Files
org-agent-contrib/skills/org-skill-gateway-matrix.org

6.4 KiB

SKILL: Matrix Gateway (Universal Literate Note)

Overview

The Matrix Gateway provides bi-directional communication via the Matrix Client-Server API. It features an asynchronous polling sensor using the `/sync` endpoint and a registered actuator for outbound `m.room.message` events.

Phase A: Demand (PRD)

1. Purpose

Integrate the OpenCortex into the Matrix federation for secure, distributed chat.

2. Success Criteria

  • Inbound: Messages from Matrix rooms are normalized and injected into the harness Bus.
  • Outbound: The `:matrix` target correctly routes messages to specific room IDs.
  • State: The `since` token is maintained during a session to prevent message loops.

Phase B: Blueprint (PROTOCOL)

1. Architectural Intent

Autonomous background polling of the Matrix homeserver. Uses `dexador` for HTTP and `cl-json` for parsing.

2. Semantic Interfaces

  • `(:type :EVENT :meta (:source :matrix :room-id "…") :payload (:sensor :user-input :text "…"))`
  • `(:type :REQUEST :target :matrix :payload (:action :message :text "…"))`

Phase D: Build (Implementation)

Package Context

State: Sync Token

Tracks the last processed event to ensure we only receive new messages.

(defvar *matrix-since-token* nil)

State: Polling Thread

Reference to the background thread responsible for sync requests.

(defvar *matrix-polling-thread* nil)

Credential Retrieval: Homeserver

(defun get-matrix-homeserver () (vault-get-secret :matrix-homeserver))

Credential Retrieval: Token

(defun get-matrix-token () (vault-get-secret :matrix-token))

Actuator: sendMessage

Sends an `m.room.message` to a Matrix room.

(defun execute-matrix-action (action context)
  "Sends a message via Matrix Client API."
  (declare (ignore context))
  (let* ((payload (getf action :payload))
         (meta (getf action :meta))
         (room-id (or (getf meta :room-id) (getf payload :room-id) (getf action :room-id)))
         (text (or (getf payload :text) (getf action :text)))
         (hs (get-matrix-homeserver))
         (token (get-matrix-token))
         (txn-id (get-universal-time))
         (url (format nil "~a/_matrix/client/v3/rooms/~a/send/m.room.message/~a" hs room-id txn-id)))
    (when (and hs token room-id text)
      (harness-log "MATRIX: Sending message to ~a..." room-id)
      (handler-case 
          (dex:put url 
                   :headers `(("Authorization" . ,(format nil "Bearer ~a" token))
                              ("Content-Type" . "application/json"))
                   :content (cl-json:encode-json-to-string 
                             `((msgtype . "m.text") (body . ,text))))
        (error (c) (harness-log "MATRIX ERROR: ~a" c))))))

Sensor: Sync loop & Injection

Polls the `/sync` endpoint and processes timeline events.

(defun matrix-process-sync ()
  "Calls Matrix sync and injects new messages."
  (let* ((hs (get-matrix-homeserver))
         (token (get-matrix-token))
         (url (format nil "~a/_matrix/client/v3/sync?timeout=30000~@[&since=~a~]" 
                      hs *matrix-since-token*)))
    (when (and hs token)
      (handler-case
          (let* ((response (dex:get url :headers `(("Authorization" . ,(format nil "Bearer ~a" token)))))
                 (json (cl-json:decode-json-from-string response))
                 (next-batch (or (cdr (assoc :next-batch json))
                                 (cdr (assoc :next--batch json))))
                 (rooms (cdr (assoc :rooms json)))
                 (joined (cdr (assoc :join rooms))))
            
            (when next-batch
              (setf *matrix-since-token* next-batch))
            
            (dolist (room-entry joined)
              (let* ((room-id (string-downcase (string (car room-entry))))
                     (room-data (cdr room-entry))
                     (timeline (cdr (assoc :timeline room-data)))
                     (events (cdr (assoc :events timeline))))
                (dolist (event events)
                  (let* ((type (cdr (assoc :type event)))
                         (content (cdr (assoc :content event)))
                         (sender (cdr (assoc :sender event)))
                         (body (cdr (assoc :body content))))
                    (when (and (string= type "m.room.message") body)
                      (harness-log "MATRIX: Received message from ~a in ~a" sender room-id)
                      (inject-stimulus 
                       (list :type :EVENT 
                             :meta (list :source :matrix :room-id room-id :sender sender)
                             :payload (list :sensor :user-input 
                                            :text body)))))))))
        (error (c) (harness-log "MATRIX SYNC ERROR: ~a" c))))))

Start Polling

Initializes the Matrix background thread.

(defun start-matrix-gateway ()
  "Initializes the Matrix background thread."
  (unless (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*))
    (setf *matrix-polling-thread*
          (bt:make-thread 
           (lambda ()
             (loop
               (matrix-process-sync)
               (sleep 2)))
           :name "opencortex-matrix-gateway"))
    (harness-log "MATRIX: Gateway sync active.")))

Stop Polling

Gracefully terminates the background thread.

(defun stop-matrix-gateway ()
  (when (and *matrix-polling-thread* (bt:thread-alive-p *matrix-polling-thread*))
    (bt:destroy-thread *matrix-polling-thread*)
    (setf *matrix-polling-thread* nil)))

Registration: Actuator

Register the Matrix channel as a physical actuator.

(register-actuator :matrix #'execute-matrix-action)

Registration: Skill

Define the passive skill entry for the gateway.

(defskill :skill-gateway-matrix
  :priority 150
  :trigger (lambda (ctx) (declare (ignore ctx)) nil)
  :probabilistic nil
  :deterministic (lambda (action ctx) (declare (ignore ctx)) action))

Initialization

Trigger the sync loop upon loading.

(start-matrix-gateway)