Files
opencode-ruby/lib/opencode/client.rb
Ajay Krishnan 889d38332f
Some checks failed
Test / test (3.2) (push) Failing after 9m43s
Test / test (3.3) (push) Failing after 9m43s
Test / test (3.4) (push) Failing after 9m42s
Initial public release v0.0.1.alpha2
opencode-ruby — idiomatic Ruby client for OpenCode (HTTP + SSE).

Hand-rolled, opinionated Ruby SDK with block-form streaming, value-
object responses, and automatic SSE reconnection. Pluggable
Opencode::Instrumentation adapter for routing events to
ActiveSupport::Notifications, OpenTelemetry, stdout, or any custom
emitter. Companion to opencode-rails for AR-coupled Rails apps.

What this version ships:
  - Opencode::Client (Net::HTTP + SSE)
  - Opencode::Reply / Reply::Result / ReplyObserver
  - Opencode::Tracer, Opencode::Prompts
  - Opencode::ResponseParser, ToolPart, PartSource, Todo
  - Opencode::Instrumentation (instrument + notify)
  - Opencode::Error and seven subclasses
  - examples/conversation_recipe.rb — canonical Rails wiring blueprint

15 smoke tests. CI on Ruby 3.2/3.3/3.4.

Ruby >= 3.2. Runtime dep: activesupport >= 6.1, < 9.0.

See CHANGELOG.md for the alpha1 -> alpha2 delta.
2026-05-20 21:41:30 -07:00

565 lines
20 KiB
Ruby

# frozen_string_literal: true
require "net/http"
require "json"
require "base64"
module Opencode
# HTTP client for OpenCode REST API.
# Thread safety: Each instance creates its own Net::HTTP connection.
# Do NOT share instances across threads. Create per-job.
class Client
attr_reader :directory
def initialize(
base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096",
password: ENV["OPENCODE_SERVER_PASSWORD"],
timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i,
directory: nil,
workspace: nil
)
@uri = URI.parse(base_url)
@password = password
@timeout = timeout || 120
@directory = directory
@workspace = workspace
end
def create_session(title: nil, permissions: nil)
body = { title: title, permission: permissions }.compact
post("/session", body)
end
def send_message(
session_id, text,
parts: nil,
model: nil,
agent: nil,
system: nil,
message_id: nil,
no_reply: nil,
tools: nil,
format: nil,
variant: nil
)
body = prompt_payload(
text,
parts: parts,
model: model,
agent: agent,
system: system,
message_id: message_id,
no_reply: no_reply,
tools: tools,
format: format,
variant: variant
)
post("/session/#{session_id}/message", body)
end
def send_message_async(
session_id, text,
parts: nil,
model: nil,
agent: nil,
system: nil,
message_id: nil,
no_reply: nil,
tools: nil,
format: nil,
variant: nil
)
body = prompt_payload(
text,
parts: parts,
model: model,
agent: agent,
system: system,
message_id: message_id,
no_reply: no_reply,
tools: tools,
format: format,
variant: variant
)
post("/session/#{session_id}/prompt_async", body)
end
# Block-form streaming — the headline API for callers who want the
# full async-prompt + SSE-loop + final-exchange-merge flow in one
# call. Returns the final Opencode::Reply::Result value object once
# the agent finishes.
#
# reply = client.stream(session_id, "Explain monads") do |part|
# print part["content"] if part["type"] == "text"
# end
# reply.full_text # => the final accumulated text
# reply.tool_parts # => array of terminal tool parts
#
# The block is invoked every time a part is added, grows, finalizes,
# or (for tool parts) advances state — i.e., whenever a user-visible
# change happens. The block receives the current `part` hash (string
# keys: "type", "content", "tool", "status", "input", ...).
#
# If you need raw events (every server.* tick, todo.updated, prompt
# asked/replied, etc.), use #stream_events instead.
#
# Optional kwargs are forwarded to send_message_async — model, agent,
# system prompt override, and the SSE pacing knobs supported by
# stream_events.
def stream(
session_id, text,
model: nil, agent: nil, system: nil, message_id: nil,
stream_timeout: 600,
first_event_timeout: 120,
idle_stream_timeout: nil,
on_activity_tick: nil,
&block
)
send_message_async(
session_id, text,
model: model, agent: agent, system: system, message_id: message_id
)
reply = Opencode::Reply.new
reply.add_observer(StreamBlockObserver.new(&block)) if block_given?
stream_events(
session_id: session_id,
timeout: stream_timeout,
first_event_timeout: first_event_timeout,
idle_stream_timeout: idle_stream_timeout,
reply: reply,
on_activity_tick: on_activity_tick
) do |event|
reply.apply(event)
end
merge_final_exchange(session_id, reply)
reply.result
end
def list_sessions
uri = build_uri("/session")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def children(session_id)
uri = build_uri("/session/#{session_id}/children")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def delete_session(session_id)
uri = build_uri("/session/#{session_id}")
request = Net::HTTP::Delete.new(uri)
execute(request)
end
def session_status
uri = build_uri("/session/status")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def get_messages(session_id)
uri = build_uri("/session/#{session_id}/message")
request = Net::HTTP::Get.new(uri)
execute(request)
end
def abort_session(session_id)
post("/session/#{session_id}/abort", {})
end
def reply_question(request_id:, answers:)
post("/question/#{request_id}/reply", { answers: answers })
end
def reject_question(request_id:)
post("/question/#{request_id}/reject", {})
end
def reply_permission(request_id:, reply:, message: nil)
body = { reply: reply }
body[:message] = message if message.present?
post("/permission/#{request_id}/reply", body)
end
# Returns pending question requests as an Array of Hashes with
# SYMBOL keys, consistent with every other endpoint that flows
# through handle_response (e.g., health, list_sessions, get_messages).
# Callers that compare against persisted JSON column data should
# symbolize their side, not desymbolize this side.
def list_questions
uri = build_uri("/question")
request = Net::HTTP::Get.new(uri)
add_auth_header(request)
response = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do
http_client.request(request)
end
unless response.code.to_i.between?(200, 299)
raise ServerError, "list_questions failed: HTTP #{response.code}#{response.body.to_s[0, 200]}"
end
return [] if response.body.blank?
JSON.parse(response.body, symbolize_names: true)
rescue JSON::ParserError => e
raise ServerError, "list_questions returned invalid JSON: #{e.message}"
rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}"
rescue Errno::ECONNREFUSED, SocketError => e
raise ConnectionError, "OpenCode unreachable: #{e.message}"
end
def health
uri = build_uri("/global/health", scoped: false)
request = Net::HTTP::Get.new(uri)
execute(request)
end
MAX_SSE_BUFFER = 1_048_576 # 1 MB — safety valve against pathological server responses
SSE_RECONNECT_DELAY = 0.1
TRANSIENT_SSE_ERRORS = [
EOFError,
IOError,
Net::OpenTimeout,
Net::ReadTimeout,
Errno::ECONNREFUSED,
Errno::ECONNRESET,
Errno::EPIPE
].freeze
# Opens SSE connection to GET /event, yields parsed events filtered by session_id.
# Blocks until session goes idle or timeout, reconnecting across dropped
# event-stream connections.
#
# first_event_timeout: seconds to wait for a session-specific event before
# declaring the session stale. Server heartbeats don't count — they're global
# keep-alives that flow regardless of session state.
#
# Default 120s rather than the more aggressive 30s used originally:
# slow-thinking reasoning models (Kimi K2, GPT-5 with extended thinking,
# etc.) routinely spend 30-90s of pure reasoning before emitting their
# first `message.part.*` event, especially on cold sessions with long
# system prompts. 30s false-positive trips on legitimate first turns
# and converts them to `StaleSessionError`; 120s catches genuine zombies
# without nuking real reasoning. Callers that know their agent is
# short-prompt + fast can pass a lower value.
#
# idle_stream_timeout: seconds to wait BETWEEN meaningful events once
# the session has started producing them. Default nil = no check
# (preserves the overall `timeout` ceiling behavior). Opt-in heartbeat
# watchdog for callers whose user-facing surface needs to fail fast
# rather than sit forever when an upstream LLM stream wedges mid-turn.
# Distinct from first_event_timeout (which only protects cold-start)
# and from the overall `timeout` ceiling of 600s (which is forgiving
# — a hung stream holding a thread for 10 minutes is already a bad
# UX). When the window is exceeded the call raises
# Opencode::IdleStreamError, which the caller is expected to catch and
# translate into a user-visible error / retry affordance.
def stream_events(session_id:, timeout: 600, first_event_timeout: 120,
idle_stream_timeout: nil,
reply: nil, on_activity_tick: nil, &block)
uri = build_uri("/event")
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
first_event_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + first_event_timeout
received_session_event = false
last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
loop do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
deadline = check_deadline_or_suspend(now, deadline, timeout, reply)
# NOTE: first_event_deadline is *not* suspension-eligible. If the agent
# never gets started we want to fail fast — a session that's blocked on
# a prompt has, by definition, already produced events.
if !received_session_event && now > first_event_deadline
raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
end
if idle_stream_timeout && received_session_event &&
(now - last_meaningful_event_at) > idle_stream_timeout
raise IdleStreamError,
"No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
"(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
end
request = Net::HTTP::Get.new(uri)
request["Accept"] = "text/event-stream"
request["Cache-Control"] = "no-cache"
add_auth_header(request)
http = Net::HTTP.new(@uri.host, @uri.port)
http.use_ssl = @uri.scheme == "https"
http.open_timeout = 10
http.read_timeout = 30
begin
buffer = String.new
http.request(request) do |response|
unless response.is_a?(Net::HTTPSuccess)
raise ServerError, "SSE connection failed: HTTP #{response.code}"
end
response.read_body do |chunk|
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
deadline = check_deadline_or_suspend(now, deadline, timeout, reply)
if !received_session_event && now > first_event_deadline
raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
end
if idle_stream_timeout && received_session_event &&
(now - last_meaningful_event_at) > idle_stream_timeout
raise IdleStreamError,
"No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
"(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
end
buffer << chunk
if buffer.bytesize > MAX_SSE_BUFFER
raise ServerError, "SSE buffer exceeded #{MAX_SSE_BUFFER} bytes"
end
while (idx = buffer.index("\n\n"))
raw_event = buffer.slice!(0, idx + 2)
event = parse_sse_event(raw_event, session_id)
next unless event
unless event[:type]&.start_with?("server.")
received_session_event = true
last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
# Tick activity on EVERY event, including server.heartbeat —
# that's the whole point: a healthy long wait (user thinking
# for 30 minutes) keeps the container warm via heartbeats so
# the reaper doesn't kill it mid-wait.
on_activity_tick&.call(event)
block.call(event)
return if event[:type] == "session.idle"
end
end
end
rescue *TRANSIENT_SSE_ERRORS
# Treat transport-level SSE disconnects like clean EOF: reconnect
# until session.idle, the overall timeout, or first-event timeout.
ensure
begin
http&.finish if http&.started?
rescue IOError
# Connection already closed — network partition or server shutdown
end
end
cutoff = received_session_event ? deadline : first_event_deadline
sleep_for = [ SSE_RECONNECT_DELAY, cutoff - Process.clock_gettime(Process::CLOCK_MONOTONIC) ].min
if sleep_for.positive?
sleep sleep_for
end
end
end
def close
@http&.finish if @http&.started?
rescue IOError
# already closed
end
private
# Best-effort merge of the polled message exchange into the live
# reply. Catches the stream-only / poll-only asymmetry — todo.updated
# is poll-only on some opencode versions; pure-streaming would miss
# the terminal todo state otherwise. If the session API is also down
# at this point (network partition, container teardown mid-call), we
# silently keep whatever the stream accumulated rather than raising;
# the caller's reply is still a usable Result either way.
def merge_final_exchange(session_id, reply)
exchange = get_messages(session_id)
last_assistant = Array(exchange).reverse_each.find do |message|
message.dig(:info, :role) == "assistant"
end
return unless last_assistant
polled = Opencode::ResponseParser.extract_interleaved_parts(last_assistant)
reply.sync_recovered_parts(polled) if polled.any?
rescue Opencode::Error
# Stream's result is still complete; the merge was a polish, not a
# requirement.
end
# Healthy wait: opencode is suspended on a question/permission deferred
# and heartbeats are keeping the connection alive. Reset the deadline
# to "from now" so the full stuck-stream protection is restored once
# the prompt resolves. Otherwise apply the normal deadline check.
def check_deadline_or_suspend(now, deadline, timeout, reply)
return now + timeout if reply&.prompt_blocked?
raise TimeoutError, "SSE stream timed out after #{timeout}s" if now > deadline
deadline
end
def prompt_payload(text, parts:, model:, agent:, system:, message_id:, no_reply:, tools:, format:, variant:)
message_parts = parts || [ { type: "text", text: text } ]
{
messageID: message_id,
parts: message_parts,
model: format_model(model),
agent: agent,
noReply: no_reply,
tools: tools,
format: format,
system: system,
variant: variant
}.compact
end
def format_model(model)
return nil unless model
return model if model.is_a?(Hash)
provider, model_id = model.split("/", 2)
{ providerID: provider, modelID: model_id }
end
def post(path, body)
uri = build_uri(path)
request = Net::HTTP::Post.new(uri)
request.body = body.to_json
execute(request)
end
def build_uri(path, scoped: true)
uri = @uri.dup
uri.path = path
if scoped
query = URI.decode_www_form(uri.query.to_s)
query << [ "directory", @directory ] if @directory.present?
query << [ "workspace", @workspace ] if @workspace.present?
uri.query = query.any? ? URI.encode_www_form(query) : nil
end
uri
end
def add_auth_header(request)
request["Content-Type"] = "application/json"
if @password.present?
request["Authorization"] = "Basic #{Base64.strict_encode64("opencode:#{@password}")}"
end
end
def execute(request)
add_auth_header(request)
response = nil
result = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do
response = http_client.request(request)
handle_response(response)
end
result
rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}"
rescue Errno::ECONNREFUSED, SocketError => e
raise ConnectionError, "OpenCode unreachable: #{e.message}"
end
def http_client
@http ||= Net::HTTP.new(@uri.host, @uri.port).tap do |http|
http.use_ssl = @uri.scheme == "https"
http.open_timeout = 10
http.read_timeout = @timeout
http.write_timeout = 30
end
end
def parse_sse_event(raw, session_id)
data_line = raw.lines.find { |l| l.start_with?("data: ") }
return nil unless data_line
json = JSON.parse(data_line.sub("data: ", "").strip, symbolize_names: true)
event_session = json.dig(:properties, :sessionID) ||
json.dig(:properties, :info, :sessionID) ||
json.dig(:properties, :part, :sessionID)
return json if json[:type] == "server.heartbeat"
return json if json[:type] == "server.connected"
return nil unless event_session == session_id
json
rescue JSON::ParserError
nil
end
def handle_response(response)
return {} if response.code.to_i == 204
body = if response.body.present?
JSON.parse(response.body, symbolize_names: true)
else
{}
end
case response.code.to_i
when 200..299 then body
when 400 then raise BadRequestError.new(error_message(body, "Bad request"), response: body)
when 404 then raise SessionNotFoundError.new(error_message(body, "Session not found"), response: body)
when 500..599 then raise ServerError.new(error_message(body, "Server error"), response: body)
else raise Error.new("Unexpected response: #{response.code}", response: body)
end
rescue JSON::ParserError
raise ServerError.new("Invalid JSON from OpenCode (HTTP #{response.code}): #{response.body&.truncate(200)}")
end
# OpenCode HTTP error bodies use a wrapped shape: { name:, data: { message:, kind?: } }.
# v1.14.51 stopped exposing internal defect details from the HTTP API, so
# `body[:message]` is no longer populated for errors — only `body[:data][:message]`.
# We read both to keep older mock servers working in tests.
def error_message(body, fallback)
body.dig(:data, :message) || body[:message] || fallback
end
end
# Internal Reply observer that bridges Reply's multi-callback protocol
# to a single user-supplied block for Client#stream. Each part-level
# callback (part_added, part_changed, part_finalized, tool_progressed)
# forwards the current part to the user's block.
#
# Non-part-level callbacks (step_finished, session_*, message_updated,
# todos_changed, question_*, permission_*) are intentionally NOT
# forwarded — they're either telemetry the gem owns internally, or
# interactive-protocol concerns that callers route through
# #stream_events directly when they need them.
class StreamBlockObserver
include Opencode::ReplyObserver
def initialize(&block)
@block = block
end
def part_added(part:, **)
@block.call(part)
end
def part_changed(part:, **)
@block.call(part)
end
def part_finalized(part:, **)
@block.call(part)
end
def tool_progressed(part:, **)
@block.call(part)
end
end
end