Initial public release v0.0.1.alpha2
Some checks failed
Test / test (3.2) (push) Failing after 9m43s
Test / test (3.3) (push) Failing after 9m43s
Test / test (3.4) (push) Failing after 9m42s

opencode-ruby — idiomatic Ruby client for OpenCode (HTTP + SSE).

Hand-rolled, opinionated Ruby SDK with block-form streaming, value-
object responses, and automatic SSE reconnection. Pluggable
Opencode::Instrumentation adapter for routing events to
ActiveSupport::Notifications, OpenTelemetry, stdout, or any custom
emitter. Companion to opencode-rails for AR-coupled Rails apps.

What this version ships:
  - Opencode::Client (Net::HTTP + SSE)
  - Opencode::Reply / Reply::Result / ReplyObserver
  - Opencode::Tracer, Opencode::Prompts
  - Opencode::ResponseParser, ToolPart, PartSource, Todo
  - Opencode::Instrumentation (instrument + notify)
  - Opencode::Error and seven subclasses
  - examples/conversation_recipe.rb — canonical Rails wiring blueprint

15 smoke tests. CI on Ruby 3.2/3.3/3.4.

Ruby >= 3.2. Runtime dep: activesupport >= 6.1, < 9.0.

See CHANGELOG.md for the alpha1 -> alpha2 delta.
This commit is contained in:
2026-05-20 21:41:30 -07:00
commit 889d38332f
24 changed files with 2616 additions and 0 deletions

564
lib/opencode/client.rb Normal file
View File

@@ -0,0 +1,564 @@
# frozen_string_literal: true
require "net/http"
require "json"
require "base64"
module Opencode
# HTTP client for OpenCode REST API.
# Thread safety: Each instance creates its own Net::HTTP connection.
# Do NOT share instances across threads. Create per-job.
class Client
attr_reader :directory
def initialize(
base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096",
password: ENV["OPENCODE_SERVER_PASSWORD"],
timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i,
directory: nil,
workspace: nil
)
@uri = URI.parse(base_url)
@password = password
@timeout = timeout || 120
@directory = directory
@workspace = workspace
end
def create_session(title: nil, permissions: nil)
body = { title: title, permission: permissions }.compact
post("/session", body)
end
def send_message(
session_id, text,
parts: nil,
model: nil,
agent: nil,
system: nil,
message_id: nil,
no_reply: nil,
tools: nil,
format: nil,
variant: nil
)
body = prompt_payload(
text,
parts: parts,
model: model,
agent: agent,
system: system,
message_id: message_id,
no_reply: no_reply,
tools: tools,
format: format,
variant: variant
)
post("/session/#{session_id}/message", body)
end
def send_message_async(
session_id, text,
parts: nil,
model: nil,
agent: nil,
system: nil,
message_id: nil,
no_reply: nil,
tools: nil,
format: nil,
variant: nil
)
body = prompt_payload(
text,
parts: parts,
model: model,
agent: agent,
system: system,
message_id: message_id,
no_reply: no_reply,
tools: tools,
format: format,
variant: variant
)
post("/session/#{session_id}/prompt_async", body)
end
# Block-form streaming — the headline API for callers who want the
# full async-prompt + SSE-loop + final-exchange-merge flow in one
# call. Returns the final Opencode::Reply::Result value object once
# the agent finishes.
#
# reply = client.stream(session_id, "Explain monads") do |part|
# print part["content"] if part["type"] == "text"
# end
# reply.full_text # => the final accumulated text
# reply.tool_parts # => array of terminal tool parts
#
# The block is invoked every time a part is added, grows, finalizes,
# or (for tool parts) advances state — i.e., whenever a user-visible
# change happens. The block receives the current `part` hash (string
# keys: "type", "content", "tool", "status", "input", ...).
#
# If you need raw events (every server.* tick, todo.updated, prompt
# asked/replied, etc.), use #stream_events instead.
#
# Optional kwargs are forwarded to send_message_async — model, agent,
# system prompt override, and the SSE pacing knobs supported by
# stream_events.
def stream(
session_id, text,
model: nil, agent: nil, system: nil, message_id: nil,
stream_timeout: 600,
first_event_timeout: 120,
idle_stream_timeout: nil,
on_activity_tick: nil,
&block
)
send_message_async(
session_id, text,
model: model, agent: agent, system: system, message_id: message_id
)
reply = Opencode::Reply.new
reply.add_observer(StreamBlockObserver.new(&block)) if block_given?
stream_events(
session_id: session_id,
timeout: stream_timeout,
first_event_timeout: first_event_timeout,
idle_stream_timeout: idle_stream_timeout,
reply: reply,
on_activity_tick: on_activity_tick
) do |event|
reply.apply(event)
end
merge_final_exchange(session_id, reply)
reply.result
end
def list_sessions
uri = build_uri("/session")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def children(session_id)
uri = build_uri("/session/#{session_id}/children")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def delete_session(session_id)
uri = build_uri("/session/#{session_id}")
request = Net::HTTP::Delete.new(uri)
execute(request)
end
def session_status
uri = build_uri("/session/status")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def get_messages(session_id)
uri = build_uri("/session/#{session_id}/message")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def abort_session(session_id)
post("/session/#{session_id}/abort", {})
end
def reply_question(request_id:, answers:)
post("/question/#{request_id}/reply", { answers: answers })
end
def reject_question(request_id:)
post("/question/#{request_id}/reject", {})
end
def reply_permission(request_id:, reply:, message: nil)
body = { reply: reply }
body[:message] = message if message.present?
post("/permission/#{request_id}/reply", body)
end
# Returns pending question requests as an Array of Hashes with
# SYMBOL keys, consistent with every other endpoint that flows
# through handle_response (e.g., health, list_sessions, get_messages).
# Callers that compare against persisted JSON column data should
# symbolize their side, not desymbolize this side.
def list_questions
uri = build_uri("/question")
request = Net::HTTP::Get.new(uri)
add_auth_header(request)
response = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do
http_client.request(request)
end
unless response.code.to_i.between?(200, 299)
raise ServerError, "list_questions failed: HTTP #{response.code}#{response.body.to_s[0, 200]}"
end
return [] if response.body.blank?
JSON.parse(response.body, symbolize_names: true)
rescue JSON::ParserError => e
raise ServerError, "list_questions returned invalid JSON: #{e.message}"
rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}"
rescue Errno::ECONNREFUSED, SocketError => e
raise ConnectionError, "OpenCode unreachable: #{e.message}"
end
def health
uri = build_uri("/global/health", scoped: false)
request = Net::HTTP::Get.new(uri)
execute(request)
end
MAX_SSE_BUFFER = 1_048_576 # 1 MB — safety valve against pathological server responses
SSE_RECONNECT_DELAY = 0.1
TRANSIENT_SSE_ERRORS = [
EOFError,
IOError,
Net::OpenTimeout,
Net::ReadTimeout,
Errno::ECONNREFUSED,
Errno::ECONNRESET,
Errno::EPIPE
].freeze
# Opens SSE connection to GET /event, yields parsed events filtered by session_id.
# Blocks until session goes idle or timeout, reconnecting across dropped
# event-stream connections.
#
# first_event_timeout: seconds to wait for a session-specific event before
# declaring the session stale. Server heartbeats don't count — they're global
# keep-alives that flow regardless of session state.
#
# Default 120s rather than the more aggressive 30s used originally:
# slow-thinking reasoning models (Kimi K2, GPT-5 with extended thinking,
# etc.) routinely spend 30-90s of pure reasoning before emitting their
# first `message.part.*` event, especially on cold sessions with long
# system prompts. 30s false-positive trips on legitimate first turns
# and converts them to `StaleSessionError`; 120s catches genuine zombies
# without nuking real reasoning. Callers that know their agent is
# short-prompt + fast can pass a lower value.
#
# idle_stream_timeout: seconds to wait BETWEEN meaningful events once
# the session has started producing them. Default nil = no check
# (preserves the overall `timeout` ceiling behavior). Opt-in heartbeat
# watchdog for callers whose user-facing surface needs to fail fast
# rather than sit forever when an upstream LLM stream wedges mid-turn.
# Distinct from first_event_timeout (which only protects cold-start)
# and from the overall `timeout` ceiling of 600s (which is forgiving
# — a hung stream holding a thread for 10 minutes is already a bad
# UX). When the window is exceeded the call raises
# Opencode::IdleStreamError, which the caller is expected to catch and
# translate into a user-visible error / retry affordance.
def stream_events(session_id:, timeout: 600, first_event_timeout: 120,
idle_stream_timeout: nil,
reply: nil, on_activity_tick: nil, &block)
uri = build_uri("/event")
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
first_event_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + first_event_timeout
received_session_event = false
last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
loop do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
deadline = check_deadline_or_suspend(now, deadline, timeout, reply)
# NOTE: first_event_deadline is *not* suspension-eligible. If the agent
# never gets started we want to fail fast — a session that's blocked on
# a prompt has, by definition, already produced events.
if !received_session_event && now > first_event_deadline
raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
end
if idle_stream_timeout && received_session_event &&
(now - last_meaningful_event_at) > idle_stream_timeout
raise IdleStreamError,
"No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
"(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
end
request = Net::HTTP::Get.new(uri)
request["Accept"] = "text/event-stream"
request["Cache-Control"] = "no-cache"
add_auth_header(request)
http = Net::HTTP.new(@uri.host, @uri.port)
http.use_ssl = @uri.scheme == "https"
http.open_timeout = 10
http.read_timeout = 30
begin
buffer = String.new
http.request(request) do |response|
unless response.is_a?(Net::HTTPSuccess)
raise ServerError, "SSE connection failed: HTTP #{response.code}"
end
response.read_body do |chunk|
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
deadline = check_deadline_or_suspend(now, deadline, timeout, reply)
if !received_session_event && now > first_event_deadline
raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
end
if idle_stream_timeout && received_session_event &&
(now - last_meaningful_event_at) > idle_stream_timeout
raise IdleStreamError,
"No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
"(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
end
buffer << chunk
if buffer.bytesize > MAX_SSE_BUFFER
raise ServerError, "SSE buffer exceeded #{MAX_SSE_BUFFER} bytes"
end
while (idx = buffer.index("\n\n"))
raw_event = buffer.slice!(0, idx + 2)
event = parse_sse_event(raw_event, session_id)
next unless event
unless event[:type]&.start_with?("server.")
received_session_event = true
last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
# Tick activity on EVERY event, including server.heartbeat —
# that's the whole point: a healthy long wait (user thinking
# for 30 minutes) keeps the container warm via heartbeats so
# the reaper doesn't kill it mid-wait.
on_activity_tick&.call(event)
block.call(event)
return if event[:type] == "session.idle"
end
end
end
rescue *TRANSIENT_SSE_ERRORS
# Treat transport-level SSE disconnects like clean EOF: reconnect
# until session.idle, the overall timeout, or first-event timeout.
ensure
begin
http&.finish if http&.started?
rescue IOError
# Connection already closed — network partition or server shutdown
end
end
cutoff = received_session_event ? deadline : first_event_deadline
sleep_for = [ SSE_RECONNECT_DELAY, cutoff - Process.clock_gettime(Process::CLOCK_MONOTONIC) ].min
if sleep_for.positive?
sleep sleep_for
end
end
end
def close
@http&.finish if @http&.started?
rescue IOError
# already closed
end
private
# Best-effort merge of the polled message exchange into the live
# reply. Catches the stream-only / poll-only asymmetry — todo.updated
# is poll-only on some opencode versions; pure-streaming would miss
# the terminal todo state otherwise. If the session API is also down
# at this point (network partition, container teardown mid-call), we
# silently keep whatever the stream accumulated rather than raising;
# the caller's reply is still a usable Result either way.
def merge_final_exchange(session_id, reply)
exchange = get_messages(session_id)
last_assistant = Array(exchange).reverse_each.find do |message|
message.dig(:info, :role) == "assistant"
end
return unless last_assistant
polled = Opencode::ResponseParser.extract_interleaved_parts(last_assistant)
reply.sync_recovered_parts(polled) if polled.any?
rescue Opencode::Error
# Stream's result is still complete; the merge was a polish, not a
# requirement.
end
# Healthy wait: opencode is suspended on a question/permission deferred
# and heartbeats are keeping the connection alive. Reset the deadline
# to "from now" so the full stuck-stream protection is restored once
# the prompt resolves. Otherwise apply the normal deadline check.
def check_deadline_or_suspend(now, deadline, timeout, reply)
return now + timeout if reply&.prompt_blocked?
raise TimeoutError, "SSE stream timed out after #{timeout}s" if now > deadline
deadline
end
def prompt_payload(text, parts:, model:, agent:, system:, message_id:, no_reply:, tools:, format:, variant:)
message_parts = parts || [ { type: "text", text: text } ]
{
messageID: message_id,
parts: message_parts,
model: format_model(model),
agent: agent,
noReply: no_reply,
tools: tools,
format: format,
system: system,
variant: variant
}.compact
end
def format_model(model)
return nil unless model
return model if model.is_a?(Hash)
provider, model_id = model.split("/", 2)
{ providerID: provider, modelID: model_id }
end
def post(path, body)
uri = build_uri(path)
request = Net::HTTP::Post.new(uri)
request.body = body.to_json
execute(request)
end
def build_uri(path, scoped: true)
uri = @uri.dup
uri.path = path
if scoped
query = URI.decode_www_form(uri.query.to_s)
query << [ "directory", @directory ] if @directory.present?
query << [ "workspace", @workspace ] if @workspace.present?
uri.query = query.any? ? URI.encode_www_form(query) : nil
end
uri
end
def add_auth_header(request)
request["Content-Type"] = "application/json"
if @password.present?
request["Authorization"] = "Basic #{Base64.strict_encode64("opencode:#{@password}")}"
end
end
def execute(request)
add_auth_header(request)
response = nil
result = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do
response = http_client.request(request)
handle_response(response)
end
result
rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}"
rescue Errno::ECONNREFUSED, SocketError => e
raise ConnectionError, "OpenCode unreachable: #{e.message}"
end
def http_client
@http ||= Net::HTTP.new(@uri.host, @uri.port).tap do |http|
http.use_ssl = @uri.scheme == "https"
http.open_timeout = 10
http.read_timeout = @timeout
http.write_timeout = 30
end
end
def parse_sse_event(raw, session_id)
data_line = raw.lines.find { |l| l.start_with?("data: ") }
return nil unless data_line
json = JSON.parse(data_line.sub("data: ", "").strip, symbolize_names: true)
event_session = json.dig(:properties, :sessionID) ||
json.dig(:properties, :info, :sessionID) ||
json.dig(:properties, :part, :sessionID)
return json if json[:type] == "server.heartbeat"
return json if json[:type] == "server.connected"
return nil unless event_session == session_id
json
rescue JSON::ParserError
nil
end
def handle_response(response)
return {} if response.code.to_i == 204
body = if response.body.present?
JSON.parse(response.body, symbolize_names: true)
else
{}
end
case response.code.to_i
when 200..299 then body
when 400 then raise BadRequestError.new(error_message(body, "Bad request"), response: body)
when 404 then raise SessionNotFoundError.new(error_message(body, "Session not found"), response: body)
when 500..599 then raise ServerError.new(error_message(body, "Server error"), response: body)
else raise Error.new("Unexpected response: #{response.code}", response: body)
end
rescue JSON::ParserError
raise ServerError.new("Invalid JSON from OpenCode (HTTP #{response.code}): #{response.body&.truncate(200)}")
end
# OpenCode HTTP error bodies use a wrapped shape: { name:, data: { message:, kind?: } }.
# v1.14.51 stopped exposing internal defect details from the HTTP API, so
# `body[:message]` is no longer populated for errors — only `body[:data][:message]`.
# We read both to keep older mock servers working in tests.
def error_message(body, fallback)
body.dig(:data, :message) || body[:message] || fallback
end
end
# Internal Reply observer that bridges Reply's multi-callback protocol
# to a single user-supplied block for Client#stream. Each part-level
# callback (part_added, part_changed, part_finalized, tool_progressed)
# forwards the current part to the user's block.
#
# Non-part-level callbacks (step_finished, session_*, message_updated,
# todos_changed, question_*, permission_*) are intentionally NOT
# forwarded — they're either telemetry the gem owns internally, or
# interactive-protocol concerns that callers route through
# #stream_events directly when they need them.
class StreamBlockObserver
include Opencode::ReplyObserver
def initialize(&block)
@block = block
end
def part_added(part:, **)
@block.call(part)
end
def part_changed(part:, **)
@block.call(part)
end
def part_finalized(part:, **)
@block.call(part)
end
def tool_progressed(part:, **)
@block.call(part)
end
end
end

28
lib/opencode/error.rb Normal file
View File

@@ -0,0 +1,28 @@
# frozen_string_literal: true
module Opencode
class Error < StandardError
attr_reader :response
def initialize(message = nil, response: nil)
@response = response
super(message)
end
end
class ConnectionError < Error; end
class TimeoutError < Error; end
class SessionNotFoundError < Error; end
class StaleSessionError < Error; end
# Raised by stream_events when meaningful (non-`server.*`) events stop
# arriving for longer than the caller's `idle_stream_timeout` window,
# even though the SSE socket itself is still alive (heartbeats are
# still flowing). Distinct from StaleSessionError, which fires when
# the session never produced any events in the first place. This one
# fires when the session WAS producing events and then went silent —
# the classic "OpenAI stream wedged mid-turn while the SSE keep-
# alive ticks on" failure mode.
class IdleStreamError < Error; end
class ServerError < Error; end
class BadRequestError < Error; end
end

View File

@@ -0,0 +1,76 @@
# frozen_string_literal: true
module Opencode
# Pluggable instrumentation adapter. opencode-ruby ships zero
# dependencies on Rails or any specific instrumentation library. Users
# plug in their own emitter:
#
# # ActiveSupport::Notifications (Rails apps):
# Opencode::Instrumentation.adapter = ->(name, payload, &block) {
# ActiveSupport::Notifications.instrument(name, payload, &block)
# }
#
# # stdout (debugging, non-Rails scripts):
# Opencode::Instrumentation.adapter = ->(name, payload, &block) {
# puts "[#{name}] #{payload.inspect}"
# block.call
# }
#
# When no adapter is set (default), instrumentation is a no-op pass-
# through that yields the block and returns its value. The Client emits
# events for HTTP requests, SSE stream lifecycle, and recovery paths.
#
# Event names the Client emits:
#
# - opencode.request — every HTTP request to OpenCode server
#
# If you wire a real adapter, the payload hash carries `:method` and
# `:path` for opencode.request. Other events may add fields in future
# versions; treat the payload as forward-compatible.
#
# Two emission shapes:
#
# .instrument(name, payload) { ... } — wrap a block; the duration
# of the block becomes part
# of the event (when the
# adapter is ActiveSupport::
# Notifications-shaped).
#
# .notify(name, payload) — fire-and-forget; no block,
# no duration. Use for
# point-in-time observations
# (e.g. "this artifact was
# dropped").
module Instrumentation
class << self
attr_accessor :adapter
end
# Yields the block, optionally routed through the adapter if one is
# set. Always returns the block's return value (so call sites can
# wrap their work transparently).
def self.instrument(name, payload = {})
return yield unless adapter
adapter.call(name, payload) { yield }
end
# Fire-and-forget event. No block, no return value (the adapter's
# return is ignored). Use for point-in-time observations where
# duration doesn't apply — apply_patch.artifacts_dropped,
# session.recreated, etc.
#
# Implementation: invokes the same adapter as #instrument but with
# an empty block. Hosts that adapt to ActiveSupport::Notifications
# will see a zero-duration event; hosts that adapt to a structured-
# event API (Rails.event.notify, OpenTelemetry span events) can
# detect the empty-block convention if they need to. Most hosts
# don't need to care.
def self.notify(name, payload = {})
return unless adapter
adapter.call(name, payload) { }
nil
end
end
end

View File

@@ -0,0 +1,62 @@
# frozen_string_literal: true
require "set"
module Opencode
# A Part's provenance — where it came from in the OpenCode wire model.
#
# Two source classes exist:
#
# - Wire parts: emitted by the OpenCode message-parts pipeline and
# echoed back by `GET /session/:id/message`. These are authoritative
# for finalization — when the final exchange poll lands, wire parts
# overwrite whatever streaming captured.
#
# - Stream-only parts: synthesized from bus events that OpenCode does
# NOT persist as message parts. The host's Opencode::Reply
# materializes them so per-product ReplyStream observers can render
# them through the same tool partials as real tool parts, and
# Opencode::Turn preserves them across exchange-finalization so the
# final assistant message keeps what the user watched live.
#
# `todo.updated` is the first stream-only source (OpenCode emits the
# full todo list on a bus event but never records it as a message part).
# Future sources land here too: add the constant, add it to STREAM_ONLY,
# both `Reply#append_part` callers and `Turn#stream_only_part?` keep
# working with no further edits.
#
# This module exists because the previous shape coupled Reply and Turn
# through a magic-string comparison of `metadata.source ==
# Opencode::Reply::TODO_STREAM_SOURCE`. Two classes carrying the same
# discriminator string is a "next time someone adds a source they'll
# only update one place" bug waiting to happen. The source-of-truth
# now lives here; both consumers go through `stream_only?(part)`.
module PartSource
TODO_UPDATED = "todo.updated"
STREAM_ONLY = Set[TODO_UPDATED].freeze
module_function
# True iff the part's metadata.source is one of the stream-only
# sources. Tolerates non-Hash input (returns false) so callers don't
# have to guard before asking.
def stream_only?(part)
return false unless part.is_a?(Hash)
STREAM_ONLY.include?(part.dig("metadata", "source"))
end
# Stamps `source:` into part_hash's metadata. Raises ArgumentError on
# an unknown source so typos surface at write time, not at the next
# `stream_only?` check (which would silently return false).
# Mutates and returns the input hash for chaining.
def stamp(part_hash, source:)
raise ArgumentError, "unknown stream-only source #{source.inspect}; " \
"register it in Opencode::PartSource::STREAM_ONLY first" unless STREAM_ONLY.include?(source)
part_hash["metadata"] ||= {}
part_hash["metadata"]["source"] = source
part_hash
end
end
end

87
lib/opencode/prompts.rb Normal file
View File

@@ -0,0 +1,87 @@
# frozen_string_literal: true
module Opencode
# Per-Reply registry of interactive prompts (questions + permissions)
# opencode has asked the user but not yet resolved. Lives on
# Opencode::Reply for the lifetime of one streaming turn.
#
# Two access patterns:
#
# * by request id ("que_..." or "per_...") — for the controller
# posting a user's answer back.
# * by {message_id, call_id} — for the order-race fix where
# `question.asked` may arrive before the matching tool part.
#
# The registry also exposes a `prompt_blocked?` predicate that
# Opencode::Client uses to suspend the SSE deadline check while
# a healthy wait is in progress.
class Prompts
Entry = Struct.new(:kind, :request, :asked_at, keyword_init: true)
def initialize
@entries = {}
@by_call = {}
end
def record_question(request)
record(:question, request)
end
def record_permission(request)
record(:permission, request)
end
# Returns the raw request hash (not the Entry wrapper) so callers
# don't depend on internal bookkeeping shape.
def find(request_id)
@entries[request_id]&.request
end
# Returns the raw request hash, same shape as #find.
def find_by_call(message_id:, call_id:)
key = call_key(message_id, call_id)
@by_call[key]&.request
end
def resolve(request_id)
entry = @entries.delete(request_id)
return unless entry
tool = entry.request[:tool]
return unless tool
@by_call.delete(call_key(tool[:messageID], tool[:callID]))
end
def each_pending
@entries.each_value { |entry| yield(entry.kind, entry.request) }
end
def any_pending?
@entries.any?
end
alias_method :prompt_blocked?, :any_pending?
def asked_at(request_id)
@entries[request_id]&.asked_at
end
private
def record(kind, request)
entry = Entry.new(
kind: kind,
request: request,
asked_at: Process.clock_gettime(Process::CLOCK_MONOTONIC)
)
@entries[request[:id]] = entry
tool = request[:tool]
@by_call[call_key(tool[:messageID], tool[:callID])] = entry if tool
end
def call_key(message_id, call_id)
[ message_id, call_id ].join(":")
end
end
end

549
lib/opencode/reply.rb Normal file
View File

@@ -0,0 +1,549 @@
# frozen_string_literal: true
module Opencode
# An assistant's reply as it is being composed, live, from OpenCode SSE
# events. A Reply accumulates parts (text, reasoning, tool invocations)
# in the order the agent emits them and notifies observers of domain
# transitions — parts appearing, parts growing, tools advancing,
# sessions erroring.
#
# Responsibilities
# ----------------
#
# * Translate raw OpenCode SSE events into domain callbacks.
# * Own the canonical state of an in-flight reply (parts list, indices,
# first-token seen, message info).
# * Apply the tail-drop safety net: when part.updated carries
# authoritative :text that differs from what deltas accumulated
# (z.ai GLM-5.1 drops trailing deltas), rewrite the part's content.
# * Preserve the original tool name when OpenCode later renames a tool
# to "invalid" mid-stream.
#
# Not responsibilities
# --------------------
#
# * Rendering HTML or broadcasting Turbo Streams (observer concern).
# * Persisting parts to a database (observer concern).
# * Fetching the event stream (Opencode::Client).
# * Retry / session recovery (job concern).
#
# Event contract
# --------------
#
# Events match OpenCode's bus schema (packages/opencode/src/session/
# message-v2.ts, status.ts, todo.ts):
#
# message.part.delta { properties: { partID, field, delta, ... } }
# message.part.updated { properties: { part: { id, type, ... } } }
# message.updated { properties: { info: { tokens, cost, ... } } }
# session.status { properties: { status: { type, ... } } }
# session.error { properties: { error: { name, data, ... } } }
# todo.updated { properties: { todos: [...] } }
#
# Observer callbacks
# ------------------
#
# See Opencode::ReplyObserver for the full callback surface. Observers
# are duck-typed — only the callbacks they define are invoked.
#
# Example
# -------
#
# reply = Opencode::Reply.new
# reply.add_observer(MyApp::ReplyStream.new(message:)) # your observer
# client.stream_events(session_id: id) { |event| reply.apply(event) }
# reply.result
# # => Opencode::Reply::Result with parts_json, full_text, reasoning_text, tool_parts
#
class Reply
STREAMABLE_TYPES = %w[text reasoning tool].freeze
TERMINAL_TOOL_STATUSES = %w[completed error].freeze
TODO_TOOLS = %w[todowrite todoread].freeze
# The denormalized output of a Reply once streaming completes (or
# recovery via Reply.distill produces an equivalent shape). Symmetric
# with Opencode::Turn::Result. Accessible by both message-style
# (`result.full_text`) and hash-style (`result[:full_text]`) syntax
# — Struct supports both natively — but the typed shape stops
# callers from poking arbitrary keys.
Result = Struct.new(:parts_json, :full_text, :reasoning_text, :tool_parts, keyword_init: true)
attr_reader :parts, :info, :total_cost, :total_input_tokens, :total_output_tokens, :prompts
def initialize
@parts = []
@part_index_by_id = {}
@part_type_by_id = {}
@observers = []
@first_text_seen = false
@info = nil
@total_cost = 0.0
@total_input_tokens = 0
@total_output_tokens = 0
@todo_part_index = nil
@prompts = Opencode::Prompts.new
# Keyed by [message_id, call_id]: question.asked payloads that
# arrived before their matching tool part. Drained when the tool
# part shows up in apply_tool_state.
@pending_question_payloads = {}
end
# True while any interactive prompt (question or permission) is
# awaiting a user reply. Opencode::Client uses this to suspend the
# SSE inactivity deadline — a wait on the human is healthy, not a
# hang.
def prompt_blocked?
@prompts.prompt_blocked?
end
def add_observer(observer)
@observers << observer
self
end
# Drive the state machine forward with one SSE event. Unknown event
# types are ignored — OpenCode may add new events, and we shouldn't
# crash on them.
def apply(event)
case event[:type]
when "message.part.delta" then apply_part_delta(event)
when "message.part.updated" then apply_part_updated(event)
when "message.updated" then apply_message_updated(event)
when "session.status" then apply_session_status(event)
when "session.error" then apply_session_error(event)
when "todo.updated" then apply_todo_updated(event)
when "question.asked" then apply_question_asked(event)
when "question.replied" then apply_question_replied(event)
when "question.rejected" then apply_question_rejected(event)
when "permission.asked" then apply_permission_asked(event)
when "permission.replied" then apply_permission_replied(event)
end
end
# Treat `recovered_parts` as a clean-slate baseline: replace parts,
# clear the id→index map (recovered parts have no OpenCode part IDs),
# and reset the running cost/token totals plus the first-text flag.
#
# Why reset totals: step-finish events that produced the pre-crash
# totals are not in the recovery payload; keeping them would
# double-count when post-recovery step-finish events accumulate
# against the same counters.
#
# Used only by the recovery path — during normal streaming, parts
# accrete via apply_* helpers and totals flow through step-finish.
def replace_parts(recovered_parts)
@parts = recovered_parts
@part_index_by_id.clear
@part_type_by_id.clear
@total_cost = 0.0
@total_input_tokens = 0
@total_output_tokens = 0
@first_text_seen = false
end
# Bring the live reply up to a recovered/polled exchange snapshot and
# notify observers for new or changed parts. This is the streaming
# counterpart to replace_parts: when the SSE connection ends before
# OpenCode's multi-message tool loop has produced final text, Turn polls
# the message exchange. Those recovered parts still need to hit Turbo as
# incremental append/update events, not only the final row replacement.
def sync_recovered_parts(recovered_parts)
Array(recovered_parts).each_with_index do |part, index|
next if @parts[index] == part
part = deep_dup_part(part)
if index < @parts.length
@parts[index] = part
notify_recovered_part_updated(part, index)
else
@parts << part
notify(:part_added, part: part, index: index)
notify_recovered_part_updated(part, index)
end
@first_text_seen ||= part["type"] == "text" && part["content"].present?
end
end
# Record a part that originated OUTSIDE the OpenCode event stream —
# used when an observer synthesizes a part (e.g., a session error
# notice) that isn't a real message.part.* event but should still
# appear in the persisted parts_json. Returns the new index.
#
# Does NOT fire part_added — the injecting observer has already done
# whatever rendering it needed. Other observers can poll `parts` if
# they care about injected content.
def inject_part(part_hash)
@parts << part_hash
@parts.size - 1
end
def first_text_seen?
@first_text_seen
end
def tool_count
@parts.count { |p| p["type"] == "tool" }
end
# The denormalized result once streaming completes, matching the
# shape jobs persist to the message table: full_text for :content,
# reasoning_text for :reasoning, tool_parts for :tool_calls_json,
# and parts_json for :parts_json.
def result
self.class.distill(@parts)
end
# Pure function: given a parts array, return the denormalized result
# as an Opencode::Reply::Result value object. Exposed so a recovery
# path (fetch messages from the session API and map them through
# ResponseParser.extract_interleaved_parts) produces the same shape
# as live streaming.
def self.distill(parts)
Result.new(
parts_json: parts,
full_text: join_content(parts, "text"),
reasoning_text: join_content(parts, "reasoning"),
tool_parts: parts.select { |p| p["type"] == "tool" && TERMINAL_TOOL_STATUSES.include?(p["status"]) }
)
end
def self.join_content(parts, type)
parts.select { |p| p["type"] == type }.map { |p| p["content"].to_s }.join("\n\n")
end
private_class_method :join_content
private
def apply_part_delta(event)
field = event.dig(:properties, :field)
return unless %w[text reasoning].include?(field)
part_id = event.dig(:properties, :partID)
delta = event.dig(:properties, :delta).to_s
return if delta.empty?
index = @part_index_by_id[part_id]
if index.nil?
# Delta before part.updated. Pre-1.2 OpenCode streams occasionally
# emit in this order; downstream part.updated for this id will
# reconcile via reconcile_final_content.
type = @part_type_by_id[part_id] || (field == "reasoning" ? "reasoning" : "text")
index = append_part({ "type" => type, "content" => +"" }, part_id: part_id)
end
@parts[index]["content"] << delta
@first_text_seen ||= (field == "text" && @parts[index]["type"] == "text")
notify(:part_changed, part: @parts[index], index: index, delta: delta)
end
def apply_part_updated(event)
part = event.dig(:properties, :part) || {}
part_id = part[:id]
part_type = part[:type]
case part_type
when "step-finish"
cost = part[:cost].to_f
tokens = part[:tokens] || {}
@total_cost += cost
@total_input_tokens += tokens[:input].to_i
@total_output_tokens += tokens[:output].to_i
notify(:step_finished, cost: cost, tokens: tokens)
when "text", "reasoning"
@part_type_by_id[part_id] = part_type if part_id
if @part_index_by_id.key?(part_id)
reconcile_final_content(part_id, part)
elsif part[:text].present?
# Extreme tail-drop path: part.updated carries the full text
# but no deltas ever arrived. Materialize it as a one-shot part
# so the content isn't lost.
append_part({ "type" => part_type, "content" => part[:text].dup }, part_id: part_id)
end
when "tool"
register_tool(part_id, part) unless @part_index_by_id.key?(part_id)
apply_tool_state(part_id, part)
end
end
def apply_message_updated(event)
info = event.dig(:properties, :info)
return unless info.is_a?(Hash)
@info = info
notify(:message_updated, info: info)
end
def apply_session_status(event)
case event.dig(:properties, :status, :type)
when "retry"
notify(:session_retried,
attempt: event.dig(:properties, :status, :attempt),
message: event.dig(:properties, :status, :message).to_s)
end
end
def apply_session_error(event)
error = event.dig(:properties, :error) || {}
name = error[:name].to_s
message = error.dig(:data, :message).to_s
text = [ name, message ].reject(&:blank?).join(": ")
notify(:session_errored, text: text, raw: error)
end
# Close out a text/reasoning part: always fires :part_finalized so
# observers can flush any throttled broadcast, and rewrites content if
# part.updated carries an authoritative :text that diverges from the
# deltas we accumulated (tail-drop safety net for providers like
# z.ai GLM-5.1 that sometimes drop trailing deltas).
def reconcile_final_content(part_id, part)
index = @part_index_by_id[part_id]
final = part[:text]
return if final.blank?
@parts[index]["content"] = final.dup unless @parts[index]["content"] == final
notify(:part_finalized, part: @parts[index], index: index)
end
def register_tool(part_id, part)
append_part({
"type" => "tool",
"tool" => part[:tool],
"status" => part.dig(:state, :status)
}, part_id: part_id)
end
# Merge an incoming `message.part.updated` event state into the
# existing tool record. Delegates the field-by-field shape to
# Opencode::ToolPart so the streaming and recovery paths share one
# canonical definition of what a tool part looks like.
def apply_tool_state(part_id, part)
index = @part_index_by_id[part_id]
return unless index
record = @parts[index]
Opencode::ToolPart.merge_streaming_state(record, part)
@todo_part_index = index if todo_tool_part?(record)
notify(:tool_progressed,
part: record,
index: index,
status: record["status"],
raw: part)
drain_pending_question_payload(record)
end
def apply_todo_updated(event)
todos = event.dig(:properties, :todos) || []
notify(:todos_changed, todos: todos)
return unless todos.is_a?(Array)
canonical_todos = Opencode::Todo.canonicalize_all(todos)
index = current_todo_part_index
if index
refresh_existing_todo_part(index, canonical_todos, event)
else
@todo_part_index = append_part(Opencode::PartSource.stamp({
"type" => "tool",
"tool" => "todowrite",
"status" => "completed",
"input" => { "todos" => canonical_todos }
}, source: Opencode::PartSource::TODO_UPDATED))
end
end
# Refresh path for an existing todo part — either a real `todowrite`
# tool part materialized from message.part.updated, OR our own
# previously-stamped stream-only part. Either way we MERGE into
# `input` rather than replace it, so any non-todos fields a real
# tool call carried survive the refresh.
#
# We intentionally do NOT touch `part["title"]`. Upstream opencode's
# title is "N remaining todos" (a progress indicator like "2 todos"
# when 2 of 3 are still incomplete, "0 todos" when all done) and is
# set on the original message.part.updated event. Stomping it with
# our own value would clobber that semantic.
def refresh_existing_todo_part(index, canonical_todos, event)
part = @parts[index]
part["status"] = part["status"].presence || "completed"
part["input"] = (part["input"] || {}).merge("todos" => canonical_todos)
notify(:tool_progressed, part: part, index: index, status: part["status"], raw: event)
end
def current_todo_part_index
return @todo_part_index if @todo_part_index && todo_tool_part?(@parts[@todo_part_index])
@todo_part_index = @parts.rindex { |part| todo_tool_part?(part) }
end
def todo_tool_part?(part)
part.is_a?(Hash) && part["type"] == "tool" && TODO_TOOLS.include?(part["tool"].to_s)
end
def deep_dup_part(part)
case part
when Hash
part.transform_values { |value| deep_dup_part(value) }
when Array
part.map { |value| deep_dup_part(value) }
else
part.duplicable? ? part.dup : part
end
end
def notify_recovered_part_updated(part, index)
case part["type"]
when "tool"
notify(:tool_progressed, part: part, index: index, status: part["status"], raw: {})
when "text", "reasoning"
notify(:part_finalized, part: part, index: index)
end
end
def append_part(part_hash, part_id: nil)
@parts << part_hash
index = @parts.size - 1
if part_id
@part_index_by_id[part_id] = index
@part_type_by_id[part_id] = part_hash["type"]
end
notify(:part_added, part: @parts[index], index: index)
index
end
def notify(callback, **payload)
@observers.each do |observer|
observer.public_send(callback, **payload) if observer.respond_to?(callback)
end
end
# --- interactive prompts -----------------------------------------
def apply_question_asked(event)
request = (event[:properties] || {}).dup
return unless request[:id].is_a?(String)
@prompts.record_question(request)
if (tool = request[:tool])
@pending_question_payloads[[ tool[:messageID].to_s, tool[:callID].to_s ]] = request
end
merge_pending_question_into_existing_tool_part(request)
notify(:question_asked, request: request, raw: event)
end
def apply_question_replied(event)
props = event[:properties] || {}
request_id = props[:requestID]
answers = props[:answers] || []
return unless request_id
asked_at = @prompts.asked_at(request_id)
@prompts.resolve(request_id)
notify(:question_replied, request_id: request_id, answers: answers, raw: event, asked_at: asked_at)
end
def apply_question_rejected(event)
props = event[:properties] || {}
request_id = props[:requestID]
return unless request_id
asked_at = @prompts.asked_at(request_id)
@prompts.resolve(request_id)
notify(:question_rejected, request_id: request_id, raw: event, asked_at: asked_at)
end
def apply_permission_asked(event)
request = (event[:properties] || {}).dup
return unless request[:id].is_a?(String)
@prompts.record_permission(request)
notify(:permission_asked, request: request, raw: event)
end
def apply_permission_replied(event)
props = event[:properties] || {}
request_id = props[:requestID]
return unless request_id
asked_at = @prompts.asked_at(request_id)
@prompts.resolve(request_id)
notify(:permission_replied,
request_id: request_id,
reply: props[:reply],
raw: event,
asked_at: asked_at)
end
# Merge a pending question payload into the matching tool part if
# the tool part exists. Reads record["callID"] / record["messageID"]
# which are persisted by ToolPart.merge_streaming_state (per Task 2.0).
# Decorates the part's "input" with both the question content AND the
# opencode identifiers the view + controller need.
#
# Called from two paths:
# 1. apply_question_asked, when the tool part already exists
# 2. apply_tool_state, when the tool part arrives AFTER question.asked
def merge_pending_question_into_existing_tool_part(request)
tool = request[:tool]
return unless tool
call_id = tool[:callID].to_s
message_id = tool[:messageID].to_s
return if call_id.empty?
index = @parts.index do |part|
part.is_a?(Hash) && part["type"] == "tool" && part["tool"] == "question" &&
part["callID"] == call_id
end
return unless index
part = @parts[index]
# Stringify keys so the in-memory shape matches what's persisted
# via the parts_json JSON column round-trip. Otherwise direct-render
# callers (e.g., integration tests, future debug tooling) hit
# symbol-keyed nested hashes while the partials read string keys —
# silent broken HTML.
input = (part["input"] || {}).merge(
"questions" => deep_stringify_keys(request[:questions]),
"opencode_request_id" => request[:id],
"opencode_message_id" => message_id,
"opencode_call_id" => call_id
)
part["input"] = input
notify(:tool_progressed, part: part, index: index, status: part["status"],
raw: { type: "question.asked.synthesized" })
end
# Order-race fix: if question.asked arrived before this tool part,
# its payload is parked in @pending_question_payloads keyed by
# {messageID, callID}. Drain it now so the part's input carries
# the questions + opencode_* identifiers the view expects.
def drain_pending_question_payload(record)
return unless record["tool"] == "question" && record["callID"].present?
key = [ record["messageID"].to_s, record["callID"].to_s ]
pending = @pending_question_payloads.delete(key)
merge_pending_question_into_existing_tool_part(pending) if pending
end
# Recursively converts hash keys to strings — used at the SSE/JSON
# boundary so in-memory parts match the shape they have after a
# parts_json (JSON column) round-trip. Same semantics as Rails'
# Hash#deep_stringify_keys but iterates arrays too.
def deep_stringify_keys(obj)
case obj
when Hash then obj.each_with_object({}) { |(k, v), h| h[k.to_s] = deep_stringify_keys(v) }
when Array then obj.map { |x| deep_stringify_keys(x) }
else obj
end
end
end
end

View File

@@ -0,0 +1,101 @@
# frozen_string_literal: true
module Opencode
# The canonical observer protocol for Opencode::Reply — every event
# Reply dispatches, documented in one place, with safe no-op defaults.
#
# Include this module in a reply-stream class to get two things:
#
# 1. **Compile-time checklist.** Override only the callbacks you care
# about; the rest inherit a no-op. Forgetting to handle a new event
# never crashes the stream.
# 2. **Protocol documentation that can't rot.** The signatures here are
# the contract. If Reply's dispatch shape ever drifts, every observer
# using this module updates in lockstep.
#
# Callbacks are duck-typed in Reply — features may choose not to
# include this module and implement the methods directly, but then
# they lose the two benefits above.
#
# Every callback takes keyword arguments, so adding a new keyword later
# only requires existing observers to add `**_` if they want to opt out
# of breakage.
module ReplyObserver
# A new part was appended to the reply's parts list.
def part_added(part:, index:)
end
# An existing part's content grew by a delta (streaming text or
# reasoning).
def part_changed(part:, index:, delta:)
end
# An existing part's content was rewritten to the authoritative
# value from part.updated. Fires unconditionally when a part closes
# so throttled observers can flush, regardless of whether content
# actually diverged from what deltas accumulated.
def part_finalized(part:, index:)
end
# A tool part transitioned status (pending → running → completed/error),
# or its state payload (title/input/error) changed.
def tool_progressed(part:, index:, status:, raw:)
end
# A step boundary with usage info. `tokens` is the raw tokens hash
# from the step-finish part (keys: :input, :output, :reasoning, :cache).
def step_finished(cost:, tokens:)
end
# The upstream session is retrying an LLM call (e.g., provider
# rate-limit backoff). Attempt is nullable; message is a short
# reason string.
def session_retried(attempt:, message:)
end
# A session-level error surfaced. Text is a human-readable summary
# ("ErrorName: details"); raw is the full error hash.
def session_errored(text:, raw:)
end
# The authoritative message.info was updated (cost, tokens, provider
# error metadata). Fires late in the stream after the agent closes.
def message_updated(info:)
end
# Agent's internal todo list changed. Todos are whatever shape the
# agent's task tool uses.
def todos_changed(todos:)
end
# opencode emitted a question.asked event — the agent's `question`
# tool is suspended waiting for the user's reply. `request` is the
# full QuestionRequest hash ({id, sessionID, questions, tool?}).
def question_asked(request:, raw:)
end
# opencode emitted a question.replied event — the user submitted
# answers (Array<Array<String>>, one inner array per question).
# `asked_at` is the monotonic clock value when question.asked was
# observed, for latency telemetry; nil if asked never arrived.
def question_replied(request_id:, answers:, raw:, asked_at:)
end
# opencode emitted a question.rejected event — the user dismissed
# the prompt, or it was cancelled (e.g., container shutdown).
def question_rejected(request_id:, raw:, asked_at:)
end
# opencode emitted a permission.asked event — a tool is requesting
# user permission to proceed. `request` is the PermissionRequest
# hash ({id, sessionID, permission, patterns, metadata, always, tool?}).
def permission_asked(request:, raw:)
end
# opencode emitted a permission.replied event — the user chose
# once/always/reject. `reply` is the string. `asked_at` per
# question_replied semantics.
def permission_replied(request_id:, reply:, raw:, asked_at:)
end
end
end

View File

@@ -0,0 +1,169 @@
# frozen_string_literal: true
module Opencode
module ResponseParser
def self.extract_text(response_body)
parts = response_body[:parts] || []
parts
.select { |p| p[:type] == "text" }
.map { |p| p[:text] }
.join("\n\n")
end
def self.extract_reasoning(response_body)
parts = response_body[:parts] || []
reasoning = parts
.select { |p| p[:type] == "reasoning" }
.map { |p| p[:text] }
.join("\n\n")
reasoning.presence
end
TERMINAL_STATUSES = %w[completed error].freeze
# Terminal-only tool list. Returned as canonical string-keyed hashes
# (same shape `extract_interleaved_parts` returns) so callers do not
# have to know which path produced the data.
def self.extract_tool_summary(response_body)
parts = response_body[:parts] || []
parts
.select { |p| p[:type] == "tool" && p.dig(:state, :status).in?(TERMINAL_STATUSES) }
.map { |p| build_tool_summary(p) }
end
def self.extract_interleaved_parts(response_body)
parts = response_body[:parts] || []
parts.filter_map do |part|
case part[:type]
when "text"
{ "type" => "text", "content" => part[:text] }
when "reasoning"
{ "type" => "reasoning", "content" => part[:text] }
when "tool"
status = part.dig(:state, :status)
next unless status.in?(TERMINAL_STATUSES)
build_tool_summary(part)
else
nil
end
end
end
# Canonical tool-part shape from one OpenCode message part. Delegates
# to Opencode::ToolPart so the streaming path (Reply#apply_tool_state)
# and recovery path (this method) cannot drift.
def self.build_tool_summary(part)
Opencode::ToolPart.from_message_part(part)
end
private_class_method :build_tool_summary
def self.extract_tokens(response_body)
response_body.dig(:info, :tokens)
end
def self.extract_cost(response_body)
response_body.dig(:info, :cost)
end
def self.extract_cache_tokens(response_body)
tokens = response_body.dig(:info, :tokens) || {}
{
cache_read: tokens.dig(:cache, :read) || 0,
cache_write: tokens.dig(:cache, :write) || 0
}
end
def self.extract_error(response_body)
error = response_body.dig(:info, :error)
return nil unless error.is_a?(Hash)
{
name: error[:name],
message: error.dig(:data, :message),
status_code: error.dig(:data, :statusCode),
retryable: error.dig(:data, :isRetryable),
url: error.dig(:data, :metadata, :url)
}.compact
end
MAX_ARTIFACT_SIZE = 10.megabytes
ARTIFACT_TOOLS = %w[write apply_patch].freeze
def self.extract_artifact_files(response_body)
parts = response_body[:parts] || []
completed_tools = parts.select do |p|
p[:type] == "tool" &&
ARTIFACT_TOOLS.include?(p[:tool]) &&
p.dig(:state, :status) == "completed"
end
return [] if completed_tools.empty?
files = completed_tools.flat_map { |part| extract_files_from_tool_part(part) }
files.uniq { |f| f[:filename] }
end
def self.extract_artifacts_from_messages(messages)
return [] unless messages.is_a?(Array)
messages
.select { |m| m.dig(:info, :role) == "assistant" }
.flat_map { |m| extract_artifact_files(m) }
.uniq { |f| f[:filename] }
end
def self.extract_files_from_tool_part(part)
case part[:tool]
when "write"
extract_from_write(part)
when "apply_patch"
extract_from_apply_patch(part)
else
[]
end
end
def self.extract_from_write(part)
content = part.dig(:state, :input, :content)
file_path = part.dig(:state, :input, :filePath)
return [] if content.blank? || file_path.blank?
return [] if content.bytesize > MAX_ARTIFACT_SIZE
filename = File.basename(file_path)
content_type = Marcel::MimeType.for(extension: File.extname(filename))
[ { filename: filename, content: content, content_type: content_type } ]
end
# apply_patch tool metadata shape changed materially between the early
# opencode versions this code originally targeted (which exposed
# `before` + `after` post-write file content as inline strings) and
# v1.4.0+ (which dropped them and only exposes the diff text in `patch`
# plus a `files` array of { filePath, relativePath, type, patch,
# additions, deletions, movePath? } descriptors). Source of truth:
# https://raw.githubusercontent.com/anomalyco/opencode/v1.15.0/packages/opencode/src/tool/apply_patch.ts
#
# With no `after` field in the v1.15.0 wire shape, this method previously
# silently returned [] for every real apply_patch invocation while still
# passing its (now-stale-shape) unit test — the worst kind of bug: a
# green test paired with a dead production path.
#
# Current behavior (intentional, until apply_patch becomes a hot path
# for the gem's users): we accept the v1.15.0 shape and return []. Most
# agents write whole files via the `write` tool rather than patching,
# so the practical impact today is zero. When you do use apply_patch,
# opencode-rails' `Opencode::Exchange#tool_artifacts` emits
# `opencode.apply_patch.artifacts_dropped` so operators see the silent
# drop and can route through the missing sandbox-read path.
#
# The event emission lives on Exchange (not here) because ResponseParser
# is a pure module — every other method takes a hash and returns a hash.
# Pure functions stay pure.
def self.extract_from_apply_patch(_part)
[]
end
private_class_method :extract_files_from_tool_part, :extract_from_write, :extract_from_apply_patch
end
end

43
lib/opencode/todo.rb Normal file
View File

@@ -0,0 +1,43 @@
# frozen_string_literal: true
module Opencode
# One todo item the OpenCode `todowrite` tool and `todo.updated` bus
# event carry: `content` + `status` + (optional) `priority`.
# Source-of-truth canonicalization lives here so Reply, ToolDisplay,
# and any future consumer all share one definition of "what does this
# todo look like once we've normalized it."
#
# Status canonicalization: OpenCode bus events have been observed
# emitting the hyphenated `"in-progress"` form. The rest of the
# codebase (per-product views, todowrite tool input shape per the
# v1.15+ openapi spec) uses the underscored `"in_progress"`.
# Canonicalize to underscore at every entry point so downstream code
# never has to handle both.
module Todo
HYPHENATED_TO_CANONICAL_STATUS = {
"in-progress" => "in_progress"
}.freeze
module_function
def canonical_status(status)
raw = status.to_s
HYPHENATED_TO_CANONICAL_STATUS.fetch(raw) { raw.tr("-", "_") }
end
# Canonicalize one todo hash: string-keyed, normalized status.
# Returns the input unchanged when it isn't a Hash (the substrate
# tolerates wire-shape drift defensively).
def canonicalize(todo)
return todo unless todo.is_a?(Hash)
result = todo.deep_stringify_keys
result["status"] = canonical_status(result["status"]) if result.key?("status")
result
end
def canonicalize_all(todos)
Array(todos).map { |t| canonicalize(t) }
end
end
end

152
lib/opencode/tool_part.rb Normal file
View File

@@ -0,0 +1,152 @@
# frozen_string_literal: true
module Opencode
# Canonical shape of a tool part in an assistant reply.
#
# A tool part starts `pending` and transitions through `running` to a
# terminal `completed` or `error`. The complete representation carries
# seven fields, all string-keyed so views read consistent keys whether
# the part came from a live streaming event or a post-stream message
# poll:
#
# "type" => "tool"
# "tool" => "edit"
# "status" => "completed"
# "title" => "Edited /INDEX.md"
# "input" => { ... } # full args the agent passed, deep-stringified
# "metadata" => { ... } # tool-specific output: diff, preview, stdout, etc.
# "output" => "Edited successfully."
# "error" => "..." # only when status == "error", truncated to 200 chars
#
# The shape is produced two ways:
#
# 1. Opencode::Reply#apply_tool_state — live, mid-stream, merging
# incoming event state into an in-memory record (previous values
# survive when the new event omits a field).
#
# 2. Opencode::ResponseParser.build_tool_summary — post-stream, built
# fresh from a complete OpenCode message returned by
# /session/:id/message during recovery / final-exchange polling.
#
# Existence reason: the two paths used to drift. ResponseParser stripped
# `metadata` and whitelisted `input` to a fixed key list, so `parts_json`
# saved on finalize had strictly less data than the streaming DOM had
# shown. The visible symptom was "I saw the diff while streaming and it
# disappeared when the turn finished". This class is the single source of
# truth that prevents that drift.
module ToolPart
MAX_ERROR_LEN = 200
INVALID_TOOL = "invalid"
module_function
# Build a fresh canonical tool-part hash from one OpenCode message
# part (the shape that arrives through /session/:id/message).
# Used by ResponseParser for recovery and final-exchange polling.
def from_message_part(part)
state = state_of(part)
build_canonical(
tool: part[:tool] || part["tool"],
status: state_value(state, :status),
title: state_value(state, :title),
input: state_value(state, :input),
metadata: state_value(state, :metadata),
output: state_value(state, :output),
error: state_value(state, :error)
)
end
# Merge an incoming `message.part.updated` event state into an
# existing record. Used by Reply#apply_tool_state during streaming.
#
# Fields the event omits (or that arrive empty) leave the record's
# previous value intact. Mid-tool events are partial by design.
#
# In addition to the canonical render fields (status, title, input,
# metadata, output, error), this also persists `callID` and
# `messageID` from the incoming state. Those identifiers are needed
# by downstream lookups (e.g. matching an ask-user reply event back
# to the originating tool part by callID) and would otherwise be
# silently dropped on the way into Reply.parts JSON.
#
# Returns the (mutated) record for chaining.
def merge_streaming_state(record, part)
state = state_of(part)
tool = part[:tool] || part["tool"]
# Preserve original tool name if OpenCode later renames to "invalid"
# mid-session — we want to keep rendering the original name.
record["tool"] = tool if tool.present? && tool != INVALID_TOOL
status = state_value(state, :status)
record["status"] = status if status
title = state_value(state, :title)
record["title"] = title if title.present?
input = state_value(state, :input)
record["input"] = stringify_deep(input) if input.present?
metadata = state_value(state, :metadata)
record["metadata"] = stringify_deep(metadata) if metadata.present?
output = state_value(state, :output)
record["output"] = output if output.present?
error = state_value(state, :error)
record["error"] = error.to_s.truncate(MAX_ERROR_LEN) if error.present?
# callID and messageID moved from state.* to the part's top level
# somewhere in opencode v1.15.x. Read top-level first, fall back
# to state.* for any older versions that may still be in flight.
# Without this, merge_pending_question_into_existing_tool_part
# (which searches @parts by callID) silently no-ops, and the
# question form renders with no questions or routing IDs.
call_id = part[:callID] || part["callID"] || state_value(state, :callID)
record["callID"] = call_id if call_id.present?
message_id = part[:messageID] || part["messageID"] || state_value(state, :messageID)
record["messageID"] = message_id if message_id.present?
record
end
class << self
private
def state_of(part)
part[:state] || part["state"] || {}
end
def state_value(state, key)
return nil unless state.is_a?(Hash)
state[key] || state[key.to_s]
end
def build_canonical(tool:, status:, title:, input:, metadata:, output:, error:)
hash = {
"type" => "tool",
"tool" => tool.to_s.presence,
"status" => status,
"title" => title.presence,
"input" => stringify_deep(input).presence,
"metadata" => stringify_deep(metadata).presence,
"output" => output.presence
}
hash["error"] = error.to_s.truncate(MAX_ERROR_LEN).presence if status == "error"
hash.compact
end
def stringify_deep(value)
case value
when Hash
value.each_with_object({}) { |(k, v), h| h[k.to_s] = stringify_deep(v) }
when Array
value.map { |v| stringify_deep(v) }
else
value
end
end
end
end
end

50
lib/opencode/tracer.rb Normal file
View File

@@ -0,0 +1,50 @@
# frozen_string_literal: true
module Opencode
# A namespacing trace emitter.
#
# Opencode::Turn emits unprefixed event names like "response.started"
# and "session.recreated". The host product wraps Turn in a Tracer
# whose job is to prepend a product prefix and forward to whatever
# actually emits trace events (typically the host job's
# `EventTraceable#trace_event`).
#
# Two responsibilities live here, and only here:
#
# 1. Callable interface: `tracer.call(name, **payload)` — the
# contract Turn relies on.
# 2. Namespacing strategy: prepend "<prefix>." to every event name.
#
# A closure-based alternative that mixes both concerns looks like:
#
# tracer: ->(name, **payload) { trace_event("myapp.#{name}", **payload) }
#
# That closure conflates the two responsibilities; every caller has
# to rediscover the prefix-with-period rule, and a typo only shows up
# in production trace data. Making it a real role removes that risk
# and makes the rule visible in one place.
#
# Usage:
#
# Opencode::Tracer.new(prefix: "myapp", emitter: self)
#
# `emitter` must respond to `trace_event(name, **payload)`.
class Tracer
def initialize(prefix:, emitter:)
@prefix = prefix
@emitter = emitter
end
# Tracer is callable so existing call sites that treated the tracer
# as a lambda (`tracer.call(name, **payload)`) keep working without
# change. Turn uses this exclusively.
#
# Uses `send` because EventTraceable's `trace_event` is a private
# method of the including class — the convention is "private inside
# the job, but the substrate's Tracer is allowed to dispatch to it
# the same way the job's own perform method would."
def call(name, **payload)
@emitter.send(:trace_event, "#{@prefix}.#{name}", **payload)
end
end
end

5
lib/opencode/version.rb Normal file
View File

@@ -0,0 +1,5 @@
# frozen_string_literal: true
module Opencode
VERSION = "0.0.1.alpha2"
end