Port lib/opencode/rails/ source files; strip Rails.event/Rails.error
Eleven source files moved from ajent-rails:lib/opencode/rails/ to
opencode-rails:lib/opencode/ (flat layout — modules are Opencode::*, not
Opencode::Rails::*; matches opencode-ruby).
artifact.rb 63 LOC
exchange.rb 77 LOC
impostor.rb 48 LOC
message_artifacts.rb 133 LOC
sandbox_file.rb 81 LOC
sandbox.rb 71 LOC
session.rb 168 LOC
tool_display.rb 423 LOC
transform.rb 77 LOC
turn.rb 642 LOC
uploaded_files_prompt.rb 85 LOC
----
total 1,868 LOC
Surgical Rails strips:
exchange.rb:
Rails.event.notify(name, payload)
-> Opencode::Instrumentation.instrument(name, payload) { }
message_artifacts.rb (1 call), turn.rb (6 calls):
Rails.error.report(error, **opts)
-> Opencode::ErrorReporter.report(error, **opts)
Comments/docstrings referencing Rails.error.report / Rails.event left
in place — they document how to wire the host adapter.
ActiveSupport core_ext requires expanded in lib/opencode-rails.rb to
cover Numeric#seconds, Hash#deep_stringify_keys, String#squish/truncate,
String#demodulize. Bundle install + smoke load confirms all 12
gem-provided constants resolve cleanly.
This commit is contained in:
@@ -10,10 +10,12 @@
|
||||
|
||||
require "opencode-ruby"
|
||||
|
||||
require "active_support/core_ext/object/blank"
|
||||
require "active_support/core_ext/object/blank" # blank?, present?, presence
|
||||
require "active_support/core_ext/object/try"
|
||||
require "active_support/core_ext/hash/keys"
|
||||
require "active_support/core_ext/string/inflections"
|
||||
require "active_support/core_ext/hash/keys" # deep_stringify_keys, deep_symbolize_keys
|
||||
require "active_support/core_ext/string/inflections" # demodulize, underscore, camelize
|
||||
require "active_support/core_ext/string/filters" # squish, truncate
|
||||
require "active_support/core_ext/numeric/time" # 2.seconds, 5.minutes, etc.
|
||||
|
||||
require_relative "opencode/rails/version"
|
||||
require_relative "opencode/error_reporter"
|
||||
|
||||
63
lib/opencode/artifact.rb
Normal file
63
lib/opencode/artifact.rb
Normal file
@@ -0,0 +1,63 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# A file the host wants to attach to an assistant message: filename,
|
||||
# content bytes, MIME type, and an optional trust-metadata hash.
|
||||
#
|
||||
# Artifacts come from two places in the substrate:
|
||||
#
|
||||
# - Opencode::Exchange.tool_artifacts — content lives inside a tool
|
||||
# call's input/metadata (write tool).
|
||||
# - Opencode::SandboxFile#as_artifact — identity conversion of a
|
||||
# sandbox-resident file (the default path for Blackline + Raven).
|
||||
#
|
||||
# Transforms also return Artifacts; that's why FlightResultsTransform
|
||||
# returns one with the host-rendered HTML + trust metadata stamp.
|
||||
#
|
||||
# An Artifact knows how to attach itself to a message, idempotently:
|
||||
# it consults `message.artifacts` to skip if its filename is already
|
||||
# there. The attaching verb belongs to the Artifact (the noun whose
|
||||
# state the verb consults), not to a separate Attacher class.
|
||||
class Artifact
|
||||
attr_reader :filename, :content, :content_type, :metadata
|
||||
|
||||
def initialize(filename:, content:, content_type:, metadata: {})
|
||||
@filename = filename
|
||||
@content = content
|
||||
@content_type = content_type
|
||||
@metadata = metadata
|
||||
end
|
||||
|
||||
# Idempotent attach. Returns true if newly attached, false if the
|
||||
# filename was already present on the message (so callers can count
|
||||
# what they actually persisted vs what was already there).
|
||||
def attach_to(message)
|
||||
return false if already_attached_to?(message)
|
||||
|
||||
message.artifacts.attach(
|
||||
io: StringIO.new(content),
|
||||
filename: filename,
|
||||
content_type: content_type,
|
||||
metadata: metadata
|
||||
)
|
||||
true
|
||||
end
|
||||
|
||||
def already_attached_to?(message)
|
||||
message.artifacts.any? { |a| a.filename.to_s == filename }
|
||||
end
|
||||
|
||||
def ==(other)
|
||||
other.is_a?(Artifact) &&
|
||||
other.filename == filename &&
|
||||
other.content == content &&
|
||||
other.content_type == content_type &&
|
||||
other.metadata == metadata
|
||||
end
|
||||
alias_method :eql?, :==
|
||||
|
||||
def hash
|
||||
[ filename, content, content_type, metadata ].hash
|
||||
end
|
||||
end
|
||||
end
|
||||
79
lib/opencode/exchange.rb
Normal file
79
lib/opencode/exchange.rb
Normal file
@@ -0,0 +1,79 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# The OpenCode messages produced by a single turn (the array returned
|
||||
# by GET /session/:id/message and consumed by Opencode::Turn's
|
||||
# recovery + finalization paths).
|
||||
#
|
||||
# First-class noun rather than a bare array because:
|
||||
#
|
||||
# - It owns the "give me the tool-produced artifacts" question, so
|
||||
# callers don't reach into ResponseParser for that. The parser is
|
||||
# about wire-shape extraction; the Exchange is about the domain
|
||||
# concept of "what files came out of this turn."
|
||||
#
|
||||
# - It owns the "opencode.apply_patch.artifacts_dropped" event
|
||||
# emission, keeping ResponseParser a pure module (no instrumentation
|
||||
# side effects). Pure functions stay pure. The event flows through
|
||||
# Opencode::Instrumentation, so hosts wire AS::Notifications /
|
||||
# Rails.event / OpenTelemetry / etc. via the adapter.
|
||||
class Exchange
|
||||
def initialize(messages)
|
||||
@messages = Array(messages)
|
||||
end
|
||||
|
||||
# Returns Opencode::Artifact values for every file produced by a
|
||||
# tool call in this exchange (currently the `write` tool; apply_patch
|
||||
# is acknowledged-but-empty in v1.15+, see ResponseParser).
|
||||
#
|
||||
# `exclude:` filters by destination filename — used by the substrate
|
||||
# to keep tool-extracted Artifacts from racing per-message transforms
|
||||
# that own the same filenames.
|
||||
def tool_artifacts(exclude: [])
|
||||
excluded = Set.new(exclude)
|
||||
raw = Opencode::ResponseParser.extract_artifacts_from_messages(@messages)
|
||||
notify_drops(raw)
|
||||
|
||||
raw.filter_map do |file_data|
|
||||
next if excluded.include?(file_data[:filename])
|
||||
|
||||
Artifact.new(
|
||||
filename: file_data[:filename],
|
||||
content: file_data[:content],
|
||||
content_type: file_data[:content_type]
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# ResponseParser annotates dropped apply_patch parts on the messages
|
||||
# it processes (since v1.15+ wire shape carries no inline post-write
|
||||
# content). The notify lives here, not in the parser, so the parser
|
||||
# stays a pure function. Operators see one event per assistant
|
||||
# message that contained an apply_patch tool call.
|
||||
def notify_drops(_)
|
||||
@messages.each do |message|
|
||||
next unless message.dig(:info, :role) == "assistant"
|
||||
|
||||
parts = message[:parts] || []
|
||||
parts.each do |part|
|
||||
next unless part[:type] == "tool" && part[:tool] == "apply_patch"
|
||||
next unless part.dig(:state, :status) == "completed"
|
||||
|
||||
file_entries = part.dig(:state, :metadata, :files) || []
|
||||
eligible = file_entries.reject { |e| e[:type] == "delete" }
|
||||
next if eligible.empty?
|
||||
|
||||
Opencode::Instrumentation.instrument("opencode.apply_patch.artifacts_dropped",
|
||||
file_count: eligible.size,
|
||||
relative_paths: eligible.filter_map { |e| e[:relativePath] }.first(5),
|
||||
message_id: part[:messageID],
|
||||
session_id: part[:sessionID],
|
||||
reason: "apply_patch v1.15+ metadata does not include post-write file content; " \
|
||||
"extraction requires sandbox-read which is not yet wired into ResponseParser") { }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
48
lib/opencode/impostor.rb
Normal file
48
lib/opencode/impostor.rb
Normal file
@@ -0,0 +1,48 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# An ActiveStorage::Attachment on an assistant message that uses a
|
||||
# trusted Transform's destination filename but fails the transform's
|
||||
# `#trusted?` predicate. In plain English: a same-named attachment
|
||||
# that wasn't produced by the host-trusted renderer pipeline.
|
||||
#
|
||||
# Where impostors come from:
|
||||
#
|
||||
# 1. A previous job retry attached the destination filename via the
|
||||
# tool-extracted path (the agent wrote a file with that name and
|
||||
# it landed before the trusted render did).
|
||||
# 2. A pre-substrate code path persisted an agent-authored HTML file
|
||||
# with the destination filename (the historical AIGL exploit
|
||||
# surface that motivated the trust boundary in the first place).
|
||||
# 3. A previous transform version stamped different metadata and the
|
||||
# trust check now correctly rejects it.
|
||||
#
|
||||
# The Impostor knows how to remove itself. The orchestrator just asks
|
||||
# "are there impostors of this transform on this message?" and tells
|
||||
# each one to `purge!`. Purging is a verb that belongs to the
|
||||
# impostor — it's the noun whose state the purge mutates.
|
||||
class Impostor
|
||||
# Finds impostors of `transform` on `message` — attachments whose
|
||||
# filename matches the transform's destination but whose contents
|
||||
# fail the transform's trust predicate.
|
||||
def self.for(message:, transform:)
|
||||
target = transform.destination_filename
|
||||
message.artifacts
|
||||
.select { |a| a.filename.to_s == target }
|
||||
.reject { |a| transform.trusted?(a) }
|
||||
.map { |a| new(attachment: a) }
|
||||
end
|
||||
|
||||
def initialize(attachment:)
|
||||
@attachment = attachment
|
||||
end
|
||||
|
||||
def purge!
|
||||
@attachment.purge
|
||||
end
|
||||
|
||||
def filename
|
||||
@attachment.filename.to_s
|
||||
end
|
||||
end
|
||||
end
|
||||
133
lib/opencode/message_artifacts.rb
Normal file
133
lib/opencode/message_artifacts.rb
Normal file
@@ -0,0 +1,133 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# The collection of new artifacts attached to an assistant message as
|
||||
# a result of one turn. The orchestrator that used to live in
|
||||
# Opencode::ArtifactCollector now lives on this collection — instead
|
||||
# of a "Collector" verb-class, the collection knows how to populate
|
||||
# itself from sources (tool exchange, sandbox) and how to attach.
|
||||
#
|
||||
# Two-line usage:
|
||||
#
|
||||
# Opencode::MessageArtifacts.new(message: m, feature: "blackline", transforms: [])
|
||||
# .attach_from(exchange: exchange, sandbox: sandbox)
|
||||
#
|
||||
# All four phases (tool extract, transform routing, impostor purge,
|
||||
# default sandbox attach) live as small named methods. The substrate
|
||||
# never special-cases a product — `:feature` is only for error-report
|
||||
# context, and `:transforms` (default []) is per-product policy.
|
||||
#
|
||||
# Idempotent under retry: `Opencode::Artifact#attach_to` already
|
||||
# skips when the filename is present on the message, and the
|
||||
# tool-extracted phase excludes filenames the transforms own.
|
||||
class MessageArtifacts
|
||||
MAX_SANDBOX_ARTIFACTS = 20
|
||||
|
||||
# default_attach values:
|
||||
# :all — Blackline/Raven default. Every safe sandbox file that
|
||||
# no transform claims falls through to identity attach.
|
||||
# The agent's `write` outputs are final document bytes the
|
||||
# host serves back unchanged.
|
||||
# :none — AIGL. The agent's sandbox is full of internal working
|
||||
# scratch (notes.md, map.md, timeline.md) plus the one
|
||||
# file the transform claims (flight-results.json). Only
|
||||
# transform-claimed files attach; everything else stays
|
||||
# agent-internal.
|
||||
def initialize(message:, feature:, transforms: [], default_attach: :all,
|
||||
max_sandbox_files: MAX_SANDBOX_ARTIFACTS)
|
||||
@message = message
|
||||
@feature = feature
|
||||
@transforms = transforms
|
||||
@default_attach = default_attach
|
||||
@max_sandbox_files = max_sandbox_files
|
||||
end
|
||||
|
||||
# Drains both sources and attaches. Returns self so callers can
|
||||
# chain off it if they want to count what landed.
|
||||
def attach_from(exchange: nil, sandbox: nil, cutoff: nil, upload_echo: [])
|
||||
attach_from_exchange(exchange) if exchange
|
||||
attach_from_sandbox(sandbox, cutoff: cutoff, upload_echo: upload_echo) if sandbox
|
||||
self
|
||||
rescue StandardError => e
|
||||
report(e, action: "attach_artifacts")
|
||||
self
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
attr_reader :message, :feature, :transforms, :max_sandbox_files, :default_attach
|
||||
|
||||
# Tool-produced artifacts (write tool's input content). Skip any
|
||||
# filename a transform owns — those land via the sandbox path so the
|
||||
# transform's trust pipeline (render + metadata stamp) is the only
|
||||
# way the bytes reach the user.
|
||||
def attach_from_exchange(exchange)
|
||||
exchange.tool_artifacts(exclude: transform_owned_filenames).each do |artifact|
|
||||
artifact.attach_to(message)
|
||||
end
|
||||
rescue StandardError => e
|
||||
report(e, action: "attach_from_exchange")
|
||||
end
|
||||
|
||||
def attach_from_sandbox(sandbox, cutoff:, upload_echo:)
|
||||
return unless sandbox.exists?
|
||||
|
||||
uploaded = Set.new(upload_echo)
|
||||
attached = 0
|
||||
|
||||
sandbox.files(after: cutoff).each do |file|
|
||||
break if attached >= max_sandbox_files
|
||||
next if uploaded.include?(file.basename)
|
||||
|
||||
if (transform = transforms.find { |t| t.applies_to?(file) })
|
||||
attached += 1 if apply_transform(transform, file)
|
||||
elsif default_attach == :all
|
||||
# Default identity path. Blackline/Raven default — every safe
|
||||
# sandbox file that no transform claims attaches as-is. AIGL
|
||||
# passes default_attach: :none so non-transform files (the
|
||||
# agent's notes.md / map.md / timeline.md scratch) don't
|
||||
# auto-attach.
|
||||
attached += 1 if file.as_artifact.attach_to(message)
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
report(e, action: "attach_from_sandbox")
|
||||
end
|
||||
|
||||
# Returns true if a fresh trusted artifact was attached. Falsy on
|
||||
# already-trusted-attached, transform-raised, or duplicate-filename.
|
||||
def apply_transform(transform, file)
|
||||
if transform.purge_impostors?
|
||||
purged = Impostor.for(message: message, transform: transform)
|
||||
if purged.any?
|
||||
purged.each(&:purge!)
|
||||
# ActiveStorage purges the attachment + blob, but `message.artifacts`
|
||||
# holds the pre-purge collection in memory. Without resetting,
|
||||
# Artifact#already_attached_to? still sees the (just-purged) row
|
||||
# and shortcuts the trusted attach below.
|
||||
message.artifacts.reset
|
||||
end
|
||||
end
|
||||
return false if trusted_present?(transform)
|
||||
|
||||
artifact = transform.render(file)
|
||||
artifact.attach_to(message)
|
||||
rescue Transform::Error => e
|
||||
report(e, action: "transform_#{transform.class.name.demodulize}")
|
||||
false
|
||||
end
|
||||
|
||||
def trusted_present?(transform)
|
||||
message.artifacts.any? { |a| transform.trusted?(a) }
|
||||
end
|
||||
|
||||
def transform_owned_filenames
|
||||
transforms.flat_map(&:owned_filenames)
|
||||
end
|
||||
|
||||
def report(error, action:)
|
||||
Opencode::ErrorReporter.report(error, handled: true, severity: :warning,
|
||||
context: { feature: feature, action: action, message_id: message.id })
|
||||
end
|
||||
end
|
||||
end
|
||||
71
lib/opencode/sandbox.rb
Normal file
71
lib/opencode/sandbox.rb
Normal file
@@ -0,0 +1,71 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# The per-user (or per-trip) sandbox directory the agent's container
|
||||
# writes into. A first-class noun rather than a path-string with
|
||||
# primitives sprinkled around the codebase: the Sandbox knows its
|
||||
# own path, knows how to walk itself, knows what "fresh enough" means
|
||||
# for a given turn, and yields SandboxFile values that carry their
|
||||
# own safety predicate.
|
||||
#
|
||||
# Used by Opencode::MessageArtifacts. Construct one with the path,
|
||||
# then ask it for `files(after:)` where `after` is the user message's
|
||||
# created_at time (minus CUTOFF_SLACK). Files older than the cutoff
|
||||
# are stale leftovers from a previous turn — never attached.
|
||||
class Sandbox
|
||||
# Two-second slack absorbs clock skew between the Rails app and the
|
||||
# per-user OpenCode container. Without it, a file written by the
|
||||
# container in the same wall-clock second as the user message could
|
||||
# be (mtime < created_at) and get rejected.
|
||||
CUTOFF_SLACK = 2.seconds
|
||||
|
||||
attr_reader :path
|
||||
|
||||
def initialize(path:, max_file_bytes: Opencode::ResponseParser::MAX_ARTIFACT_SIZE)
|
||||
@path = path
|
||||
@max_file_bytes = max_file_bytes
|
||||
end
|
||||
|
||||
def exists?
|
||||
path.present? && Dir.exist?(path)
|
||||
end
|
||||
|
||||
# Yields SandboxFile values for every file in the sandbox that
|
||||
# passes its own #safe? predicate AND was modified after the cutoff.
|
||||
# When `after:` is nil (callers without a user_message handle, e.g.
|
||||
# AIGL on certain finalize paths), no mtime filter is applied —
|
||||
# only safety + filetype.
|
||||
def files(after: nil)
|
||||
return enum_for(:files, after: after) unless block_given?
|
||||
return unless exists?
|
||||
|
||||
cutoff = after && (after.to_time - CUTOFF_SLACK)
|
||||
|
||||
Dir.glob(File.join(path, "*")).each do |entry|
|
||||
next unless File.file?(entry)
|
||||
next if cutoff && File.mtime(entry) < cutoff
|
||||
|
||||
file = SandboxFile.new(
|
||||
path: entry,
|
||||
sandbox_prefix: prefix,
|
||||
max_bytes: @max_file_bytes
|
||||
)
|
||||
next unless file.safe?
|
||||
|
||||
yield file
|
||||
end
|
||||
end
|
||||
|
||||
def file(basename, after: nil)
|
||||
files(after: after).find { |f| f.basename == basename }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Separator-terminated prefix so /sandbox-1 doesn't false-positive
|
||||
# on /sandbox-10/foo when SandboxFile checks realpath containment.
|
||||
def prefix
|
||||
@prefix ||= File.join(path, "")
|
||||
end
|
||||
end
|
||||
end
|
||||
81
lib/opencode/sandbox_file.rb
Normal file
81
lib/opencode/sandbox_file.rb
Normal file
@@ -0,0 +1,81 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "pathname"
|
||||
|
||||
module Opencode
|
||||
# One file living inside an Opencode::Sandbox.
|
||||
#
|
||||
# Carries the safety predicate inline (#safe?) so the orchestrator
|
||||
# doesn't have to know what "safe" means — symlink, realpath inside
|
||||
# the sandbox, size cap. Carries the default identity conversion to
|
||||
# Artifact (#as_artifact) so non-transform code can attach a sandbox
|
||||
# file as-is without re-implementing the marcel + StringIO ceremony.
|
||||
#
|
||||
# mtime-cutoff freshness lives on Opencode::Sandbox#files(after:),
|
||||
# not here — the file doesn't know which turn opened "after." That's
|
||||
# a property of the scan, not a property of the file.
|
||||
class SandboxFile
|
||||
attr_reader :path, :sandbox_prefix
|
||||
|
||||
def initialize(path:, sandbox_prefix:, max_bytes:)
|
||||
@path = path
|
||||
@sandbox_prefix = sandbox_prefix
|
||||
@max_bytes = max_bytes
|
||||
end
|
||||
|
||||
def basename
|
||||
File.basename(path)
|
||||
end
|
||||
|
||||
def size
|
||||
File.size(path)
|
||||
end
|
||||
|
||||
def mtime
|
||||
File.mtime(path)
|
||||
end
|
||||
|
||||
def content
|
||||
File.read(path)
|
||||
end
|
||||
|
||||
def content_type
|
||||
Marcel::MimeType.for(name: basename)
|
||||
end
|
||||
|
||||
# Defense-in-depth on individual file paths the scan yielded:
|
||||
#
|
||||
# - Reject symlinks (no follow-the-link escape).
|
||||
# - The resolved realpath of the path must lie inside the sandbox
|
||||
# with a separator-terminated prefix so /sandbox-1 doesn't false-
|
||||
# positive on /sandbox-10/foo.
|
||||
# - Reject anything over the size cap (default
|
||||
# Opencode::ResponseParser::MAX_ARTIFACT_SIZE = 10 MB).
|
||||
#
|
||||
# The Sandbox scan filters non-files (directories, FIFOs) before
|
||||
# yielding, so we don't re-check #file? here.
|
||||
def safe?
|
||||
return false if File.symlink?(path)
|
||||
return false unless Pathname.new(path).realpath.to_s.start_with?(sandbox_prefix)
|
||||
return false if size > @max_bytes
|
||||
|
||||
true
|
||||
rescue Errno::ENOENT
|
||||
# Concurrent deletion between scan-yield and safety-check — treat
|
||||
# as unsafe so the orchestrator skips rather than crashing.
|
||||
false
|
||||
end
|
||||
|
||||
# Identity conversion: this sandbox file → an Artifact carrying the
|
||||
# file's own bytes. Used by the substrate's default (non-transform)
|
||||
# path for Blackline + Raven, whose agents write document bytes
|
||||
# directly to the sandbox and expect them attached unchanged.
|
||||
def as_artifact
|
||||
Artifact.new(
|
||||
filename: basename,
|
||||
content: content,
|
||||
content_type: content_type
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
168
lib/opencode/session.rb
Normal file
168
lib/opencode/session.rb
Normal file
@@ -0,0 +1,168 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# Owns the lifecycle of an OpenCode session against a domain record.
|
||||
#
|
||||
# Three near-identical implementations of this lifecycle existed on
|
||||
# Blackline::Conversation, Raven::Conversation, and AIGL::Trip. Each
|
||||
# had subtle differences (Blackline and Raven didn't take a row-level
|
||||
# lock; AIGL did). Sandi Metz flagged the shotgun surgery in the
|
||||
# architectural review — a change to the lifecycle had to be made in
|
||||
# three places that looked alike but disagreed on locking. This PORO
|
||||
# is the consolidated role.
|
||||
#
|
||||
# Usage:
|
||||
#
|
||||
# service = Opencode::Session.new(
|
||||
# conversation,
|
||||
# permissions_for: ->(record) { permission_rules_for(record) },
|
||||
# on_error: ->(e, **opts) { Rails.error.report(e, **opts) } # optional
|
||||
# )
|
||||
# session_id = service.ensure!(client) # idempotent create-or-resolve
|
||||
# service.recreate!(client) # always create fresh
|
||||
# service.abort!(client) # best-effort upstream abort
|
||||
#
|
||||
# The permissions_for: callable receives the record at mint! time and
|
||||
# returns the permissions array for client.create_session. Product-
|
||||
# specific scoping (e.g. AIGL's workspace_key/trip_id branching) lives
|
||||
# in the caller's lambda, not in this class — that keeps Session free
|
||||
# of any reference to permission-building helpers and preserves the
|
||||
# rails-tier -> containers-tier boundary the design doc locks in.
|
||||
#
|
||||
# The on_error: callable is invoked when abort! catches an
|
||||
# Opencode::Error during teardown. Callers wire their own observability
|
||||
# (Rails.error.report, OpenTelemetry, Sentry, custom logging) here.
|
||||
# Defaults to nil — silently swallowing teardown errors, which matches
|
||||
# the pre-inversion behaviour. Substrate has no opinion about how the
|
||||
# host reports.
|
||||
#
|
||||
# The record must respond to:
|
||||
# - #title (String) — passed as session title
|
||||
# - #opencode_session_id / #opencode_session_id= — string column
|
||||
# - #with_lock(&block) — ActiveRecord row-level lock
|
||||
# - #update! / #reload — standard ActiveRecord
|
||||
# - #id — for error reporting context
|
||||
class Session
|
||||
def initialize(record, permissions_for:, on_error: nil)
|
||||
@record = record
|
||||
@permissions_for = permissions_for
|
||||
@on_error = on_error
|
||||
@just_created = false
|
||||
end
|
||||
|
||||
# True iff the most recent ensure!/recreate! actually created a
|
||||
# fresh upstream session (vs resolving to an existing id).
|
||||
#
|
||||
# Exists so consumers can distinguish "we just minted this" from
|
||||
# "we found an existing one" without poking at ActiveRecord dirty
|
||||
# tracking on the record. The right object to ask is the object
|
||||
# that did the work; that's this one.
|
||||
def just_created?
|
||||
@just_created
|
||||
end
|
||||
|
||||
# Returns the session id for the record, creating an OpenCode session
|
||||
# if none exists yet. Idempotent. Race-safe via row-level locking and
|
||||
# double-check shortcuts that avoid the lock entirely when an id is
|
||||
# already persisted.
|
||||
def ensure!(client)
|
||||
@just_created = false
|
||||
return @record.opencode_session_id if @record.opencode_session_id.present?
|
||||
|
||||
@record.with_lock do
|
||||
# Double-check inside the lock: another worker may have set the
|
||||
# id between our first read and acquiring the lock.
|
||||
return @record.opencode_session_id if @record.opencode_session_id.present?
|
||||
|
||||
mint!(client)
|
||||
end
|
||||
rescue ActiveRecord::RecordNotUnique
|
||||
# Two workers raced past the unique-index gate. The loser reloads
|
||||
# to pick up the winner's id. The winner is the one that
|
||||
# just_created?; the loser sees @just_created = false.
|
||||
@record.reload
|
||||
@record.opencode_session_id
|
||||
end
|
||||
|
||||
# Always creates a fresh session and overwrites the persisted id.
|
||||
# Used by Opencode::Turn recovery when the upstream session has gone
|
||||
# stale (StaleSessionError / SessionNotFoundError).
|
||||
#
|
||||
# Race semantics: two concurrent recreate! callers serialize through
|
||||
# with_lock; the first mints, the second observes the freshly-minted
|
||||
# id (different from its own pre-lock snapshot) and returns that
|
||||
# rather than minting again. Both callers converge on one upstream
|
||||
# session — no orphan leak.
|
||||
def recreate!(client)
|
||||
@just_created = false
|
||||
pre_lock_id = @record.opencode_session_id
|
||||
|
||||
@record.with_lock do
|
||||
# If another recreate! caller minted while we were waiting for
|
||||
# the lock, the id changed under us. Treat their fresh mint as
|
||||
# fresh enough for us too — recreate! returns "a session that's
|
||||
# newer than what I saw at method entry", not "specifically my
|
||||
# own mint".
|
||||
current_id = @record.opencode_session_id
|
||||
if current_id.present? && current_id != pre_lock_id
|
||||
return current_id
|
||||
end
|
||||
|
||||
mint!(client)
|
||||
end
|
||||
end
|
||||
|
||||
# Best-effort upstream abort. Swallows Opencode::Error so callers
|
||||
# never have to wrap this in a rescue — aborts run inside cleanup
|
||||
# paths where re-raising would mask the real cause of teardown.
|
||||
def abort!(client)
|
||||
return unless @record.opencode_session_id.present?
|
||||
|
||||
client.abort_session(@record.opencode_session_id)
|
||||
rescue Opencode::Error => e
|
||||
@on_error&.call(e, action: "abort_session", record_id: @record.id)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# The atomic create-and-persist unit shared by ensure! and recreate!.
|
||||
#
|
||||
# One operation, two failure modes:
|
||||
# - client.create_session raises -> nothing to clean up, re-raise
|
||||
# - update! raises RecordInvalid -> upstream session exists,
|
||||
# delete it before re-raising
|
||||
# so we never leak orphans
|
||||
#
|
||||
# Sets @just_created = true on success; callers reset to false at
|
||||
# method entry so a no-op call (existing id) reports false correctly.
|
||||
def mint!(client)
|
||||
session_id = nil
|
||||
|
||||
begin
|
||||
result = client.create_session(title: @record.title, permissions: @permissions_for.call(@record))
|
||||
session_id = extract_session_id(result)
|
||||
@record.update!(opencode_session_id: session_id)
|
||||
@just_created = true
|
||||
session_id
|
||||
rescue ActiveRecord::RecordInvalid
|
||||
safely_delete(client, session_id) if session_id
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
# OpenCode HTTP responses use symbol keys when parsed via JSON.parse
|
||||
# with symbolize_names: true (Opencode::Client) but mocks/stubs in
|
||||
# tests often produce string-keyed hashes. Accept both.
|
||||
def extract_session_id(result)
|
||||
result[:id] || result["id"]
|
||||
end
|
||||
|
||||
def safely_delete(client, session_id)
|
||||
client.delete_session(session_id)
|
||||
rescue Opencode::Error
|
||||
# Best-effort cleanup; the orphan may be gone already or the
|
||||
# upstream is unavailable. Either way, we already have a real
|
||||
# error to raise.
|
||||
end
|
||||
end
|
||||
end
|
||||
423
lib/opencode/tool_display.rb
Normal file
423
lib/opencode/tool_display.rb
Normal file
@@ -0,0 +1,423 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# A value object that wraps an OpenCode tool part (the shape produced by
|
||||
# Opencode::Reply from `message.part.updated` events) and exposes the
|
||||
# information a renderer needs — canonical tool name, human labels,
|
||||
# target (filepath / pattern / url / command), semantic accessors for
|
||||
# rich content (unified diffs, todo lists, bash output, etc.), and an
|
||||
# icon identifier.
|
||||
#
|
||||
# Pure Ruby over ActiveSupport. Lives in the shared Opencode namespace
|
||||
# so Blackline views, AIGL views, and any future OpenCode-backed
|
||||
# feature can render tool calls consistently.
|
||||
#
|
||||
# ## Data shape (Opencode::Reply writes this into `parts_json`)
|
||||
#
|
||||
# {
|
||||
# "type" => "tool",
|
||||
# "tool" => "read" | "edit" | "exa_web_search_exa" | ...,
|
||||
# "status" => "pending" | "running" | "completed" | "error",
|
||||
# "input" => { "filePath" => ..., "content" => ..., ... },
|
||||
# "title" => "..." (optional, from tool-result)
|
||||
# "error" => "..." (only when status == "error")
|
||||
# "metadata" => { (optional, from tool-result)
|
||||
# "diff" => "...unified diff text...",
|
||||
# "diagnostics" => { filePath => [LSP diagnostics] },
|
||||
# "preview" => "...file preview...",
|
||||
# "matches" => Integer,
|
||||
# "count" => Integer,
|
||||
# "output" => "...bash stdout...",
|
||||
# "stdout" => "...bash stdout (legacy key)...",
|
||||
# "description" => "...bash description...",
|
||||
# "error" => truthy when the tool ran but returned an error,
|
||||
# },
|
||||
# "output" => "...raw tool output string..."
|
||||
# }
|
||||
#
|
||||
# ## MCP prefix handling
|
||||
#
|
||||
# OpenCode's MCP adapter prefixes tools with the server name: Exa's
|
||||
# `web_search_exa` becomes `exa_web_search_exa` on the wire (double
|
||||
# suffix because Exa also names the tool with an `_exa` suffix).
|
||||
# `#canonical_tool` strips known MCP prefixes so switching logic can
|
||||
# treat `exa_web_search_exa` and `web_search_exa` as the same tool.
|
||||
#
|
||||
# ## Adding a new tool
|
||||
#
|
||||
# Add one row to the `TOOLS` table below — every derived concern (kind,
|
||||
# gerund, icon, past-tense verb, KNOWN membership) is computed from it.
|
||||
#
|
||||
# ## Example
|
||||
#
|
||||
# display = Opencode::ToolDisplay.new(part)
|
||||
# display.canonical_tool # => "edit"
|
||||
# display.kind # => "Edit"
|
||||
# display.gerund # => "Editing"
|
||||
# display.target # => "app/models/user.rb"
|
||||
# display.diff # => "@@ -1,3 +1,4 @@\n..."
|
||||
# display.icon # => :pencil_square
|
||||
#
|
||||
class ToolDisplay
|
||||
# The single source of truth for every tool we render with dedicated
|
||||
# affordances. One row per tool, five columns:
|
||||
#
|
||||
# :kind — noun label ("Read", "Web search")
|
||||
# :gerund — present-progressive phrase ("Reading", "Searching the web")
|
||||
# :icon — abstract icon name the view layer maps to an SVG
|
||||
# :past — past-tense verb ("Read", "Searched", "Wrote")
|
||||
#
|
||||
# Anything not in this table falls back to generic rendering (humanize
|
||||
# the canonical name + OpenCode's title).
|
||||
TOOLS = {
|
||||
"read" => { kind: "Read", gerund: "Reading", icon: :document, past: "Read" },
|
||||
"write" => { kind: "Write", gerund: "Writing", icon: :document_plus, past: "Wrote" },
|
||||
"edit" => { kind: "Edit", gerund: "Editing", icon: :pencil_square, past: "Edited" },
|
||||
"multiedit" => { kind: "Edit", gerund: "Editing", icon: :pencil_square, past: "Edited" },
|
||||
"apply_patch" => { kind: "Patch", gerund: "Applying changes", icon: :pencil_square, past: "Applied changes to" },
|
||||
"bash" => { kind: "Bash", gerund: "Running command", icon: :command_line, past: "Ran" },
|
||||
"grep" => { kind: "Grep", gerund: "Searching files", icon: :document_magnifying_glass, past: "Searched for" },
|
||||
"glob" => { kind: "Glob", gerund: "Searching files", icon: :magnifying_glass, past: "Searched for" },
|
||||
"list" => { kind: "LS", gerund: "Listing files", icon: :rectangle_stack, past: "Listed" },
|
||||
"ls" => { kind: "LS", gerund: "Listing files", icon: :rectangle_stack, past: "Listed" },
|
||||
"webfetch" => { kind: "Fetch", gerund: "Reading web page", icon: :globe, past: "Fetched" },
|
||||
"websearch" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" },
|
||||
"codesearch" => { kind: "Code search", gerund: "Searching code", icon: :magnifying_glass, past: "Searched code for" },
|
||||
"web_search_exa" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" },
|
||||
# @zhafron/mcp-web-search exposes two tools: search_web (SearXNG meta-
|
||||
# search) and fetch_url (Mozilla Readability page fetch). OpenCode
|
||||
# prefixes them with the MCP server name from config.json
|
||||
# (`local-web-search_`), which `canonical_tool` strips before lookup.
|
||||
"search_web" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" },
|
||||
"fetch_url" => { kind: "Fetch", gerund: "Reading web page", icon: :globe, past: "Fetched" },
|
||||
"get_code_context_exa" => { kind: "Code lookup", gerund: "Looking up code", icon: :document_magnifying_glass, past: "Looked up" },
|
||||
"company_research_exa" => { kind: "Company research", gerund: "Researching company", icon: :globe, past: "Researched" },
|
||||
"todowrite" => { kind: "Plan", gerund: "Planning", icon: :queue_list, past: "Updated plan" },
|
||||
"todoread" => { kind: "Plan", gerund: "Reading plan", icon: :queue_list, past: "Read plan" },
|
||||
"task" => { kind: "Task", gerund: "Researching", icon: :robot, past: "Ran subtask" },
|
||||
"skill" => { kind: "Skill", gerund: "Loading skill", icon: :sparkles, past: "Loaded skill" }
|
||||
}.freeze
|
||||
|
||||
KNOWN = TOOLS.keys.freeze
|
||||
DEFAULT_ICON = :sparkles
|
||||
|
||||
# MCP server prefixes to strip, paired with the tools they canonicalize
|
||||
# to (for `#provider` classification). Prefixes sorted by length
|
||||
# descending in case future additions overlap.
|
||||
PROVIDERS = {
|
||||
"exa" => { prefix: "exa_", canonical: %w[web_search_exa get_code_context_exa company_research_exa].freeze },
|
||||
"brave" => { prefix: "brave_", canonical: [].freeze },
|
||||
"serper" => { prefix: "serper_", canonical: [].freeze },
|
||||
"tavily" => { prefix: "tavily_", canonical: [].freeze },
|
||||
# The local-web-search MCP server registered in
|
||||
# config/opencode/<product>/config.json. Hyphenated server name plus
|
||||
# underscore separator; canonical tools are search_web and fetch_url.
|
||||
"local-web-search" => { prefix: "local-web-search_", canonical: %w[search_web fetch_url].freeze }
|
||||
}.freeze
|
||||
|
||||
MCP_PREFIXES = PROVIDERS.values.map { |p| p[:prefix] }.freeze
|
||||
|
||||
attr_reader :part
|
||||
|
||||
def initialize(part)
|
||||
@part = part || {}
|
||||
end
|
||||
|
||||
# Convenience constructor for raw OpenCode API parts (symbol keys,
|
||||
# nested under `state`). Flattens into the canonical `parts_json`
|
||||
# shape Reply persists.
|
||||
#
|
||||
# raw = { type: "tool", tool: "bash", callID: ...,
|
||||
# state: { status: "running", input: {...}, title: "..." } }
|
||||
# Opencode::ToolDisplay.from_raw(raw)
|
||||
def self.from_raw(raw)
|
||||
raw = (raw || {}).deep_stringify_keys
|
||||
state = raw["state"] || {}
|
||||
new(
|
||||
"type" => "tool",
|
||||
"tool" => raw["tool"],
|
||||
"status" => state["status"],
|
||||
"input" => state["input"] || {},
|
||||
"output" => state["output"],
|
||||
"title" => state["title"],
|
||||
"error" => state["error"],
|
||||
"metadata" => state["metadata"] || {}
|
||||
)
|
||||
end
|
||||
|
||||
# ----- Identity -----------------------------------------------------
|
||||
|
||||
def tool_name
|
||||
@part["tool"].to_s
|
||||
end
|
||||
|
||||
# Strips MCP prefixes — and, when present, the matching MCP suffix —
|
||||
# so tools render cleanly regardless of how the server namespaces them.
|
||||
#
|
||||
# Examples:
|
||||
# exa_web_search_exa → web_search_exa (KNOWN tool, preserved)
|
||||
# exa_web_fetch_exa → web_fetch (not KNOWN; cleaned for display)
|
||||
# exa_nonexistent → nonexistent (prefix-stripped fallback)
|
||||
# brave_read → read (KNOWN after prefix strip)
|
||||
#
|
||||
# The double-strip handles Exa's naming convention: the MCP server
|
||||
# exports tools like `web_fetch_exa`, and OpenCode prepends `exa_`,
|
||||
# producing `exa_web_fetch_exa`. Stripping both yields a readable
|
||||
# `web_fetch`.
|
||||
def canonical_tool
|
||||
name = tool_name
|
||||
return name if name.empty? || KNOWN.include?(name)
|
||||
|
||||
MCP_PREFIXES.each do |prefix|
|
||||
stripped = strip_mcp_decoration(name, prefix)
|
||||
return stripped if stripped
|
||||
end
|
||||
name
|
||||
end
|
||||
|
||||
def known?
|
||||
KNOWN.include?(canonical_tool)
|
||||
end
|
||||
|
||||
def icon
|
||||
TOOLS.dig(canonical_tool, :icon) || DEFAULT_ICON
|
||||
end
|
||||
|
||||
# "Read", "Edit", "Bash", or a humanized *canonical* name for unknowns.
|
||||
# Humanizes `canonical_tool`, not `tool_name` — otherwise an MCP tool
|
||||
# like `exa_web_fetch_exa` whose canonical form (`web_fetch`) isn't
|
||||
# in TOOLS would fall back to the raw, MCP-prefixed name and display
|
||||
# as "Exa web fetch exa". Using the canonical form gives the clean
|
||||
# "Web fetch" label.
|
||||
def kind
|
||||
TOOLS.dig(canonical_tool, :kind) || canonical_tool.humanize
|
||||
end
|
||||
|
||||
# "Reading", "Editing", "Running command", falls back to "<kind>...".
|
||||
def gerund
|
||||
TOOLS.dig(canonical_tool, :gerund) || "#{kind}..."
|
||||
end
|
||||
|
||||
# ----- Status -------------------------------------------------------
|
||||
|
||||
def status
|
||||
@part["status"].to_s
|
||||
end
|
||||
|
||||
def pending? = status == "pending"
|
||||
def running? = status == "running"
|
||||
def completed? = status == "completed"
|
||||
def errored? = status == "error"
|
||||
def terminal? = completed? || errored?
|
||||
def in_flight? = pending? || running?
|
||||
|
||||
# ----- Raw payloads -------------------------------------------------
|
||||
|
||||
def input
|
||||
@part["input"].is_a?(Hash) ? @part["input"] : {}
|
||||
end
|
||||
|
||||
def metadata
|
||||
@part["metadata"].is_a?(Hash) ? @part["metadata"] : {}
|
||||
end
|
||||
|
||||
def output
|
||||
@part["output"].to_s
|
||||
end
|
||||
|
||||
def error_text
|
||||
@part["error"].to_s
|
||||
end
|
||||
|
||||
# OpenCode-supplied title (from tool-result). Used as a fallback for
|
||||
# unknown MCP tools that don't match KNOWN.
|
||||
def opencode_title
|
||||
@part["title"].to_s
|
||||
end
|
||||
|
||||
# ----- Target (what the tool is operating on) ----------------------
|
||||
|
||||
# Returns a single-string representation of the tool's primary target,
|
||||
# or nil when the tool has no meaningful single target. Callers can
|
||||
# substitute display-friendly names (e.g., sandbox filenames).
|
||||
def target
|
||||
raw = case canonical_tool
|
||||
when "read", "write", "edit", "multiedit", "apply_patch"
|
||||
input["filePath"] || input["path"]
|
||||
when "bash"
|
||||
input["command"]
|
||||
when "grep", "glob"
|
||||
input["pattern"]
|
||||
when "list", "ls"
|
||||
input["path"]
|
||||
when "webfetch", "fetch_url"
|
||||
input["url"]
|
||||
when "websearch", "codesearch",
|
||||
"web_search_exa", "get_code_context_exa", "company_research_exa"
|
||||
input["query"]
|
||||
when "search_web"
|
||||
# @zhafron/mcp-web-search names the argument `q`, not `query`.
|
||||
input["q"]
|
||||
when "task"
|
||||
input["description"]
|
||||
when "skill"
|
||||
input["skill_name"] || input["name"]
|
||||
end
|
||||
raw.to_s.presence
|
||||
end
|
||||
|
||||
# A short label combining kind + target, suitable for a one-line
|
||||
# summary. Falls back to the OpenCode-supplied title for unknown MCP
|
||||
# tools, then to just kind.
|
||||
def title
|
||||
if known?
|
||||
target.present? ? "#{kind}: #{target}" : kind
|
||||
else
|
||||
opencode_title.presence || kind
|
||||
end
|
||||
end
|
||||
|
||||
# Past-tense "done" variant: "Read foo.rb", "Wrote contract.pdf",
|
||||
# "Edited user.rb", "Ran `ls -la`". Used after completion.
|
||||
def past_tense_title
|
||||
if known?
|
||||
verb = TOOLS.dig(canonical_tool, :past)
|
||||
return kind unless verb
|
||||
target.present? ? "#{verb} #{target}" : verb
|
||||
else
|
||||
opencode_title.presence || kind
|
||||
end
|
||||
end
|
||||
|
||||
# ----- Semantic accessors (rich content) ---------------------------
|
||||
|
||||
# Unified-diff text produced by the edit tool (OpenCode attaches this
|
||||
# under metadata.diff). Present only after completion.
|
||||
def diff
|
||||
metadata["diff"].presence
|
||||
end
|
||||
|
||||
# Sorted list of todo hashes { "content", "status", "priority", "id" }.
|
||||
# Available during running (input is populated) and completed states.
|
||||
# Order: in_progress first, pending next, completed last.
|
||||
# Canonicalization (string keys + status hyphen→underscore) is
|
||||
# delegated to Opencode::Todo so Reply and ToolDisplay can't drift.
|
||||
def todos
|
||||
return [] unless %w[todowrite todoread].include?(canonical_tool)
|
||||
items = input["todos"]
|
||||
return [] unless items.is_a?(Array)
|
||||
order = { "in_progress" => 0, "pending" => 1, "completed" => 2 }
|
||||
items
|
||||
.select { |t| t.is_a?(Hash) }
|
||||
.map { |todo| Opencode::Todo.canonicalize(todo) }
|
||||
.sort_by { |todo| order[todo["status"]] || 99 }
|
||||
end
|
||||
|
||||
# File content that the write tool is creating (lives in input.content).
|
||||
def file_content
|
||||
return nil unless canonical_tool == "write"
|
||||
input["content"].presence
|
||||
end
|
||||
|
||||
# Syntax-highlighting language hint based on the target filename.
|
||||
# Falls back to the raw extension so unknown file types still get a
|
||||
# hint their syntax-highlighter may recognize heuristically.
|
||||
def file_lang
|
||||
name = File.basename(target.to_s)
|
||||
return nil if name.empty?
|
||||
ext = File.extname(name).delete_prefix(".").downcase
|
||||
LANG_BY_EXT[ext] || ext.presence
|
||||
end
|
||||
|
||||
LANG_BY_EXT = {
|
||||
"md" => "markdown", "markdown" => "markdown",
|
||||
"rb" => "ruby", "rake" => "ruby",
|
||||
"py" => "python",
|
||||
"js" => "javascript", "mjs" => "javascript",
|
||||
"ts" => "typescript", "tsx" => "typescript",
|
||||
"jsx" => "jsx",
|
||||
"json" => "json", "yml" => "yaml", "yaml" => "yaml",
|
||||
"html" => "html", "erb" => "erb",
|
||||
"css" => "css", "scss" => "scss",
|
||||
"sh" => "shell", "bash" => "shell", "zsh" => "shell",
|
||||
"sql" => "sql",
|
||||
"go" => "go", "rs" => "rust",
|
||||
"c" => "c", "h" => "c",
|
||||
"cpp" => "cpp", "hpp" => "cpp",
|
||||
"java" => "java", "kt" => "kotlin",
|
||||
"swift" => "swift", "php" => "php",
|
||||
"lua" => "lua", "toml" => "toml",
|
||||
"xml" => "xml", "conf" => "shell"
|
||||
}.freeze
|
||||
|
||||
# Bash-specific accessors.
|
||||
def bash_command
|
||||
return nil unless canonical_tool == "bash"
|
||||
input["command"].presence
|
||||
end
|
||||
|
||||
def bash_output
|
||||
return nil unless canonical_tool == "bash"
|
||||
(metadata["output"] || metadata["stdout"] || output).to_s.presence
|
||||
end
|
||||
|
||||
def bash_description
|
||||
return nil unless canonical_tool == "bash"
|
||||
metadata["description"].presence
|
||||
end
|
||||
|
||||
# Read preview (OpenCode populates metadata.preview after the read).
|
||||
def read_preview
|
||||
return nil unless canonical_tool == "read"
|
||||
metadata["preview"].presence
|
||||
end
|
||||
|
||||
# Grep/Glob match counts.
|
||||
def match_count
|
||||
case canonical_tool
|
||||
when "grep" then metadata["matches"].to_i
|
||||
when "glob" then metadata["count"].to_i
|
||||
end
|
||||
end
|
||||
|
||||
# ----- Provider identification for log tagging ---------------------
|
||||
|
||||
# Groups tools by which MCP server / built-in provides them, for
|
||||
# operational logs and metrics. Adding a new provider = one row in
|
||||
# PROVIDERS.
|
||||
def provider
|
||||
name = tool_name
|
||||
canonical = canonical_tool
|
||||
PROVIDERS.each do |provider_name, config|
|
||||
return provider_name if name.start_with?(config[:prefix])
|
||||
return provider_name if config[:canonical].include?(canonical)
|
||||
end
|
||||
KNOWN.include?(canonical) ? "opencode-builtin" : "unknown"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Returns `name` with `prefix` (and the matching MCP suffix, where
|
||||
# Exa double-encodes) removed, or nil when `name` doesn't carry that
|
||||
# prefix. Precedence:
|
||||
#
|
||||
# 1. Prefer the single-stripped form when it's KNOWN
|
||||
# (exa_web_search_exa → web_search_exa, which IS a TOOLS key).
|
||||
# 2. Otherwise prefer the clean double-stripped form when both
|
||||
# prefix and suffix are present
|
||||
# (exa_web_fetch_exa → web_fetch, for humanization).
|
||||
# 3. Fall back to single-stripped when double-stripped is empty.
|
||||
def strip_mcp_decoration(name, prefix)
|
||||
return nil unless name.start_with?(prefix)
|
||||
|
||||
stripped = name.delete_prefix(prefix)
|
||||
return stripped if KNOWN.include?(stripped)
|
||||
|
||||
suffix = "_#{prefix.chomp('_')}"
|
||||
return stripped unless stripped.end_with?(suffix)
|
||||
|
||||
double = stripped.delete_suffix(suffix)
|
||||
double.empty? ? stripped : double
|
||||
end
|
||||
end
|
||||
end
|
||||
77
lib/opencode/transform.rb
Normal file
77
lib/opencode/transform.rb
Normal file
@@ -0,0 +1,77 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# A per-product rule that converts an Opencode::SandboxFile into an
|
||||
# Opencode::Artifact, owning the trust boundary between "bytes the
|
||||
# agent wrote" and "bytes the host signs and attaches."
|
||||
#
|
||||
# The default substrate path is identity: any sandbox file the
|
||||
# allowlist accepts gets attached as-is. Blackline and Raven use
|
||||
# the default — their agents `write` final document bytes the host
|
||||
# serves back unchanged. AIGL's contract is structurally different:
|
||||
# the agent writes JSON, the **host** must render that JSON into
|
||||
# trusted HTML before attaching, because the resulting HTML gets
|
||||
# served inline from the app origin and an agent-written filename
|
||||
# can't be permitted as stored-XSS.
|
||||
#
|
||||
# Subclass hooks (override these — none have a generic default
|
||||
# that's safe to inherit):
|
||||
#
|
||||
# source_filename — basename in the sandbox the transform
|
||||
# reads from
|
||||
# destination_filename — filename of the Artifact the transform
|
||||
# returns from #render
|
||||
# render(sandbox_file) — return an Artifact carrying the rendered
|
||||
# bytes + trust metadata. Raise
|
||||
# Opencode::Transform::Error to abort just
|
||||
# this file (substrate logs + skips).
|
||||
# trusted?(attachment) — true if the attachment was produced by
|
||||
# this transform (used by Impostor.for and
|
||||
# by view code that decides inline-render
|
||||
# vs download). Default: filename match.
|
||||
# purge_impostors? — if true, before attaching the substrate
|
||||
# deletes any existing attachment whose
|
||||
# filename matches destination_filename
|
||||
# but fails trusted?. Default: false.
|
||||
#
|
||||
# `applies_to?(sandbox_file)` is the routing predicate the substrate
|
||||
# uses to decide whether to send this file through this transform.
|
||||
# Default is exact match against source_filename; override for
|
||||
# multi-file or glob-style ownership.
|
||||
class Transform
|
||||
Error = Class.new(StandardError)
|
||||
|
||||
def destination_filename
|
||||
raise NotImplementedError, "#{self.class.name} must implement #destination_filename"
|
||||
end
|
||||
|
||||
def source_filename
|
||||
raise NotImplementedError, "#{self.class.name} must implement #source_filename"
|
||||
end
|
||||
|
||||
def applies_to?(sandbox_file)
|
||||
sandbox_file.basename == source_filename
|
||||
end
|
||||
|
||||
def render(_sandbox_file)
|
||||
raise NotImplementedError, "#{self.class.name} must implement #render"
|
||||
end
|
||||
|
||||
def trusted?(attachment)
|
||||
attachment.filename.to_s == destination_filename
|
||||
end
|
||||
|
||||
def purge_impostors?
|
||||
false
|
||||
end
|
||||
|
||||
# Names this transform owns end-to-end. The substrate uses this to
|
||||
# keep its tool-extracted phase from racing the transform — the
|
||||
# agent's raw payload (source_filename) and the rendered output
|
||||
# (destination_filename) are both off-limits to the default attach
|
||||
# path so the transform owns the slot.
|
||||
def owned_filenames
|
||||
[ source_filename, destination_filename ]
|
||||
end
|
||||
end
|
||||
end
|
||||
642
lib/opencode/turn.rb
Normal file
642
lib/opencode/turn.rb
Normal file
@@ -0,0 +1,642 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# Opencode::Turn is an INTERNAL procedure object.
|
||||
#
|
||||
# Its constructor signature (14 keyword arguments) is NOT part of the
|
||||
# gem's public API. Use the higher-level affordances on
|
||||
# Opencode::Client instead:
|
||||
#
|
||||
# Opencode::Client#stream(session_id, prompt) { |part| ... }
|
||||
# → block-form streaming for live partials, returns Reply::Result
|
||||
#
|
||||
# Opencode::Client#send_message(session_id, prompt)
|
||||
# → sync send-and-poll for the simple no-streaming case
|
||||
#
|
||||
# If you find yourself instantiating Turn directly, file an issue —
|
||||
# that's a signal we need a higher-level API you can't yet reach.
|
||||
#
|
||||
# Subject to change without major-version bump. See lib/opencode/CLAUDE.md
|
||||
# 'Conventions and known debt' section.
|
||||
# One streaming turn against an Opencode session.
|
||||
#
|
||||
# A "turn" is one user-message + one assistant-response cycle. Turn drives
|
||||
# that cycle to completion: ensure the session, send the query, stream
|
||||
# events into a Reply, recover from common failures, persist the final
|
||||
# assistant message, and produce an Opencode::Turn::Result.
|
||||
#
|
||||
# Honest about the shape: this is a procedure object that wraps the data
|
||||
# of one turn (message, subject, exchange, reply) with the strategies
|
||||
# that drive it (session lifecycle, observer, system context, agent name)
|
||||
# and the sinks that consume the result (tracer, callbacks). The design
|
||||
# alternatives — abstract base class with hook methods, or factoring out
|
||||
# an explicit Pipeline state machine — were considered. The first is the
|
||||
# POODR-flagged inheritance-for-code-reuse anti-pattern. The second adds
|
||||
# a layer without changing the size of the procedure. We picked the
|
||||
# smallest honest shape.
|
||||
#
|
||||
# Composition over inheritance: every product-specific concern is a
|
||||
# collaborator passed in. Turn never sees Blackline, Raven, or AIGL by
|
||||
# name.
|
||||
#
|
||||
# Collaborators
|
||||
# -------------
|
||||
#
|
||||
# session_for Opencode::Session-shaped. Responds to:
|
||||
# - #ensure!(client) -> session_id String
|
||||
# - #recreate!(client) -> session_id String
|
||||
# - #just_created? -> Boolean (true iff the most recent
|
||||
# ensure!/recreate! actually minted a fresh session).
|
||||
#
|
||||
# observer_factory callable: ->(message) returning an observer that
|
||||
# responds to #watch(reply). Concretely:
|
||||
# ->(message) { Blackline::ReplyStream.new(...) }.
|
||||
#
|
||||
# system_context callable: ->(subject) -> String system prompt.
|
||||
#
|
||||
# agent_name callable: ->(subject) -> String agent slug.
|
||||
#
|
||||
# tracer Opencode::Tracer-shaped. Responds to
|
||||
# #call(name, **payload). Receives unprefixed event
|
||||
# names; the tracer prepends the product namespace.
|
||||
#
|
||||
# on_finalized callable: ->(message, exchange) called after the
|
||||
# assistant message is persisted in :completed.
|
||||
# Errors raised here are reported and contained;
|
||||
# they do not flip the message back to :error.
|
||||
#
|
||||
# on_turn_finished callable: ->(result) where result is an
|
||||
# Opencode::Turn::Result. Called once at the end of
|
||||
# every turn (any path). Errors raised here are
|
||||
# reported and contained.
|
||||
#
|
||||
# on_activity_tick callable: ->(subject) called periodically during
|
||||
# streaming so callers can keep the user's container
|
||||
# warm. Default: no-op.
|
||||
#
|
||||
# Required record-shape contract on `subject`:
|
||||
#
|
||||
# subject.id
|
||||
# subject.opencode_session_id
|
||||
#
|
||||
# Required record-shape contract on `message` (assistant message):
|
||||
#
|
||||
# message.id
|
||||
# message.reload
|
||||
# message.cancelled?
|
||||
# message.finalize!(**attrs) # CAS update from :pending state
|
||||
# message.update!(content:, status:) # for cancellation + error fallback
|
||||
#
|
||||
# Public API: only `#call`. Never raises in normal operation; all errors
|
||||
# are translated into a marked-error message and an on_turn_finished
|
||||
# callback with `result.failed?`.
|
||||
class Turn
|
||||
DEFAULT_EMPTY_STREAM_RETRY_DELAY = 2.seconds
|
||||
DEFAULT_FINAL_EXCHANGE_TIMEOUT = 120.seconds
|
||||
DEFAULT_FINAL_EXCHANGE_RETRY_DELAY = 2.seconds
|
||||
ACTIVITY_TOUCH_INTERVAL = 5.minutes.to_i
|
||||
ERROR_FALLBACK_CONTENT = "Sorry, an error occurred while generating this response."
|
||||
|
||||
# The result of running one Turn. A value object so the Symbol-vs-String
|
||||
# status confusion that lived inside the old `emit_turn_finished` payload
|
||||
# has one source of truth: the Result. Callbacks ask `result.completed?`;
|
||||
# trace consumers ask `result.trace_payload`.
|
||||
class Result
|
||||
attr_reader :status, :message, :duration_ms, :cost,
|
||||
:input_tokens, :output_tokens, :error
|
||||
|
||||
# status: :completed | :cancelled | :error | :failed
|
||||
def initialize(status:, message:, duration_ms:, error: nil)
|
||||
@status = status
|
||||
@message = message
|
||||
@duration_ms = duration_ms
|
||||
@error = error
|
||||
if message.respond_to?(:cost)
|
||||
@cost = message.cost
|
||||
@input_tokens = message.input_tokens
|
||||
@output_tokens = message.output_tokens
|
||||
end
|
||||
end
|
||||
|
||||
def completed? = @status == :completed
|
||||
def cancelled? = @status == :cancelled
|
||||
def errored? = @status == :error
|
||||
def failed? = @status == :failed
|
||||
|
||||
# The trace-event-shaped payload. Status as String to keep dashboard
|
||||
# query compatibility with pre-refactor traces. tool_count optional.
|
||||
def trace_payload(tool_count: nil)
|
||||
payload = {
|
||||
status: @status.to_s,
|
||||
duration_ms: @duration_ms,
|
||||
cost: @cost,
|
||||
input_tokens: @input_tokens,
|
||||
output_tokens: @output_tokens
|
||||
}
|
||||
if @error
|
||||
payload[:error] = @error.class.name
|
||||
payload[:error_message] = @error.message.to_s.truncate(200)
|
||||
end
|
||||
payload[:tool_count] = tool_count if tool_count
|
||||
payload.compact
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(
|
||||
message:,
|
||||
subject:,
|
||||
query_text:,
|
||||
client:,
|
||||
session_for:,
|
||||
observer_factory:,
|
||||
system_context:,
|
||||
agent_name:,
|
||||
tracer:,
|
||||
on_finalized: ->(_msg, _ex) { },
|
||||
on_turn_finished: ->(_result) { },
|
||||
on_activity_tick: ->(_subject) { },
|
||||
empty_stream_retry_delay: DEFAULT_EMPTY_STREAM_RETRY_DELAY,
|
||||
final_exchange_timeout: DEFAULT_FINAL_EXCHANGE_TIMEOUT,
|
||||
final_exchange_retry_delay: DEFAULT_FINAL_EXCHANGE_RETRY_DELAY,
|
||||
error_fallback_content: ERROR_FALLBACK_CONTENT,
|
||||
error_feature: "opencode.turn"
|
||||
)
|
||||
@message = message
|
||||
@subject = subject
|
||||
@query_text = query_text
|
||||
@client = client
|
||||
@session_for = session_for
|
||||
@observer_factory = observer_factory
|
||||
@system_context = system_context
|
||||
@agent_name = agent_name
|
||||
@tracer = tracer
|
||||
@on_finalized = on_finalized
|
||||
@on_turn_finished = on_turn_finished
|
||||
@on_activity_tick = on_activity_tick
|
||||
@empty_stream_retry_delay = empty_stream_retry_delay
|
||||
@final_exchange_timeout = final_exchange_timeout
|
||||
@final_exchange_retry_delay = final_exchange_retry_delay
|
||||
@error_fallback_content = error_fallback_content
|
||||
@error_feature = error_feature
|
||||
@pre_turn_message_count = 0
|
||||
end
|
||||
|
||||
def call
|
||||
@turn_started_at = monotonic_now
|
||||
emit("response.started", subject_id: @subject.id, message_id: @message.id)
|
||||
|
||||
attempted_recreate = false
|
||||
begin
|
||||
run_turn
|
||||
rescue Opencode::SessionNotFoundError, Opencode::StaleSessionError
|
||||
raise if attempted_recreate
|
||||
@session_for.recreate!(@client)
|
||||
# Distinguish the recovery-with-resend path: if our original
|
||||
# async send was already accepted upstream, the recreate means
|
||||
# the upstream may now have orphan work it's still spending on.
|
||||
# The on-call engineer needs this distinction at 3am.
|
||||
emit("session.recreated_with_resend",
|
||||
session_id: @subject.opencode_session_id, subject_id: @subject.id)
|
||||
attempted_recreate = true
|
||||
retry
|
||||
end
|
||||
rescue StandardError => e
|
||||
handle_unexpected_error(e)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# ---- Pipeline -------------------------------------------------------
|
||||
|
||||
def run_turn
|
||||
session_id = @session_for.ensure!(@client)
|
||||
emit_session_created_if_new
|
||||
validate_session!(session_id)
|
||||
|
||||
@client.send_message_async(
|
||||
session_id, @query_text,
|
||||
agent: @agent_name.call(@subject),
|
||||
system: @system_context.call(@subject)
|
||||
)
|
||||
|
||||
stream_result = stream_response(session_id)
|
||||
exchange = fetch_current_exchange(session_id)
|
||||
stream_result, exchange = wait_for_final_exchange_result(session_id, stream_result, exchange)
|
||||
last_assistant = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" }
|
||||
|
||||
@message.reload
|
||||
if @message.cancelled?
|
||||
save_cancelled_response(stream_result, last_assistant)
|
||||
elsif stream_result[:full_text].blank?
|
||||
recover_empty_stream(session_id, last_assistant, exchange)
|
||||
else
|
||||
finalize_response(stream_result, last_assistant, exchange)
|
||||
end
|
||||
end
|
||||
|
||||
def validate_session!(session_id)
|
||||
messages = @client.get_messages(session_id)
|
||||
@pre_turn_message_count = messages.is_a?(Array) ? messages.size : 0
|
||||
end
|
||||
|
||||
# `session.created` is emitted iff the session was *just* created by
|
||||
# the call to `ensure!` above. We ask the Session — which did the
|
||||
# work and knows the answer — instead of the subject's AR dirty
|
||||
# tracking, which would couple Turn to ActiveRecord-shaped records.
|
||||
def emit_session_created_if_new
|
||||
return unless @session_for.respond_to?(:just_created?)
|
||||
return unless @session_for.just_created?
|
||||
emit("session.created", session_id: @subject.opencode_session_id, subject_id: @subject.id)
|
||||
end
|
||||
|
||||
# ---- Streaming ------------------------------------------------------
|
||||
|
||||
def stream_response(session_id)
|
||||
reply = Opencode::Reply.new
|
||||
@reply = reply
|
||||
@observer_factory.call(@message).watch(reply)
|
||||
|
||||
stream_started_at = monotonic_now
|
||||
last_activity_touch_at = stream_started_at
|
||||
first_token_at = nil
|
||||
event_count = 0
|
||||
|
||||
begin
|
||||
release_active_record_connections
|
||||
# Throttled activity tick — fires on EVERY event including heartbeats
|
||||
# (via the stream_events :on_activity_tick kwarg). We need heartbeats
|
||||
# to count so that a user taking 30+ minutes to answer an ask-user
|
||||
# prompt keeps the container warm: the agent itself emits no events
|
||||
# while suspended, only the server's keep-alive does.
|
||||
#
|
||||
# The 5-minute throttle bounds DB write rate (one
|
||||
# update_column per tick, not per heartbeat). Reaper safety
|
||||
# is independent: the reaper's 30-minute idle threshold gives
|
||||
# 6× headroom over this throttle, so even if several ticks
|
||||
# miss the container survives.
|
||||
#
|
||||
# Wrapped in rescue so a transient DB blip on touch_activity
|
||||
# is observable but doesn't kill an otherwise-healthy in-flight
|
||||
# stream (heartbeats are advisory; next tick retries).
|
||||
activity_tick = ->(_event) {
|
||||
if (monotonic_now - last_activity_touch_at) >= ACTIVITY_TOUCH_INTERVAL
|
||||
begin
|
||||
@on_activity_tick.call(@subject)
|
||||
last_activity_touch_at = monotonic_now
|
||||
rescue StandardError => e
|
||||
Opencode::ErrorReporter.report(e, handled: true, severity: :warning,
|
||||
context: { feature: @error_feature, hook: :on_activity_tick })
|
||||
end
|
||||
end
|
||||
}
|
||||
|
||||
@client.stream_events(
|
||||
session_id: session_id,
|
||||
reply: reply,
|
||||
on_activity_tick: activity_tick
|
||||
) do |event|
|
||||
event_count += 1
|
||||
reply.apply(event)
|
||||
first_token_at ||= monotonic_now if reply.first_text_seen?
|
||||
end
|
||||
|
||||
emit("stream.completed",
|
||||
duration_ms: elapsed_ms(stream_started_at),
|
||||
first_token_ms: first_token_at && ((first_token_at - stream_started_at) * 1000).round,
|
||||
event_count: event_count,
|
||||
tool_count: reply.respond_to?(:tool_count) ? reply.tool_count : nil)
|
||||
rescue Opencode::SessionNotFoundError
|
||||
raise
|
||||
rescue StandardError => e
|
||||
Opencode::ErrorReporter.report(e, handled: true, severity: :warning,
|
||||
context: { feature: @error_feature, error_class: e.class.name })
|
||||
emit("stream.interrupted",
|
||||
duration_ms: elapsed_ms(stream_started_at),
|
||||
event_count: event_count,
|
||||
error: e.class.name,
|
||||
error_message: e.message.to_s.truncate(200))
|
||||
attempt_stream_recovery(session_id, reply)
|
||||
end
|
||||
|
||||
reply.result
|
||||
end
|
||||
|
||||
def release_active_record_connections
|
||||
return unless defined?(ActiveRecord::Base)
|
||||
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
end
|
||||
|
||||
# If the session API is still reachable, fetch the current exchange
|
||||
# and rebaseline `reply` to whatever the server has. If the API is
|
||||
# also unreachable, keep whatever the reply accumulated before the
|
||||
# interruption.
|
||||
def attempt_stream_recovery(session_id, reply)
|
||||
exchange = fetch_current_exchange(session_id)
|
||||
last_msg = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" }
|
||||
return unless last_msg
|
||||
|
||||
recovered_parts = Opencode::ResponseParser.extract_interleaved_parts(last_msg)
|
||||
reply.replace_parts(recovered_parts) if recovered_parts.any?
|
||||
rescue StandardError
|
||||
# Session API also unreachable; keep whatever the reply has.
|
||||
end
|
||||
|
||||
# ---- Finalize -------------------------------------------------------
|
||||
|
||||
def finalize_response(stream_result, last_assistant, exchange)
|
||||
result = authoritative_result(stream_result, exchange)
|
||||
attrs = {
|
||||
content: result[:full_text],
|
||||
tool_calls_json: result[:tool_parts],
|
||||
parts_json: result[:parts_json],
|
||||
status: :completed
|
||||
}
|
||||
attrs[:reasoning] = result[:reasoning_text] if result[:reasoning_text].present?
|
||||
attrs.merge!(extract_cost(last_assistant)) if last_assistant
|
||||
|
||||
unless @message.finalize!(**attrs)
|
||||
# finalize! returns false if message was cancelled/errored mid-flight.
|
||||
emit_turn_finished(status: :cancelled)
|
||||
return
|
||||
end
|
||||
|
||||
# Callbacks run AFTER the system of record is durable. If a callback
|
||||
# raises (Redis flake on Turbo broadcast, ActiveJob enqueue hiccup
|
||||
# on title generation), the turn is still completed; the failure
|
||||
# is reported and isolated. Without this isolation a successful
|
||||
# turn could be flipped to :error by an unrelated infra hiccup.
|
||||
safe_callback(:on_finalized) { @on_finalized.call(@message, exchange) }
|
||||
emit_turn_finished(status: :completed)
|
||||
end
|
||||
|
||||
def authoritative_result(stream_result, exchange)
|
||||
exchange_result = current_turn_result(exchange)
|
||||
return stream_result unless exchange_result
|
||||
return stream_result if exchange_result[:full_text].blank?
|
||||
|
||||
merge_stream_only_parts(stream_result, exchange_result)
|
||||
end
|
||||
|
||||
# The final session poll is authoritative for answer text and terminal
|
||||
# tool payloads, but OpenCode emits some events (`todo.updated`, and
|
||||
# whatever future bus events join Opencode::PartSource::STREAM_ONLY)
|
||||
# that never persist as message parts. Preserve those synthetic
|
||||
# stream parts across finalization so the refresh-rendered UI does
|
||||
# not drop the live state the user watched stream in.
|
||||
def merge_stream_only_parts(stream_result, exchange_result)
|
||||
stream_parts = Array(stream_result[:parts_json])
|
||||
return exchange_result unless stream_parts.any? { |part| Opencode::PartSource.stream_only?(part) }
|
||||
|
||||
exchange_parts = Array(exchange_result[:parts_json]).dup
|
||||
merged = []
|
||||
|
||||
stream_parts.each do |part|
|
||||
if Opencode::PartSource.stream_only?(part)
|
||||
merged << part
|
||||
elsif exchange_parts.any?
|
||||
merged << exchange_parts.shift
|
||||
end
|
||||
end
|
||||
|
||||
merged.concat(exchange_parts)
|
||||
Opencode::Reply.distill(merged)
|
||||
end
|
||||
|
||||
def wait_for_final_exchange_result(session_id, stream_result, exchange)
|
||||
result = authoritative_result(stream_result, exchange)
|
||||
sync_reply_from_result(result)
|
||||
return [ result, exchange ] if terminal_exchange_result?(result, exchange)
|
||||
return [ result, exchange ] unless exchange_indicates_more_work?(exchange)
|
||||
|
||||
emit("response.waiting_for_final_text", subject_id: @subject.id, message_id: @message.id)
|
||||
deadline = monotonic_now + @final_exchange_timeout
|
||||
|
||||
loop do
|
||||
return [ result, exchange ] if monotonic_now >= deadline
|
||||
|
||||
sleep @final_exchange_retry_delay if @final_exchange_retry_delay.positive?
|
||||
exchange = fetch_current_exchange(session_id)
|
||||
result = authoritative_result(stream_result, exchange)
|
||||
sync_reply_from_result(result)
|
||||
return [ result, exchange ] if terminal_exchange_result?(result, exchange)
|
||||
return [ result, exchange ] unless exchange_indicates_more_work?(exchange)
|
||||
end
|
||||
end
|
||||
|
||||
def sync_reply_from_result(result)
|
||||
return unless @reply.respond_to?(:sync_recovered_parts)
|
||||
return if result[:parts_json].blank?
|
||||
|
||||
@reply.sync_recovered_parts(result[:parts_json])
|
||||
end
|
||||
|
||||
def terminal_exchange_result?(result, exchange)
|
||||
return false if result[:full_text].blank?
|
||||
|
||||
last_assistant = current_turn_assistant_messages(exchange).last
|
||||
return true unless last_assistant
|
||||
return false if assistant_in_progress?(last_assistant)
|
||||
|
||||
assistant_finish(last_assistant) != "tool-calls"
|
||||
end
|
||||
|
||||
def exchange_indicates_more_work?(exchange)
|
||||
last_assistant = current_turn_assistant_messages(exchange).last
|
||||
return false unless last_assistant
|
||||
|
||||
assistant_finish(last_assistant) == "tool-calls" || assistant_in_progress?(last_assistant)
|
||||
end
|
||||
|
||||
def assistant_finish(assistant_message)
|
||||
assistant_message.dig(:info, :finish).to_s
|
||||
end
|
||||
|
||||
def assistant_in_progress?(assistant_message)
|
||||
time = assistant_message.dig(:info, :time)
|
||||
return false unless time.is_a?(Hash)
|
||||
return false unless time.key?(:created)
|
||||
|
||||
time[:completed].blank?
|
||||
end
|
||||
|
||||
def current_turn_result(exchange)
|
||||
parts = current_turn_assistant_messages(exchange).flat_map do |assistant_message|
|
||||
Opencode::ResponseParser.extract_interleaved_parts(assistant_message)
|
||||
end
|
||||
return nil if parts.empty?
|
||||
|
||||
Opencode::Reply.distill(parts)
|
||||
end
|
||||
|
||||
def current_turn_assistant_messages(exchange)
|
||||
Array(exchange).select { |message| message.dig(:info, :role) == "assistant" }
|
||||
end
|
||||
|
||||
def save_cancelled_response(stream_result, last_assistant)
|
||||
content = stream_result[:full_text].presence || "Response was stopped."
|
||||
attrs = {
|
||||
content: content,
|
||||
tool_calls_json: stream_result[:tool_parts],
|
||||
parts_json: stream_result[:parts_json]
|
||||
}
|
||||
attrs[:reasoning] = stream_result[:reasoning_text] if stream_result[:reasoning_text].present?
|
||||
attrs.merge!(extract_cost(last_assistant)) if last_assistant
|
||||
@message.update!(**attrs)
|
||||
emit_turn_finished(status: :cancelled)
|
||||
end
|
||||
|
||||
# ---- Empty-stream recovery ------------------------------------------
|
||||
|
||||
def recover_empty_stream(session_id, last_assistant, exchange)
|
||||
recovered = recover_from_exchange(last_assistant)
|
||||
|
||||
unless recovered
|
||||
sleep @empty_stream_retry_delay if @empty_stream_retry_delay.positive?
|
||||
exchange = fetch_current_exchange(session_id)
|
||||
last_assistant = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" }
|
||||
recovered = recover_from_exchange(last_assistant)
|
||||
end
|
||||
|
||||
if recovered
|
||||
emit("response.recovered_from_exchange")
|
||||
finalize_response(recovered, last_assistant, exchange)
|
||||
elsif detect_upstream_error(last_assistant)
|
||||
mark_error(reason: "upstream_llm_error")
|
||||
else
|
||||
mark_error(reason: "empty_stream")
|
||||
end
|
||||
end
|
||||
|
||||
def recover_from_exchange(assistant_message)
|
||||
return nil unless assistant_message
|
||||
parts_json = Opencode::ResponseParser.extract_interleaved_parts(assistant_message)
|
||||
return nil if parts_json.empty?
|
||||
result = Opencode::Reply.distill(parts_json)
|
||||
return nil if result[:full_text].blank?
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
def detect_upstream_error(assistant_message)
|
||||
return nil unless assistant_message
|
||||
error = Opencode::ResponseParser.extract_error(assistant_message)
|
||||
return nil unless error
|
||||
|
||||
emit("response.upstream_error",
|
||||
error_name: error[:name],
|
||||
error_message: error[:message],
|
||||
status_code: error[:status_code],
|
||||
provider_url: error[:url])
|
||||
|
||||
Opencode::ErrorReporter.report(
|
||||
Opencode::Error.new("Upstream LLM error: #{error[:name]} - #{error[:message]}"),
|
||||
handled: true,
|
||||
severity: :error,
|
||||
context: { feature: @error_feature, **error }
|
||||
)
|
||||
|
||||
error
|
||||
end
|
||||
|
||||
# ---- Error paths ----------------------------------------------------
|
||||
|
||||
# Both error paths transition the message to :error through the
|
||||
# CAS-safe Message#error! contract — a concurrent cancel that already
|
||||
# moved the row out of :pending wins, and the canceller's terminal
|
||||
# state survives. emit_turn_finished re-reads the persisted state
|
||||
# (Result.message is reloaded) so callbacks receive the actual
|
||||
# current state, not the state we wished we wrote.
|
||||
|
||||
def handle_unexpected_error(e)
|
||||
Opencode::ErrorReporter.report(e, handled: true, severity: :error,
|
||||
context: { feature: @error_feature, message_id: @message.id, error_class: e.class.name })
|
||||
@message.error!(@error_fallback_content)
|
||||
emit_turn_finished(status: :failed, error: e)
|
||||
end
|
||||
|
||||
def mark_error(reason:)
|
||||
emit("response.error", reason: reason, message_id: @message.id, subject_id: @subject.id)
|
||||
@message.error!(@error_fallback_content)
|
||||
emit_turn_finished(status: :error)
|
||||
end
|
||||
|
||||
# ---- Trace + callback helpers --------------------------------------
|
||||
|
||||
def emit(name, **payload)
|
||||
@tracer.call(name, **payload)
|
||||
end
|
||||
|
||||
def emit_turn_finished(status:, error: nil)
|
||||
@message.reload if @message.respond_to?(:reload)
|
||||
result = Result.new(
|
||||
status: status,
|
||||
message: @message,
|
||||
duration_ms: elapsed_ms(@turn_started_at),
|
||||
error: error
|
||||
)
|
||||
|
||||
safe_callback(:on_turn_finished) { @on_turn_finished.call(result) }
|
||||
|
||||
tool_count = @message.respond_to?(:tool_calls_json) ? @message.tool_calls_json&.size.to_i : nil
|
||||
emit("turn.finished", **result.trace_payload(tool_count: tool_count))
|
||||
end
|
||||
|
||||
# Run a callback, report any exception, but keep the turn in its
|
||||
# current durable state. Side-effect callbacks (broadcast, artifact
|
||||
# collection, title enqueueing) are not allowed to overwrite
|
||||
# :completed → :error after the message is already persisted.
|
||||
def safe_callback(name)
|
||||
yield
|
||||
rescue StandardError => e
|
||||
Opencode::ErrorReporter.report(e, handled: true, severity: :warning,
|
||||
context: { feature: @error_feature, callback: name, message_id: @message.id, error_class: e.class.name })
|
||||
emit("callback.error", callback: name.to_s, error_class: e.class.name)
|
||||
end
|
||||
|
||||
# ---- Exchange + cost helpers ---------------------------------------
|
||||
|
||||
def fetch_current_exchange(session_id)
|
||||
messages = @client.get_messages(session_id)
|
||||
return [] unless messages.is_a?(Array) && messages.any?
|
||||
|
||||
search_start_idx = [ @pre_turn_message_count.to_i, messages.length ].min
|
||||
last_user_idx = nil
|
||||
(messages.length - 1).downto(search_start_idx) do |idx|
|
||||
message = messages[idx]
|
||||
if message.dig(:info, :role) == "user" && user_message_text(message) == @query_text.to_s
|
||||
last_user_idx = idx
|
||||
break
|
||||
end
|
||||
end
|
||||
return [] unless last_user_idx
|
||||
messages[(last_user_idx + 1)..]
|
||||
rescue Opencode::Error => e
|
||||
Opencode::ErrorReporter.report(e, handled: true, severity: :warning,
|
||||
context: { feature: @error_feature, action: "fetch_current_exchange", session_id: session_id })
|
||||
[]
|
||||
end
|
||||
|
||||
def user_message_text(message)
|
||||
Opencode::ResponseParser.extract_text(message).to_s
|
||||
end
|
||||
|
||||
def extract_cost(assistant_msg)
|
||||
cost = Opencode::ResponseParser.extract_cost(assistant_msg)
|
||||
cache = Opencode::ResponseParser.extract_cache_tokens(assistant_msg)
|
||||
tokens = Opencode::ResponseParser.extract_tokens(assistant_msg) || {}
|
||||
{
|
||||
cost: cost,
|
||||
input_tokens: tokens[:input],
|
||||
output_tokens: tokens[:output],
|
||||
cache_read_tokens: cache[:cache_read],
|
||||
cache_write_tokens: cache[:cache_write]
|
||||
}.compact
|
||||
end
|
||||
|
||||
# ---- Time helpers ---------------------------------------------------
|
||||
|
||||
def monotonic_now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
def elapsed_ms(t) = ((monotonic_now - t) * 1000).round
|
||||
end
|
||||
end
|
||||
85
lib/opencode/uploaded_files_prompt.rb
Normal file
85
lib/opencode/uploaded_files_prompt.rb
Normal file
@@ -0,0 +1,85 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Opencode
|
||||
# The prompt body to send to an OpenCode agent when the user attached
|
||||
# files: the user's text plus an instruction block naming each file by
|
||||
# its sandboxed filename so the agent can read it with the `read` tool.
|
||||
#
|
||||
# Two outputs, both explicit:
|
||||
#
|
||||
# text — the prompt body to pass to send_message_async
|
||||
# sandbox_file_names — map of sandbox_name => original filename,
|
||||
# used by ReplyStream to show the user a
|
||||
# recognizable name when the agent reads the
|
||||
# file back.
|
||||
#
|
||||
# Previously this work lived in `Opencode::SandboxFiles`, an ActiveSupport
|
||||
# concern that mutated a hidden `@sandbox_file_names` instance variable on
|
||||
# the including job. ReplyStream then read that ivar back through a
|
||||
# closure. State across class boundaries via shared mutable ivars is the
|
||||
# kind of Sandi-smelly action-at-a-distance that breaks the moment
|
||||
# someone forgets the contract. This value object replaces that with two
|
||||
# named return values.
|
||||
#
|
||||
# Side effect, unchanged from the concern: file bytes are copied from
|
||||
# ActiveStorage into the per-user OpenCode sandbox directory so the
|
||||
# agent can read them with the `read` tool. The copy is path-escape
|
||||
# guarded (the cleanpath of the destination must start with the
|
||||
# sandbox dir prefix, no symlink trickery).
|
||||
class UploadedFilesPrompt
|
||||
attr_reader :text, :sandbox_file_names
|
||||
|
||||
def initialize(user_message:, sandbox_path:, sandbox_name_for:)
|
||||
@user_message = user_message
|
||||
@sandbox_path = sandbox_path
|
||||
@sandbox_name_for = sandbox_name_for
|
||||
@sandbox_file_names = {}
|
||||
@text = build_text
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_text
|
||||
raw = @user_message.content.to_s
|
||||
return raw unless @user_message.files.attached?
|
||||
|
||||
file_instructions = @user_message.files.map do |file|
|
||||
sandbox_file = copy_to_sandbox(file)
|
||||
@sandbox_file_names[sandbox_file.sandbox_name] = file.filename.to_s
|
||||
"#{file.filename} -> #{sandbox_file.sandbox_name} (#{file.content_type}, #{file.byte_size} bytes)"
|
||||
end
|
||||
|
||||
[
|
||||
raw,
|
||||
"",
|
||||
"The user uploaded #{file_instructions.size} file(s). Read each file thoroughly, then consult your reference materials and verify any legal claims before responding:",
|
||||
*file_instructions
|
||||
].join("\n").strip
|
||||
end
|
||||
|
||||
def copy_to_sandbox(file)
|
||||
FileUtils.mkdir_p(@sandbox_path)
|
||||
|
||||
sandbox_name = @sandbox_name_for.call(file)
|
||||
dest = File.join(@sandbox_path, sandbox_name)
|
||||
|
||||
resolved = Pathname.new(dest).cleanpath.to_s
|
||||
unless resolved.start_with?(@sandbox_path)
|
||||
raise ArgumentError, "Filename escapes sandbox: #{sandbox_name}"
|
||||
end
|
||||
|
||||
File.open(dest, "wb") { |f| f.write(file.download) }
|
||||
Placement.new(sandbox_name, dest)
|
||||
end
|
||||
|
||||
# Tiny value pair returned by copy_to_sandbox: the canonical filename
|
||||
# the agent should read by, and the on-disk path the file ended up at.
|
||||
# Internal to UploadedFilesPrompt — the caller (UploadedFilesPrompt
|
||||
# itself) only needs the sandbox_name to embed in the prompt text.
|
||||
Placement = Struct.new(:sandbox_name, :path) do
|
||||
def to_s
|
||||
path
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user