v0.7.1: Streaming + Markdown + URLs + Interrupt — TDD
Some checks failed
Deploy (Gitea) / deploy (push) Failing after 2s
Some checks failed
Deploy (Gitea) / deploy (push) Failing after 2s
Stream-chunk protocol: SSE streaming via provider-openai-stream, cascade-stream with fboundp guard in think(). TUI renders live. Stream interrupt: Esc during streaming marks [interrupted], finalizes msg. SSE cancel infrastructure: *stream-cancel* check in read loop. Markdown inline: **bold**, *italic*, `code` via parse-markdown-spans. Code blocks: parse-markdown-blocks + syntax-highlight (keywords/strings/fns). URL detection + Tab-to-activate: https:// URLs in dim, Tab opens. Watchdog: 30s stall detection via Dexador read-timeout. [streaming] indicator in status bar. Pre-existing TUI test fixes (7): first→aref, nil→zerop, add-msg arg. Core: 65/65 Neuro: 13/13 TUI View: 22/22 TUI Main: 65/65 Total: 165 tests, 0 failures.
This commit is contained in:
@@ -31,6 +31,18 @@ Providers register themselves at boot. No API key? That provider doesn't registe
|
||||
when the LLM returns a tool call, or the existing ~:content~ path otherwise.
|
||||
4. (provider-cascade-initialize): reads ~PROVIDER_CASCADE~ from env and
|
||||
sets ~*provider-cascade*~.
|
||||
5. (provider-openai-stream prompt system-prompt callback &key model provider tools):
|
||||
v0.7.1 — executes a streaming OpenAI-compatible /v1/chat/completions
|
||||
request. Sends ~"stream": true~ in the request body. Reads Server-Sent
|
||||
Events (SSE) from the response stream, parsing ~data: ...~ lines. For
|
||||
each delta with content, calls CALLBACK with the delta string. After
|
||||
all deltas, calls CALLBACK with ~""~ to signal end-of-stream. Returns
|
||||
~(:status :success)~ on completion or ~(:status :error :message ...)~.
|
||||
If ~*stream-cancel*~ is set to T (by another thread), exits the SSE
|
||||
loop and calls CALLBACK with ~""~.
|
||||
6. (parse-sse-line line): parses an SSE line. Returns the data content
|
||||
for ~data: <content>~ lines, ~:done~ for ~data: [DONE]~, and ~nil~
|
||||
for comment lines (starting with ~:~), empty lines, or non-data lines.
|
||||
|
||||
* Implementation
|
||||
|
||||
@@ -202,6 +214,142 @@ If API-KEY is nil, reads from environment."
|
||||
:trigger (lambda (ctx) (declare (ignore ctx)) nil))
|
||||
#+end_src
|
||||
|
||||
* v0.7.1 — Streaming Backend
|
||||
:PROPERTIES:
|
||||
:ID: id-v071-streaming
|
||||
:CREATED: [2026-05-08 Fri]
|
||||
:END:
|
||||
|
||||
** SSE Parser
|
||||
|
||||
*** RED
|
||||
#+begin_example
|
||||
test-parse-sse-line-data: 0/2 pass — stub returns nil instead of content
|
||||
test-parse-sse-line-done: 0/1 pass — stub returns nil instead of :done
|
||||
test-parse-sse-line-nil: 3/3 pass — stub correctly returns nil
|
||||
#+end_example
|
||||
|
||||
*** GREEN
|
||||
#+begin_example
|
||||
test-parse-sse-line-data: 2/2 pass (100%)
|
||||
test-parse-sse-line-done: 1/1 pass (100%)
|
||||
test-parse-sse-line-nil: 3/3 pass (100%)
|
||||
test-provider-openai-stream-calls-callback: 3/3 pass (100%)
|
||||
llm-gateway-suite: 13/13 pass (100%)
|
||||
#+end_example
|
||||
|
||||
** Cascade Stream
|
||||
#+begin_src lisp
|
||||
(defun cascade-stream (prompt system-prompt callback)
|
||||
"Streaming cascade: calls provider-openai-stream on the first available backend.
|
||||
Calls CALLBACK with each delta string, then with '' to signal end-of-stream."
|
||||
(dolist (backend *provider-cascade*)
|
||||
(when (gethash backend *probabilistic-backends*)
|
||||
(let ((result (provider-openai-stream prompt system-prompt callback
|
||||
:provider backend)))
|
||||
(when (eq (getf result :status) :success)
|
||||
(return cascade-stream))))))
|
||||
#+end_src
|
||||
#+begin_src lisp
|
||||
(in-package :passepartout)
|
||||
|
||||
(defun parse-sse-line (line)
|
||||
"Parse an SSE line. Returns data string, :done for [DONE], nil otherwise."
|
||||
(cond
|
||||
((or (null line) (string= line "")) nil)
|
||||
((char= (char line 0) #\:) nil)
|
||||
((and (>= (length line) 6) (string-equal (subseq line 0 6) "data: "))
|
||||
(let ((content (subseq line 6)))
|
||||
(if (string= content "[DONE]")
|
||||
:done
|
||||
content)))
|
||||
(t nil)))
|
||||
#+end_src
|
||||
|
||||
** Streaming request
|
||||
#+begin_src lisp
|
||||
(defvar *stream-cancel* nil
|
||||
"When T, the streaming SSE loop exits early.")
|
||||
|
||||
(defun provider-openai-stream (prompt system-prompt callback &key model (provider :openrouter) tools)
|
||||
"Streaming OpenAI-compatible request. Calls CALLBACK with each delta, then ''."
|
||||
(let* ((config (provider-config provider))
|
||||
(base-url (getf config :base-url))
|
||||
(key-env (getf config :key-env))
|
||||
(url-env (getf config :url-env))
|
||||
(default-model (getf config :default-model))
|
||||
(api-key (when key-env (uiop:getenv key-env)))
|
||||
(model-id (or model default-model))
|
||||
(url (if url-env
|
||||
(let ((host (uiop:getenv url-env)))
|
||||
(if host
|
||||
(format nil "http://~a/v1/chat/completions" host)
|
||||
(format nil "~a/chat/completions" base-url)))
|
||||
(format nil "~a/chat/completions" base-url)))
|
||||
(timeout (or (ignore-errors (parse-integer (uiop:getenv "LLM_REQUEST_TIMEOUT"))) 30))
|
||||
(req-headers (list (cons "Content-Type" "application/json")))
|
||||
(base `((model . ,model-id)
|
||||
(messages . (( (role . "system") (content . ,system-prompt) )
|
||||
( (role . "user") (content . ,prompt) )))
|
||||
(stream . t))))
|
||||
(when api-key
|
||||
(push (cons "Authorization" (format nil "Bearer ~a" api-key)) req-headers))
|
||||
(when (eq provider :openrouter)
|
||||
(setf req-headers
|
||||
(append req-headers
|
||||
`(("HTTP-Referer" . "https://github.com/amrgharbeia/passepartout")
|
||||
("X-Title" . "Passepartout")))))
|
||||
(let ((body (if tools
|
||||
(append base
|
||||
`((tools . ,(loop for tool in tools
|
||||
collect (list (cons :|type| "function")
|
||||
(cons :|function|
|
||||
(loop for (k v) on tool by #'cddr
|
||||
collect (cons (intern (string-upcase (string k)) "KEYWORD") v))))))
|
||||
(:|tool_choice| . "auto")))
|
||||
base)))
|
||||
(handler-case
|
||||
(let* ((body-json (cl-json:encode-json-to-string body))
|
||||
(stall-seconds 30)
|
||||
(s (dex:post url :headers req-headers :content body-json
|
||||
:connect-timeout (min 5 timeout)
|
||||
:read-timeout stall-seconds
|
||||
:want-stream t)))
|
||||
;; v0.7.1: track stall timer — reset on each successful chunk
|
||||
(let ((last-chunk-time (get-universal-time)))
|
||||
(loop for raw = (handler-case (read-line s nil nil)
|
||||
(error (c)
|
||||
(declare (ignore c))
|
||||
nil))
|
||||
while raw
|
||||
do (when *stream-cancel* ; v0.7.1: cancel check
|
||||
(setf *stream-cancel* nil)
|
||||
(funcall callback " [cancelled]")
|
||||
(return))
|
||||
(let ((parsed (parse-sse-line raw)))
|
||||
(cond
|
||||
((null parsed))
|
||||
((eq parsed :done) (return))
|
||||
(t (handler-case
|
||||
(let* ((json (cl-json:decode-json-from-string parsed))
|
||||
(choices (cdr (assoc :choices json)))
|
||||
(choice (car choices))
|
||||
(delta (cdr (assoc :delta choice)))
|
||||
(content (cdr (assoc :content delta))))
|
||||
(when content
|
||||
(funcall callback content)
|
||||
(setf last-chunk-time (get-universal-time))))
|
||||
(error ())))))
|
||||
(when (> (- (get-universal-time) last-chunk-time) stall-seconds)
|
||||
(funcall callback "[Response stalled — timed out at 30s]")
|
||||
(return))))
|
||||
(funcall callback "")
|
||||
(close s)
|
||||
(list :status :success))
|
||||
(error (c)
|
||||
(list :status :error :message (format nil "~a Stream Failure: ~a" provider c)))))))
|
||||
#+end_src
|
||||
|
||||
* Test Suite
|
||||
#+begin_src lisp
|
||||
(eval-when (:compile-toplevel :load-toplevel :execute)
|
||||
@@ -231,4 +379,32 @@ If API-KEY is nil, reads from environment."
|
||||
"Contract 4: provider-openai-request accepts :tools parameter without error."
|
||||
(let ((result (provider-openai-request "test" "system" :tools (list))))
|
||||
(fiveam:is (member (getf result :status) '(:success :error)))))
|
||||
|
||||
;; ── v0.7.1 Streaming ──
|
||||
|
||||
(fiveam:test test-parse-sse-line-data
|
||||
"Contract 6: parse-sse-line extracts content from data: lines."
|
||||
(fiveam:is (string= "hello world" (passepartout::parse-sse-line "data: hello world")))
|
||||
(fiveam:is (string= "{\"a\":1}" (passepartout::parse-sse-line "data: {\"a\":1}"))))
|
||||
|
||||
(fiveam:test test-parse-sse-line-done
|
||||
"Contract 6: parse-sse-line returns :done for [DONE]."
|
||||
(fiveam:is (eq :done (passepartout::parse-sse-line "data: [DONE]"))))
|
||||
|
||||
(fiveam:test test-parse-sse-line-nil
|
||||
"Contract 6: parse-sse-line returns nil for comment, empty, non-data lines."
|
||||
(fiveam:is (null (passepartout::parse-sse-line "")))
|
||||
(fiveam:is (null (passepartout::parse-sse-line ":ok")))
|
||||
(fiveam:is (null (passepartout::parse-sse-line "event: ping"))))
|
||||
|
||||
(fiveam:test test-provider-openai-stream-calls-callback
|
||||
"Contract 5: provider-openai-stream calls callback with deltas and final empty string."
|
||||
(let ((collected '()))
|
||||
(flet ((collector (text) (push text collected)))
|
||||
(passepartout::provider-openai-stream "hi" "sys" #'collector :provider :openrouter))
|
||||
(let* ((reversed (nreverse collected))
|
||||
(last (car (last reversed))))
|
||||
(fiveam:is (stringp last))
|
||||
(fiveam:is (string= "" last))
|
||||
(fiveam:is (>= (length reversed) 2)))))
|
||||
#+end_src
|
||||
|
||||
Reference in New Issue
Block a user