Files
passepartout/harness/perceive.org

8.9 KiB

Stage 1: Perceive (perceive.lisp)

Stage 1: Perceive (perceive.lisp)

Architectural Intent: Sensory Normalization

The Perceive stage is the "sensory cortex" of OpenCortex. Its job is to take raw stimuli from the outside world and transform them into standardized Signals that the rest of the pipeline can process.

Raw stimuli come from diverse sources:

  • Terminal input (CLI)
  • Emacs org-mode buffers (via swank)
  • Telegram/Signal messages
  • Heartbeats (internal clock)
  • Shell command outputs

Each source has its own format and protocol. Perceive normalizes all of them into the Signal format:

(TYPE :EVENT META (...) PAYLOAD (...))

Why Normalize?

Without normalization, each downstream component (Reason, Act) would need to understand each input format. With normalization:

  1. The gateway layer (CLI, Emacs, Telegram) just sends raw messages
  2. Perceive transforms them into Signals
  3. Reason and Act work with a single, consistent format
  4. Adding new input sources only requires gateway code, not changes to the core

The Signal Format

Signals are property lists with a consistent structure:

Key Description
:type :EVENT, :REQUEST, :RESPONSE, :LOG
:payload The actual content (sensor data, actions, etc.)
:meta Metadata: source, session, reply stream
:status Processing status: :perceived, :reasoned, :acted
:depth Recursion depth for feedback loops
:approved-action Set by Reason, executed by Act
:foveal-focus ID of the node user is interacting with

Async vs Sync Processing

Some sensors (user input, chat messages) are processed asynchronously in dedicated threads. This prevents:

  • A slow API call from blocking the entire system
  • Race conditions when multiple inputs arrive simultaneously

Other sensors (heartbeats, interrupts) are processed synchronously to maintain ordering guarantees.

Package Context

(in-package :opencortex)

Sensor Configuration

Async Sensor Registry

(defvar *async-sensors* '(:chat-message :delegation :user-command)
  "Sensors that are processed in dedicated threads.

  These sensors can block (waiting for API responses, user input, etc.)
  so they run in separate threads to avoid blocking the main pipeline.

  Other sensors (:heartbeat, :interrupt, :buffer-update) are processed
  synchronously to maintain temporal ordering.")

Foveal Focus State

(defvar *foveal-focus-id* nil
  "The Org ID of the node the user is currently interacting with.

  This enables the reasoning engine to provide contextually relevant
  responses. When editing a specific note, the agent knows which
  note you're referring to without needing explicit ID references.

  Updated on :point-update events from Emacs.")

Stimulus Injection

inject-stimulus: Entry Point

(defun inject-stimulus (raw-message &key stream (depth 0))
  "Inject a raw message into the signal processing pipeline.

  RAW-MESSAGE is a property list that will be normalized into a Signal.
  STREAM is an optional output stream for responses (used by TUI/CLI).
  DEPTH tracks recursion depth for feedback loops.

  This function determines whether to process synchronously or
  asynchronously based on the sensor type, then calls process-signal
  to run through the Perceive -> Reason -> Act pipeline.

  Error handling: Uses restarts to prevent individual signals from
  crashing the entire system. Failed signals are logged and dropped."

  (let* ((payload (getf raw-message :payload))
         (sensor (getf payload :sensor))
         (meta (getf raw-message :meta))
         (async-p (or (getf payload :async-p)
                     (member sensor *async-sensors*))))

    ;; Ensure metadata exists
    (unless meta
      (setf meta (list :SOURCE :SYSTEM :SESSION-ID "internal")))

    ;; Attach reply stream if provided
    (when stream
      (setf (getf meta :reply-stream) stream))

    (setf (getf raw-message :meta) meta)

    (if async-p
        ;; Async: process in dedicated thread
        (bt:make-thread
         (lambda ()
           (restart-case
               (handler-bind ((error (lambda (c)
                                       (harness-log "ASYNC ERROR: ~a" c)
                                       (invoke-restart 'skip-event))))
                 (process-signal raw-message))
             (skip-event () nil)))
         :name "opencortex-async-task")

        ;; Sync: process in main thread with recovery
        (restart-case
            (handler-bind ((error (lambda (c)
                                    (harness-log "SYSTEM ERROR: ~a" c)
                                    (invoke-restart 'skip-event))))
              (process-signal raw-message))
          (skip-event ()
            (harness-log "SYSTEM RECOVERY: Stimulus dropped."))))))

The Perceive Gate

perceive-gate: Signal Normalization

(defun perceive-gate (signal)
  "Stage 1 of the metabolic pipeline: Normalize sensory input.

  This function:
  1. Logs the incoming signal for debugging
  2. Handles special sensor types (:buffer-update, :point-update, etc.)
  3. Updates the Memory graph with incoming data
  4. Tracks foveal focus (user's current node)
  5. Sets :status to :perceived

  Modifies the signal in place and returns it for the next stage.

  Memory snapshots are taken before AST updates to enable rollback
  if the update causes issues."

  (let* ((payload (getf signal :payload))
         (type (getf signal :type))
         (meta (getf signal :meta))
         (sensor (getf payload :sensor)))

    ;; Log the incoming signal for debugging
    (harness-log "GATE [Perceive]: ~a (~a) [Source: ~s]"
                 type (or sensor "no-sensor") (getf meta :source))

    ;; Handle EVENT type sensors
    (cond ((eq type :EVENT)
           (case sensor

             ;; Org buffer was modified - update memory
             (:buffer-update
              (let ((ast (getf payload :ast)))
                (when ast
                  (snapshot-memory)  ; Enable rollback if update causes issues
                  (ingest-ast ast))))

             ;; Point moved to different org node - update focus
             (:point-update
              (let ((element (getf payload :element)))
                (when element
                  (snapshot-memory)
                  ;; Track foveal focus for contextual reasoning
                  (setf *foveal-focus-id*
                        (ignore-errors (getf element :id)))
                  (ingest-ast element))))

             ;; System interrupt - trigger shutdown
             (:interrupt
              (bt:with-lock-held (*interrupt-lock*)
                (setf *interrupt-flag* t)))))

          ;; Log responses from actuators
          ((eq type :RESPONSE)
           (harness-log "GATE [Perceive]: Act Result -> ~a"
                       (getf payload :status))))

    ;; Update signal status
    (setf (getf signal :status) :perceived)
    (setf (getf signal :foveal-focus) *foveal-focus-id*)
    signal))

Sensor Types Reference

Sensor Source Processing Description
:user-input CLI/TUI Async Text input from terminal
:chat-message Telegram/Signal Async Messages from messaging apps
:heartbeat Internal Sync Periodic maintenance trigger
:buffer-update Emacs Sync Org buffer was modified
:point-update Emacs Sync Cursor moved to different headline
:interrupt System Sync SIGINT received
:tool-output Internal Sync Result from cognitive tool
:loop-error Internal Sync Error during signal processing

Test Suite

These tests verify the Perceive pipeline. Run with: (fiveam:run! 'pipeline-perceive-suite)

(defpackage :opencortex-pipeline-perceive-tests
  (:use :cl :fiveam :opencortex)
  (:export #:pipeline-perceive-suite))

(in-package :opencortex-pipeline-perceive-tests)

(def-suite pipeline-perceive-suite
  :description "Test suite for Perceive pipeline")

(in-suite pipeline-perceive-suite)

(test test-perceive-gate
  "Perceive gate should update the object store and normalize signal."
  (clrhash opencortex::*memory*)
  (let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil))))
         (result (perceive-gate signal)))
    (is (eq :perceived (getf result :status)))
    (is (not (null (gethash "test-node" opencortex::*memory*))))))

(test test-depth-limiting
  "Verify that the pipeline terminates runaway feedback loops."
  (let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat))))
    (is (null (process-signal runaway-signal)))))