Files
passepartout/org/core-communication.org
Amr Gharbeia d35aea391e feat(v0.3.0): Event Orchestrator skill
- New system-event-orchestrator skill with hook registry, cron registry, and tier classifier

- Three dispatch tiers: :reflex (no LLM), :cognition (light), :reasoning (full)

- Org-mode timestamp parsing for repeat patterns (+1w, +1d, +1m)

- Registers on heartbeat via defskill, dispatches due cron jobs

- Fix all remaining harness-log → log-message references across org files
2026-05-02 22:36:39 -04:00

11 KiB

Communication Protocol (communication.lisp)

Overview: Architectural Intent

The Communication Protocol defines how Passepartout speaks to the outside world. It sits between the metabolic loop and the network, providing framed, length-prefixed message transport over TCP.

Every message is an S-expression (plist) prefixed with a 6-character hex length:

00002C(:TYPE :EVENT :PAYLOAD (:ACTION :handshake :VERSION "0.2.0"))

This is a deliberate rejection of JSON, Protocol Buffers, or any other serialization format. The message format is Lisp-native because:

  1. The agent generates and consumes these messages inside the cognitive loop — no serialization layer needed
  2. The format is human-readable and trivially debuggable with a text editor
  3. The length prefix prevents framing attacks (no "read until newline" ambiguity)

Why Length-Prefixed Framing?

A naive TCP protocol that reads until newline fails when:

  • A message contains a newline character (which Lisp plists can)
  • A message is split across TCP packets (read returns partial data)
  • A malicious client sends an infinite stream without newlines

The length prefix solves all three problems. The reader reads exactly 6 characters (the hex length), then reads exactly that many additional characters. No ambiguous termination, no partial message handling, no newline worries.

The 6-character hex length supports messages up to ~16MB (0xFFFFFF bytes). This is sufficient for any single message the agent would produce. Larger payloads should be split across multiple messages.

Implementation

Package Context

(in-package :passepartout)

Actuator Registry

The global registry mapping target keywords (:cli, :telegram, :signal, etc.) to their physical actuator functions. Extensible at runtime — skills can register new actuators via actuator-register.

(defvar *actuator-registry* (make-hash-table :test 'equalp)
  "Global registry mapping target keywords to their physical actuator functions.")

(defun actuator-register (name fn) 
  "Registers an actuator function. Actuators receive: (ACTION CONTEXT)."
  (let ((key (if (keywordp name) name (intern (string-upcase (string name)) :keyword))))
    (setf (gethash key *actuator-registry*) fn)))

Message Framing

Three functions handle the full message lifecycle: sanitize (strip non-serializable state), frame (serialize + prefix), and read (parse from stream).

Sanitize Protocol Message

Strips transient runtime state (:reply-stream, :socket, :stream) from a message plist before sending it over the network. These are Lisp stream objects that cannot be serialized and have no meaning to the remote end.

(defun protocol-message-sanitize (msg)
  "Recursively strips non-serializable objects from a protocol plist."
  (if (and msg (listp msg))
      (let ((clean nil))
        (loop for (k v) on msg by #'cddr
              do (unless (member k '(:reply-stream :socket :stream))
                   (push k clean)
                   (push (if (listp v) (protocol-message-sanitize v) v) clean)))
        (nreverse clean))
      msg))

Frame Message

Serializes a plist to a length-prefixed string: 6-character hex length followed by the prin1 representation.

(defun frame-message (msg)
  "Serializes a message plist and prefixes it with a 6-character hex length."
  (let* ((sanitized (protocol-message-sanitize msg))
         (payload (let ((*print-pretty* nil) (*read-eval* nil)) (format nil "~s" sanitized)))
         (len (length payload)))
    (format nil "~6,'0x~a" len payload)))

Read Framed Message

Reads a complete framed message from a TCP stream. Handles leading whitespace between messages, partial reads, and malformed length headers gracefully. Returns the parsed S-expression, or :eof if the stream is closed, or :error if the message is malformed.

(defun read-framed-message (stream)
  "Reads a hex-length prefixed S-expression from the stream securely."
  (let ((length-buffer (make-string 6)))
    (handler-case
        (progn
          (loop for char = (peek-char nil stream nil :eof)
                while (and (not (eq char :eof)) (member char '(#\Space #\Newline #\Tab #\Return)))
                do (read-char stream))
          (let ((count (read-sequence length-buffer stream)))
            (if (< count 6)
                :eof
                (let ((len (ignore-errors (parse-integer length-buffer :radix 16))))
                  (if (not len)
                      :error
                      (let ((msg-buffer (make-string len)))
                        (read-sequence msg-buffer stream)
                        (let ((*read-eval* nil))
                          (handler-case (read-from-string msg-buffer)
                            (error () :error)))))))))
      (error () :error))))

Server Listener (daemon-start)

The TCP server that accepts connections from CLI and TUI clients. Each connection gets a dedicated thread (client-handle-connection).

The daemon sends a handshake message on connection, then enters a read loop, injecting each received message into the metabolic loop via inject-stimulus. The :health-check message type is handled inline (not sent to the cognitive loop) so that health checks work even when the agent is busy.

(defvar *daemon-socket* nil)

(defun client-handle-connection (socket)
  "Handles a single TUI/CLI client connection in a dedicated thread."
  (let ((stream (usocket:socket-stream socket)))
    (handler-case
        (progn
          (format stream "~a" (frame-message (make-hello-message "0.2.0")))
          (finish-output stream)
          (loop
            (let ((msg (read-framed-message stream)))
              (cond
                ((eq msg :eof) (return))
                ((eq msg :error) (return))
                ((eq (getf msg :type) :health-check)
                 (let ((health-msg (list :type :health-response 
                                         :status (or (and (boundp 'passepartout::*system-health*) 
                                                         (symbol-value 'passepartout::*system-health*))
                                                     :unknown)
                                         :checked-p (or (and (boundp 'passepartout::*health-check-ran*)
                                                             (symbol-value 'passepartout::*health-check-ran*))
                                                     nil))))
                   (format stream "~a" (frame-message health-msg))
                   (finish-output stream)))
                (t (inject-stimulus msg :stream stream))))))
      (error (c) (log-message "CLIENT ERROR: ~a" c)))
    (ignore-errors (usocket:socket-close socket))))

(defun start-daemon (&key (port 9105))
  "Starts the network listener for TUI/CLI clients."
  (setf *daemon-socket* (usocket:socket-listen "127.0.0.1" port :reuse-address t))
  (log-message "DAEMON: Listening on localhost:~a" port)
  (bt:make-thread
   (lambda ()
     (loop
       (let ((client-socket (usocket:socket-accept *daemon-socket*)))
         (when client-socket
           (bt:make-thread (lambda () (client-handle-connection client-socket))
                          :name "passepartout-client-handler")))))
   :name "passepartout-server-listener"))

Handshake Logic

The first message sent to every new connection. The client can use this to verify the protocol version and the daemon's capabilities.

(defun make-hello-message (version)
  "Constructs the standard HELLO handshake message."
  (list :TYPE :EVENT 
        :PAYLOAD (list :ACTION :handshake 
                       :VERSION version 
                       :CAPABILITIES '(:AUTH :ORG-AST))))

Structural Validation

Validates that an incoming message has the minimum required structure: a plist with a valid :type field. Used by the protocol validator skill to reject malformed messages before they enter the cognitive loop.

(in-package :passepartout)

(defun protocol-schema-validate (msg)
  "Strict structural validation for incoming protocol messages."
  (unless (listp msg) (error "Message must be a plist"))
  (let ((type (proto-get msg :type)))
    (unless (member type '(:REQUEST :EVENT :RESPONSE :LOG :STATUS))
      (error "Invalid message type '~a'" type))
    t))

Protocol Smoke Test (manual for REPL evaluation)

Use this function to manually verify that the daemon is alive and the framing protocol works end-to-end. It connects to a running daemon, reads the HELLO handshake, sends a "hi" message, and reads the response.

(defun test-daemon-protocol ()
  (handler-case
      (let* ((socket (usocket:socket-connect "127.0.0.1" 9105))
             (stream (usocket:socket-stream socket)))
        (format t "Connected.~%")
        (let* ((len-buf (make-string 6))
               (count (read-sequence len-buf stream)))
          (when (= count 6)
            (let* ((len (parse-integer len-buf :radix 16))
                   (msg-buf (make-string len)))
              (read-sequence msg-buf stream)
              (format t "HELLO: ~a~%" msg-buf))))
        (let* ((msg '(:TYPE :EVENT :META (:SOURCE :tui) :PAYLOAD (:SENSOR :user-input :TEXT "hi")))
               (framed (frame-message msg)))
          (format stream "~a" framed)
          (finish-output stream)
          (let* ((len-buf (make-string 6))
                 (count (read-sequence len-buf stream)))
            (when (= count 6)
              (let* ((len (parse-integer len-buf :radix 16))
                     (msg-buf (make-string len)))
                (read-sequence msg-buf stream)
                (format t "Response: ~a~%" msg-buf)))))
        (usocket:socket-close socket))
    (error (c) (format t "Error: ~a~%" c))))

Test Suite

Verifies that the framing protocol correctly serializes and deserializes messages.

(eval-when (:compile-toplevel :load-toplevel :execute)
  (ql:quickload :fiveam :silent t))

(defpackage :passepartout-communication-tests
  (:use :cl :fiveam :passepartout)
  (:export #:communication-protocol-suite))
(in-package :passepartout-communication-tests)

(def-suite communication-protocol-suite :description "Communication Protocol Suite")
(in-suite communication-protocol-suite)

(test test-framing
  (let* ((msg '(:type :EVENT :payload (:action :handshake)))
         (framed (frame-message msg)))
    (is (string= "00002C" (string-upcase (subseq framed 0 6))))))