- Fixed memory.org source blocks to ensure persistence functions are tangled. - Improved extract-tangle-target to handle complex Elisp expressions. - Corrected opencortex.sh initialization paths to prevent setup loops. - Reordered variable definitions in policy and standards skills to eliminate forward-reference warnings.
9.0 KiB
Stage 1: Perceive (perceive.lisp)
- Stage 1: Perceive (perceive.lisp)
- Package Context
- Sensor Configuration
- Stimulus Injection
- The Perceive Gate
- Test Suite
Stage 1: Perceive (perceive.lisp)
Architectural Intent: Sensory Normalization
The Perceive stage is the "sensory cortex" of OpenCortex. Its job is to take raw stimuli from the outside world and transform them into standardized Signals that the rest of the pipeline can process.
Raw stimuli come from diverse sources:
- Terminal input (CLI)
- Emacs org-mode buffers (via swank)
- Telegram/Signal messages
- Heartbeats (internal clock)
- Shell command outputs
Each source has its own format and protocol. Perceive normalizes all of them into the Signal format:
(TYPE :EVENT META (...) PAYLOAD (...))
Why Normalize?
Without normalization, each downstream component (Reason, Act) would need to understand each input format. With normalization:
- The gateway layer (CLI, Emacs, Telegram) just sends raw messages
- Perceive transforms them into Signals
- Reason and Act work with a single, consistent format
- Adding new input sources only requires gateway code, not changes to the core
The Signal Format
Signals are property lists with a consistent structure:
| Key | Description |
|---|---|
| :type | :EVENT, :REQUEST, :RESPONSE, :LOG |
| :payload | The actual content (sensor data, actions, etc.) |
| :meta | Metadata: source, session, reply stream |
| :status | Processing status: :perceived, :reasoned, :acted |
| :depth | Recursion depth for feedback loops |
| :approved-action | Set by Reason, executed by Act |
| :foveal-focus | ID of the node user is interacting with |
Async vs Sync Processing
Some sensors (user input, chat messages) are processed asynchronously in dedicated threads. This prevents:
- A slow API call from blocking the entire system
- Race conditions when multiple inputs arrive simultaneously
Other sensors (heartbeats, interrupts) are processed synchronously to maintain ordering guarantees.
Package Context
(in-package :opencortex)
Sensor Configuration
Async Sensor Registry
(defvar *async-sensors* '(:chat-message :delegation :user-command)
"Sensors that are processed in dedicated threads.
These sensors can block (waiting for API responses, user input, etc.)
so they run in separate threads to avoid blocking the main pipeline.
Other sensors (:heartbeat, :interrupt, :buffer-update) are processed
synchronously to maintain temporal ordering.")
Foveal Focus State
(defvar *foveal-focus-id* nil
"The Org ID of the node the user is currently interacting with.
This enables the reasoning engine to provide contextually relevant
responses. When editing a specific note, the agent knows which
note you're referring to without needing explicit ID references.
Updated on :point-update events from Emacs.")
Stimulus Injection
inject-stimulus: Entry Point
(defun inject-stimulus (raw-message &key stream (depth 0))
"Inject a raw message into the signal processing pipeline.
RAW-MESSAGE is a property list that will be normalized into a Signal.
STREAM is an optional output stream for responses (used by TUI/CLI).
DEPTH tracks recursion depth for feedback loops.
This function determines whether to process synchronously or
asynchronously based on the sensor type, then calls process-signal
to run through the Perceive -> Reason -> Act pipeline.
Error handling: Uses restarts to prevent individual signals from
crashing the entire system. Failed signals are logged and dropped."
(let* ((payload (getf raw-message :payload))
(sensor (getf payload :sensor))
(meta (getf raw-message :meta))
(async-p (or (getf payload :async-p)
(member sensor *async-sensors*))))
;; Ensure metadata exists
(unless meta
(setf meta (list :SOURCE :SYSTEM :SESSION-ID "internal")))
;; Attach reply stream if provided
(when stream
(setf (getf meta :reply-stream) stream))
(setf (getf raw-message :meta) meta)
(if async-p
;; Async: process in dedicated thread
(bt:make-thread
(lambda ()
(restart-case
(handler-bind ((error (lambda (c)
(harness-log "ASYNC ERROR: ~a" c)
(invoke-restart 'skip-event))))
(process-signal raw-message))
(skip-event () nil)))
:name "opencortex-async-task")
;; Sync: process in main thread with recovery
(restart-case
(handler-bind ((error (lambda (c)
(harness-log "SYSTEM ERROR: ~a" c)
(invoke-restart 'skip-event))))
(process-signal raw-message))
(skip-event ()
(harness-log "SYSTEM RECOVERY: Stimulus dropped."))))))
The Perceive Gate
perceive-gate: Signal Normalization
(defun perceive-gate (signal)
"Stage 1 of the metabolic pipeline: Normalize sensory input.
This function:
1. Logs the incoming signal for debugging
2. Handles special sensor types (:buffer-update, :point-update, etc.)
3. Updates the Memory graph with incoming data
4. Tracks foveal focus (user's current node)
5. Sets :status to :perceived
Modifies the signal in place and returns it for the next stage.
Memory snapshots are taken before AST updates to enable rollback
if the update causes issues."
(let* ((payload (getf signal :payload))
(type (getf signal :type))
(meta (getf signal :meta))
(sensor (getf payload :sensor)))
;; Log the incoming signal for debugging
(harness-log "GATE [Perceive]: ~a (~a) [Source: ~s]"
type (or sensor "no-sensor") (getf meta :source))
;; Handle EVENT type sensors
(cond ((eq type :EVENT)
(case sensor
;; Org buffer was modified - update memory
(:buffer-update
(let ((ast (getf payload :ast)))
(when ast
(snapshot-memory) ; Enable rollback if update causes issues
(ingest-ast ast))))
;; Point moved to different org node - update focus
(:point-update
(let ((element (getf payload :element)))
(when element
(snapshot-memory)
;; Track foveal focus for contextual reasoning
(setf *foveal-focus-id*
(ignore-errors (getf element :id)))
(ingest-ast element))))
;; System interrupt - trigger shutdown
(:interrupt
(bt:with-lock-held (*interrupt-lock*)
(setf *interrupt-flag* t)))))
;; Log responses from actuators
((eq type :RESPONSE)
(harness-log "GATE [Perceive]: Act Result -> ~a"
(getf payload :status))))
;; Update signal status
(setf (getf signal :status) :perceived)
(setf (getf signal :foveal-focus) *foveal-focus-id*)
signal))
Sensor Types Reference
| Sensor | Source | Processing | Description |
|---|---|---|---|
| :user-input | CLI/TUI | Async | Text input from terminal |
| :chat-message | Telegram/Signal | Async | Messages from messaging apps |
| :heartbeat | Internal | Sync | Periodic maintenance trigger |
| :buffer-update | Emacs | Sync | Org buffer was modified |
| :point-update | Emacs | Sync | Cursor moved to different headline |
| :interrupt | System | Sync | SIGINT received |
| :tool-output | Internal | Sync | Result from cognitive tool |
| :loop-error | Internal | Sync | Error during signal processing |
Test Suite
These tests verify the Perceive pipeline. Run with:
(fiveam:run! 'pipeline-perceive-suite)
(defpackage :opencortex-pipeline-perceive-tests
(:use :cl :fiveam :opencortex)
(:export #:pipeline-perceive-suite))
(in-package :opencortex-pipeline-perceive-tests)
(def-suite pipeline-perceive-suite
:description "Test suite for Perceive pipeline")
(in-suite pipeline-perceive-suite)
(test test-perceive-gate
"Perceive gate should update the object store and normalize signal."
(clrhash opencortex::*memory*)
(let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil))))
(result (perceive-gate signal)))
(is (eq :perceived (getf result :status)))
(is (not (null (gethash "test-node" opencortex::*memory*))))))
(test test-depth-limiting
"Verify that the pipeline terminates runaway feedback loops."
(let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat))))
(is (null (process-signal runaway-signal)))))