6.4 KiB
SKILL: Matrix Gateway (Universal Literate Note)
Overview
The Matrix Gateway provides bi-directional communication via the Matrix Client-Server API. It features an asynchronous polling sensor using the `/sync` endpoint and a registered actuator for outbound `m.room.message` events.
Phase A: Demand (PRD)
1. Purpose
Integrate the OpenCortex into the Matrix federation for secure, distributed chat.
2. Success Criteria
- Inbound: Messages from Matrix rooms are normalized and injected into the harness Bus.
- Outbound: The `:matrix` target correctly routes messages to specific room IDs.
- State: The `since` token is maintained during a session to prevent message loops.
Phase B: Blueprint (PROTOCOL)
1. Architectural Intent
Autonomous background polling of the Matrix homeserver. Uses `dexador` for HTTP and `cl-json` for parsing.
2. Semantic Interfaces
- `(:sensor :chat-message :channel :matrix …)`
- `(:type :REQUEST :target :matrix :room-id "…" :text "…")`
Phase D: Build (Implementation)
Package Context
State: Sync Token
Tracks the last processed event to ensure we only receive new messages.
(defvar *matrix-since-token* nil)
State: Polling Thread
Reference to the background thread responsible for sync requests.
(defvar *matrix-polling-thread* nil)
Credential Retrieval: Homeserver
(defun get-matrix-homeserver () (vault-get-secret :matrix-homeserver))
Credential Retrieval: Token
(defun get-matrix-token () (vault-get-secret :matrix-token))
Actuator: sendMessage
Sends an `m.room.message` to a Matrix room.
(defun execute-matrix-action (action context)
"Sends a message via Matrix Client API."
(declare (ignore context))
(let* ((payload (getf action :payload))
(room-id (or (getf payload :room-id) (getf action :room-id)))
(text (or (getf payload :text) (getf action :text)))
(hs (get-matrix-homeserver))
(token (get-matrix-token))
(txn-id (get-universal-time))
(url (format nil "~a/_matrix/client/v3/rooms/~a/send/m.room.message/~a" hs room-id txn-id)))
(when (and hs token room-id text)
(harness-log "MATRIX: Sending message to ~a..." room-id)
(handler-case
(dex:put url
:headers `(("Authorization" . ,(format nil "Bearer ~a" token))
("Content-Type" . "application/json"))
:content (cl-json:encode-json-to-string
`((msgtype . "m.text") (body . ,text))))
(error (c) (harness-log "MATRIX ERROR: ~a" c))))))
Sensor: Sync loop & Injection
Polls the `/sync` endpoint and processes timeline events.
(defun matrix-process-sync ()
"Calls Matrix sync and injects new messages."
(let* ((hs (get-matrix-homeserver))
(token (get-matrix-token))
(url (format nil "~a/_matrix/client/v3/sync?timeout=30000~@[&since=~a~]"
hs *matrix-since-token*)))
(when (and hs token)
(handler-case
(let* ((response (dex:get url :headers `(("Authorization" . ,(format nil "Bearer ~a" token)))))
(json (cl-json:decode-json-from-string response))
(next-batch (or (cdr (assoc :next-batch json))
(cdr (assoc :next--batch json))))
(rooms (cdr (assoc :rooms json)))
(joined (cdr (assoc :join rooms))))
(when next-batch
(setf *matrix-since-token* next-batch))
(dolist (room-entry joined)
(let* ((room-id (string-downcase (string (car room-entry))))
(room-data (cdr room-entry))
(timeline (cdr (assoc :timeline room-data)))
(events (cdr (assoc :events timeline))))
(dolist (event events)
(let* ((type (cdr (assoc :type event)))
(content (cdr (assoc :content event)))
(sender (cdr (assoc :sender event)))
(body (cdr (assoc :body content))))
(when (and (string= type "m.room.message") body)
(harness-log "MATRIX: Received message from ~a in ~a" sender room-id)
(inject-stimulus
(list :type :EVENT
:payload (list :sensor :chat-message
:channel :matrix
:room-id room-id
:sender sender
:text body)))))))))
(error (c) (harness-log "MATRIX SYNC ERROR: ~a" c))))))
Start Polling
Initializes the Matrix background thread.
(defun start-matrix-gateway ()
"Initializes the Matrix background thread."
(unless (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*))
(setf *matrix-polling-thread*
(bt:make-thread
(lambda ()
(loop
(matrix-process-sync)
(sleep 2)))
:name "opencortex-matrix-gateway"))
(harness-log "MATRIX: Gateway sync active.")))
Stop Polling
Gracefully terminates the background thread.
(defun stop-matrix-gateway ()
(when (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*))
(bt:destroy-thread *matrix-polling-thread*)
(setf *matrix-polling-thread* nil)))
Registration: Actuator
Register the Matrix channel as a physical actuator.
(register-actuator :matrix #'execute-matrix-action)
Registration: Skill
Define the passive skill entry for the gateway.
(defskill :skill-gateway-matrix
:priority 150
:trigger (lambda (ctx) (declare (ignore ctx)) nil)
:probabilistic nil
:deterministic (lambda (action ctx) (declare (ignore ctx)) action))
Initialization
Trigger the sync loop upon loading.
(start-matrix-gateway)