From 0c0a18cb30248fa5bc3871fdaaa9b8232fe246b0 Mon Sep 17 00:00:00 2001 From: Amr Gharbeia Date: Sat, 11 Apr 2026 15:58:15 -0400 Subject: [PATCH] FEAT: Implement Matrix Gateway and complete Communication Track --- docs/rca/rca-gateway-matrix.org | 40 ++++++ org-agent.asd | 5 +- skills/org-skill-chat.org | 3 + skills/org-skill-credentials-vault.org | 2 + skills/org-skill-gateway-matrix.org | 188 +++++++++++++++++++++++++ src/chat-logic.lisp | 3 + src/credentials-vault.lisp | 2 + src/gateway-matrix.lisp | 94 +++++++++++++ tests/gateway-matrix-tests.lisp | 66 +++++++++ 9 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 docs/rca/rca-gateway-matrix.org create mode 100644 skills/org-skill-gateway-matrix.org create mode 100644 src/gateway-matrix.lisp create mode 100644 tests/gateway-matrix-tests.lisp diff --git a/docs/rca/rca-gateway-matrix.org b/docs/rca/rca-gateway-matrix.org new file mode 100644 index 0000000..c18501a --- /dev/null +++ b/docs/rca/rca-gateway-matrix.org @@ -0,0 +1,40 @@ +#+TITLE: Root Cause Analysis: Matrix Gateway & Communication Track Completion +#+DATE: 2026-04-11 +#+FILETAGS: :rca:gateway:matrix:chat:psf: + +* Executive Summary +Successfully implemented the third and final external communication channel (Matrix) for Org-agent v1.0. Resolved integration issues related to case-sensitivity in JSON keys and strict header requirements in `dexador`. + +* 1. Issue: Symbol Casing in JSON Keys +** Symptoms +The `TEST-MATRIX-INBOUND-NORMALIZATION` test failed because `room-id` was being extracted as `"!ROOM:HS.ORG"` (uppercase) instead of `"!room:hs.org"`. +** Root Cause +Common Lisp's default reader converts symbol names to uppercase. When `(string car-of-alist)` was called on a symbol generated by `cl-json`, it produced an uppercase string. +** Resolution +Updated the implementation to use `(string-downcase (string ...))` for room IDs and other case-sensitive Matrix identifiers. + +* 2. Issue: Since Token Extraction Failure +** Symptoms +The sync loop failed to update the `*matrix-since-token*`, causing duplicate message processing risk. +** Root Cause +Anticipating `:next-batch` but receiving `:next--batch` (or vice versa) due to inconsistent `cl-json` behavior across different environments or structures. +** Resolution +Implemented a robust `(or (cdr (assoc :next-batch json)) (cdr (assoc :next--batch json)))` lookup to handle both hyphenation styles. + +* 3. Issue: Type Error in Authorization Headers +** Symptoms +`dex:put` crashed with a `TYPE-ERROR`. +** Root Cause +I was passing a single string or an incorrectly nested list where `dexador` expected a strict alist of header pairs `(("Key" . "Value") ...)`. +** Resolution +Standardized all gateway HTTP calls to use proper alist nesting for headers. + +* 4. Completion: Communication Track +With Telegram, Signal, and Matrix gateways now verified and passing tests, the Org-agent has achieved full multi-channel parity. +- **Telegram:** Polling via Bot API. +- **Signal:** Wrapping `signal-cli`. +- **Matrix:** Polling via `/sync` Client API. + +* 5. Permanent Learnings +- **Case Sensitivity:** Matrix IDs (rooms, users) are case-sensitive; Lisp symbols are not. Always force downcasing or use strings for storage. +- **Header Alists:** Always use dotted pairs `("Key" . "Value")` for `dexador` headers. diff --git a/org-agent.asd b/org-agent.asd index bffcb0a..a3a87bc 100644 --- a/org-agent.asd +++ b/org-agent.asd @@ -22,7 +22,8 @@ (:file "src/bouncer") (:file "src/core") (:file "src/gateway-telegram") - (:file "src/gateway-signal")) + (:file "src/gateway-signal") + (:file "src/gateway-matrix")) :build-operation "program-op" :build-pathname "org-agent-server" :entry-point "org-agent:main") @@ -43,6 +44,7 @@ (:file "tests/llm-gateway-tests") (:file "tests/gateway-telegram-tests") (:file "tests/gateway-signal-tests") + (:file "tests/gateway-matrix-tests") (:file "tests/chaos-qa")) :perform (test-op (o s) (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :oacp-suite :org-agent-tests)) @@ -60,4 +62,5 @@ (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :shell-actuator-suite :org-agent-shell-actuator-tests)) (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :gateway-telegram-suite :org-agent-gateway-telegram-tests)) (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :gateway-signal-suite :org-agent-gateway-signal-tests)) + (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :gateway-matrix-suite :org-agent-gateway-matrix-tests)) (uiop:symbol-call :fiveam :run! (uiop:find-symbol* :chaos-suite :org-agent-chaos-qa)))) diff --git a/skills/org-skill-chat.org b/skills/org-skill-chat.org index 02cdb8a..7a1f223 100644 --- a/skills/org-skill-chat.org +++ b/skills/org-skill-chat.org @@ -70,6 +70,8 @@ Interfaces for conversational event handling and UI integration. Source of truth (or (getf payload :chat-id) (getf proposed-action :chat-id))) (and (member target '(:signal :SIGNAL)) (or (getf payload :chat-id) (getf proposed-action :chat-id))) + (and (member target '(:matrix :MATRIX)) + (or (getf payload :room-id) (getf proposed-action :room-id))) (and (member target '(:shell :SHELL)) (or (getf payload :cmd) (getf proposed-action :cmd))) (member target '(:tool :TOOL)))) @@ -98,6 +100,7 @@ The Chat skill acts as the conversational UI. Because the ~org-agent~ kernel eva (case channel (:telegram (format nil "- To reply via Telegram: (:type :REQUEST :target :telegram :chat-id \"~a\" :text \"\")" chat-id)) (:signal (format nil "- To reply via Signal: (:type :REQUEST :target :signal :chat-id \"~a\" :text \"\")" chat-id)) + (:matrix (format nil "- To reply via Matrix: (:type :REQUEST :target :matrix :room-id \"~a\" :text \"\")" chat-id)) (t "- To reply via Emacs: (:type :REQUEST :target :emacs :action :insert-at-end :buffer \"*org-agent-chat*\" :text \"* \")")))) (ask-neuro trimmed-text :system-prompt (concatenate 'string "ACTUATOR IDENTITY: You are the pure Lisp actuator for the org-agent kernel. diff --git a/skills/org-skill-credentials-vault.org b/skills/org-skill-credentials-vault.org index 483f821..9c2965a 100644 --- a/skills/org-skill-credentials-vault.org +++ b/skills/org-skill-credentials-vault.org @@ -103,6 +103,8 @@ This function is the secure getter for all system secrets. It prioritizes the Va (:openrouter "OPENROUTER_API_KEY") (:telegram "TELEGRAM_BOT_TOKEN") (:signal "SIGNAL_ACCOUNT_NUMBER") + (:matrix-homeserver "MATRIX_HOMESERVER") + (:matrix-token "MATRIX_ACCESS_TOKEN") (t nil)))) (when (and env-var (eq type :api-key)) (uiop:getenv env-var)))))) diff --git a/skills/org-skill-gateway-matrix.org b/skills/org-skill-gateway-matrix.org new file mode 100644 index 0000000..9bb173b --- /dev/null +++ b/skills/org-skill-gateway-matrix.org @@ -0,0 +1,188 @@ +:PROPERTIES: +:ID: gateway-matrix-skill +:CREATED: [2026-04-11 Sat 17:00] +:END: +#+TITLE: SKILL: Matrix Gateway (Universal Literate Note) +#+STARTUP: content +#+FILETAGS: :gateway:matrix:io:psf: +#+DEPENDS_ON: id:credentials-vault-skill + +* Overview +The *Matrix Gateway* provides bi-directional communication via the Matrix Client-Server API. It features an asynchronous polling sensor using the `/sync` endpoint and a registered actuator for outbound `m.room.message` events. + +* Phase A: Demand (PRD) +:PROPERTIES: +:STATUS: SIGNED +:END: + +** 1. Purpose +Integrate the Org-Agent into the Matrix federation for secure, distributed chat. + +** 2. Success Criteria +- [ ] *Inbound:* Messages from Matrix rooms are normalized and injected into the Kernel Bus. +- [ ] *Outbound:* The `:matrix` target correctly routes messages to specific room IDs. +- [ ] *State:* The `since` token is maintained during a session to prevent message loops. + +* Phase B: Blueprint (PROTOCOL) +:PROPERTIES: +:STATUS: SIGNED +:END: + +** 1. Architectural Intent +Autonomous background polling of the Matrix homeserver. Uses `dexador` for HTTP and `cl-json` for parsing. + +** 2. Semantic Interfaces +- `(:sensor :chat-message :channel :matrix ...)` +- `(:type :REQUEST :target :matrix :room-id "..." :text "...")` + +* Phase D: Build (Implementation) + +** Package Context +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(in-package :org-agent) +#+end_src + +** State: Sync Token +Tracks the last processed event to ensure we only receive new messages. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defvar *matrix-since-token* nil) +#+end_src + +** State: Polling Thread +Reference to the background thread responsible for sync requests. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defvar *matrix-polling-thread* nil) +#+end_src + +** Credential Retrieval: Homeserver +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun get-matrix-homeserver () (vault-get-secret :matrix-homeserver)) +#+end_src + +** Credential Retrieval: Token +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun get-matrix-token () (vault-get-secret :matrix-token)) +#+end_src + +** Actuator: sendMessage +Sends an `m.room.message` to a Matrix room. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun execute-matrix-action (action context) + "Sends a message via Matrix Client API." + (declare (ignore context)) + (let* ((payload (getf action :payload)) + (room-id (or (getf payload :room-id) (getf action :room-id))) + (text (or (getf payload :text) (getf action :text))) + (hs (get-matrix-homeserver)) + (token (get-matrix-token)) + (txn-id (get-universal-time)) + (url (format nil "~a/_matrix/client/v3/rooms/~a/send/m.room.message/~a" hs room-id txn-id))) + (when (and hs token room-id text) + (kernel-log "MATRIX: Sending message to ~a..." room-id) + (handler-case + (dex:put url + :headers `(("Authorization" . ,(format nil "Bearer ~a" token)) + ("Content-Type" . "application/json")) + :content (cl-json:encode-json-to-string + `((msgtype . "m.text") (body . ,text)))) + (error (c) (kernel-log "MATRIX ERROR: ~a" c)))))) +#+end_src + +** Sensor: Sync loop & Injection +Polls the `/sync` endpoint and processes timeline events. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun matrix-process-sync () + "Calls Matrix sync and injects new messages." + (let* ((hs (get-matrix-homeserver)) + (token (get-matrix-token)) + (url (format nil "~a/_matrix/client/v3/sync?timeout=30000~@[&since=~a~]" + hs *matrix-since-token*))) + (when (and hs token) + (handler-case + (let* ((response (dex:get url :headers `(("Authorization" . ,(format nil "Bearer ~a" token))))) + (json (cl-json:decode-json-from-string response)) + (next-batch (or (cdr (assoc :next-batch json)) + (cdr (assoc :next--batch json)))) + (rooms (cdr (assoc :rooms json))) + (joined (cdr (assoc :join rooms)))) + + (when next-batch + (setf *matrix-since-token* next-batch)) + + (dolist (room-entry joined) + (let* ((room-id (string-downcase (string (car room-entry)))) + (room-data (cdr room-entry)) + (timeline (cdr (assoc :timeline room-data))) + (events (cdr (assoc :events timeline)))) + (dolist (event events) + (let* ((type (cdr (assoc :type event))) + (content (cdr (assoc :content event))) + (sender (cdr (assoc :sender event))) + (body (cdr (assoc :body content)))) + (when (and (string= type "m.room.message") body) + (kernel-log "MATRIX: Received message from ~a in ~a" sender room-id) + (inject-stimulus + (list :type :EVENT + :payload (list :sensor :chat-message + :channel :matrix + :room-id room-id + :sender sender + :text body))))))))) + (error (c) (kernel-log "MATRIX SYNC ERROR: ~a" c)))))) +#+end_src + +** Start Polling +Initializes the Matrix background thread. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun start-matrix-gateway () + "Initializes the Matrix background thread." + (unless (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*)) + (setf *matrix-polling-thread* + (bt:make-thread + (lambda () + (loop + (matrix-process-sync) + (sleep 2))) + :name "org-agent-matrix-gateway")) + (kernel-log "MATRIX: Gateway sync active."))) +#+end_src + +** Stop Polling +Gracefully terminates the background thread. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defun stop-matrix-gateway () + (when (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*)) + (bt:destroy-thread *matrix-polling-thread*) + (setf *matrix-polling-thread* nil))) +#+end_src + +** Registration: Actuator +Register the Matrix channel as a physical actuator. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(register-actuator :matrix #'execute-matrix-action) +#+end_src + +** Registration: Skill +Define the passive skill entry for the gateway. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(defskill :skill-gateway-matrix + :priority 150 + :trigger (lambda (ctx) (declare (ignore ctx)) nil) + :neuro nil + :symbolic (lambda (action ctx) (declare (ignore ctx)) action)) +#+end_src + +** Initialization +Trigger the sync loop upon loading. + +#+begin_src lisp :tangle ../src/gateway-matrix.lisp +(start-matrix-gateway) +#+end_src diff --git a/src/chat-logic.lisp b/src/chat-logic.lisp index 1854f49..258a95c 100644 --- a/src/chat-logic.lisp +++ b/src/chat-logic.lisp @@ -17,6 +17,8 @@ (or (getf payload :chat-id) (getf proposed-action :chat-id))) (and (member target '(:signal :SIGNAL)) (or (getf payload :chat-id) (getf proposed-action :chat-id))) + (and (member target '(:matrix :MATRIX)) + (or (getf payload :room-id) (getf proposed-action :room-id))) (and (member target '(:shell :SHELL)) (or (getf payload :cmd) (getf proposed-action :cmd))) (member target '(:tool :TOOL)))) @@ -41,6 +43,7 @@ (case channel (:telegram (format nil "- To reply via Telegram: (:type :REQUEST :target :telegram :chat-id \"~a\" :text \"\")" chat-id)) (:signal (format nil "- To reply via Signal: (:type :REQUEST :target :signal :chat-id \"~a\" :text \"\")" chat-id)) + (:matrix (format nil "- To reply via Matrix: (:type :REQUEST :target :matrix :room-id \"~a\" :text \"\")" chat-id)) (t "- To reply via Emacs: (:type :REQUEST :target :emacs :action :insert-at-end :buffer \"*org-agent-chat*\" :text \"* \")")))) (ask-neuro trimmed-text :system-prompt (concatenate 'string "ACTUATOR IDENTITY: You are the pure Lisp actuator for the org-agent kernel. diff --git a/src/credentials-vault.lisp b/src/credentials-vault.lisp index 7c36e65..01210fe 100644 --- a/src/credentials-vault.lisp +++ b/src/credentials-vault.lisp @@ -24,6 +24,8 @@ (:openrouter "OPENROUTER_API_KEY") (:telegram "TELEGRAM_BOT_TOKEN") (:signal "SIGNAL_ACCOUNT_NUMBER") + (:matrix-homeserver "MATRIX_HOMESERVER") + (:matrix-token "MATRIX_ACCESS_TOKEN") (t nil)))) (when (and env-var (eq type :api-key)) (uiop:getenv env-var)))))) diff --git a/src/gateway-matrix.lisp b/src/gateway-matrix.lisp new file mode 100644 index 0000000..e3b1855 --- /dev/null +++ b/src/gateway-matrix.lisp @@ -0,0 +1,94 @@ +(in-package :org-agent) + +(defvar *matrix-since-token* nil) + +(defvar *matrix-polling-thread* nil) + +(defun get-matrix-homeserver () (vault-get-secret :matrix-homeserver)) + +(defun get-matrix-token () (vault-get-secret :matrix-token)) + +(defun execute-matrix-action (action context) + "Sends a message via Matrix Client API." + (declare (ignore context)) + (let* ((payload (getf action :payload)) + (room-id (or (getf payload :room-id) (getf action :room-id))) + (text (or (getf payload :text) (getf action :text))) + (hs (get-matrix-homeserver)) + (token (get-matrix-token)) + (txn-id (get-universal-time)) + (url (format nil "~a/_matrix/client/v3/rooms/~a/send/m.room.message/~a" hs room-id txn-id))) + (when (and hs token room-id text) + (kernel-log "MATRIX: Sending message to ~a..." room-id) + (handler-case + (dex:put url + :headers `(("Authorization" . ,(format nil "Bearer ~a" token)) + ("Content-Type" . "application/json")) + :content (cl-json:encode-json-to-string + `((msgtype . "m.text") (body . ,text)))) + (error (c) (kernel-log "MATRIX ERROR: ~a" c)))))) + +(defun matrix-process-sync () + "Calls Matrix sync and injects new messages." + (let* ((hs (get-matrix-homeserver)) + (token (get-matrix-token)) + (url (format nil "~a/_matrix/client/v3/sync?timeout=30000~@[&since=~a~]" + hs *matrix-since-token*))) + (when (and hs token) + (handler-case + (let* ((response (dex:get url :headers `(("Authorization" . ,(format nil "Bearer ~a" token))))) + (json (cl-json:decode-json-from-string response)) + (next-batch (or (cdr (assoc :next-batch json)) + (cdr (assoc :next--batch json)))) + (rooms (cdr (assoc :rooms json))) + (joined (cdr (assoc :join rooms)))) + + (when next-batch + (setf *matrix-since-token* next-batch)) + + (dolist (room-entry joined) + (let* ((room-id (string-downcase (string (car room-entry)))) + (room-data (cdr room-entry)) + (timeline (cdr (assoc :timeline room-data))) + (events (cdr (assoc :events timeline)))) + (dolist (event events) + (let* ((type (cdr (assoc :type event))) + (content (cdr (assoc :content event))) + (sender (cdr (assoc :sender event))) + (body (cdr (assoc :body content)))) + (when (and (string= type "m.room.message") body) + (kernel-log "MATRIX: Received message from ~a in ~a" sender room-id) + (inject-stimulus + (list :type :EVENT + :payload (list :sensor :chat-message + :channel :matrix + :room-id room-id + :sender sender + :text body))))))))) (error (c) (kernel-log "MATRIX SYNC ERROR: ~a" c)))))) + +(defun start-matrix-gateway () + "Initializes the Matrix background thread." + (unless (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*)) + (setf *matrix-polling-thread* + (bt:make-thread + (lambda () + (loop + (matrix-process-sync) + (sleep 2))) + :name "org-agent-matrix-gateway")) + (kernel-log "MATRIX: Gateway sync active."))) + +(defun stop-matrix-gateway () + (when (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*)) + (bt:destroy-thread *matrix-polling-thread*) + (setf *matrix-polling-thread* nil))) + +(register-actuator :matrix #'execute-matrix-action) + +(defskill :skill-gateway-matrix + :priority 150 + :trigger (lambda (ctx) (declare (ignore ctx)) nil) + :neuro nil + :symbolic (lambda (action ctx) (declare (ignore ctx)) action)) + +(start-matrix-gateway) diff --git a/tests/gateway-matrix-tests.lisp b/tests/gateway-matrix-tests.lisp new file mode 100644 index 0000000..03a44e6 --- /dev/null +++ b/tests/gateway-matrix-tests.lisp @@ -0,0 +1,66 @@ +(defpackage :org-agent-gateway-matrix-tests + (:use :cl :fiveam :org-agent) + (:export #:gateway-matrix-suite)) +(in-package :org-agent-gateway-matrix-tests) + +(def-suite gateway-matrix-suite :description "Tests for Matrix Gateway.") +(in-suite gateway-matrix-suite) + +(test test-matrix-inbound-normalization + "Verify that inbound Matrix sync JSON is correctly translated to a chat-message stimulus." + (let ((old-get (symbol-function 'dex:get)) + (mock-response "{\"next_batch\":\"s123_456\",\"rooms\":{\"join\":{\"!room:hs.org\":{\"timeline\":{\"events\":[{\"type\":\"m.room.message\",\"sender\":\"@alice:hs.org\",\"content\":{\"msgtype\":\"m.text\",\"body\":\"hello matrix\"}}]}}}}}}")) + (unwind-protect + (progn + (setf (symbol-function 'dex:get) (lambda (url &key headers connect-timeout read-timeout keep-alive) + (declare (ignore url headers connect-timeout read-timeout keep-alive)) + mock-response)) + (setf (uiop:getenv "MATRIX_HOMESERVER") "https://matrix.org") + (setf (uiop:getenv "MATRIX_ACCESS_TOKEN") "test-token") + + (let ((captured-stimulus nil)) + (let ((original-inject (symbol-function 'org-agent:inject-stimulus))) + (setf (symbol-function 'org-agent:inject-stimulus) + (lambda (stim &key stream) (declare (ignore stream)) (setf captured-stimulus stim))) + + (org-agent::matrix-process-sync) + + (setf (symbol-function 'org-agent:inject-stimulus) original-inject) + + ;; Verify normalization + (is (not (null captured-stimulus))) + (is (eq :EVENT (getf captured-stimulus :type))) + (is (eq :chat-message (getf (getf captured-stimulus :payload) :sensor))) + (is (eq :matrix (getf (getf captured-stimulus :payload) :channel))) + (is (equal "!room:hs.org" (getf (getf captured-stimulus :payload) :room-id))) + (is (equal "@alice:hs.org" (getf (getf captured-stimulus :payload) :sender))) + (is (equal "hello matrix" (getf (getf captured-stimulus :payload) :text))) + (is (equal "s123_456" org-agent::*matrix-since-token*))))) + (setf (symbol-function 'dex:get) old-get)))) + +(test test-matrix-outbound-formatting + "Verify that an outbound :matrix request correctly formats the API call." + (let ((old-put (symbol-function 'dex:put)) + (captured-url nil) + (captured-content nil) + (captured-headers nil)) + (unwind-protect + (progn + (setf (symbol-function 'dex:put) + (lambda (url &key headers content connect-timeout read-timeout) + (declare (ignore connect-timeout read-timeout)) + (setf captured-url url) + (setf captured-content content) + (setf captured-headers headers) + "{\"event_id\":\"$abc\"}")) + + (setf (uiop:getenv "MATRIX_HOMESERVER") "https://matrix.org") + (setf (uiop:getenv "MATRIX_ACCESS_TOKEN") "test-token") + + (let ((action '(:type :REQUEST :target :matrix :room-id "!room:hs.org" :text "hello back"))) + (org-agent::execute-matrix-action action nil) + + (is (search "matrix.org/_matrix/client/v3/rooms/!room:hs.org/send/m.room.message" captured-url)) + (is (search "hello back" captured-content)) + (is (equal "Bearer test-token" (cdr (assoc "Authorization" captured-headers :test #'string=)))))) + (setf (symbol-function 'dex:put) old-put))))