From e3e7b69c7f31ceae3311a7478fafcc571f1b7c35 Mon Sep 17 00:00:00 2001 From: Ajay Krishnan Date: Tue, 19 May 2026 20:06:40 -0700 Subject: [PATCH] =?UTF-8?q?Initial=20opencode-ruby=20v0.0.1.alpha1=20?= =?UTF-8?q?=E2=80=94=20hand-rolled=20HTTP+SSE=20client=20for=20OpenCode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Headline API: reply = client.stream(session_id, "Explain monads") do |part| print part["content"] if part["type"] == "text" end reply.full_text # final accumulated text Sources ported from ajaynomics/ajent-rails lib/opencode/client/ after the Phase-1+2 tier carve + Phase-2.5 boundary cleanup (see ajent-rails PRs #840 and #843). Rails-runtime coupling stripped: - Defaults read from ENV[OPENCODE_BASE_URL/SERVER_PASSWORD/TIMEOUT] instead of Rails.application.config.x.opencode_blackline.* - EventTraceable.timed_event(...) calls swapped for Opencode::Instrumentation.instrument(...) — pluggable adapter (default no-op) that callers wire to ActiveSupport::Notifications, OpenTelemetry, stdout, etc. Runtime dependency: activesupport (>= 6.1, < 9.0) for the small core_ext surface (blank?/present?/presence/truncate/duplicable?/ megabytes). ActiveSupport is NOT Rails — it's a standalone helpers gem that most Ruby apps already have transitively. What's in the gem: Opencode::Client HTTP + SSE client; #stream block-form API Opencode::Reply SSE-event accumulator with observer protocol Opencode::Reply::Result typed Struct value object Opencode::ReplyObserver observer protocol module (no-op defaults) Opencode::Prompts per-Reply pending question/permission registry Opencode::Tracer callable that prefixes event names Opencode::Instrumentation pluggable adapter Opencode::ResponseParser wire-format extractors Opencode::ToolPart canonical tool-part hash shape Opencode::PartSource wire-vs-stream-only discriminator Opencode::Todo todo status canonicalization Opencode::Error (+ 7 subclasses) What's out (per design D18 — wait for demand signal): - acts_as_opencode_session concern - ActiveRecord-backed session lifecycle - rails generators - opencode-rails as a separate gem Instead, examples/conversation_recipe.rb ships as a ~140-line plain-ActiveRecord blueprint demonstrating session lifecycle, with_lock, update_columns mid-stream pattern, and CAS-safe finalize. Tests: 12 runs, 25 assertions, 0 failures (smoke test against WebMock-stubbed OpenCode endpoints — covers the postcard, error model, Instrumentation, and Reply::Result shape). Authored against ajent-rails commit 02954eeb (opencode-gem/phase-3-prep). --- .gitignore | 8 + CHANGELOG.md | 33 ++ Gemfile | 3 + README.md | 161 ++++++++- Rakefile | 12 + examples/conversation_recipe.rb | 154 +++++++++ lib/opencode-ruby.rb | 26 ++ lib/opencode/client.rb | 567 ++++++++++++++++++++++++++++++++ lib/opencode/error.rb | 28 ++ lib/opencode/instrumentation.rb | 44 +++ lib/opencode/part_source.rb | 62 ++++ lib/opencode/prompts.rb | 87 +++++ lib/opencode/reply.rb | 549 +++++++++++++++++++++++++++++++ lib/opencode/reply_observer.rb | 101 ++++++ lib/opencode/response_parser.rb | 170 ++++++++++ lib/opencode/todo.rb | 43 +++ lib/opencode/tool_part.rb | 152 +++++++++ lib/opencode/tracer.rb | 51 +++ lib/opencode/version.rb | 5 + opencode-ruby.gemspec | 44 +++ test/opencode/smoke_test.rb | 169 ++++++++++ test/test_helper.rb | 10 + 22 files changed, 2478 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 Gemfile create mode 100644 Rakefile create mode 100644 examples/conversation_recipe.rb create mode 100644 lib/opencode-ruby.rb create mode 100644 lib/opencode/client.rb create mode 100644 lib/opencode/error.rb create mode 100644 lib/opencode/instrumentation.rb create mode 100644 lib/opencode/part_source.rb create mode 100644 lib/opencode/prompts.rb create mode 100644 lib/opencode/reply.rb create mode 100644 lib/opencode/reply_observer.rb create mode 100644 lib/opencode/response_parser.rb create mode 100644 lib/opencode/todo.rb create mode 100644 lib/opencode/tool_part.rb create mode 100644 lib/opencode/tracer.rb create mode 100644 lib/opencode/version.rb create mode 100644 opencode-ruby.gemspec create mode 100644 test/opencode/smoke_test.rb create mode 100644 test/test_helper.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..290f8a8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +*.gem +.bundle/ +Gemfile.lock +pkg/ +tmp/ +.ruby-version +.byebug_history +coverage/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..888882f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,33 @@ +# Changelog + +## 0.0.1.alpha1 — Unreleased + +First public alpha. HTTP + SSE client for OpenCode REST API. + +### What's in + +- `Opencode::Client` — Net::HTTP-based HTTP client with SSE streaming + automatic reconnection. + - `#create_session(title:, permissions:)`, `#get_messages(session_id)`, `#list_sessions`, `#delete_session(id)`, `#abort_session(id)`. + - `#send_message(session_id, text, model:, ...)` — synchronous send-and-poll. + - `#send_message_async(session_id, text, ...)` — async send. + - `#stream(session_id, text, ...) { |part| ... } → Opencode::Reply::Result` — **the headline.** Block-form streaming with internal Reply accumulation and final-exchange merge. + - `#stream_events(session_id:, ...) { |event| ... }` — lower-level SSE event firehose for power users. + - `#reply_question(request_id:, answers:)` / `#reply_permission(request_id:, reply:)` — answer interactive prompts. +- `Opencode::Reply` — live state machine accumulating SSE events into the assistant's reply. Documented observer protocol (`Opencode::ReplyObserver`). +- `Opencode::Reply::Result` — typed Struct value object returned by `Client#stream` and `Reply#result`. Fields: `:parts_json`, `:full_text`, `:reasoning_text`, `:tool_parts`. +- `Opencode::Instrumentation` — pluggable adapter (default no-op). Plug in `ActiveSupport::Notifications`, OpenTelemetry, stdout, etc. +- `Opencode::ResponseParser`, `Opencode::ToolPart`, `Opencode::PartSource`, `Opencode::Todo` — wire-format helpers used by `Reply` and reusable by callers building their own SSE handling. +- `Opencode::Prompts` — per-Reply registry of pending question/permission prompts (used by `Reply` internally; exposed for callers that need to peek). +- `Opencode::Tracer` — callable that prefixes event names before forwarding to a host emitter. +- Error hierarchy: `Opencode::Error` and seven subclasses (`ConnectionError`, `TimeoutError`, `SessionNotFoundError`, `StaleSessionError`, `IdleStreamError`, `ServerError`, `BadRequestError`). + +### What's out + +- ActiveRecord-backed session lifecycle, `acts_as_opencode_session`, generators — deferred to `opencode-rails` if external demand materializes. See `examples/conversation_recipe.rb` for the canonical Rails wiring pattern. +- Multi-tenant per-user Docker container orchestration — application glue, not a gem's concern. + +### Compatibility + +- Ruby ≥ 3.2 +- OpenCode server ≥ 1.15 (tested against the message bus schema in `packages/opencode/src/session/message-v2.ts`) +- Runtime dependency: `activesupport (>= 6.1)` for `blank?`/`present?`/`presence`/`truncate`/`duplicable?`/`megabytes`. ActiveSupport is *not* Rails — it's a standalone helpers gem. diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..b4e2a20 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source "https://rubygems.org" + +gemspec diff --git a/README.md b/README.md index 0670139..83bf856 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,162 @@ # opencode-ruby -Idiomatic Ruby client for OpenCode (HTTP + SSE). Hand-rolled alternative to opencode_client. \ No newline at end of file +Idiomatic Ruby client for [OpenCode](https://opencode.ai). Block-form streaming, value-object responses, automatic SSE reconnection. + +```ruby +require "opencode-ruby" + +client = Opencode::Client.new(base_url: "http://localhost:4096") +session = client.create_session(title: "My session") + +reply = client.stream(session[:id], "Explain monads in two sentences.") do |part| + print part["content"] if part["type"] == "text" +end + +puts +puts reply.full_text +puts "(#{reply.tool_parts.size} tool calls, #{reply.parts_json.size} parts total)" +``` + +Three lines of setup, four lines of work. Block fires every time a part appears, grows, finalizes, or (for tool calls) advances state. The final return value is a typed `Opencode::Reply::Result` you can persist or inspect. + +## Install + +```ruby +# Gemfile +gem "opencode-ruby" +``` + +Or: + +```sh +gem install opencode-ruby +``` + +Then `require "opencode-ruby"`. + +## Configuration + +```ruby +client = Opencode::Client.new( + base_url: "http://localhost:4096", # or ENV["OPENCODE_BASE_URL"] + password: "secret", # or ENV["OPENCODE_SERVER_PASSWORD"] + timeout: 120 # or ENV["OPENCODE_TIMEOUT"], seconds +) +``` + +Multi-tenant apps construct multiple clients with different `base_url`s — each `Opencode::Client` holds its own Net::HTTP connection, no shared state. + +## Core API + +### Streaming (the headline) + +```ruby +reply = client.stream(session_id, "What's 2 + 2?") do |part| + case part["type"] + when "text" then print part["content"] + when "reasoning" then # ignore, or render in a separate UI + when "tool" then puts " [tool: #{part['tool']} → #{part['status']}]" + end +end + +reply.full_text # => "2 + 2 = 4." +reply.tool_parts # => array of terminal tool-call parts +reply.reasoning_text # => the model's hidden reasoning, if any +reply.parts_json # => the full ordered parts array, ready for persistence +``` + +### Synchronous send (no streaming) + +```ruby +result = client.send_message(session_id, "Quick yes/no: is Ruby fun?") +# result is the OpenCode response hash; see API docs for fields. +``` + +### Lower-level event firehose + +If you need raw SSE events (every server tick, todo update, prompt asked/replied), use `stream_events` directly: + +```ruby +client.stream_events(session_id: session_id) do |event| + puts event[:type] # "message.part.delta", "todo.updated", "session.idle", ... +end +``` + +### Interactive prompts + +When the agent uses the `question` or `permission` tools, opencode emits `question.asked` / `permission.asked` events. Answer them via: + +```ruby +client.reply_question(request_id: "que_...", answers: [["yes"]]) +client.reply_permission(request_id: "per_...", reply: "always") +``` + +## Error model + +Every method that hits the network raises `Opencode::Error` (or a subclass) on failure. Catch the parent or the specific subclass: + +```ruby +begin + client.health +rescue Opencode::ConnectionError # server unreachable +rescue Opencode::TimeoutError # client-side timeout +rescue Opencode::SessionNotFoundError # 404 on a session +rescue Opencode::StaleSessionError # session.idle never arrived +rescue Opencode::IdleStreamError # mid-turn SSE wedge +rescue Opencode::ServerError # 5xx +rescue Opencode::BadRequestError # 4xx other than 404 +rescue Opencode::Error # catch-all +end +``` + +## Instrumentation + +Want to see what the gem is doing? Plug in an adapter. Default behaviour is silent no-op — the gem ships zero opinion about your observability stack. + +```ruby +# stdout for debugging: +Opencode::Instrumentation.adapter = ->(name, payload, &block) { + puts "[#{name}] #{payload.inspect}" + block.call +} + +# ActiveSupport::Notifications in a Rails app: +Opencode::Instrumentation.adapter = ->(name, payload, &block) { + ActiveSupport::Notifications.instrument(name, payload, &block) +} +``` + +Event names emitted today: + +| Event | Payload | +|---|---| +| `opencode.request` | `:method`, `:path` | + +## Want this in a Rails app? + +See [`examples/conversation_recipe.rb`](examples/conversation_recipe.rb) for a ~60-line plain-ActiveRecord blueprint covering session lifecycle (`with_lock`, `update_columns` mid-stream snapshots, CAS-safe finalize). Drop it into your app and adapt. + +If enough Rails developers do that and want it as a one-liner, we'll ship `opencode-rails` with `acts_as_opencode_session`. **File an issue if that's you** — your issue is the signal. + +## Position against `opencode_client` + +Want every OpenCode endpoint auto-generated from the OpenAPI spec? Use [`opencode_client`](https://rubygems.org/gems/opencode_client). This gem is the hand-rolled idiomatic alternative — smaller surface, opinionated defaults, block-form streaming. Pick whichever fits how you want to write Ruby. + +## Compatibility + +- Ruby ≥ 3.2 +- OpenCode server ≥ 1.15 +- Runtime dependency: `activesupport (>= 6.1)` — *not* Rails. ActiveSupport is a standalone helpers gem (`blank?`, `present?`, `presence`, `truncate`, etc.). + +## Development + +```sh +bundle install +bundle exec rake test +``` + +12-test smoke covers Client end-to-end against WebMock-stubbed OpenCode endpoints. + +## License + +MIT. See [LICENSE](LICENSE). diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..8ca97e1 --- /dev/null +++ b/Rakefile @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +require "bundler/gem_tasks" +require "rake/testtask" + +Rake::TestTask.new(:test) do |t| + t.libs << "test" + t.libs << "lib" + t.test_files = FileList["test/**/*_test.rb"] +end + +task default: :test diff --git a/examples/conversation_recipe.rb b/examples/conversation_recipe.rb new file mode 100644 index 0000000..cd3106c --- /dev/null +++ b/examples/conversation_recipe.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +# Rails integration recipe — copy + adapt. +# +# This is NOT part of opencode-ruby. It's the canonical pattern showing +# how to wire the gem's primitives into a Rails ActiveRecord app. Drop +# this file into your `app/models/` (rename it), adapt the schema, and +# you have a working block-streaming chat with row-locked session +# lifecycle and CAS-safe finalize. +# +# What this recipe demonstrates: +# +# 1. Schema (below in a comment) — the migration you'll need +# 2. Session lifecycle — idempotent ensure! with with_lock +# 3. Mid-stream parts persistence via update_columns (bypasses +# AR callbacks so Turbo broadcasts don't fire per-part) +# 4. CAS-safe finalize — concurrent cancel wins +# 5. Recovery from SessionNotFoundError — recreate once + retry +# +# If you want this as a one-liner (`acts_as_opencode_session`), file an +# issue at https://gitea.krishnan.ca/ajaynomics/opencode-ruby/issues. +# That issue is the demand signal for opencode-rails. The gem ships +# this recipe instead of the concern because we don't yet know what +# shape Rails developers actually want — and shipping a half-built +# concern is worse than shipping a clear blueprint. +# +# Suggested schema (adapt naming to your domain): +# +# create_table :conversations do |t| +# t.references :user, null: false, foreign_key: true +# t.string :title +# t.string :opencode_session_id +# t.timestamps +# t.index :opencode_session_id, unique: true, +# where: "opencode_session_id IS NOT NULL" # partial unique +# end +# +# create_table :messages do |t| +# t.references :conversation, null: false, foreign_key: true +# t.string :role, null: false # "user" or "assistant" +# t.integer :status, null: false, default: 0 # see enum below +# t.text :content, null: false, default: "" +# t.json :parts_json, null: false, default: [] +# t.json :tool_calls_json, null: false, default: [] +# t.decimal :cost, precision: 10, scale: 6 +# t.integer :input_tokens +# t.integer :output_tokens +# t.timestamps +# end + +class Conversation < ApplicationRecord + belongs_to :user + has_many :messages, dependent: :destroy + + # Returns the OpenCode session id for this conversation, creating one + # if needed. Idempotent. Race-safe via row-lock + double-check. + def ensure_opencode_session!(client) + return opencode_session_id if opencode_session_id.present? + + with_lock do + return opencode_session_id if opencode_session_id.present? + session = client.create_session(title: title) + update!(opencode_session_id: session[:id] || session["id"]) + end + opencode_session_id + rescue ActiveRecord::RecordNotUnique + # Another worker raced past the partial unique index. Loser reloads. + reload + opencode_session_id + end + + # Replace a stale upstream session. Used by SessionNotFoundError + # recovery in the streaming job below. + def recreate_opencode_session!(client) + pre_id = opencode_session_id + with_lock do + return opencode_session_id if opencode_session_id.present? && opencode_session_id != pre_id + session = client.create_session(title: title) + update!(opencode_session_id: session[:id] || session["id"]) + end + opencode_session_id + end +end + +class Message < ApplicationRecord + belongs_to :conversation + + enum status: { pending: 0, streaming: 1, completed: 2, cancelled: 3, errored: 4 } +end + +# The streaming job. Compose Opencode::Client + ActiveRecord; that's it. +class GenerateAssistantReplyJob < ApplicationJob + def perform(message_id, user_prompt) + message = Message.find(message_id) + return unless message.pending? + + client = Opencode::Client.new( + base_url: ENV.fetch("OPENCODE_BASE_URL"), + password: ENV["OPENCODE_SERVER_PASSWORD"] + ) + + session_id = message.conversation.ensure_opencode_session!(client) + message.update!(status: :streaming) + + attempted_recreate = false + begin + reply = client.stream(session_id, user_prompt) do |part| + # Mid-stream snapshot: update_columns bypasses AR callbacks so + # an after_update_commit broadcasts_refreshes_to(conversation) + # doesn't fire per-part and clobber per-part Turbo broadcasts + # you might be doing separately. The final write below uses + # update! to fire callbacks deliberately. + message.update_columns( + parts_json: reply_parts_so_far(part, message), + updated_at: Time.current + ) + end + + # CAS-safe finalize: only land the final state if no concurrent + # cancel got there first. + message.with_lock do + return unless message.reload.pending? || message.streaming? + message.update!( + status: :completed, + content: reply.full_text, + parts_json: reply.parts_json, + tool_calls_json: reply.tool_parts + ) + end + rescue Opencode::SessionNotFoundError, Opencode::StaleSessionError + raise if attempted_recreate + message.conversation.recreate_opencode_session!(client) + attempted_recreate = true + retry + end + rescue StandardError => e + message&.update!(status: :errored, content: "An error occurred: #{e.message.truncate(200)}") + end + + private + + # Builds the parts array up to (and including) the current part by + # poking the gem's internal Reply state. In practice you'd capture + # the Reply instance from the block via a closure, OR derive from + # `part` if you only need the latest part. + def reply_parts_so_far(part, message) + parts = (message.parts_json || []).dup + # Trivial dedup: replace or append by part id, if your wire-format + # includes one. For real merge logic, lift Opencode::Reply's + # part_index_by_id / append_part pattern. + parts << part unless parts.any? { |existing| existing["id"] == part["id"] } + parts + end +end diff --git a/lib/opencode-ruby.rb b/lib/opencode-ruby.rb new file mode 100644 index 0000000..c4cc0d4 --- /dev/null +++ b/lib/opencode-ruby.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +# Minimal ActiveSupport surface — `present?`, `blank?`, `presence`, +# `truncate`, `duplicable?`. We deliberately load only the core_ext bits +# we use, not all of activesupport, to keep the boot footprint small in +# non-Rails apps. +require "active_support/core_ext/object/blank" # provides blank?, present?, presence +require "active_support/core_ext/object/duplicable" +require "active_support/core_ext/string/filters" # provides String#truncate +require "active_support/core_ext/numeric/bytes" # provides Integer#megabytes + +require_relative "opencode/version" +require_relative "opencode/error" +require_relative "opencode/instrumentation" +require_relative "opencode/response_parser" +require_relative "opencode/part_source" +require_relative "opencode/tool_part" +require_relative "opencode/todo" +require_relative "opencode/prompts" +require_relative "opencode/reply_observer" +require_relative "opencode/reply" +require_relative "opencode/tracer" +require_relative "opencode/client" + +module Opencode +end diff --git a/lib/opencode/client.rb b/lib/opencode/client.rb new file mode 100644 index 0000000..43518b1 --- /dev/null +++ b/lib/opencode/client.rb @@ -0,0 +1,567 @@ +# frozen_string_literal: true + +require "net/http" +require "json" +require "base64" + +module Opencode + # HTTP client for OpenCode REST API. + # Thread safety: Each instance creates its own Net::HTTP connection. + # Do NOT share instances across threads. Create per-job. + class Client + attr_reader :directory + + def initialize( + base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096", + password: ENV["OPENCODE_SERVER_PASSWORD"], + timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i, + directory: nil, + workspace: nil + ) + @uri = URI.parse(base_url) + @password = password + @timeout = timeout || 120 + @directory = directory + @workspace = workspace + end + + def create_session(title: nil, permissions: nil) + body = { title: title, permission: permissions }.compact + post("/session", body) + end + + def send_message( + session_id, text, + parts: nil, + model: nil, + agent: nil, + system: nil, + message_id: nil, + no_reply: nil, + tools: nil, + format: nil, + variant: nil + ) + body = prompt_payload( + text, + parts: parts, + model: model, + agent: agent, + system: system, + message_id: message_id, + no_reply: no_reply, + tools: tools, + format: format, + variant: variant + ) + post("/session/#{session_id}/message", body) + end + + def send_message_async( + session_id, text, + parts: nil, + model: nil, + agent: nil, + system: nil, + message_id: nil, + no_reply: nil, + tools: nil, + format: nil, + variant: nil + ) + body = prompt_payload( + text, + parts: parts, + model: model, + agent: agent, + system: system, + message_id: message_id, + no_reply: no_reply, + tools: tools, + format: format, + variant: variant + ) + post("/session/#{session_id}/prompt_async", body) + end + + # Block-form streaming — the headline API for callers who want the + # full async-prompt + SSE-loop + final-exchange-merge flow in one + # call. Returns the final Opencode::Reply::Result value object once + # the agent finishes. + # + # reply = client.stream(session_id, "Explain monads") do |part| + # print part["content"] if part["type"] == "text" + # end + # reply.full_text # => the final accumulated text + # reply.tool_parts # => array of terminal tool parts + # + # The block is invoked every time a part is added, grows, finalizes, + # or (for tool parts) advances state — i.e., whenever a user-visible + # change happens. The block receives the current `part` hash (string + # keys: "type", "content", "tool", "status", "input", ...). + # + # If you need raw events (every server.* tick, todo.updated, prompt + # asked/replied, etc.), use #stream_events instead. + # + # Optional kwargs are forwarded to send_message_async — model, agent, + # system prompt override, and the SSE pacing knobs supported by + # stream_events. + def stream( + session_id, text, + model: nil, agent: nil, system: nil, message_id: nil, + stream_timeout: 600, + first_event_timeout: 120, + idle_stream_timeout: nil, + on_activity_tick: nil, + &block + ) + send_message_async( + session_id, text, + model: model, agent: agent, system: system, message_id: message_id + ) + + reply = Opencode::Reply.new + reply.add_observer(StreamBlockObserver.new(&block)) if block_given? + + stream_events( + session_id: session_id, + timeout: stream_timeout, + first_event_timeout: first_event_timeout, + idle_stream_timeout: idle_stream_timeout, + reply: reply, + on_activity_tick: on_activity_tick + ) do |event| + reply.apply(event) + end + + merge_final_exchange(session_id, reply) + reply.result + end + + def list_sessions + uri = build_uri("/session") + request = Net::HTTP::Get.new(uri) + execute(request) + end + + def children(session_id) + uri = build_uri("/session/#{session_id}/children") + request = Net::HTTP::Get.new(uri) + execute(request) + end + + def delete_session(session_id) + uri = build_uri("/session/#{session_id}") + request = Net::HTTP::Delete.new(uri) + execute(request) + end + + def session_status + uri = build_uri("/session/status") + request = Net::HTTP::Get.new(uri) + execute(request) + end + + def get_messages(session_id) + uri = build_uri("/session/#{session_id}/message") + request = Net::HTTP::Get.new(uri) + execute(request) + end + + def abort_session(session_id) + post("/session/#{session_id}/abort", {}) + end + + def reply_question(request_id:, answers:) + post("/question/#{request_id}/reply", { answers: answers }) + end + + def reject_question(request_id:) + post("/question/#{request_id}/reject", {}) + end + + def reply_permission(request_id:, reply:, message: nil) + body = { reply: reply } + body[:message] = message if message.present? + post("/permission/#{request_id}/reply", body) + end + + # Returns pending question requests as an Array of Hashes with + # SYMBOL keys, consistent with every other endpoint that flows + # through handle_response (e.g., health, list_sessions, get_messages). + # Callers that compare against persisted JSON column data should + # symbolize their side, not desymbolize this side. + def list_questions + uri = build_uri("/question") + request = Net::HTTP::Get.new(uri) + add_auth_header(request) + + response = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do + http_client.request(request) + end + + unless response.code.to_i.between?(200, 299) + raise ServerError, "list_questions failed: HTTP #{response.code} — #{response.body.to_s[0, 200]}" + end + + return [] if response.body.blank? + JSON.parse(response.body, symbolize_names: true) + rescue JSON::ParserError => e + raise ServerError, "list_questions returned invalid JSON: #{e.message}" + rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e + raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}" + rescue Errno::ECONNREFUSED, SocketError => e + raise ConnectionError, "OpenCode unreachable: #{e.message}" + end + + def health + uri = build_uri("/global/health", scoped: false) + request = Net::HTTP::Get.new(uri) + execute(request) + end + + MAX_SSE_BUFFER = 1_048_576 # 1 MB — safety valve against pathological server responses + SSE_RECONNECT_DELAY = 0.1 + TRANSIENT_SSE_ERRORS = [ + EOFError, + IOError, + Net::OpenTimeout, + Net::ReadTimeout, + Errno::ECONNREFUSED, + Errno::ECONNRESET, + Errno::EPIPE + ].freeze + + # Opens SSE connection to GET /event, yields parsed events filtered by session_id. + # Blocks until session goes idle or timeout, reconnecting across dropped + # event-stream connections. + # + # first_event_timeout: seconds to wait for a session-specific event before + # declaring the session stale. Server heartbeats don't count — they're global + # keep-alives that flow regardless of session state. + # + # Default 120s rather than the more aggressive 30s we shipped originally: + # slow-thinking models (moonshot/kimi-k2.5 for Raven raven-legal, gpt-5* + # reasoning models) routinely spend 30–90s of pure reasoning before + # emitting their first message.part.* event, especially on cold sessions + # with long system prompts that prescribe "read these playbooks first" + # (see config/opencode/prompts/raven-legal.md). 30s ended up false-positive + # tripping on legitimate first turns and converting them to + # `StaleSessionError -> "Sorry, something went wrong"` while the agent was + # still happily planning. 120s catches genuine zombies (the only thing + # this gate exists for) without nuking real reasoning. Callers that want + # tighter timing for short-prompt agents can override. + # + # idle_stream_timeout: seconds to wait BETWEEN meaningful events once the + # session has started producing them. Default nil = no check (preserves + # existing behavior). Opt-in heartbeat watchdog for callers like + # AIGL::AgentForwardJob whose user-facing surface (a chat loading page) + # needs to fail fast rather than sit forever when an upstream LLM stream + # wedges mid-turn. Distinct from first_event_timeout (which only protects + # cold-start) and from the overall `timeout` ceiling of 600s (which is too + # forgiving — a Hung OpenAI stream holding a thread for 10 minutes is + # already a bad UX). When the window is exceeded the call raises + # Opencode::IdleStreamError, which the caller is expected to catch and + # translate into a user-visible error / retry affordance. + def stream_events(session_id:, timeout: 600, first_event_timeout: 120, + idle_stream_timeout: nil, + reply: nil, on_activity_tick: nil, &block) + uri = build_uri("/event") + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout + first_event_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + first_event_timeout + received_session_event = false + last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + loop do + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + deadline = check_deadline_or_suspend(now, deadline, timeout, reply) + + # NOTE: first_event_deadline is *not* suspension-eligible. If the agent + # never gets started we want to fail fast — a session that's blocked on + # a prompt has, by definition, already produced events. + if !received_session_event && now > first_event_deadline + raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s" + end + + if idle_stream_timeout && received_session_event && + (now - last_meaningful_event_at) > idle_stream_timeout + raise IdleStreamError, + "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \ + "(SSE heartbeats still arriving — upstream likely wedged mid-turn)" + end + + request = Net::HTTP::Get.new(uri) + request["Accept"] = "text/event-stream" + request["Cache-Control"] = "no-cache" + add_auth_header(request) + + http = Net::HTTP.new(@uri.host, @uri.port) + http.use_ssl = @uri.scheme == "https" + http.open_timeout = 10 + http.read_timeout = 30 + + begin + buffer = String.new + + http.request(request) do |response| + unless response.is_a?(Net::HTTPSuccess) + raise ServerError, "SSE connection failed: HTTP #{response.code}" + end + + response.read_body do |chunk| + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + deadline = check_deadline_or_suspend(now, deadline, timeout, reply) + + if !received_session_event && now > first_event_deadline + raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s" + end + + if idle_stream_timeout && received_session_event && + (now - last_meaningful_event_at) > idle_stream_timeout + raise IdleStreamError, + "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \ + "(SSE heartbeats still arriving — upstream likely wedged mid-turn)" + end + + buffer << chunk + if buffer.bytesize > MAX_SSE_BUFFER + raise ServerError, "SSE buffer exceeded #{MAX_SSE_BUFFER} bytes" + end + + while (idx = buffer.index("\n\n")) + raw_event = buffer.slice!(0, idx + 2) + event = parse_sse_event(raw_event, session_id) + next unless event + + unless event[:type]&.start_with?("server.") + received_session_event = true + last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + # Tick activity on EVERY event, including server.heartbeat — + # that's the whole point: a healthy long wait (user thinking + # for 30 minutes) keeps the container warm via heartbeats so + # the reaper doesn't kill it mid-wait. + on_activity_tick&.call(event) + block.call(event) + return if event[:type] == "session.idle" + end + end + end + rescue *TRANSIENT_SSE_ERRORS + # Treat transport-level SSE disconnects like clean EOF: reconnect + # until session.idle, the overall timeout, or first-event timeout. + ensure + begin + http&.finish if http&.started? + rescue IOError + # Connection already closed — network partition or server shutdown + end + end + + cutoff = received_session_event ? deadline : first_event_deadline + sleep_for = [ SSE_RECONNECT_DELAY, cutoff - Process.clock_gettime(Process::CLOCK_MONOTONIC) ].min + if sleep_for.positive? + sleep sleep_for + end + end + end + + def close + @http&.finish if @http&.started? + rescue IOError + # already closed + end + + private + + # Best-effort merge of the polled message exchange into the live + # reply. Catches the stream-only / poll-only asymmetry — todo.updated + # is poll-only on some opencode versions; pure-streaming would miss + # the terminal todo state otherwise. If the session API is also down + # at this point (network partition, container teardown mid-call), we + # silently keep whatever the stream accumulated rather than raising; + # the caller's reply is still a usable Result either way. + def merge_final_exchange(session_id, reply) + exchange = get_messages(session_id) + last_assistant = Array(exchange).reverse_each.find do |message| + message.dig(:info, :role) == "assistant" + end + return unless last_assistant + + polled = Opencode::ResponseParser.extract_interleaved_parts(last_assistant) + reply.sync_recovered_parts(polled) if polled.any? + rescue Opencode::Error + # Stream's result is still complete; the merge was a polish, not a + # requirement. + end + + # Healthy wait: opencode is suspended on a question/permission deferred + # and heartbeats are keeping the connection alive. Reset the deadline + # to "from now" so the full stuck-stream protection is restored once + # the prompt resolves. Otherwise apply the normal deadline check. + def check_deadline_or_suspend(now, deadline, timeout, reply) + return now + timeout if reply&.prompt_blocked? + raise TimeoutError, "SSE stream timed out after #{timeout}s" if now > deadline + + deadline + end + + def prompt_payload(text, parts:, model:, agent:, system:, message_id:, no_reply:, tools:, format:, variant:) + message_parts = parts || [ { type: "text", text: text } ] + { + messageID: message_id, + parts: message_parts, + model: format_model(model), + agent: agent, + noReply: no_reply, + tools: tools, + format: format, + system: system, + variant: variant + }.compact + end + + def format_model(model) + return nil unless model + return model if model.is_a?(Hash) + + provider, model_id = model.split("/", 2) + { providerID: provider, modelID: model_id } + end + + def post(path, body) + uri = build_uri(path) + request = Net::HTTP::Post.new(uri) + request.body = body.to_json + execute(request) + end + + def build_uri(path, scoped: true) + uri = @uri.dup + uri.path = path + + if scoped + query = URI.decode_www_form(uri.query.to_s) + query << [ "directory", @directory ] if @directory.present? + query << [ "workspace", @workspace ] if @workspace.present? + uri.query = query.any? ? URI.encode_www_form(query) : nil + end + + uri + end + + def add_auth_header(request) + request["Content-Type"] = "application/json" + if @password.present? + request["Authorization"] = "Basic #{Base64.strict_encode64("opencode:#{@password}")}" + end + end + + def execute(request) + add_auth_header(request) + + response = nil + result = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do + response = http_client.request(request) + handle_response(response) + end + + result + rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e + raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}" + rescue Errno::ECONNREFUSED, SocketError => e + raise ConnectionError, "OpenCode unreachable: #{e.message}" + end + + def http_client + @http ||= Net::HTTP.new(@uri.host, @uri.port).tap do |http| + http.use_ssl = @uri.scheme == "https" + http.open_timeout = 10 + http.read_timeout = @timeout + http.write_timeout = 30 + end + end + + def parse_sse_event(raw, session_id) + data_line = raw.lines.find { |l| l.start_with?("data: ") } + return nil unless data_line + + json = JSON.parse(data_line.sub("data: ", "").strip, symbolize_names: true) + + event_session = json.dig(:properties, :sessionID) || + json.dig(:properties, :info, :sessionID) || + json.dig(:properties, :part, :sessionID) + + return json if json[:type] == "server.heartbeat" + return json if json[:type] == "server.connected" + return nil unless event_session == session_id + + json + rescue JSON::ParserError + nil + end + + def handle_response(response) + return {} if response.code.to_i == 204 + + body = if response.body.present? + JSON.parse(response.body, symbolize_names: true) + else + {} + end + + case response.code.to_i + when 200..299 then body + when 400 then raise BadRequestError.new(error_message(body, "Bad request"), response: body) + when 404 then raise SessionNotFoundError.new(error_message(body, "Session not found"), response: body) + when 500..599 then raise ServerError.new(error_message(body, "Server error"), response: body) + else raise Error.new("Unexpected response: #{response.code}", response: body) + end + rescue JSON::ParserError + raise ServerError.new("Invalid JSON from OpenCode (HTTP #{response.code}): #{response.body&.truncate(200)}") + end + + # OpenCode HTTP error bodies use a wrapped shape: { name:, data: { message:, kind?: } }. + # v1.14.51 stopped exposing internal defect details from the HTTP API, so + # `body[:message]` is no longer populated for errors — only `body[:data][:message]`. + # We read both to keep older mock servers working in tests. + def error_message(body, fallback) + body.dig(:data, :message) || body[:message] || fallback + end + end + + # Internal Reply observer that bridges Reply's multi-callback protocol + # to a single user-supplied block for Client#stream. Each part-level + # callback (part_added, part_changed, part_finalized, tool_progressed) + # forwards the current part to the user's block. + # + # Non-part-level callbacks (step_finished, session_*, message_updated, + # todos_changed, question_*, permission_*) are intentionally NOT + # forwarded — they're either telemetry the gem owns internally, or + # interactive-protocol concerns that callers route through + # #stream_events directly when they need them. + class StreamBlockObserver + include Opencode::ReplyObserver + + def initialize(&block) + @block = block + end + + def part_added(part:, **) + @block.call(part) + end + + def part_changed(part:, **) + @block.call(part) + end + + def part_finalized(part:, **) + @block.call(part) + end + + def tool_progressed(part:, **) + @block.call(part) + end + end +end diff --git a/lib/opencode/error.rb b/lib/opencode/error.rb new file mode 100644 index 0000000..778bdf5 --- /dev/null +++ b/lib/opencode/error.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Opencode + class Error < StandardError + attr_reader :response + + def initialize(message = nil, response: nil) + @response = response + super(message) + end + end + + class ConnectionError < Error; end + class TimeoutError < Error; end + class SessionNotFoundError < Error; end + class StaleSessionError < Error; end + # Raised by stream_events when meaningful (non-`server.*`) events stop + # arriving for longer than the caller's `idle_stream_timeout` window, + # even though the SSE socket itself is still alive (heartbeats are + # still flowing). Distinct from StaleSessionError, which fires when + # the session never produced any events in the first place. This one + # fires when the session WAS producing events and then went silent — + # the classic "OpenAI stream wedged mid-turn while the SSE keep- + # alive ticks on" failure mode. + class IdleStreamError < Error; end + class ServerError < Error; end + class BadRequestError < Error; end +end diff --git a/lib/opencode/instrumentation.rb b/lib/opencode/instrumentation.rb new file mode 100644 index 0000000..7b36db5 --- /dev/null +++ b/lib/opencode/instrumentation.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module Opencode + # Pluggable instrumentation adapter. opencode-ruby ships zero + # dependencies on Rails or any specific instrumentation library. Users + # plug in their own emitter: + # + # # ActiveSupport::Notifications (Rails apps): + # Opencode::Instrumentation.adapter = ->(name, payload, &block) { + # ActiveSupport::Notifications.instrument(name, payload, &block) + # } + # + # # stdout (debugging, non-Rails scripts): + # Opencode::Instrumentation.adapter = ->(name, payload, &block) { + # puts "[#{name}] #{payload.inspect}" + # block.call + # } + # + # When no adapter is set (default), instrumentation is a no-op pass- + # through that yields the block and returns its value. The Client emits + # events for HTTP requests, SSE stream lifecycle, and recovery paths. + # + # Event names the Client emits: + # + # - opencode.request — every HTTP request to OpenCode server + # + # If you wire a real adapter, the payload hash carries `:method` and + # `:path` for opencode.request. Other events may add fields in future + # versions; treat the payload as forward-compatible. + module Instrumentation + class << self + attr_accessor :adapter + end + + # Yields the block, optionally routed through the adapter if one is + # set. Always returns the block's return value (so call sites can + # wrap their work transparently). + def self.instrument(name, payload = {}) + return yield unless adapter + + adapter.call(name, payload) { yield } + end + end +end diff --git a/lib/opencode/part_source.rb b/lib/opencode/part_source.rb new file mode 100644 index 0000000..013db51 --- /dev/null +++ b/lib/opencode/part_source.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require "set" + +module Opencode + # A Part's provenance — where it came from in the OpenCode wire model. + # + # Two source classes exist: + # + # - Wire parts: emitted by the OpenCode message-parts pipeline and + # echoed back by `GET /session/:id/message`. These are authoritative + # for finalization — when the final exchange poll lands, wire parts + # overwrite whatever streaming captured. + # + # - Stream-only parts: synthesized from bus events that OpenCode does + # NOT persist as message parts. The host's Opencode::Reply + # materializes them so per-product ReplyStream observers can render + # them through the same tool partials as real tool parts, and + # Opencode::Turn preserves them across exchange-finalization so the + # final assistant message keeps what the user watched live. + # + # `todo.updated` is the first stream-only source (OpenCode emits the + # full todo list on a bus event but never records it as a message part). + # Future sources land here too: add the constant, add it to STREAM_ONLY, + # both `Reply#append_part` callers and `Turn#stream_only_part?` keep + # working with no further edits. + # + # This module exists because the previous shape coupled Reply and Turn + # through a magic-string comparison of `metadata.source == + # Opencode::Reply::TODO_STREAM_SOURCE`. Two classes carrying the same + # discriminator string is a "next time someone adds a source they'll + # only update one place" bug waiting to happen. The source-of-truth + # now lives here; both consumers go through `stream_only?(part)`. + module PartSource + TODO_UPDATED = "todo.updated" + STREAM_ONLY = Set[TODO_UPDATED].freeze + + module_function + + # True iff the part's metadata.source is one of the stream-only + # sources. Tolerates non-Hash input (returns false) so callers don't + # have to guard before asking. + def stream_only?(part) + return false unless part.is_a?(Hash) + + STREAM_ONLY.include?(part.dig("metadata", "source")) + end + + # Stamps `source:` into part_hash's metadata. Raises ArgumentError on + # an unknown source so typos surface at write time, not at the next + # `stream_only?` check (which would silently return false). + # Mutates and returns the input hash for chaining. + def stamp(part_hash, source:) + raise ArgumentError, "unknown stream-only source #{source.inspect}; " \ + "register it in Opencode::PartSource::STREAM_ONLY first" unless STREAM_ONLY.include?(source) + + part_hash["metadata"] ||= {} + part_hash["metadata"]["source"] = source + part_hash + end + end +end diff --git a/lib/opencode/prompts.rb b/lib/opencode/prompts.rb new file mode 100644 index 0000000..2d744b8 --- /dev/null +++ b/lib/opencode/prompts.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +module Opencode + # Per-Reply registry of interactive prompts (questions + permissions) + # opencode has asked the user but not yet resolved. Lives on + # Opencode::Reply for the lifetime of one streaming turn. + # + # Two access patterns: + # + # * by request id ("que_..." or "per_...") — for the controller + # posting a user's answer back. + # * by {message_id, call_id} — for the order-race fix where + # `question.asked` may arrive before the matching tool part. + # + # The registry also exposes a `prompt_blocked?` predicate that + # Opencode::Client uses to suspend the SSE deadline check while + # a healthy wait is in progress. + class Prompts + Entry = Struct.new(:kind, :request, :asked_at, keyword_init: true) + + def initialize + @entries = {} + @by_call = {} + end + + def record_question(request) + record(:question, request) + end + + def record_permission(request) + record(:permission, request) + end + + # Returns the raw request hash (not the Entry wrapper) so callers + # don't depend on internal bookkeeping shape. + def find(request_id) + @entries[request_id]&.request + end + + # Returns the raw request hash, same shape as #find. + def find_by_call(message_id:, call_id:) + key = call_key(message_id, call_id) + @by_call[key]&.request + end + + def resolve(request_id) + entry = @entries.delete(request_id) + return unless entry + + tool = entry.request[:tool] + return unless tool + + @by_call.delete(call_key(tool[:messageID], tool[:callID])) + end + + def each_pending + @entries.each_value { |entry| yield(entry.kind, entry.request) } + end + + def any_pending? + @entries.any? + end + alias_method :prompt_blocked?, :any_pending? + + def asked_at(request_id) + @entries[request_id]&.asked_at + end + + private + + def record(kind, request) + entry = Entry.new( + kind: kind, + request: request, + asked_at: Process.clock_gettime(Process::CLOCK_MONOTONIC) + ) + @entries[request[:id]] = entry + + tool = request[:tool] + @by_call[call_key(tool[:messageID], tool[:callID])] = entry if tool + end + + def call_key(message_id, call_id) + [ message_id, call_id ].join(":") + end + end +end diff --git a/lib/opencode/reply.rb b/lib/opencode/reply.rb new file mode 100644 index 0000000..861a109 --- /dev/null +++ b/lib/opencode/reply.rb @@ -0,0 +1,549 @@ +# frozen_string_literal: true + +module Opencode + # An assistant's reply as it is being composed, live, from OpenCode SSE + # events. A Reply accumulates parts (text, reasoning, tool invocations) + # in the order the agent emits them and notifies observers of domain + # transitions — parts appearing, parts growing, tools advancing, + # sessions erroring. + # + # Responsibilities + # ---------------- + # + # * Translate raw OpenCode SSE events into domain callbacks. + # * Own the canonical state of an in-flight reply (parts list, indices, + # first-token seen, message info). + # * Apply the tail-drop safety net: when part.updated carries + # authoritative :text that differs from what deltas accumulated + # (z.ai GLM-5.1 drops trailing deltas), rewrite the part's content. + # * Preserve the original tool name when OpenCode later renames a tool + # to "invalid" mid-stream. + # + # Not responsibilities + # -------------------- + # + # * Rendering HTML or broadcasting Turbo Streams (observer concern). + # * Persisting parts to a database (observer concern). + # * Fetching the event stream (Opencode::Client). + # * Retry / session recovery (job concern). + # + # Event contract + # -------------- + # + # Events match OpenCode's bus schema (packages/opencode/src/session/ + # message-v2.ts, status.ts, todo.ts): + # + # message.part.delta { properties: { partID, field, delta, ... } } + # message.part.updated { properties: { part: { id, type, ... } } } + # message.updated { properties: { info: { tokens, cost, ... } } } + # session.status { properties: { status: { type, ... } } } + # session.error { properties: { error: { name, data, ... } } } + # todo.updated { properties: { todos: [...] } } + # + # Observer callbacks + # ------------------ + # + # See Opencode::ReplyObserver for the full callback surface. Observers + # are duck-typed — only the callbacks they define are invoked. + # + # Example + # ------- + # + # reply = Opencode::Reply.new + # reply.add_observer(Blackline::ReplyStream.new(message:, conversation:)) + # client.stream_events(session_id: id) { |event| reply.apply(event) } + # reply.result + # # => { parts_json:, full_text:, reasoning_text:, tool_parts: } + # + class Reply + STREAMABLE_TYPES = %w[text reasoning tool].freeze + TERMINAL_TOOL_STATUSES = %w[completed error].freeze + TODO_TOOLS = %w[todowrite todoread].freeze + + # The denormalized output of a Reply once streaming completes (or + # recovery via Reply.distill produces an equivalent shape). Symmetric + # with Opencode::Turn::Result. Accessible by both message-style + # (`result.full_text`) and hash-style (`result[:full_text]`) syntax + # — Struct supports both natively — but the typed shape stops + # callers from poking arbitrary keys. + Result = Struct.new(:parts_json, :full_text, :reasoning_text, :tool_parts, keyword_init: true) + + attr_reader :parts, :info, :total_cost, :total_input_tokens, :total_output_tokens, :prompts + + def initialize + @parts = [] + @part_index_by_id = {} + @part_type_by_id = {} + @observers = [] + @first_text_seen = false + @info = nil + @total_cost = 0.0 + @total_input_tokens = 0 + @total_output_tokens = 0 + @todo_part_index = nil + @prompts = Opencode::Prompts.new + # Keyed by [message_id, call_id]: question.asked payloads that + # arrived before their matching tool part. Drained when the tool + # part shows up in apply_tool_state. + @pending_question_payloads = {} + end + + # True while any interactive prompt (question or permission) is + # awaiting a user reply. Opencode::Client uses this to suspend the + # SSE inactivity deadline — a wait on the human is healthy, not a + # hang. + def prompt_blocked? + @prompts.prompt_blocked? + end + + def add_observer(observer) + @observers << observer + self + end + + # Drive the state machine forward with one SSE event. Unknown event + # types are ignored — OpenCode may add new events, and we shouldn't + # crash on them. + def apply(event) + case event[:type] + when "message.part.delta" then apply_part_delta(event) + when "message.part.updated" then apply_part_updated(event) + when "message.updated" then apply_message_updated(event) + when "session.status" then apply_session_status(event) + when "session.error" then apply_session_error(event) + when "todo.updated" then apply_todo_updated(event) + when "question.asked" then apply_question_asked(event) + when "question.replied" then apply_question_replied(event) + when "question.rejected" then apply_question_rejected(event) + when "permission.asked" then apply_permission_asked(event) + when "permission.replied" then apply_permission_replied(event) + end + end + + # Treat `recovered_parts` as a clean-slate baseline: replace parts, + # clear the id→index map (recovered parts have no OpenCode part IDs), + # and reset the running cost/token totals plus the first-text flag. + # + # Why reset totals: step-finish events that produced the pre-crash + # totals are not in the recovery payload; keeping them would + # double-count when post-recovery step-finish events accumulate + # against the same counters. + # + # Used only by the recovery path — during normal streaming, parts + # accrete via apply_* helpers and totals flow through step-finish. + def replace_parts(recovered_parts) + @parts = recovered_parts + @part_index_by_id.clear + @part_type_by_id.clear + @total_cost = 0.0 + @total_input_tokens = 0 + @total_output_tokens = 0 + @first_text_seen = false + end + + # Bring the live reply up to a recovered/polled exchange snapshot and + # notify observers for new or changed parts. This is the streaming + # counterpart to replace_parts: when the SSE connection ends before + # OpenCode's multi-message tool loop has produced final text, Turn polls + # the message exchange. Those recovered parts still need to hit Turbo as + # incremental append/update events, not only the final row replacement. + def sync_recovered_parts(recovered_parts) + Array(recovered_parts).each_with_index do |part, index| + next if @parts[index] == part + + part = deep_dup_part(part) + if index < @parts.length + @parts[index] = part + notify_recovered_part_updated(part, index) + else + @parts << part + notify(:part_added, part: part, index: index) + notify_recovered_part_updated(part, index) + end + + @first_text_seen ||= part["type"] == "text" && part["content"].present? + end + end + + # Record a part that originated OUTSIDE the OpenCode event stream — + # used when an observer synthesizes a part (e.g., a session error + # notice) that isn't a real message.part.* event but should still + # appear in the persisted parts_json. Returns the new index. + # + # Does NOT fire part_added — the injecting observer has already done + # whatever rendering it needed. Other observers can poll `parts` if + # they care about injected content. + def inject_part(part_hash) + @parts << part_hash + @parts.size - 1 + end + + def first_text_seen? + @first_text_seen + end + + def tool_count + @parts.count { |p| p["type"] == "tool" } + end + + # The denormalized result once streaming completes, matching the + # shape jobs persist to the message table: full_text for :content, + # reasoning_text for :reasoning, tool_parts for :tool_calls_json, + # and parts_json for :parts_json. + def result + self.class.distill(@parts) + end + + # Pure function: given a parts array, return the denormalized result + # as an Opencode::Reply::Result value object. Exposed so a recovery + # path (fetch messages from the session API and map them through + # ResponseParser.extract_interleaved_parts) produces the same shape + # as live streaming. + def self.distill(parts) + Result.new( + parts_json: parts, + full_text: join_content(parts, "text"), + reasoning_text: join_content(parts, "reasoning"), + tool_parts: parts.select { |p| p["type"] == "tool" && TERMINAL_TOOL_STATUSES.include?(p["status"]) } + ) + end + + def self.join_content(parts, type) + parts.select { |p| p["type"] == type }.map { |p| p["content"].to_s }.join("\n\n") + end + private_class_method :join_content + + private + + def apply_part_delta(event) + field = event.dig(:properties, :field) + return unless %w[text reasoning].include?(field) + + part_id = event.dig(:properties, :partID) + delta = event.dig(:properties, :delta).to_s + return if delta.empty? + + index = @part_index_by_id[part_id] + if index.nil? + # Delta before part.updated. Pre-1.2 OpenCode streams occasionally + # emit in this order; downstream part.updated for this id will + # reconcile via reconcile_final_content. + type = @part_type_by_id[part_id] || (field == "reasoning" ? "reasoning" : "text") + index = append_part({ "type" => type, "content" => +"" }, part_id: part_id) + end + + @parts[index]["content"] << delta + @first_text_seen ||= (field == "text" && @parts[index]["type"] == "text") + + notify(:part_changed, part: @parts[index], index: index, delta: delta) + end + + def apply_part_updated(event) + part = event.dig(:properties, :part) || {} + part_id = part[:id] + part_type = part[:type] + + case part_type + when "step-finish" + cost = part[:cost].to_f + tokens = part[:tokens] || {} + @total_cost += cost + @total_input_tokens += tokens[:input].to_i + @total_output_tokens += tokens[:output].to_i + notify(:step_finished, cost: cost, tokens: tokens) + when "text", "reasoning" + @part_type_by_id[part_id] = part_type if part_id + if @part_index_by_id.key?(part_id) + reconcile_final_content(part_id, part) + elsif part[:text].present? + # Extreme tail-drop path: part.updated carries the full text + # but no deltas ever arrived. Materialize it as a one-shot part + # so the content isn't lost. + append_part({ "type" => part_type, "content" => part[:text].dup }, part_id: part_id) + end + when "tool" + register_tool(part_id, part) unless @part_index_by_id.key?(part_id) + apply_tool_state(part_id, part) + end + end + + def apply_message_updated(event) + info = event.dig(:properties, :info) + return unless info.is_a?(Hash) + + @info = info + notify(:message_updated, info: info) + end + + def apply_session_status(event) + case event.dig(:properties, :status, :type) + when "retry" + notify(:session_retried, + attempt: event.dig(:properties, :status, :attempt), + message: event.dig(:properties, :status, :message).to_s) + end + end + + def apply_session_error(event) + error = event.dig(:properties, :error) || {} + name = error[:name].to_s + message = error.dig(:data, :message).to_s + text = [ name, message ].reject(&:blank?).join(": ") + + notify(:session_errored, text: text, raw: error) + end + + # Close out a text/reasoning part: always fires :part_finalized so + # observers can flush any throttled broadcast, and rewrites content if + # part.updated carries an authoritative :text that diverges from the + # deltas we accumulated (tail-drop safety net for providers like + # z.ai GLM-5.1 that sometimes drop trailing deltas). + def reconcile_final_content(part_id, part) + index = @part_index_by_id[part_id] + final = part[:text] + return if final.blank? + + @parts[index]["content"] = final.dup unless @parts[index]["content"] == final + notify(:part_finalized, part: @parts[index], index: index) + end + + def register_tool(part_id, part) + append_part({ + "type" => "tool", + "tool" => part[:tool], + "status" => part.dig(:state, :status) + }, part_id: part_id) + end + + # Merge an incoming `message.part.updated` event state into the + # existing tool record. Delegates the field-by-field shape to + # Opencode::ToolPart so the streaming and recovery paths share one + # canonical definition of what a tool part looks like. + def apply_tool_state(part_id, part) + index = @part_index_by_id[part_id] + return unless index + + record = @parts[index] + Opencode::ToolPart.merge_streaming_state(record, part) + @todo_part_index = index if todo_tool_part?(record) + + notify(:tool_progressed, + part: record, + index: index, + status: record["status"], + raw: part) + + drain_pending_question_payload(record) + end + + def apply_todo_updated(event) + todos = event.dig(:properties, :todos) || [] + notify(:todos_changed, todos: todos) + return unless todos.is_a?(Array) + + canonical_todos = Opencode::Todo.canonicalize_all(todos) + + index = current_todo_part_index + if index + refresh_existing_todo_part(index, canonical_todos, event) + else + @todo_part_index = append_part(Opencode::PartSource.stamp({ + "type" => "tool", + "tool" => "todowrite", + "status" => "completed", + "input" => { "todos" => canonical_todos } + }, source: Opencode::PartSource::TODO_UPDATED)) + end + end + + # Refresh path for an existing todo part — either a real `todowrite` + # tool part materialized from message.part.updated, OR our own + # previously-stamped stream-only part. Either way we MERGE into + # `input` rather than replace it, so any non-todos fields a real + # tool call carried survive the refresh. + # + # We intentionally do NOT touch `part["title"]`. Upstream opencode's + # title is "N remaining todos" (a progress indicator like "2 todos" + # when 2 of 3 are still incomplete, "0 todos" when all done) and is + # set on the original message.part.updated event. Stomping it with + # our own value would clobber that semantic. + def refresh_existing_todo_part(index, canonical_todos, event) + part = @parts[index] + part["status"] = part["status"].presence || "completed" + part["input"] = (part["input"] || {}).merge("todos" => canonical_todos) + notify(:tool_progressed, part: part, index: index, status: part["status"], raw: event) + end + + def current_todo_part_index + return @todo_part_index if @todo_part_index && todo_tool_part?(@parts[@todo_part_index]) + + @todo_part_index = @parts.rindex { |part| todo_tool_part?(part) } + end + + def todo_tool_part?(part) + part.is_a?(Hash) && part["type"] == "tool" && TODO_TOOLS.include?(part["tool"].to_s) + end + + def deep_dup_part(part) + case part + when Hash + part.transform_values { |value| deep_dup_part(value) } + when Array + part.map { |value| deep_dup_part(value) } + else + part.duplicable? ? part.dup : part + end + end + + def notify_recovered_part_updated(part, index) + case part["type"] + when "tool" + notify(:tool_progressed, part: part, index: index, status: part["status"], raw: {}) + when "text", "reasoning" + notify(:part_finalized, part: part, index: index) + end + end + + def append_part(part_hash, part_id: nil) + @parts << part_hash + index = @parts.size - 1 + if part_id + @part_index_by_id[part_id] = index + @part_type_by_id[part_id] = part_hash["type"] + end + notify(:part_added, part: @parts[index], index: index) + index + end + + def notify(callback, **payload) + @observers.each do |observer| + observer.public_send(callback, **payload) if observer.respond_to?(callback) + end + end + + # --- interactive prompts ----------------------------------------- + + def apply_question_asked(event) + request = (event[:properties] || {}).dup + return unless request[:id].is_a?(String) + + @prompts.record_question(request) + + if (tool = request[:tool]) + @pending_question_payloads[[ tool[:messageID].to_s, tool[:callID].to_s ]] = request + end + + merge_pending_question_into_existing_tool_part(request) + + notify(:question_asked, request: request, raw: event) + end + + def apply_question_replied(event) + props = event[:properties] || {} + request_id = props[:requestID] + answers = props[:answers] || [] + return unless request_id + + asked_at = @prompts.asked_at(request_id) + @prompts.resolve(request_id) + notify(:question_replied, request_id: request_id, answers: answers, raw: event, asked_at: asked_at) + end + + def apply_question_rejected(event) + props = event[:properties] || {} + request_id = props[:requestID] + return unless request_id + + asked_at = @prompts.asked_at(request_id) + @prompts.resolve(request_id) + notify(:question_rejected, request_id: request_id, raw: event, asked_at: asked_at) + end + + def apply_permission_asked(event) + request = (event[:properties] || {}).dup + return unless request[:id].is_a?(String) + + @prompts.record_permission(request) + notify(:permission_asked, request: request, raw: event) + end + + def apply_permission_replied(event) + props = event[:properties] || {} + request_id = props[:requestID] + return unless request_id + + asked_at = @prompts.asked_at(request_id) + @prompts.resolve(request_id) + notify(:permission_replied, + request_id: request_id, + reply: props[:reply], + raw: event, + asked_at: asked_at) + end + + # Merge a pending question payload into the matching tool part if + # the tool part exists. Reads record["callID"] / record["messageID"] + # which are persisted by ToolPart.merge_streaming_state (per Task 2.0). + # Decorates the part's "input" with both the question content AND the + # opencode identifiers the view + controller need. + # + # Called from two paths: + # 1. apply_question_asked, when the tool part already exists + # 2. apply_tool_state, when the tool part arrives AFTER question.asked + def merge_pending_question_into_existing_tool_part(request) + tool = request[:tool] + return unless tool + + call_id = tool[:callID].to_s + message_id = tool[:messageID].to_s + return if call_id.empty? + + index = @parts.index do |part| + part.is_a?(Hash) && part["type"] == "tool" && part["tool"] == "question" && + part["callID"] == call_id + end + return unless index + + part = @parts[index] + # Stringify keys so the in-memory shape matches what's persisted + # via the parts_json JSON column round-trip. Otherwise direct-render + # callers (e.g., integration tests, future debug tooling) hit + # symbol-keyed nested hashes while the partials read string keys — + # silent broken HTML. + input = (part["input"] || {}).merge( + "questions" => deep_stringify_keys(request[:questions]), + "opencode_request_id" => request[:id], + "opencode_message_id" => message_id, + "opencode_call_id" => call_id + ) + part["input"] = input + + notify(:tool_progressed, part: part, index: index, status: part["status"], + raw: { type: "question.asked.synthesized" }) + end + + # Order-race fix: if question.asked arrived before this tool part, + # its payload is parked in @pending_question_payloads keyed by + # {messageID, callID}. Drain it now so the part's input carries + # the questions + opencode_* identifiers the view expects. + def drain_pending_question_payload(record) + return unless record["tool"] == "question" && record["callID"].present? + + key = [ record["messageID"].to_s, record["callID"].to_s ] + pending = @pending_question_payloads.delete(key) + merge_pending_question_into_existing_tool_part(pending) if pending + end + + # Recursively converts hash keys to strings — used at the SSE/JSON + # boundary so in-memory parts match the shape they have after a + # parts_json (JSON column) round-trip. Same semantics as Rails' + # Hash#deep_stringify_keys but iterates arrays too. + def deep_stringify_keys(obj) + case obj + when Hash then obj.each_with_object({}) { |(k, v), h| h[k.to_s] = deep_stringify_keys(v) } + when Array then obj.map { |x| deep_stringify_keys(x) } + else obj + end + end + end +end diff --git a/lib/opencode/reply_observer.rb b/lib/opencode/reply_observer.rb new file mode 100644 index 0000000..1668ebd --- /dev/null +++ b/lib/opencode/reply_observer.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +module Opencode + # The canonical observer protocol for Opencode::Reply — every event + # Reply dispatches, documented in one place, with safe no-op defaults. + # + # Include this module in a reply-stream class to get two things: + # + # 1. **Compile-time checklist.** Override only the callbacks you care + # about; the rest inherit a no-op. Forgetting to handle a new event + # never crashes the stream. + # 2. **Protocol documentation that can't rot.** The signatures here are + # the contract. If Reply's dispatch shape ever drifts, every observer + # using this module updates in lockstep. + # + # Callbacks are duck-typed in Reply — features may choose not to + # include this module and implement the methods directly, but then + # they lose the two benefits above. + # + # Every callback takes keyword arguments, so adding a new keyword later + # only requires existing observers to add `**_` if they want to opt out + # of breakage. + module ReplyObserver + # A new part was appended to the reply's parts list. + def part_added(part:, index:) + end + + # An existing part's content grew by a delta (streaming text or + # reasoning). + def part_changed(part:, index:, delta:) + end + + # An existing part's content was rewritten to the authoritative + # value from part.updated. Fires unconditionally when a part closes + # so throttled observers can flush, regardless of whether content + # actually diverged from what deltas accumulated. + def part_finalized(part:, index:) + end + + # A tool part transitioned status (pending → running → completed/error), + # or its state payload (title/input/error) changed. + def tool_progressed(part:, index:, status:, raw:) + end + + # A step boundary with usage info. `tokens` is the raw tokens hash + # from the step-finish part (keys: :input, :output, :reasoning, :cache). + def step_finished(cost:, tokens:) + end + + # The upstream session is retrying an LLM call (e.g., provider + # rate-limit backoff). Attempt is nullable; message is a short + # reason string. + def session_retried(attempt:, message:) + end + + # A session-level error surfaced. Text is a human-readable summary + # ("ErrorName: details"); raw is the full error hash. + def session_errored(text:, raw:) + end + + # The authoritative message.info was updated (cost, tokens, provider + # error metadata). Fires late in the stream after the agent closes. + def message_updated(info:) + end + + # Agent's internal todo list changed. Todos are whatever shape the + # agent's task tool uses. + def todos_changed(todos:) + end + + # opencode emitted a question.asked event — the agent's `question` + # tool is suspended waiting for the user's reply. `request` is the + # full QuestionRequest hash ({id, sessionID, questions, tool?}). + def question_asked(request:, raw:) + end + + # opencode emitted a question.replied event — the user submitted + # answers (Array>, one inner array per question). + # `asked_at` is the monotonic clock value when question.asked was + # observed, for latency telemetry; nil if asked never arrived. + def question_replied(request_id:, answers:, raw:, asked_at:) + end + + # opencode emitted a question.rejected event — the user dismissed + # the prompt, or it was cancelled (e.g., container shutdown). + def question_rejected(request_id:, raw:, asked_at:) + end + + # opencode emitted a permission.asked event — a tool is requesting + # user permission to proceed. `request` is the PermissionRequest + # hash ({id, sessionID, permission, patterns, metadata, always, tool?}). + def permission_asked(request:, raw:) + end + + # opencode emitted a permission.replied event — the user chose + # once/always/reject. `reply` is the string. `asked_at` per + # question_replied semantics. + def permission_replied(request_id:, reply:, raw:, asked_at:) + end + end +end diff --git a/lib/opencode/response_parser.rb b/lib/opencode/response_parser.rb new file mode 100644 index 0000000..a4b9646 --- /dev/null +++ b/lib/opencode/response_parser.rb @@ -0,0 +1,170 @@ +# frozen_string_literal: true + +module Opencode + module ResponseParser + def self.extract_text(response_body) + parts = response_body[:parts] || [] + parts + .select { |p| p[:type] == "text" } + .map { |p| p[:text] } + .join("\n\n") + end + + def self.extract_reasoning(response_body) + parts = response_body[:parts] || [] + reasoning = parts + .select { |p| p[:type] == "reasoning" } + .map { |p| p[:text] } + .join("\n\n") + reasoning.presence + end + + TERMINAL_STATUSES = %w[completed error].freeze + + # Terminal-only tool list. Returned as canonical string-keyed hashes + # (same shape `extract_interleaved_parts` returns) so callers do not + # have to know which path produced the data. + def self.extract_tool_summary(response_body) + parts = response_body[:parts] || [] + parts + .select { |p| p[:type] == "tool" && p.dig(:state, :status).in?(TERMINAL_STATUSES) } + .map { |p| build_tool_summary(p) } + end + + def self.extract_interleaved_parts(response_body) + parts = response_body[:parts] || [] + + parts.filter_map do |part| + case part[:type] + when "text" + { "type" => "text", "content" => part[:text] } + when "reasoning" + { "type" => "reasoning", "content" => part[:text] } + when "tool" + status = part.dig(:state, :status) + next unless status.in?(TERMINAL_STATUSES) + + build_tool_summary(part) + else + nil + end + end + end + + # Canonical tool-part shape from one OpenCode message part. Delegates + # to Opencode::ToolPart so the streaming path (Reply#apply_tool_state) + # and recovery path (this method) cannot drift. + def self.build_tool_summary(part) + Opencode::ToolPart.from_message_part(part) + end + + private_class_method :build_tool_summary + + def self.extract_tokens(response_body) + response_body.dig(:info, :tokens) + end + + def self.extract_cost(response_body) + response_body.dig(:info, :cost) + end + + def self.extract_cache_tokens(response_body) + tokens = response_body.dig(:info, :tokens) || {} + { + cache_read: tokens.dig(:cache, :read) || 0, + cache_write: tokens.dig(:cache, :write) || 0 + } + end + + def self.extract_error(response_body) + error = response_body.dig(:info, :error) + return nil unless error.is_a?(Hash) + + { + name: error[:name], + message: error.dig(:data, :message), + status_code: error.dig(:data, :statusCode), + retryable: error.dig(:data, :isRetryable), + url: error.dig(:data, :metadata, :url) + }.compact + end + + MAX_ARTIFACT_SIZE = 10.megabytes + ARTIFACT_TOOLS = %w[write apply_patch].freeze + + def self.extract_artifact_files(response_body) + parts = response_body[:parts] || [] + completed_tools = parts.select do |p| + p[:type] == "tool" && + ARTIFACT_TOOLS.include?(p[:tool]) && + p.dig(:state, :status) == "completed" + end + return [] if completed_tools.empty? + + files = completed_tools.flat_map { |part| extract_files_from_tool_part(part) } + files.uniq { |f| f[:filename] } + end + + def self.extract_artifacts_from_messages(messages) + return [] unless messages.is_a?(Array) + + messages + .select { |m| m.dig(:info, :role) == "assistant" } + .flat_map { |m| extract_artifact_files(m) } + .uniq { |f| f[:filename] } + end + + def self.extract_files_from_tool_part(part) + case part[:tool] + when "write" + extract_from_write(part) + when "apply_patch" + extract_from_apply_patch(part) + else + [] + end + end + + def self.extract_from_write(part) + content = part.dig(:state, :input, :content) + file_path = part.dig(:state, :input, :filePath) + return [] if content.blank? || file_path.blank? + return [] if content.bytesize > MAX_ARTIFACT_SIZE + + filename = File.basename(file_path) + content_type = Marcel::MimeType.for(extension: File.extname(filename)) + [ { filename: filename, content: content, content_type: content_type } ] + end + + # apply_patch tool metadata shape changed materially between the early + # opencode versions this code originally targeted (which exposed + # `before` + `after` post-write file content as inline strings) and + # v1.4.0+ (which dropped them and only exposes the diff text in `patch` + # plus a `files` array of { filePath, relativePath, type, patch, + # additions, deletions, movePath? } descriptors). Source of truth: + # https://raw.githubusercontent.com/anomalyco/opencode/v1.15.0/packages/opencode/src/tool/apply_patch.ts + # + # With no `after` field in the v1.15.0 wire shape, this method previously + # silently returned [] for every real apply_patch invocation while still + # passing its (now-stale-shape) unit test — the worst kind of bug: a + # green test paired with a dead production path. + # + # Current behavior (intentional, until apply_patch becomes a hot path): + # we accept the v1.15.0 shape and return []. None of the active agents + # (travel-agent, employment-lawyer, contract-lawyer, raven-legal, + # better-*) use apply_patch — they write whole files via the `write` + # tool — so the practical-impact-today is zero. When the first + # apply_patch-using agent ships, Opencode::Exchange#tool_artifacts + # emits `opencode.apply_patch.artifacts_dropped` so operators see the + # silent drop and route through the missing sandbox-read path. + # + # The event emission lives on Exchange (not here) because ResponseParser + # is a pure module — every other method takes a hash and returns a hash. + # Pure functions stay pure. + def self.extract_from_apply_patch(_part) + [] + end + + private_class_method :extract_files_from_tool_part, :extract_from_write, :extract_from_apply_patch + end +end diff --git a/lib/opencode/todo.rb b/lib/opencode/todo.rb new file mode 100644 index 0000000..7cb92c2 --- /dev/null +++ b/lib/opencode/todo.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Opencode + # One todo item the OpenCode `todowrite` tool and `todo.updated` bus + # event carry: `content` + `status` + (optional) `priority`. + # Source-of-truth canonicalization lives here so Reply, ToolDisplay, + # and any future consumer all share one definition of "what does this + # todo look like once we've normalized it." + # + # Status canonicalization: OpenCode bus events have been observed + # emitting the hyphenated `"in-progress"` form. The rest of the + # codebase (per-product views, todowrite tool input shape per the + # v1.15+ openapi spec) uses the underscored `"in_progress"`. + # Canonicalize to underscore at every entry point so downstream code + # never has to handle both. + module Todo + HYPHENATED_TO_CANONICAL_STATUS = { + "in-progress" => "in_progress" + }.freeze + + module_function + + def canonical_status(status) + raw = status.to_s + HYPHENATED_TO_CANONICAL_STATUS.fetch(raw) { raw.tr("-", "_") } + end + + # Canonicalize one todo hash: string-keyed, normalized status. + # Returns the input unchanged when it isn't a Hash (the substrate + # tolerates wire-shape drift defensively). + def canonicalize(todo) + return todo unless todo.is_a?(Hash) + + result = todo.deep_stringify_keys + result["status"] = canonical_status(result["status"]) if result.key?("status") + result + end + + def canonicalize_all(todos) + Array(todos).map { |t| canonicalize(t) } + end + end +end diff --git a/lib/opencode/tool_part.rb b/lib/opencode/tool_part.rb new file mode 100644 index 0000000..b2c4f3b --- /dev/null +++ b/lib/opencode/tool_part.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +module Opencode + # Canonical shape of a tool part in an assistant reply. + # + # A tool part starts `pending` and transitions through `running` to a + # terminal `completed` or `error`. The complete representation carries + # seven fields, all string-keyed so views read consistent keys whether + # the part came from a live streaming event or a post-stream message + # poll: + # + # "type" => "tool" + # "tool" => "edit" + # "status" => "completed" + # "title" => "Edited /INDEX.md" + # "input" => { ... } # full args the agent passed, deep-stringified + # "metadata" => { ... } # tool-specific output: diff, preview, stdout, etc. + # "output" => "Edited successfully." + # "error" => "..." # only when status == "error", truncated to 200 chars + # + # The shape is produced two ways: + # + # 1. Opencode::Reply#apply_tool_state — live, mid-stream, merging + # incoming event state into an in-memory record (previous values + # survive when the new event omits a field). + # + # 2. Opencode::ResponseParser.build_tool_summary — post-stream, built + # fresh from a complete OpenCode message returned by + # /session/:id/message during recovery / final-exchange polling. + # + # Existence reason: the two paths used to drift. ResponseParser stripped + # `metadata` and whitelisted `input` to a fixed key list, so `parts_json` + # saved on finalize had strictly less data than the streaming DOM had + # shown. The visible symptom was "I saw the diff while streaming and it + # disappeared when the turn finished". This class is the single source of + # truth that prevents that drift. + module ToolPart + MAX_ERROR_LEN = 200 + INVALID_TOOL = "invalid" + + module_function + + # Build a fresh canonical tool-part hash from one OpenCode message + # part (the shape that arrives through /session/:id/message). + # Used by ResponseParser for recovery and final-exchange polling. + def from_message_part(part) + state = state_of(part) + build_canonical( + tool: part[:tool] || part["tool"], + status: state_value(state, :status), + title: state_value(state, :title), + input: state_value(state, :input), + metadata: state_value(state, :metadata), + output: state_value(state, :output), + error: state_value(state, :error) + ) + end + + # Merge an incoming `message.part.updated` event state into an + # existing record. Used by Reply#apply_tool_state during streaming. + # + # Fields the event omits (or that arrive empty) leave the record's + # previous value intact. Mid-tool events are partial by design. + # + # In addition to the canonical render fields (status, title, input, + # metadata, output, error), this also persists `callID` and + # `messageID` from the incoming state. Those identifiers are needed + # by downstream lookups (e.g. matching an ask-user reply event back + # to the originating tool part by callID) and would otherwise be + # silently dropped on the way into Reply.parts JSON. + # + # Returns the (mutated) record for chaining. + def merge_streaming_state(record, part) + state = state_of(part) + + tool = part[:tool] || part["tool"] + # Preserve original tool name if OpenCode later renames to "invalid" + # mid-session — we want to keep rendering the original name. + record["tool"] = tool if tool.present? && tool != INVALID_TOOL + + status = state_value(state, :status) + record["status"] = status if status + + title = state_value(state, :title) + record["title"] = title if title.present? + + input = state_value(state, :input) + record["input"] = stringify_deep(input) if input.present? + + metadata = state_value(state, :metadata) + record["metadata"] = stringify_deep(metadata) if metadata.present? + + output = state_value(state, :output) + record["output"] = output if output.present? + + error = state_value(state, :error) + record["error"] = error.to_s.truncate(MAX_ERROR_LEN) if error.present? + + # callID and messageID moved from state.* to the part's top level + # somewhere in opencode v1.15.x. Read top-level first, fall back + # to state.* for any older versions that may still be in flight. + # Without this, merge_pending_question_into_existing_tool_part + # (which searches @parts by callID) silently no-ops, and the + # question form renders with no questions or routing IDs. + call_id = part[:callID] || part["callID"] || state_value(state, :callID) + record["callID"] = call_id if call_id.present? + + message_id = part[:messageID] || part["messageID"] || state_value(state, :messageID) + record["messageID"] = message_id if message_id.present? + + record + end + + class << self + private + + def state_of(part) + part[:state] || part["state"] || {} + end + + def state_value(state, key) + return nil unless state.is_a?(Hash) + state[key] || state[key.to_s] + end + + def build_canonical(tool:, status:, title:, input:, metadata:, output:, error:) + hash = { + "type" => "tool", + "tool" => tool.to_s.presence, + "status" => status, + "title" => title.presence, + "input" => stringify_deep(input).presence, + "metadata" => stringify_deep(metadata).presence, + "output" => output.presence + } + hash["error"] = error.to_s.truncate(MAX_ERROR_LEN).presence if status == "error" + hash.compact + end + + def stringify_deep(value) + case value + when Hash + value.each_with_object({}) { |(k, v), h| h[k.to_s] = stringify_deep(v) } + when Array + value.map { |v| stringify_deep(v) } + else + value + end + end + end + end +end diff --git a/lib/opencode/tracer.rb b/lib/opencode/tracer.rb new file mode 100644 index 0000000..7f1d3ae --- /dev/null +++ b/lib/opencode/tracer.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module Opencode + # A namespacing trace emitter. + # + # Opencode::Turn emits unprefixed event names like "response.started" + # and "session.recreated". The host product wraps Turn in a Tracer + # whose job is to prepend a product prefix and forward to whatever + # actually emits trace events (typically the host job's + # `EventTraceable#trace_event`). + # + # Two responsibilities live here, and only here: + # + # 1. Callable interface: `tracer.call(name, **payload)` — the + # contract Turn relies on. + # 2. Namespacing strategy: prepend "." to every event name. + # + # Pre-extraction this lived in a closure at every Turn-construction + # site: + # + # tracer: ->(name, **payload) { trace_event("blackline.#{name}", **payload) } + # + # That closure conflates the two responsibilities; every product had + # to rediscover the prefix-with-period rule, and a typo would only + # show up in production trace data. Making it a real role removes + # that risk and makes the rule visible at one place. + # + # Usage: + # + # Opencode::Tracer.new(prefix: "blackline", emitter: self) + # + # `emitter` must respond to `trace_event(name, **payload)`. + class Tracer + def initialize(prefix:, emitter:) + @prefix = prefix + @emitter = emitter + end + + # Tracer is callable so existing call sites that treated the tracer + # as a lambda (`tracer.call(name, **payload)`) keep working without + # change. Turn uses this exclusively. + # + # Uses `send` because EventTraceable's `trace_event` is a private + # method of the including class — the convention is "private inside + # the job, but the substrate's Tracer is allowed to dispatch to it + # the same way the job's own perform method would." + def call(name, **payload) + @emitter.send(:trace_event, "#{@prefix}.#{name}", **payload) + end + end +end diff --git a/lib/opencode/version.rb b/lib/opencode/version.rb new file mode 100644 index 0000000..c09bd31 --- /dev/null +++ b/lib/opencode/version.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +module Opencode + VERSION = "0.0.1.alpha1" +end diff --git a/opencode-ruby.gemspec b/opencode-ruby.gemspec new file mode 100644 index 0000000..0560213 --- /dev/null +++ b/opencode-ruby.gemspec @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +require_relative "lib/opencode/version" + +Gem::Specification.new do |spec| + spec.name = "opencode-ruby" + spec.version = Opencode::VERSION + spec.authors = ["Ajay Krishnan"] + spec.email = ["ajay@krishnan.ca"] + + spec.summary = "Idiomatic Ruby client for OpenCode (HTTP + SSE)." + spec.description = <<~DESC + Hand-rolled, opinionated Ruby SDK for OpenCode's REST + SSE API. + Block-form streaming, value-object responses, automatic SSE + reconnection. Complement to opencode_client (auto-generated from + OpenAPI) — pick this one if you want a small Ruby-idiomatic surface; + pick opencode_client if you want every endpoint with generated types. + DESC + spec.homepage = "https://gitea.krishnan.ca/ajaynomics/opencode-ruby" + spec.license = "MIT" + spec.required_ruby_version = ">= 3.2.0" + + spec.metadata["source_code_uri"] = spec.homepage + spec.metadata["changelog_uri"] = "#{spec.homepage}/src/branch/main/CHANGELOG.md" + spec.metadata["bug_tracker_uri"] = "#{spec.homepage}/issues" + + spec.files = Dir.glob("lib/**/*.rb") + + Dir.glob("examples/**/*.rb") + + %w[README.md LICENSE CHANGELOG.md opencode-ruby.gemspec] + spec.require_paths = ["lib"] + + # The only runtime dependency is ActiveSupport (NOT Rails). ActiveSupport + # is a standalone gem providing the `present?`/`blank?`/`presence`/ + # `truncate`/`duplicable?` helpers used in this gem's code. It does NOT + # pull in ActiveRecord, ActionView, ActionController, Turbo, or any other + # Rails-only piece. Most Ruby apps in the wild already have ActiveSupport + # transitively via another gem; in the rare case yours doesn't, ~250 LOC + # of core_ext is added when this gem installs. + spec.add_runtime_dependency "activesupport", ">= 6.1", "< 9.0" + + spec.add_development_dependency "minitest", "~> 5.20" + spec.add_development_dependency "rake", "~> 13.0" + spec.add_development_dependency "webmock", "~> 3.20" +end diff --git a/test/opencode/smoke_test.rb b/test/opencode/smoke_test.rb new file mode 100644 index 0000000..562ce6d --- /dev/null +++ b/test/opencode/smoke_test.rb @@ -0,0 +1,169 @@ +# frozen_string_literal: true + +require "test_helper" + +# End-to-end smoke test of the gem's public surface. Validates that the +# headline `client.stream(...)` API + Reply::Result + error model + the +# pluggable Instrumentation adapter all work against a fully mocked +# OpenCode server. This is the test we'd point at in the README to +# prove the postcard works. +class SmokeTest < Minitest::Test + BASE = "http://opencode.test" + PASSWORD = "test-secret" + SESSION_ID = "ses_smoke_1" + + def setup + @client = Opencode::Client.new( + base_url: BASE, + password: PASSWORD, + timeout: 5 + ) + Opencode::Instrumentation.adapter = nil + end + + def test_VERSION_is_a_string + assert_kind_of String, Opencode::VERSION + assert_match(/\A\d+\.\d+\.\d+/, Opencode::VERSION) + end + + def test_constants_are_loaded + assert_equal "Opencode::Client", Opencode::Client.name + assert_equal "Opencode::Reply", Opencode::Reply.name + assert_equal "Opencode::Reply::Result", Opencode::Reply::Result.name + assert_equal "Opencode::Error", Opencode::Error.name + assert Opencode::ConnectionError < Opencode::Error + end + + def test_health_endpoint_round_trip + stub_request(:get, "#{BASE}/global/health") + .to_return(status: 200, body: { healthy: true, version: "1.15.5" }.to_json, + headers: { "Content-Type" => "application/json" }) + + response = @client.health + assert_equal true, response[:healthy] + assert_equal "1.15.5", response[:version] + end + + def test_create_session_returns_session_id + stub_request(:post, "#{BASE}/session") + .to_return(status: 200, body: { id: SESSION_ID, title: "smoke" }.to_json, + headers: { "Content-Type" => "application/json" }) + + response = @client.create_session(title: "smoke", permissions: []) + assert_equal SESSION_ID, response[:id] + end + + def test_send_message_async_returns_empty_body + stub_request(:post, "#{BASE}/session/#{SESSION_ID}/prompt_async") + .to_return(status: 204, body: "") + + response = @client.send_message_async(SESSION_ID, "ping") + assert_equal({}, response) + end + + def test_stream_returns_typed_Reply_Result_with_full_text + stub_request(:post, "#{BASE}/session/#{SESSION_ID}/prompt_async") + .to_return(status: 204, body: "") + + sse = [ + { type: "message.part.delta", + properties: { sessionID: SESSION_ID, partID: "p1", field: "text", delta: "hello " } }, + { type: "message.part.delta", + properties: { sessionID: SESSION_ID, partID: "p1", field: "text", delta: "world" } }, + { type: "session.idle", properties: { sessionID: SESSION_ID } } + ].map { |e| "data: #{e.to_json}\n\n" }.join + + stub_request(:get, %r{#{Regexp.escape(BASE)}/event(\?.*)?\z}) + .to_return(status: 200, body: sse, + headers: { "Content-Type" => "text/event-stream" }) + + stub_request(:get, "#{BASE}/session/#{SESSION_ID}/message") + .to_return(status: 200, body: [].to_json, + headers: { "Content-Type" => "application/json" }) + + parts_yielded = [] + reply = @client.stream(SESSION_ID, "ping") do |part| + parts_yielded << part.dup + end + + assert_kind_of Opencode::Reply::Result, reply + assert_equal "hello world", reply.full_text + # Struct value object supports both message and hash style access. + assert_equal "hello world", reply[:full_text] + refute_empty parts_yielded + end + + def test_stream_block_is_optional + stub_request(:post, "#{BASE}/session/#{SESSION_ID}/prompt_async") + .to_return(status: 204, body: "") + + sse = [ + { type: "message.part.delta", + properties: { sessionID: SESSION_ID, partID: "p1", field: "text", delta: "ack" } }, + { type: "session.idle", properties: { sessionID: SESSION_ID } } + ].map { |e| "data: #{e.to_json}\n\n" }.join + + stub_request(:get, %r{#{Regexp.escape(BASE)}/event(\?.*)?\z}) + .to_return(status: 200, body: sse, + headers: { "Content-Type" => "text/event-stream" }) + + stub_request(:get, "#{BASE}/session/#{SESSION_ID}/message") + .to_return(status: 200, body: [].to_json, + headers: { "Content-Type" => "application/json" }) + + reply = @client.stream(SESSION_ID, "ping") + assert_equal "ack", reply.full_text + end + + def test_connection_refused_raises_ConnectionError + stub_request(:get, "http://opencode.dead/global/health") + .to_raise(Errno::ECONNREFUSED) + + bad = Opencode::Client.new(base_url: "http://opencode.dead", timeout: 1) + assert_raises(Opencode::ConnectionError) { bad.health } + end + + def test_404_on_session_endpoint_raises_SessionNotFoundError + stub_request(:get, "#{BASE}/session/missing/message") + .to_return(status: 404, body: { error: "not found" }.to_json, + headers: { "Content-Type" => "application/json" }) + + assert_raises(Opencode::SessionNotFoundError) do + @client.get_messages("missing") + end + end + + def test_instrumentation_adapter_receives_request_events + events = [] + Opencode::Instrumentation.adapter = ->(name, payload, &block) { + events << [ name, payload ] + block.call + } + + stub_request(:get, "#{BASE}/global/health") + .to_return(status: 200, body: "{}", + headers: { "Content-Type" => "application/json" }) + + @client.health + assert events.any? { |name, _| name == "opencode.request" }, + "instrumentation adapter must receive opencode.request events" + end + + def test_Reply_distill_returns_typed_Result + parts = [ + { "type" => "text", "content" => "hi" }, + { "type" => "text", "content" => "there" }, + { "type" => "tool", "tool" => "read", "status" => "completed" } + ] + result = Opencode::Reply.distill(parts) + + assert_kind_of Opencode::Reply::Result, result + assert_equal "hi\n\nthere", result.full_text + assert_equal 1, result.tool_parts.size + end + + def test_Instrumentation_no_op_default_yields_block_value + Opencode::Instrumentation.adapter = nil + assert_equal 42, Opencode::Instrumentation.instrument("x") { 42 } + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 0000000..272f31a --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +$LOAD_PATH.unshift File.expand_path("../lib", __dir__) + +require "opencode-ruby" +require "minitest/autorun" +require "webmock/minitest" + +# Tests run against WebMock-stubbed endpoints; never hit the network. +WebMock.disable_net_connect!