diff --git a/lib/opencode-rails.rb b/lib/opencode-rails.rb index e36db1b..69f5c26 100644 --- a/lib/opencode-rails.rb +++ b/lib/opencode-rails.rb @@ -10,10 +10,12 @@ require "opencode-ruby" -require "active_support/core_ext/object/blank" +require "active_support/core_ext/object/blank" # blank?, present?, presence require "active_support/core_ext/object/try" -require "active_support/core_ext/hash/keys" -require "active_support/core_ext/string/inflections" +require "active_support/core_ext/hash/keys" # deep_stringify_keys, deep_symbolize_keys +require "active_support/core_ext/string/inflections" # demodulize, underscore, camelize +require "active_support/core_ext/string/filters" # squish, truncate +require "active_support/core_ext/numeric/time" # 2.seconds, 5.minutes, etc. require_relative "opencode/rails/version" require_relative "opencode/error_reporter" diff --git a/lib/opencode/artifact.rb b/lib/opencode/artifact.rb new file mode 100644 index 0000000..1d24913 --- /dev/null +++ b/lib/opencode/artifact.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +module Opencode + # A file the host wants to attach to an assistant message: filename, + # content bytes, MIME type, and an optional trust-metadata hash. + # + # Artifacts come from two places in the substrate: + # + # - Opencode::Exchange.tool_artifacts — content lives inside a tool + # call's input/metadata (write tool). + # - Opencode::SandboxFile#as_artifact — identity conversion of a + # sandbox-resident file (the default path for Blackline + Raven). + # + # Transforms also return Artifacts; that's why FlightResultsTransform + # returns one with the host-rendered HTML + trust metadata stamp. + # + # An Artifact knows how to attach itself to a message, idempotently: + # it consults `message.artifacts` to skip if its filename is already + # there. The attaching verb belongs to the Artifact (the noun whose + # state the verb consults), not to a separate Attacher class. + class Artifact + attr_reader :filename, :content, :content_type, :metadata + + def initialize(filename:, content:, content_type:, metadata: {}) + @filename = filename + @content = content + @content_type = content_type + @metadata = metadata + end + + # Idempotent attach. Returns true if newly attached, false if the + # filename was already present on the message (so callers can count + # what they actually persisted vs what was already there). + def attach_to(message) + return false if already_attached_to?(message) + + message.artifacts.attach( + io: StringIO.new(content), + filename: filename, + content_type: content_type, + metadata: metadata + ) + true + end + + def already_attached_to?(message) + message.artifacts.any? { |a| a.filename.to_s == filename } + end + + def ==(other) + other.is_a?(Artifact) && + other.filename == filename && + other.content == content && + other.content_type == content_type && + other.metadata == metadata + end + alias_method :eql?, :== + + def hash + [ filename, content, content_type, metadata ].hash + end + end +end diff --git a/lib/opencode/exchange.rb b/lib/opencode/exchange.rb new file mode 100644 index 0000000..ddd5b99 --- /dev/null +++ b/lib/opencode/exchange.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +module Opencode + # The OpenCode messages produced by a single turn (the array returned + # by GET /session/:id/message and consumed by Opencode::Turn's + # recovery + finalization paths). + # + # First-class noun rather than a bare array because: + # + # - It owns the "give me the tool-produced artifacts" question, so + # callers don't reach into ResponseParser for that. The parser is + # about wire-shape extraction; the Exchange is about the domain + # concept of "what files came out of this turn." + # + # - It owns the "opencode.apply_patch.artifacts_dropped" event + # emission, keeping ResponseParser a pure module (no instrumentation + # side effects). Pure functions stay pure. The event flows through + # Opencode::Instrumentation, so hosts wire AS::Notifications / + # Rails.event / OpenTelemetry / etc. via the adapter. + class Exchange + def initialize(messages) + @messages = Array(messages) + end + + # Returns Opencode::Artifact values for every file produced by a + # tool call in this exchange (currently the `write` tool; apply_patch + # is acknowledged-but-empty in v1.15+, see ResponseParser). + # + # `exclude:` filters by destination filename — used by the substrate + # to keep tool-extracted Artifacts from racing per-message transforms + # that own the same filenames. + def tool_artifacts(exclude: []) + excluded = Set.new(exclude) + raw = Opencode::ResponseParser.extract_artifacts_from_messages(@messages) + notify_drops(raw) + + raw.filter_map do |file_data| + next if excluded.include?(file_data[:filename]) + + Artifact.new( + filename: file_data[:filename], + content: file_data[:content], + content_type: file_data[:content_type] + ) + end + end + + private + + # ResponseParser annotates dropped apply_patch parts on the messages + # it processes (since v1.15+ wire shape carries no inline post-write + # content). The notify lives here, not in the parser, so the parser + # stays a pure function. Operators see one event per assistant + # message that contained an apply_patch tool call. + def notify_drops(_) + @messages.each do |message| + next unless message.dig(:info, :role) == "assistant" + + parts = message[:parts] || [] + parts.each do |part| + next unless part[:type] == "tool" && part[:tool] == "apply_patch" + next unless part.dig(:state, :status) == "completed" + + file_entries = part.dig(:state, :metadata, :files) || [] + eligible = file_entries.reject { |e| e[:type] == "delete" } + next if eligible.empty? + + Opencode::Instrumentation.instrument("opencode.apply_patch.artifacts_dropped", + file_count: eligible.size, + relative_paths: eligible.filter_map { |e| e[:relativePath] }.first(5), + message_id: part[:messageID], + session_id: part[:sessionID], + reason: "apply_patch v1.15+ metadata does not include post-write file content; " \ + "extraction requires sandbox-read which is not yet wired into ResponseParser") { } + end + end + end + end +end diff --git a/lib/opencode/impostor.rb b/lib/opencode/impostor.rb new file mode 100644 index 0000000..57bd20c --- /dev/null +++ b/lib/opencode/impostor.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Opencode + # An ActiveStorage::Attachment on an assistant message that uses a + # trusted Transform's destination filename but fails the transform's + # `#trusted?` predicate. In plain English: a same-named attachment + # that wasn't produced by the host-trusted renderer pipeline. + # + # Where impostors come from: + # + # 1. A previous job retry attached the destination filename via the + # tool-extracted path (the agent wrote a file with that name and + # it landed before the trusted render did). + # 2. A pre-substrate code path persisted an agent-authored HTML file + # with the destination filename (the historical AIGL exploit + # surface that motivated the trust boundary in the first place). + # 3. A previous transform version stamped different metadata and the + # trust check now correctly rejects it. + # + # The Impostor knows how to remove itself. The orchestrator just asks + # "are there impostors of this transform on this message?" and tells + # each one to `purge!`. Purging is a verb that belongs to the + # impostor — it's the noun whose state the purge mutates. + class Impostor + # Finds impostors of `transform` on `message` — attachments whose + # filename matches the transform's destination but whose contents + # fail the transform's trust predicate. + def self.for(message:, transform:) + target = transform.destination_filename + message.artifacts + .select { |a| a.filename.to_s == target } + .reject { |a| transform.trusted?(a) } + .map { |a| new(attachment: a) } + end + + def initialize(attachment:) + @attachment = attachment + end + + def purge! + @attachment.purge + end + + def filename + @attachment.filename.to_s + end + end +end diff --git a/lib/opencode/message_artifacts.rb b/lib/opencode/message_artifacts.rb new file mode 100644 index 0000000..3591679 --- /dev/null +++ b/lib/opencode/message_artifacts.rb @@ -0,0 +1,133 @@ +# frozen_string_literal: true + +module Opencode + # The collection of new artifacts attached to an assistant message as + # a result of one turn. The orchestrator that used to live in + # Opencode::ArtifactCollector now lives on this collection — instead + # of a "Collector" verb-class, the collection knows how to populate + # itself from sources (tool exchange, sandbox) and how to attach. + # + # Two-line usage: + # + # Opencode::MessageArtifacts.new(message: m, feature: "blackline", transforms: []) + # .attach_from(exchange: exchange, sandbox: sandbox) + # + # All four phases (tool extract, transform routing, impostor purge, + # default sandbox attach) live as small named methods. The substrate + # never special-cases a product — `:feature` is only for error-report + # context, and `:transforms` (default []) is per-product policy. + # + # Idempotent under retry: `Opencode::Artifact#attach_to` already + # skips when the filename is present on the message, and the + # tool-extracted phase excludes filenames the transforms own. + class MessageArtifacts + MAX_SANDBOX_ARTIFACTS = 20 + + # default_attach values: + # :all — Blackline/Raven default. Every safe sandbox file that + # no transform claims falls through to identity attach. + # The agent's `write` outputs are final document bytes the + # host serves back unchanged. + # :none — AIGL. The agent's sandbox is full of internal working + # scratch (notes.md, map.md, timeline.md) plus the one + # file the transform claims (flight-results.json). Only + # transform-claimed files attach; everything else stays + # agent-internal. + def initialize(message:, feature:, transforms: [], default_attach: :all, + max_sandbox_files: MAX_SANDBOX_ARTIFACTS) + @message = message + @feature = feature + @transforms = transforms + @default_attach = default_attach + @max_sandbox_files = max_sandbox_files + end + + # Drains both sources and attaches. Returns self so callers can + # chain off it if they want to count what landed. + def attach_from(exchange: nil, sandbox: nil, cutoff: nil, upload_echo: []) + attach_from_exchange(exchange) if exchange + attach_from_sandbox(sandbox, cutoff: cutoff, upload_echo: upload_echo) if sandbox + self + rescue StandardError => e + report(e, action: "attach_artifacts") + self + end + + private + + attr_reader :message, :feature, :transforms, :max_sandbox_files, :default_attach + + # Tool-produced artifacts (write tool's input content). Skip any + # filename a transform owns — those land via the sandbox path so the + # transform's trust pipeline (render + metadata stamp) is the only + # way the bytes reach the user. + def attach_from_exchange(exchange) + exchange.tool_artifacts(exclude: transform_owned_filenames).each do |artifact| + artifact.attach_to(message) + end + rescue StandardError => e + report(e, action: "attach_from_exchange") + end + + def attach_from_sandbox(sandbox, cutoff:, upload_echo:) + return unless sandbox.exists? + + uploaded = Set.new(upload_echo) + attached = 0 + + sandbox.files(after: cutoff).each do |file| + break if attached >= max_sandbox_files + next if uploaded.include?(file.basename) + + if (transform = transforms.find { |t| t.applies_to?(file) }) + attached += 1 if apply_transform(transform, file) + elsif default_attach == :all + # Default identity path. Blackline/Raven default — every safe + # sandbox file that no transform claims attaches as-is. AIGL + # passes default_attach: :none so non-transform files (the + # agent's notes.md / map.md / timeline.md scratch) don't + # auto-attach. + attached += 1 if file.as_artifact.attach_to(message) + end + end + rescue StandardError => e + report(e, action: "attach_from_sandbox") + end + + # Returns true if a fresh trusted artifact was attached. Falsy on + # already-trusted-attached, transform-raised, or duplicate-filename. + def apply_transform(transform, file) + if transform.purge_impostors? + purged = Impostor.for(message: message, transform: transform) + if purged.any? + purged.each(&:purge!) + # ActiveStorage purges the attachment + blob, but `message.artifacts` + # holds the pre-purge collection in memory. Without resetting, + # Artifact#already_attached_to? still sees the (just-purged) row + # and shortcuts the trusted attach below. + message.artifacts.reset + end + end + return false if trusted_present?(transform) + + artifact = transform.render(file) + artifact.attach_to(message) + rescue Transform::Error => e + report(e, action: "transform_#{transform.class.name.demodulize}") + false + end + + def trusted_present?(transform) + message.artifacts.any? { |a| transform.trusted?(a) } + end + + def transform_owned_filenames + transforms.flat_map(&:owned_filenames) + end + + def report(error, action:) + Opencode::ErrorReporter.report(error, handled: true, severity: :warning, + context: { feature: feature, action: action, message_id: message.id }) + end + end +end diff --git a/lib/opencode/sandbox.rb b/lib/opencode/sandbox.rb new file mode 100644 index 0000000..5192fa8 --- /dev/null +++ b/lib/opencode/sandbox.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +module Opencode + # The per-user (or per-trip) sandbox directory the agent's container + # writes into. A first-class noun rather than a path-string with + # primitives sprinkled around the codebase: the Sandbox knows its + # own path, knows how to walk itself, knows what "fresh enough" means + # for a given turn, and yields SandboxFile values that carry their + # own safety predicate. + # + # Used by Opencode::MessageArtifacts. Construct one with the path, + # then ask it for `files(after:)` where `after` is the user message's + # created_at time (minus CUTOFF_SLACK). Files older than the cutoff + # are stale leftovers from a previous turn — never attached. + class Sandbox + # Two-second slack absorbs clock skew between the Rails app and the + # per-user OpenCode container. Without it, a file written by the + # container in the same wall-clock second as the user message could + # be (mtime < created_at) and get rejected. + CUTOFF_SLACK = 2.seconds + + attr_reader :path + + def initialize(path:, max_file_bytes: Opencode::ResponseParser::MAX_ARTIFACT_SIZE) + @path = path + @max_file_bytes = max_file_bytes + end + + def exists? + path.present? && Dir.exist?(path) + end + + # Yields SandboxFile values for every file in the sandbox that + # passes its own #safe? predicate AND was modified after the cutoff. + # When `after:` is nil (callers without a user_message handle, e.g. + # AIGL on certain finalize paths), no mtime filter is applied — + # only safety + filetype. + def files(after: nil) + return enum_for(:files, after: after) unless block_given? + return unless exists? + + cutoff = after && (after.to_time - CUTOFF_SLACK) + + Dir.glob(File.join(path, "*")).each do |entry| + next unless File.file?(entry) + next if cutoff && File.mtime(entry) < cutoff + + file = SandboxFile.new( + path: entry, + sandbox_prefix: prefix, + max_bytes: @max_file_bytes + ) + next unless file.safe? + + yield file + end + end + + def file(basename, after: nil) + files(after: after).find { |f| f.basename == basename } + end + + private + + # Separator-terminated prefix so /sandbox-1 doesn't false-positive + # on /sandbox-10/foo when SandboxFile checks realpath containment. + def prefix + @prefix ||= File.join(path, "") + end + end +end diff --git a/lib/opencode/sandbox_file.rb b/lib/opencode/sandbox_file.rb new file mode 100644 index 0000000..554fda3 --- /dev/null +++ b/lib/opencode/sandbox_file.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +require "pathname" + +module Opencode + # One file living inside an Opencode::Sandbox. + # + # Carries the safety predicate inline (#safe?) so the orchestrator + # doesn't have to know what "safe" means — symlink, realpath inside + # the sandbox, size cap. Carries the default identity conversion to + # Artifact (#as_artifact) so non-transform code can attach a sandbox + # file as-is without re-implementing the marcel + StringIO ceremony. + # + # mtime-cutoff freshness lives on Opencode::Sandbox#files(after:), + # not here — the file doesn't know which turn opened "after." That's + # a property of the scan, not a property of the file. + class SandboxFile + attr_reader :path, :sandbox_prefix + + def initialize(path:, sandbox_prefix:, max_bytes:) + @path = path + @sandbox_prefix = sandbox_prefix + @max_bytes = max_bytes + end + + def basename + File.basename(path) + end + + def size + File.size(path) + end + + def mtime + File.mtime(path) + end + + def content + File.read(path) + end + + def content_type + Marcel::MimeType.for(name: basename) + end + + # Defense-in-depth on individual file paths the scan yielded: + # + # - Reject symlinks (no follow-the-link escape). + # - The resolved realpath of the path must lie inside the sandbox + # with a separator-terminated prefix so /sandbox-1 doesn't false- + # positive on /sandbox-10/foo. + # - Reject anything over the size cap (default + # Opencode::ResponseParser::MAX_ARTIFACT_SIZE = 10 MB). + # + # The Sandbox scan filters non-files (directories, FIFOs) before + # yielding, so we don't re-check #file? here. + def safe? + return false if File.symlink?(path) + return false unless Pathname.new(path).realpath.to_s.start_with?(sandbox_prefix) + return false if size > @max_bytes + + true + rescue Errno::ENOENT + # Concurrent deletion between scan-yield and safety-check — treat + # as unsafe so the orchestrator skips rather than crashing. + false + end + + # Identity conversion: this sandbox file → an Artifact carrying the + # file's own bytes. Used by the substrate's default (non-transform) + # path for Blackline + Raven, whose agents write document bytes + # directly to the sandbox and expect them attached unchanged. + def as_artifact + Artifact.new( + filename: basename, + content: content, + content_type: content_type + ) + end + end +end diff --git a/lib/opencode/session.rb b/lib/opencode/session.rb new file mode 100644 index 0000000..6cd2c90 --- /dev/null +++ b/lib/opencode/session.rb @@ -0,0 +1,168 @@ +# frozen_string_literal: true + +module Opencode + # Owns the lifecycle of an OpenCode session against a domain record. + # + # Three near-identical implementations of this lifecycle existed on + # Blackline::Conversation, Raven::Conversation, and AIGL::Trip. Each + # had subtle differences (Blackline and Raven didn't take a row-level + # lock; AIGL did). Sandi Metz flagged the shotgun surgery in the + # architectural review — a change to the lifecycle had to be made in + # three places that looked alike but disagreed on locking. This PORO + # is the consolidated role. + # + # Usage: + # + # service = Opencode::Session.new( + # conversation, + # permissions_for: ->(record) { permission_rules_for(record) }, + # on_error: ->(e, **opts) { Rails.error.report(e, **opts) } # optional + # ) + # session_id = service.ensure!(client) # idempotent create-or-resolve + # service.recreate!(client) # always create fresh + # service.abort!(client) # best-effort upstream abort + # + # The permissions_for: callable receives the record at mint! time and + # returns the permissions array for client.create_session. Product- + # specific scoping (e.g. AIGL's workspace_key/trip_id branching) lives + # in the caller's lambda, not in this class — that keeps Session free + # of any reference to permission-building helpers and preserves the + # rails-tier -> containers-tier boundary the design doc locks in. + # + # The on_error: callable is invoked when abort! catches an + # Opencode::Error during teardown. Callers wire their own observability + # (Rails.error.report, OpenTelemetry, Sentry, custom logging) here. + # Defaults to nil — silently swallowing teardown errors, which matches + # the pre-inversion behaviour. Substrate has no opinion about how the + # host reports. + # + # The record must respond to: + # - #title (String) — passed as session title + # - #opencode_session_id / #opencode_session_id= — string column + # - #with_lock(&block) — ActiveRecord row-level lock + # - #update! / #reload — standard ActiveRecord + # - #id — for error reporting context + class Session + def initialize(record, permissions_for:, on_error: nil) + @record = record + @permissions_for = permissions_for + @on_error = on_error + @just_created = false + end + + # True iff the most recent ensure!/recreate! actually created a + # fresh upstream session (vs resolving to an existing id). + # + # Exists so consumers can distinguish "we just minted this" from + # "we found an existing one" without poking at ActiveRecord dirty + # tracking on the record. The right object to ask is the object + # that did the work; that's this one. + def just_created? + @just_created + end + + # Returns the session id for the record, creating an OpenCode session + # if none exists yet. Idempotent. Race-safe via row-level locking and + # double-check shortcuts that avoid the lock entirely when an id is + # already persisted. + def ensure!(client) + @just_created = false + return @record.opencode_session_id if @record.opencode_session_id.present? + + @record.with_lock do + # Double-check inside the lock: another worker may have set the + # id between our first read and acquiring the lock. + return @record.opencode_session_id if @record.opencode_session_id.present? + + mint!(client) + end + rescue ActiveRecord::RecordNotUnique + # Two workers raced past the unique-index gate. The loser reloads + # to pick up the winner's id. The winner is the one that + # just_created?; the loser sees @just_created = false. + @record.reload + @record.opencode_session_id + end + + # Always creates a fresh session and overwrites the persisted id. + # Used by Opencode::Turn recovery when the upstream session has gone + # stale (StaleSessionError / SessionNotFoundError). + # + # Race semantics: two concurrent recreate! callers serialize through + # with_lock; the first mints, the second observes the freshly-minted + # id (different from its own pre-lock snapshot) and returns that + # rather than minting again. Both callers converge on one upstream + # session — no orphan leak. + def recreate!(client) + @just_created = false + pre_lock_id = @record.opencode_session_id + + @record.with_lock do + # If another recreate! caller minted while we were waiting for + # the lock, the id changed under us. Treat their fresh mint as + # fresh enough for us too — recreate! returns "a session that's + # newer than what I saw at method entry", not "specifically my + # own mint". + current_id = @record.opencode_session_id + if current_id.present? && current_id != pre_lock_id + return current_id + end + + mint!(client) + end + end + + # Best-effort upstream abort. Swallows Opencode::Error so callers + # never have to wrap this in a rescue — aborts run inside cleanup + # paths where re-raising would mask the real cause of teardown. + def abort!(client) + return unless @record.opencode_session_id.present? + + client.abort_session(@record.opencode_session_id) + rescue Opencode::Error => e + @on_error&.call(e, action: "abort_session", record_id: @record.id) + end + + private + + # The atomic create-and-persist unit shared by ensure! and recreate!. + # + # One operation, two failure modes: + # - client.create_session raises -> nothing to clean up, re-raise + # - update! raises RecordInvalid -> upstream session exists, + # delete it before re-raising + # so we never leak orphans + # + # Sets @just_created = true on success; callers reset to false at + # method entry so a no-op call (existing id) reports false correctly. + def mint!(client) + session_id = nil + + begin + result = client.create_session(title: @record.title, permissions: @permissions_for.call(@record)) + session_id = extract_session_id(result) + @record.update!(opencode_session_id: session_id) + @just_created = true + session_id + rescue ActiveRecord::RecordInvalid + safely_delete(client, session_id) if session_id + raise + end + end + + # OpenCode HTTP responses use symbol keys when parsed via JSON.parse + # with symbolize_names: true (Opencode::Client) but mocks/stubs in + # tests often produce string-keyed hashes. Accept both. + def extract_session_id(result) + result[:id] || result["id"] + end + + def safely_delete(client, session_id) + client.delete_session(session_id) + rescue Opencode::Error + # Best-effort cleanup; the orphan may be gone already or the + # upstream is unavailable. Either way, we already have a real + # error to raise. + end + end +end diff --git a/lib/opencode/tool_display.rb b/lib/opencode/tool_display.rb new file mode 100644 index 0000000..d66f33a --- /dev/null +++ b/lib/opencode/tool_display.rb @@ -0,0 +1,423 @@ +# frozen_string_literal: true + +module Opencode + # A value object that wraps an OpenCode tool part (the shape produced by + # Opencode::Reply from `message.part.updated` events) and exposes the + # information a renderer needs — canonical tool name, human labels, + # target (filepath / pattern / url / command), semantic accessors for + # rich content (unified diffs, todo lists, bash output, etc.), and an + # icon identifier. + # + # Pure Ruby over ActiveSupport. Lives in the shared Opencode namespace + # so Blackline views, AIGL views, and any future OpenCode-backed + # feature can render tool calls consistently. + # + # ## Data shape (Opencode::Reply writes this into `parts_json`) + # + # { + # "type" => "tool", + # "tool" => "read" | "edit" | "exa_web_search_exa" | ..., + # "status" => "pending" | "running" | "completed" | "error", + # "input" => { "filePath" => ..., "content" => ..., ... }, + # "title" => "..." (optional, from tool-result) + # "error" => "..." (only when status == "error") + # "metadata" => { (optional, from tool-result) + # "diff" => "...unified diff text...", + # "diagnostics" => { filePath => [LSP diagnostics] }, + # "preview" => "...file preview...", + # "matches" => Integer, + # "count" => Integer, + # "output" => "...bash stdout...", + # "stdout" => "...bash stdout (legacy key)...", + # "description" => "...bash description...", + # "error" => truthy when the tool ran but returned an error, + # }, + # "output" => "...raw tool output string..." + # } + # + # ## MCP prefix handling + # + # OpenCode's MCP adapter prefixes tools with the server name: Exa's + # `web_search_exa` becomes `exa_web_search_exa` on the wire (double + # suffix because Exa also names the tool with an `_exa` suffix). + # `#canonical_tool` strips known MCP prefixes so switching logic can + # treat `exa_web_search_exa` and `web_search_exa` as the same tool. + # + # ## Adding a new tool + # + # Add one row to the `TOOLS` table below — every derived concern (kind, + # gerund, icon, past-tense verb, KNOWN membership) is computed from it. + # + # ## Example + # + # display = Opencode::ToolDisplay.new(part) + # display.canonical_tool # => "edit" + # display.kind # => "Edit" + # display.gerund # => "Editing" + # display.target # => "app/models/user.rb" + # display.diff # => "@@ -1,3 +1,4 @@\n..." + # display.icon # => :pencil_square + # + class ToolDisplay + # The single source of truth for every tool we render with dedicated + # affordances. One row per tool, five columns: + # + # :kind — noun label ("Read", "Web search") + # :gerund — present-progressive phrase ("Reading", "Searching the web") + # :icon — abstract icon name the view layer maps to an SVG + # :past — past-tense verb ("Read", "Searched", "Wrote") + # + # Anything not in this table falls back to generic rendering (humanize + # the canonical name + OpenCode's title). + TOOLS = { + "read" => { kind: "Read", gerund: "Reading", icon: :document, past: "Read" }, + "write" => { kind: "Write", gerund: "Writing", icon: :document_plus, past: "Wrote" }, + "edit" => { kind: "Edit", gerund: "Editing", icon: :pencil_square, past: "Edited" }, + "multiedit" => { kind: "Edit", gerund: "Editing", icon: :pencil_square, past: "Edited" }, + "apply_patch" => { kind: "Patch", gerund: "Applying changes", icon: :pencil_square, past: "Applied changes to" }, + "bash" => { kind: "Bash", gerund: "Running command", icon: :command_line, past: "Ran" }, + "grep" => { kind: "Grep", gerund: "Searching files", icon: :document_magnifying_glass, past: "Searched for" }, + "glob" => { kind: "Glob", gerund: "Searching files", icon: :magnifying_glass, past: "Searched for" }, + "list" => { kind: "LS", gerund: "Listing files", icon: :rectangle_stack, past: "Listed" }, + "ls" => { kind: "LS", gerund: "Listing files", icon: :rectangle_stack, past: "Listed" }, + "webfetch" => { kind: "Fetch", gerund: "Reading web page", icon: :globe, past: "Fetched" }, + "websearch" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" }, + "codesearch" => { kind: "Code search", gerund: "Searching code", icon: :magnifying_glass, past: "Searched code for" }, + "web_search_exa" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" }, + # @zhafron/mcp-web-search exposes two tools: search_web (SearXNG meta- + # search) and fetch_url (Mozilla Readability page fetch). OpenCode + # prefixes them with the MCP server name from config.json + # (`local-web-search_`), which `canonical_tool` strips before lookup. + "search_web" => { kind: "Web search", gerund: "Searching the web", icon: :globe, past: "Searched" }, + "fetch_url" => { kind: "Fetch", gerund: "Reading web page", icon: :globe, past: "Fetched" }, + "get_code_context_exa" => { kind: "Code lookup", gerund: "Looking up code", icon: :document_magnifying_glass, past: "Looked up" }, + "company_research_exa" => { kind: "Company research", gerund: "Researching company", icon: :globe, past: "Researched" }, + "todowrite" => { kind: "Plan", gerund: "Planning", icon: :queue_list, past: "Updated plan" }, + "todoread" => { kind: "Plan", gerund: "Reading plan", icon: :queue_list, past: "Read plan" }, + "task" => { kind: "Task", gerund: "Researching", icon: :robot, past: "Ran subtask" }, + "skill" => { kind: "Skill", gerund: "Loading skill", icon: :sparkles, past: "Loaded skill" } + }.freeze + + KNOWN = TOOLS.keys.freeze + DEFAULT_ICON = :sparkles + + # MCP server prefixes to strip, paired with the tools they canonicalize + # to (for `#provider` classification). Prefixes sorted by length + # descending in case future additions overlap. + PROVIDERS = { + "exa" => { prefix: "exa_", canonical: %w[web_search_exa get_code_context_exa company_research_exa].freeze }, + "brave" => { prefix: "brave_", canonical: [].freeze }, + "serper" => { prefix: "serper_", canonical: [].freeze }, + "tavily" => { prefix: "tavily_", canonical: [].freeze }, + # The local-web-search MCP server registered in + # config/opencode//config.json. Hyphenated server name plus + # underscore separator; canonical tools are search_web and fetch_url. + "local-web-search" => { prefix: "local-web-search_", canonical: %w[search_web fetch_url].freeze } + }.freeze + + MCP_PREFIXES = PROVIDERS.values.map { |p| p[:prefix] }.freeze + + attr_reader :part + + def initialize(part) + @part = part || {} + end + + # Convenience constructor for raw OpenCode API parts (symbol keys, + # nested under `state`). Flattens into the canonical `parts_json` + # shape Reply persists. + # + # raw = { type: "tool", tool: "bash", callID: ..., + # state: { status: "running", input: {...}, title: "..." } } + # Opencode::ToolDisplay.from_raw(raw) + def self.from_raw(raw) + raw = (raw || {}).deep_stringify_keys + state = raw["state"] || {} + new( + "type" => "tool", + "tool" => raw["tool"], + "status" => state["status"], + "input" => state["input"] || {}, + "output" => state["output"], + "title" => state["title"], + "error" => state["error"], + "metadata" => state["metadata"] || {} + ) + end + + # ----- Identity ----------------------------------------------------- + + def tool_name + @part["tool"].to_s + end + + # Strips MCP prefixes — and, when present, the matching MCP suffix — + # so tools render cleanly regardless of how the server namespaces them. + # + # Examples: + # exa_web_search_exa → web_search_exa (KNOWN tool, preserved) + # exa_web_fetch_exa → web_fetch (not KNOWN; cleaned for display) + # exa_nonexistent → nonexistent (prefix-stripped fallback) + # brave_read → read (KNOWN after prefix strip) + # + # The double-strip handles Exa's naming convention: the MCP server + # exports tools like `web_fetch_exa`, and OpenCode prepends `exa_`, + # producing `exa_web_fetch_exa`. Stripping both yields a readable + # `web_fetch`. + def canonical_tool + name = tool_name + return name if name.empty? || KNOWN.include?(name) + + MCP_PREFIXES.each do |prefix| + stripped = strip_mcp_decoration(name, prefix) + return stripped if stripped + end + name + end + + def known? + KNOWN.include?(canonical_tool) + end + + def icon + TOOLS.dig(canonical_tool, :icon) || DEFAULT_ICON + end + + # "Read", "Edit", "Bash", or a humanized *canonical* name for unknowns. + # Humanizes `canonical_tool`, not `tool_name` — otherwise an MCP tool + # like `exa_web_fetch_exa` whose canonical form (`web_fetch`) isn't + # in TOOLS would fall back to the raw, MCP-prefixed name and display + # as "Exa web fetch exa". Using the canonical form gives the clean + # "Web fetch" label. + def kind + TOOLS.dig(canonical_tool, :kind) || canonical_tool.humanize + end + + # "Reading", "Editing", "Running command", falls back to "...". + def gerund + TOOLS.dig(canonical_tool, :gerund) || "#{kind}..." + end + + # ----- Status ------------------------------------------------------- + + def status + @part["status"].to_s + end + + def pending? = status == "pending" + def running? = status == "running" + def completed? = status == "completed" + def errored? = status == "error" + def terminal? = completed? || errored? + def in_flight? = pending? || running? + + # ----- Raw payloads ------------------------------------------------- + + def input + @part["input"].is_a?(Hash) ? @part["input"] : {} + end + + def metadata + @part["metadata"].is_a?(Hash) ? @part["metadata"] : {} + end + + def output + @part["output"].to_s + end + + def error_text + @part["error"].to_s + end + + # OpenCode-supplied title (from tool-result). Used as a fallback for + # unknown MCP tools that don't match KNOWN. + def opencode_title + @part["title"].to_s + end + + # ----- Target (what the tool is operating on) ---------------------- + + # Returns a single-string representation of the tool's primary target, + # or nil when the tool has no meaningful single target. Callers can + # substitute display-friendly names (e.g., sandbox filenames). + def target + raw = case canonical_tool + when "read", "write", "edit", "multiedit", "apply_patch" + input["filePath"] || input["path"] + when "bash" + input["command"] + when "grep", "glob" + input["pattern"] + when "list", "ls" + input["path"] + when "webfetch", "fetch_url" + input["url"] + when "websearch", "codesearch", + "web_search_exa", "get_code_context_exa", "company_research_exa" + input["query"] + when "search_web" + # @zhafron/mcp-web-search names the argument `q`, not `query`. + input["q"] + when "task" + input["description"] + when "skill" + input["skill_name"] || input["name"] + end + raw.to_s.presence + end + + # A short label combining kind + target, suitable for a one-line + # summary. Falls back to the OpenCode-supplied title for unknown MCP + # tools, then to just kind. + def title + if known? + target.present? ? "#{kind}: #{target}" : kind + else + opencode_title.presence || kind + end + end + + # Past-tense "done" variant: "Read foo.rb", "Wrote contract.pdf", + # "Edited user.rb", "Ran `ls -la`". Used after completion. + def past_tense_title + if known? + verb = TOOLS.dig(canonical_tool, :past) + return kind unless verb + target.present? ? "#{verb} #{target}" : verb + else + opencode_title.presence || kind + end + end + + # ----- Semantic accessors (rich content) --------------------------- + + # Unified-diff text produced by the edit tool (OpenCode attaches this + # under metadata.diff). Present only after completion. + def diff + metadata["diff"].presence + end + + # Sorted list of todo hashes { "content", "status", "priority", "id" }. + # Available during running (input is populated) and completed states. + # Order: in_progress first, pending next, completed last. + # Canonicalization (string keys + status hyphen→underscore) is + # delegated to Opencode::Todo so Reply and ToolDisplay can't drift. + def todos + return [] unless %w[todowrite todoread].include?(canonical_tool) + items = input["todos"] + return [] unless items.is_a?(Array) + order = { "in_progress" => 0, "pending" => 1, "completed" => 2 } + items + .select { |t| t.is_a?(Hash) } + .map { |todo| Opencode::Todo.canonicalize(todo) } + .sort_by { |todo| order[todo["status"]] || 99 } + end + + # File content that the write tool is creating (lives in input.content). + def file_content + return nil unless canonical_tool == "write" + input["content"].presence + end + + # Syntax-highlighting language hint based on the target filename. + # Falls back to the raw extension so unknown file types still get a + # hint their syntax-highlighter may recognize heuristically. + def file_lang + name = File.basename(target.to_s) + return nil if name.empty? + ext = File.extname(name).delete_prefix(".").downcase + LANG_BY_EXT[ext] || ext.presence + end + + LANG_BY_EXT = { + "md" => "markdown", "markdown" => "markdown", + "rb" => "ruby", "rake" => "ruby", + "py" => "python", + "js" => "javascript", "mjs" => "javascript", + "ts" => "typescript", "tsx" => "typescript", + "jsx" => "jsx", + "json" => "json", "yml" => "yaml", "yaml" => "yaml", + "html" => "html", "erb" => "erb", + "css" => "css", "scss" => "scss", + "sh" => "shell", "bash" => "shell", "zsh" => "shell", + "sql" => "sql", + "go" => "go", "rs" => "rust", + "c" => "c", "h" => "c", + "cpp" => "cpp", "hpp" => "cpp", + "java" => "java", "kt" => "kotlin", + "swift" => "swift", "php" => "php", + "lua" => "lua", "toml" => "toml", + "xml" => "xml", "conf" => "shell" + }.freeze + + # Bash-specific accessors. + def bash_command + return nil unless canonical_tool == "bash" + input["command"].presence + end + + def bash_output + return nil unless canonical_tool == "bash" + (metadata["output"] || metadata["stdout"] || output).to_s.presence + end + + def bash_description + return nil unless canonical_tool == "bash" + metadata["description"].presence + end + + # Read preview (OpenCode populates metadata.preview after the read). + def read_preview + return nil unless canonical_tool == "read" + metadata["preview"].presence + end + + # Grep/Glob match counts. + def match_count + case canonical_tool + when "grep" then metadata["matches"].to_i + when "glob" then metadata["count"].to_i + end + end + + # ----- Provider identification for log tagging --------------------- + + # Groups tools by which MCP server / built-in provides them, for + # operational logs and metrics. Adding a new provider = one row in + # PROVIDERS. + def provider + name = tool_name + canonical = canonical_tool + PROVIDERS.each do |provider_name, config| + return provider_name if name.start_with?(config[:prefix]) + return provider_name if config[:canonical].include?(canonical) + end + KNOWN.include?(canonical) ? "opencode-builtin" : "unknown" + end + + private + + # Returns `name` with `prefix` (and the matching MCP suffix, where + # Exa double-encodes) removed, or nil when `name` doesn't carry that + # prefix. Precedence: + # + # 1. Prefer the single-stripped form when it's KNOWN + # (exa_web_search_exa → web_search_exa, which IS a TOOLS key). + # 2. Otherwise prefer the clean double-stripped form when both + # prefix and suffix are present + # (exa_web_fetch_exa → web_fetch, for humanization). + # 3. Fall back to single-stripped when double-stripped is empty. + def strip_mcp_decoration(name, prefix) + return nil unless name.start_with?(prefix) + + stripped = name.delete_prefix(prefix) + return stripped if KNOWN.include?(stripped) + + suffix = "_#{prefix.chomp('_')}" + return stripped unless stripped.end_with?(suffix) + + double = stripped.delete_suffix(suffix) + double.empty? ? stripped : double + end + end +end diff --git a/lib/opencode/transform.rb b/lib/opencode/transform.rb new file mode 100644 index 0000000..5bd3097 --- /dev/null +++ b/lib/opencode/transform.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +module Opencode + # A per-product rule that converts an Opencode::SandboxFile into an + # Opencode::Artifact, owning the trust boundary between "bytes the + # agent wrote" and "bytes the host signs and attaches." + # + # The default substrate path is identity: any sandbox file the + # allowlist accepts gets attached as-is. Blackline and Raven use + # the default — their agents `write` final document bytes the host + # serves back unchanged. AIGL's contract is structurally different: + # the agent writes JSON, the **host** must render that JSON into + # trusted HTML before attaching, because the resulting HTML gets + # served inline from the app origin and an agent-written filename + # can't be permitted as stored-XSS. + # + # Subclass hooks (override these — none have a generic default + # that's safe to inherit): + # + # source_filename — basename in the sandbox the transform + # reads from + # destination_filename — filename of the Artifact the transform + # returns from #render + # render(sandbox_file) — return an Artifact carrying the rendered + # bytes + trust metadata. Raise + # Opencode::Transform::Error to abort just + # this file (substrate logs + skips). + # trusted?(attachment) — true if the attachment was produced by + # this transform (used by Impostor.for and + # by view code that decides inline-render + # vs download). Default: filename match. + # purge_impostors? — if true, before attaching the substrate + # deletes any existing attachment whose + # filename matches destination_filename + # but fails trusted?. Default: false. + # + # `applies_to?(sandbox_file)` is the routing predicate the substrate + # uses to decide whether to send this file through this transform. + # Default is exact match against source_filename; override for + # multi-file or glob-style ownership. + class Transform + Error = Class.new(StandardError) + + def destination_filename + raise NotImplementedError, "#{self.class.name} must implement #destination_filename" + end + + def source_filename + raise NotImplementedError, "#{self.class.name} must implement #source_filename" + end + + def applies_to?(sandbox_file) + sandbox_file.basename == source_filename + end + + def render(_sandbox_file) + raise NotImplementedError, "#{self.class.name} must implement #render" + end + + def trusted?(attachment) + attachment.filename.to_s == destination_filename + end + + def purge_impostors? + false + end + + # Names this transform owns end-to-end. The substrate uses this to + # keep its tool-extracted phase from racing the transform — the + # agent's raw payload (source_filename) and the rendered output + # (destination_filename) are both off-limits to the default attach + # path so the transform owns the slot. + def owned_filenames + [ source_filename, destination_filename ] + end + end +end diff --git a/lib/opencode/turn.rb b/lib/opencode/turn.rb new file mode 100644 index 0000000..ed2011f --- /dev/null +++ b/lib/opencode/turn.rb @@ -0,0 +1,642 @@ +# frozen_string_literal: true + +module Opencode + # Opencode::Turn is an INTERNAL procedure object. + # + # Its constructor signature (14 keyword arguments) is NOT part of the + # gem's public API. Use the higher-level affordances on + # Opencode::Client instead: + # + # Opencode::Client#stream(session_id, prompt) { |part| ... } + # → block-form streaming for live partials, returns Reply::Result + # + # Opencode::Client#send_message(session_id, prompt) + # → sync send-and-poll for the simple no-streaming case + # + # If you find yourself instantiating Turn directly, file an issue — + # that's a signal we need a higher-level API you can't yet reach. + # + # Subject to change without major-version bump. See lib/opencode/CLAUDE.md + # 'Conventions and known debt' section. + # One streaming turn against an Opencode session. + # + # A "turn" is one user-message + one assistant-response cycle. Turn drives + # that cycle to completion: ensure the session, send the query, stream + # events into a Reply, recover from common failures, persist the final + # assistant message, and produce an Opencode::Turn::Result. + # + # Honest about the shape: this is a procedure object that wraps the data + # of one turn (message, subject, exchange, reply) with the strategies + # that drive it (session lifecycle, observer, system context, agent name) + # and the sinks that consume the result (tracer, callbacks). The design + # alternatives — abstract base class with hook methods, or factoring out + # an explicit Pipeline state machine — were considered. The first is the + # POODR-flagged inheritance-for-code-reuse anti-pattern. The second adds + # a layer without changing the size of the procedure. We picked the + # smallest honest shape. + # + # Composition over inheritance: every product-specific concern is a + # collaborator passed in. Turn never sees Blackline, Raven, or AIGL by + # name. + # + # Collaborators + # ------------- + # + # session_for Opencode::Session-shaped. Responds to: + # - #ensure!(client) -> session_id String + # - #recreate!(client) -> session_id String + # - #just_created? -> Boolean (true iff the most recent + # ensure!/recreate! actually minted a fresh session). + # + # observer_factory callable: ->(message) returning an observer that + # responds to #watch(reply). Concretely: + # ->(message) { Blackline::ReplyStream.new(...) }. + # + # system_context callable: ->(subject) -> String system prompt. + # + # agent_name callable: ->(subject) -> String agent slug. + # + # tracer Opencode::Tracer-shaped. Responds to + # #call(name, **payload). Receives unprefixed event + # names; the tracer prepends the product namespace. + # + # on_finalized callable: ->(message, exchange) called after the + # assistant message is persisted in :completed. + # Errors raised here are reported and contained; + # they do not flip the message back to :error. + # + # on_turn_finished callable: ->(result) where result is an + # Opencode::Turn::Result. Called once at the end of + # every turn (any path). Errors raised here are + # reported and contained. + # + # on_activity_tick callable: ->(subject) called periodically during + # streaming so callers can keep the user's container + # warm. Default: no-op. + # + # Required record-shape contract on `subject`: + # + # subject.id + # subject.opencode_session_id + # + # Required record-shape contract on `message` (assistant message): + # + # message.id + # message.reload + # message.cancelled? + # message.finalize!(**attrs) # CAS update from :pending state + # message.update!(content:, status:) # for cancellation + error fallback + # + # Public API: only `#call`. Never raises in normal operation; all errors + # are translated into a marked-error message and an on_turn_finished + # callback with `result.failed?`. + class Turn + DEFAULT_EMPTY_STREAM_RETRY_DELAY = 2.seconds + DEFAULT_FINAL_EXCHANGE_TIMEOUT = 120.seconds + DEFAULT_FINAL_EXCHANGE_RETRY_DELAY = 2.seconds + ACTIVITY_TOUCH_INTERVAL = 5.minutes.to_i + ERROR_FALLBACK_CONTENT = "Sorry, an error occurred while generating this response." + + # The result of running one Turn. A value object so the Symbol-vs-String + # status confusion that lived inside the old `emit_turn_finished` payload + # has one source of truth: the Result. Callbacks ask `result.completed?`; + # trace consumers ask `result.trace_payload`. + class Result + attr_reader :status, :message, :duration_ms, :cost, + :input_tokens, :output_tokens, :error + + # status: :completed | :cancelled | :error | :failed + def initialize(status:, message:, duration_ms:, error: nil) + @status = status + @message = message + @duration_ms = duration_ms + @error = error + if message.respond_to?(:cost) + @cost = message.cost + @input_tokens = message.input_tokens + @output_tokens = message.output_tokens + end + end + + def completed? = @status == :completed + def cancelled? = @status == :cancelled + def errored? = @status == :error + def failed? = @status == :failed + + # The trace-event-shaped payload. Status as String to keep dashboard + # query compatibility with pre-refactor traces. tool_count optional. + def trace_payload(tool_count: nil) + payload = { + status: @status.to_s, + duration_ms: @duration_ms, + cost: @cost, + input_tokens: @input_tokens, + output_tokens: @output_tokens + } + if @error + payload[:error] = @error.class.name + payload[:error_message] = @error.message.to_s.truncate(200) + end + payload[:tool_count] = tool_count if tool_count + payload.compact + end + end + + def initialize( + message:, + subject:, + query_text:, + client:, + session_for:, + observer_factory:, + system_context:, + agent_name:, + tracer:, + on_finalized: ->(_msg, _ex) { }, + on_turn_finished: ->(_result) { }, + on_activity_tick: ->(_subject) { }, + empty_stream_retry_delay: DEFAULT_EMPTY_STREAM_RETRY_DELAY, + final_exchange_timeout: DEFAULT_FINAL_EXCHANGE_TIMEOUT, + final_exchange_retry_delay: DEFAULT_FINAL_EXCHANGE_RETRY_DELAY, + error_fallback_content: ERROR_FALLBACK_CONTENT, + error_feature: "opencode.turn" + ) + @message = message + @subject = subject + @query_text = query_text + @client = client + @session_for = session_for + @observer_factory = observer_factory + @system_context = system_context + @agent_name = agent_name + @tracer = tracer + @on_finalized = on_finalized + @on_turn_finished = on_turn_finished + @on_activity_tick = on_activity_tick + @empty_stream_retry_delay = empty_stream_retry_delay + @final_exchange_timeout = final_exchange_timeout + @final_exchange_retry_delay = final_exchange_retry_delay + @error_fallback_content = error_fallback_content + @error_feature = error_feature + @pre_turn_message_count = 0 + end + + def call + @turn_started_at = monotonic_now + emit("response.started", subject_id: @subject.id, message_id: @message.id) + + attempted_recreate = false + begin + run_turn + rescue Opencode::SessionNotFoundError, Opencode::StaleSessionError + raise if attempted_recreate + @session_for.recreate!(@client) + # Distinguish the recovery-with-resend path: if our original + # async send was already accepted upstream, the recreate means + # the upstream may now have orphan work it's still spending on. + # The on-call engineer needs this distinction at 3am. + emit("session.recreated_with_resend", + session_id: @subject.opencode_session_id, subject_id: @subject.id) + attempted_recreate = true + retry + end + rescue StandardError => e + handle_unexpected_error(e) + end + + private + + # ---- Pipeline ------------------------------------------------------- + + def run_turn + session_id = @session_for.ensure!(@client) + emit_session_created_if_new + validate_session!(session_id) + + @client.send_message_async( + session_id, @query_text, + agent: @agent_name.call(@subject), + system: @system_context.call(@subject) + ) + + stream_result = stream_response(session_id) + exchange = fetch_current_exchange(session_id) + stream_result, exchange = wait_for_final_exchange_result(session_id, stream_result, exchange) + last_assistant = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" } + + @message.reload + if @message.cancelled? + save_cancelled_response(stream_result, last_assistant) + elsif stream_result[:full_text].blank? + recover_empty_stream(session_id, last_assistant, exchange) + else + finalize_response(stream_result, last_assistant, exchange) + end + end + + def validate_session!(session_id) + messages = @client.get_messages(session_id) + @pre_turn_message_count = messages.is_a?(Array) ? messages.size : 0 + end + + # `session.created` is emitted iff the session was *just* created by + # the call to `ensure!` above. We ask the Session — which did the + # work and knows the answer — instead of the subject's AR dirty + # tracking, which would couple Turn to ActiveRecord-shaped records. + def emit_session_created_if_new + return unless @session_for.respond_to?(:just_created?) + return unless @session_for.just_created? + emit("session.created", session_id: @subject.opencode_session_id, subject_id: @subject.id) + end + + # ---- Streaming ------------------------------------------------------ + + def stream_response(session_id) + reply = Opencode::Reply.new + @reply = reply + @observer_factory.call(@message).watch(reply) + + stream_started_at = monotonic_now + last_activity_touch_at = stream_started_at + first_token_at = nil + event_count = 0 + + begin + release_active_record_connections + # Throttled activity tick — fires on EVERY event including heartbeats + # (via the stream_events :on_activity_tick kwarg). We need heartbeats + # to count so that a user taking 30+ minutes to answer an ask-user + # prompt keeps the container warm: the agent itself emits no events + # while suspended, only the server's keep-alive does. + # + # The 5-minute throttle bounds DB write rate (one + # update_column per tick, not per heartbeat). Reaper safety + # is independent: the reaper's 30-minute idle threshold gives + # 6× headroom over this throttle, so even if several ticks + # miss the container survives. + # + # Wrapped in rescue so a transient DB blip on touch_activity + # is observable but doesn't kill an otherwise-healthy in-flight + # stream (heartbeats are advisory; next tick retries). + activity_tick = ->(_event) { + if (monotonic_now - last_activity_touch_at) >= ACTIVITY_TOUCH_INTERVAL + begin + @on_activity_tick.call(@subject) + last_activity_touch_at = monotonic_now + rescue StandardError => e + Opencode::ErrorReporter.report(e, handled: true, severity: :warning, + context: { feature: @error_feature, hook: :on_activity_tick }) + end + end + } + + @client.stream_events( + session_id: session_id, + reply: reply, + on_activity_tick: activity_tick + ) do |event| + event_count += 1 + reply.apply(event) + first_token_at ||= monotonic_now if reply.first_text_seen? + end + + emit("stream.completed", + duration_ms: elapsed_ms(stream_started_at), + first_token_ms: first_token_at && ((first_token_at - stream_started_at) * 1000).round, + event_count: event_count, + tool_count: reply.respond_to?(:tool_count) ? reply.tool_count : nil) + rescue Opencode::SessionNotFoundError + raise + rescue StandardError => e + Opencode::ErrorReporter.report(e, handled: true, severity: :warning, + context: { feature: @error_feature, error_class: e.class.name }) + emit("stream.interrupted", + duration_ms: elapsed_ms(stream_started_at), + event_count: event_count, + error: e.class.name, + error_message: e.message.to_s.truncate(200)) + attempt_stream_recovery(session_id, reply) + end + + reply.result + end + + def release_active_record_connections + return unless defined?(ActiveRecord::Base) + + ActiveRecord::Base.connection_handler.clear_active_connections! + end + + # If the session API is still reachable, fetch the current exchange + # and rebaseline `reply` to whatever the server has. If the API is + # also unreachable, keep whatever the reply accumulated before the + # interruption. + def attempt_stream_recovery(session_id, reply) + exchange = fetch_current_exchange(session_id) + last_msg = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" } + return unless last_msg + + recovered_parts = Opencode::ResponseParser.extract_interleaved_parts(last_msg) + reply.replace_parts(recovered_parts) if recovered_parts.any? + rescue StandardError + # Session API also unreachable; keep whatever the reply has. + end + + # ---- Finalize ------------------------------------------------------- + + def finalize_response(stream_result, last_assistant, exchange) + result = authoritative_result(stream_result, exchange) + attrs = { + content: result[:full_text], + tool_calls_json: result[:tool_parts], + parts_json: result[:parts_json], + status: :completed + } + attrs[:reasoning] = result[:reasoning_text] if result[:reasoning_text].present? + attrs.merge!(extract_cost(last_assistant)) if last_assistant + + unless @message.finalize!(**attrs) + # finalize! returns false if message was cancelled/errored mid-flight. + emit_turn_finished(status: :cancelled) + return + end + + # Callbacks run AFTER the system of record is durable. If a callback + # raises (Redis flake on Turbo broadcast, ActiveJob enqueue hiccup + # on title generation), the turn is still completed; the failure + # is reported and isolated. Without this isolation a successful + # turn could be flipped to :error by an unrelated infra hiccup. + safe_callback(:on_finalized) { @on_finalized.call(@message, exchange) } + emit_turn_finished(status: :completed) + end + + def authoritative_result(stream_result, exchange) + exchange_result = current_turn_result(exchange) + return stream_result unless exchange_result + return stream_result if exchange_result[:full_text].blank? + + merge_stream_only_parts(stream_result, exchange_result) + end + + # The final session poll is authoritative for answer text and terminal + # tool payloads, but OpenCode emits some events (`todo.updated`, and + # whatever future bus events join Opencode::PartSource::STREAM_ONLY) + # that never persist as message parts. Preserve those synthetic + # stream parts across finalization so the refresh-rendered UI does + # not drop the live state the user watched stream in. + def merge_stream_only_parts(stream_result, exchange_result) + stream_parts = Array(stream_result[:parts_json]) + return exchange_result unless stream_parts.any? { |part| Opencode::PartSource.stream_only?(part) } + + exchange_parts = Array(exchange_result[:parts_json]).dup + merged = [] + + stream_parts.each do |part| + if Opencode::PartSource.stream_only?(part) + merged << part + elsif exchange_parts.any? + merged << exchange_parts.shift + end + end + + merged.concat(exchange_parts) + Opencode::Reply.distill(merged) + end + + def wait_for_final_exchange_result(session_id, stream_result, exchange) + result = authoritative_result(stream_result, exchange) + sync_reply_from_result(result) + return [ result, exchange ] if terminal_exchange_result?(result, exchange) + return [ result, exchange ] unless exchange_indicates_more_work?(exchange) + + emit("response.waiting_for_final_text", subject_id: @subject.id, message_id: @message.id) + deadline = monotonic_now + @final_exchange_timeout + + loop do + return [ result, exchange ] if monotonic_now >= deadline + + sleep @final_exchange_retry_delay if @final_exchange_retry_delay.positive? + exchange = fetch_current_exchange(session_id) + result = authoritative_result(stream_result, exchange) + sync_reply_from_result(result) + return [ result, exchange ] if terminal_exchange_result?(result, exchange) + return [ result, exchange ] unless exchange_indicates_more_work?(exchange) + end + end + + def sync_reply_from_result(result) + return unless @reply.respond_to?(:sync_recovered_parts) + return if result[:parts_json].blank? + + @reply.sync_recovered_parts(result[:parts_json]) + end + + def terminal_exchange_result?(result, exchange) + return false if result[:full_text].blank? + + last_assistant = current_turn_assistant_messages(exchange).last + return true unless last_assistant + return false if assistant_in_progress?(last_assistant) + + assistant_finish(last_assistant) != "tool-calls" + end + + def exchange_indicates_more_work?(exchange) + last_assistant = current_turn_assistant_messages(exchange).last + return false unless last_assistant + + assistant_finish(last_assistant) == "tool-calls" || assistant_in_progress?(last_assistant) + end + + def assistant_finish(assistant_message) + assistant_message.dig(:info, :finish).to_s + end + + def assistant_in_progress?(assistant_message) + time = assistant_message.dig(:info, :time) + return false unless time.is_a?(Hash) + return false unless time.key?(:created) + + time[:completed].blank? + end + + def current_turn_result(exchange) + parts = current_turn_assistant_messages(exchange).flat_map do |assistant_message| + Opencode::ResponseParser.extract_interleaved_parts(assistant_message) + end + return nil if parts.empty? + + Opencode::Reply.distill(parts) + end + + def current_turn_assistant_messages(exchange) + Array(exchange).select { |message| message.dig(:info, :role) == "assistant" } + end + + def save_cancelled_response(stream_result, last_assistant) + content = stream_result[:full_text].presence || "Response was stopped." + attrs = { + content: content, + tool_calls_json: stream_result[:tool_parts], + parts_json: stream_result[:parts_json] + } + attrs[:reasoning] = stream_result[:reasoning_text] if stream_result[:reasoning_text].present? + attrs.merge!(extract_cost(last_assistant)) if last_assistant + @message.update!(**attrs) + emit_turn_finished(status: :cancelled) + end + + # ---- Empty-stream recovery ------------------------------------------ + + def recover_empty_stream(session_id, last_assistant, exchange) + recovered = recover_from_exchange(last_assistant) + + unless recovered + sleep @empty_stream_retry_delay if @empty_stream_retry_delay.positive? + exchange = fetch_current_exchange(session_id) + last_assistant = exchange.reverse_each.detect { |m| m.dig(:info, :role) == "assistant" } + recovered = recover_from_exchange(last_assistant) + end + + if recovered + emit("response.recovered_from_exchange") + finalize_response(recovered, last_assistant, exchange) + elsif detect_upstream_error(last_assistant) + mark_error(reason: "upstream_llm_error") + else + mark_error(reason: "empty_stream") + end + end + + def recover_from_exchange(assistant_message) + return nil unless assistant_message + parts_json = Opencode::ResponseParser.extract_interleaved_parts(assistant_message) + return nil if parts_json.empty? + result = Opencode::Reply.distill(parts_json) + return nil if result[:full_text].blank? + + result + end + + def detect_upstream_error(assistant_message) + return nil unless assistant_message + error = Opencode::ResponseParser.extract_error(assistant_message) + return nil unless error + + emit("response.upstream_error", + error_name: error[:name], + error_message: error[:message], + status_code: error[:status_code], + provider_url: error[:url]) + + Opencode::ErrorReporter.report( + Opencode::Error.new("Upstream LLM error: #{error[:name]} - #{error[:message]}"), + handled: true, + severity: :error, + context: { feature: @error_feature, **error } + ) + + error + end + + # ---- Error paths ---------------------------------------------------- + + # Both error paths transition the message to :error through the + # CAS-safe Message#error! contract — a concurrent cancel that already + # moved the row out of :pending wins, and the canceller's terminal + # state survives. emit_turn_finished re-reads the persisted state + # (Result.message is reloaded) so callbacks receive the actual + # current state, not the state we wished we wrote. + + def handle_unexpected_error(e) + Opencode::ErrorReporter.report(e, handled: true, severity: :error, + context: { feature: @error_feature, message_id: @message.id, error_class: e.class.name }) + @message.error!(@error_fallback_content) + emit_turn_finished(status: :failed, error: e) + end + + def mark_error(reason:) + emit("response.error", reason: reason, message_id: @message.id, subject_id: @subject.id) + @message.error!(@error_fallback_content) + emit_turn_finished(status: :error) + end + + # ---- Trace + callback helpers -------------------------------------- + + def emit(name, **payload) + @tracer.call(name, **payload) + end + + def emit_turn_finished(status:, error: nil) + @message.reload if @message.respond_to?(:reload) + result = Result.new( + status: status, + message: @message, + duration_ms: elapsed_ms(@turn_started_at), + error: error + ) + + safe_callback(:on_turn_finished) { @on_turn_finished.call(result) } + + tool_count = @message.respond_to?(:tool_calls_json) ? @message.tool_calls_json&.size.to_i : nil + emit("turn.finished", **result.trace_payload(tool_count: tool_count)) + end + + # Run a callback, report any exception, but keep the turn in its + # current durable state. Side-effect callbacks (broadcast, artifact + # collection, title enqueueing) are not allowed to overwrite + # :completed → :error after the message is already persisted. + def safe_callback(name) + yield + rescue StandardError => e + Opencode::ErrorReporter.report(e, handled: true, severity: :warning, + context: { feature: @error_feature, callback: name, message_id: @message.id, error_class: e.class.name }) + emit("callback.error", callback: name.to_s, error_class: e.class.name) + end + + # ---- Exchange + cost helpers --------------------------------------- + + def fetch_current_exchange(session_id) + messages = @client.get_messages(session_id) + return [] unless messages.is_a?(Array) && messages.any? + + search_start_idx = [ @pre_turn_message_count.to_i, messages.length ].min + last_user_idx = nil + (messages.length - 1).downto(search_start_idx) do |idx| + message = messages[idx] + if message.dig(:info, :role) == "user" && user_message_text(message) == @query_text.to_s + last_user_idx = idx + break + end + end + return [] unless last_user_idx + messages[(last_user_idx + 1)..] + rescue Opencode::Error => e + Opencode::ErrorReporter.report(e, handled: true, severity: :warning, + context: { feature: @error_feature, action: "fetch_current_exchange", session_id: session_id }) + [] + end + + def user_message_text(message) + Opencode::ResponseParser.extract_text(message).to_s + end + + def extract_cost(assistant_msg) + cost = Opencode::ResponseParser.extract_cost(assistant_msg) + cache = Opencode::ResponseParser.extract_cache_tokens(assistant_msg) + tokens = Opencode::ResponseParser.extract_tokens(assistant_msg) || {} + { + cost: cost, + input_tokens: tokens[:input], + output_tokens: tokens[:output], + cache_read_tokens: cache[:cache_read], + cache_write_tokens: cache[:cache_write] + }.compact + end + + # ---- Time helpers --------------------------------------------------- + + def monotonic_now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + def elapsed_ms(t) = ((monotonic_now - t) * 1000).round + end +end diff --git a/lib/opencode/uploaded_files_prompt.rb b/lib/opencode/uploaded_files_prompt.rb new file mode 100644 index 0000000..2eb80f7 --- /dev/null +++ b/lib/opencode/uploaded_files_prompt.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +module Opencode + # The prompt body to send to an OpenCode agent when the user attached + # files: the user's text plus an instruction block naming each file by + # its sandboxed filename so the agent can read it with the `read` tool. + # + # Two outputs, both explicit: + # + # text — the prompt body to pass to send_message_async + # sandbox_file_names — map of sandbox_name => original filename, + # used by ReplyStream to show the user a + # recognizable name when the agent reads the + # file back. + # + # Previously this work lived in `Opencode::SandboxFiles`, an ActiveSupport + # concern that mutated a hidden `@sandbox_file_names` instance variable on + # the including job. ReplyStream then read that ivar back through a + # closure. State across class boundaries via shared mutable ivars is the + # kind of Sandi-smelly action-at-a-distance that breaks the moment + # someone forgets the contract. This value object replaces that with two + # named return values. + # + # Side effect, unchanged from the concern: file bytes are copied from + # ActiveStorage into the per-user OpenCode sandbox directory so the + # agent can read them with the `read` tool. The copy is path-escape + # guarded (the cleanpath of the destination must start with the + # sandbox dir prefix, no symlink trickery). + class UploadedFilesPrompt + attr_reader :text, :sandbox_file_names + + def initialize(user_message:, sandbox_path:, sandbox_name_for:) + @user_message = user_message + @sandbox_path = sandbox_path + @sandbox_name_for = sandbox_name_for + @sandbox_file_names = {} + @text = build_text + end + + private + + def build_text + raw = @user_message.content.to_s + return raw unless @user_message.files.attached? + + file_instructions = @user_message.files.map do |file| + sandbox_file = copy_to_sandbox(file) + @sandbox_file_names[sandbox_file.sandbox_name] = file.filename.to_s + "#{file.filename} -> #{sandbox_file.sandbox_name} (#{file.content_type}, #{file.byte_size} bytes)" + end + + [ + raw, + "", + "The user uploaded #{file_instructions.size} file(s). Read each file thoroughly, then consult your reference materials and verify any legal claims before responding:", + *file_instructions + ].join("\n").strip + end + + def copy_to_sandbox(file) + FileUtils.mkdir_p(@sandbox_path) + + sandbox_name = @sandbox_name_for.call(file) + dest = File.join(@sandbox_path, sandbox_name) + + resolved = Pathname.new(dest).cleanpath.to_s + unless resolved.start_with?(@sandbox_path) + raise ArgumentError, "Filename escapes sandbox: #{sandbox_name}" + end + + File.open(dest, "wb") { |f| f.write(file.download) } + Placement.new(sandbox_name, dest) + end + + # Tiny value pair returned by copy_to_sandbox: the canonical filename + # the agent should read by, and the on-disk path the file ended up at. + # Internal to UploadedFilesPrompt — the caller (UploadedFilesPrompt + # itself) only needs the sandbox_name to embed in the prompt text. + Placement = Struct.new(:sandbox_name, :path) do + def to_s + path + end + end + end +end