Files
passepartout/harness/loop.org

351 lines
14 KiB
Org Mode

#+PROPERTY: header-args:lisp :tangle (expand-file-name "harness/loop.lisp" (expand-file-name "harness/"))
#+TITLE: The Metabolic Loop (loop.lisp)
#+AUTHOR: Amr
#+FILETAGS: :harness:loop:
#+STARTUP: content
* The Metabolic Loop (loop.lisp)
** Architectural Intent
The Metabolic Loop is the /cranial nerve reflex/ of OpenCortex. While skills provide specialized intelligence, the loop provides the fundamental rhythm of existence: the continuous processing of signals from perception through cognition to action.
Unlike a simple event loop, the Metabolic Loop implements a sophisticated error recovery model. When the system encounters an error, it distinguishes between:
1. *Transient errors* (tool failures, network timeouts) - recoverable, no state rollback
2. *Critical errors* (undefined functions, malformed data structures) - require memory rollback
3. *Recursive loops* (signals generating more signals indefinitely) - depth limit enforcement
This design ensures the agent remains stable under adverse conditions while preserving the ability to recover from genuine system failures.
** Why Separate Perceive-Reason-Act?
The three-stage pipeline mirrors the classical sense-think-act paradigm but with a crucial difference: each stage is a pure function that transforms a signal. This allows:
- *Perceive* to normalize raw input into a standardized signal format
- *Reason* to transform the perceived signal into an approved action (or reject it)
- *Act* to execute the approved action and potentially generate a feedback signal
The feedback loop (Act returning a signal that feeds back into Perceive) enables complex multi-step operations where each action can trigger subsequent reasoning.
** The Metabolic Pipeline
Every signal in openCortex moves through the same three-stage pipeline:
1. *Perceive:* Normalize raw input into a standardized Signal
2. *Reason:* Generate a proposal via LLM, verify via skills
3. *Act:* Execute the approved action, generate feedback
#+begin_src mermaid
sequenceDiagram
participant User
participant Gateway
participant Perceive
participant Reason
participant Act
participant User
User->>Gateway: "Write a note about X"
Gateway->>Perceive: Raw message
Perceive->>Perceive: Normalize to Signal
Perceive->>Reason: Signal
Reason->>Reason: LLM generates proposal
Reason->>Reason: Skills verify proposal
Reason->>Act: Approved action
Act->>Act: Execute action
Act->>Reason: Feedback signal
Reason->>Perceive: New signal
Perceive->>Gateway: Response
Gateway->>User: "Done"
#+end_src
** Thread Safety
The loop operates in a multi-threaded environment:
- The main thread runs the heartbeat and idle loop
- Async sensors spawn threads for non-blocking I/O
- Interrupt handling requires mutex protection to prevent race conditions
* Package and Thread-Safe Variables
#+begin_src lisp
(in-package :opencortex)
(defvar *interrupt-flag* nil
"Atomic flag set by signal handlers to trigger graceful shutdown.
Using a dedicated variable avoids race conditions in interrupt handling.")
(defvar *interrupt-lock* (bt:make-lock "harness-interrupt-lock")
"Mutex protecting *interrupt-flag* access.
Locking is required because SBCL's interrupt handlers run in uncertain contexts.")
(defvar *heartbeat-thread* nil
"Handle to the heartbeat thread, allowing explicit termination on shutdown.")
#+end_src
* The Metabolic Pipeline
** process-signal: The Core Engine
This function implements the Perceive-Reason-Act pipeline. It processes a signal through all three stages and handles the feedback loop where Actions can generate new signals.
The depth counter prevents infinite recursion—a signal that generates another signal that generates another, etc. By limiting to depth 10, we ensure the system eventually converges or gracefully terminates.
#+begin_src lisp
(defun process-signal (signal)
"The entry point to the Metabolic Pipeline: Perceive -> Reason -> Act.
SIGNAL is a property list with the following structure:
- :type - :EVENT, :REQUEST, :RESPONSE, etc.
- :payload - The actual content (sensor data, approved actions, etc.)
- :meta - Metadata including source, session, reply stream
- :depth - Recursion depth counter (starts at 0)
- :status - Processing status (:perceived, :reasoned, :acted)
Returns NIL when processing is complete, or a new signal for feedback loop."
(let ((current-signal signal))
(loop while current-signal do
;; Depth limiting prevents infinite recursion from feedback loops
(let ((depth (getf current-signal :depth 0))
(meta (getf current-signal :meta)))
(when (> depth 10)
(harness-log "METABOLISM ERROR: Max recursion depth reached.")
(return nil))
;; Check for graceful shutdown interrupt
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
(harness-log "METABOLISM: Interrupted by shutdown signal.")
(bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
(return nil))
;; The three-stage pipeline wrapped in error handling
(handler-case
(progn
;; Stage 1: Perceive - normalize sensory input
(setf current-signal (perceive-gate current-signal))
;; Stage 2: Reason - generate and verify action proposals
(setf current-signal (reason-gate current-signal))
;; Stage 3: Act - execute approved actions
(let ((feedback (act-gate current-signal)))
(if feedback
;; Action generated a feedback signal - continue processing
(progn
;; Preserve metadata from original signal
(unless (getf feedback :meta)
(setf (getf feedback :meta) meta))
(setf current-signal feedback))
;; No feedback - pipeline complete
(setf current-signal nil))))
;; Error recovery with differentiated response
(error (c)
(let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
(harness-log "METABOLISM CRASH [~a]: ~a" (or sensor :unknown) c)
;; Only rollback memory on critical errors, not transient tool failures
;; This prevents losing recent context due to a single bad API call
(unless (member sensor '(:loop-error :tool-error :syntax-error))
(harness-log "CRITICAL ERROR: Initiating Micro-Rollback.")
(rollback-memory 0))
;; At deep recursion or known error types, terminate gracefully
(if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
(setf current-signal nil)
;; Otherwise, convert error to a loop-error signal for retry
(setf current-signal
(list :type :EVENT
:depth (1+ depth)
:meta meta
:payload (list :sensor :loop-error
:message (format nil "~a" c)
:depth depth)))))))))))
#+end_src
** The Feedback Loop Explained
The pipeline implements a feedback loop where Act can return a new signal:
1. User input arrives → Perceive normalizes it
2. Reason generates an action → Act executes it
3. If the action was a tool call that returned new information → Act returns a feedback signal
4. Feedback signal feeds back into step 1 for further reasoning
This enables multi-step workflows where each action can trigger additional analysis.
* Heartbeat Mechanism
The heartbeat thread ensures the agent remains alive even without external input. It drives two critical functions:
1. **Latent reflection** - the agent can think without external prompting
2. **Periodic maintenance** - memory auto-save, orphan detection, etc.
** Heartbeat Configuration Variables
#+begin_src lisp
(defvar *auto-save-interval* 300
"Interval in seconds between automatic memory saves.
Defaults to 300 seconds (5 minutes). Set via MEMORY_AUTO_SAVE_INTERVAL env var.")
(defvar *heartbeat-save-counter* 0
"Tracks heartbeats since last save, used to calculate auto-save timing.")
#+end_src
** start-heartbeat: The Pulsing Heart
#+begin_src lisp
(defun start-heartbeat ()
"Starts the background heartbeat thread.
The heartbeat runs in a dedicated thread to avoid blocking the main
signal processing loop. Each heartbeat:
1. Injects a :HEARTBEAT signal into the metabolic pipeline
2. Checks if memory should be auto-saved (based on interval ratio)
Configuration via environment:
- HEARTBEAT_INTERVAL: Seconds between heartbeats (default: 60)
- MEMORY_AUTO_SAVE_INTERVAL: Seconds between auto-saves (default: 300)"
(let ((interval (or (ignore-errors (parse-integer (getenv "HEARTBEAT_INTERVAL"))) 60))
(auto-save (or (ignore-errors (parse-integer (getenv "MEMORY_AUTO_SAVE_INTERVAL"))) *auto-save-interval*)))
(setf *auto-save-interval* auto-save)
(setf *heartbeat-save-counter* 0)
(setf *heartbeat-thread*
(bt:make-thread
(lambda ()
(loop
;; Wait for interval
(sleep interval)
;; Update counter and check if it's time to save
(incf *heartbeat-save-counter*)
(when (>= *heartbeat-save-counter* (/ *auto-save-interval* interval))
(setf *heartbeat-save-counter* 0)
(save-memory-to-disk))
;; Inject heartbeat signal - this runs through the full pipeline
;; allowing the agent to do latent reflection even with no input
(inject-stimulus
(list :type :EVENT
:payload (list :sensor :heartbeat
:unix-time (get-universal-time)))))
:name "opencortex-heartbeat")))))
#+end_src
* Main Entry Point
** Shutdown Configuration
#+begin_src lisp
(defvar *shutdown-save-enabled* t
"When T, save memory to disk on graceful shutdown.
Disable for testing or when memory persistence is handled externally.")
#+end_src
** main: System Bootstrap and Idle Loop
The main function orchestrates system startup:
1. Load environment variables from ~/.local/share/opencortex/.env
2. Restore memory from previous snapshot (crash recovery)
3. Initialize actuators and load all skills
4. Start the heartbeat thread
5. Register SIGINT handler for graceful Ctrl+C shutdown
6. Enter idle loop (sleeping in 1-hour increments)
#+begin_src lisp
(defun main ()
"Entry point for OpenCortex. Initializes the system and enters idle loop.
Startup sequence:
1. Load environment from ~/.local/share/opencortex/.env
2. Restore memory from disk (if snapshot exists)
3. Initialize actuators (shell, cli, system)
4. Load all skills from SKILLS_DIR
5. Start heartbeat thread
6. Register SIGINT handler for graceful shutdown
7. Enter idle loop (sleeps in DAEMON_SLEEP_INTERVAL chunks)
The idle loop checks for interrupts and saves memory before exit."
;; Step 1: Load environment variables from standard location
(let* ((home (getenv "HOME"))
(env-file (uiop:merge-pathnames*
".local/share/opencortex/.env"
(uiop:ensure-directory-pathname home))))
(when (uiop:file-exists-p env-file)
(cl-dotenv:load-env env-file)))
;; Step 2: Crash recovery - load memory from previous snapshot
(load-memory-from-disk)
;; Step 3-4: Initialize actuators and load skills
(initialize-actuators)
(initialize-all-skills)
;; Step 5: Start the heartbeat
(start-heartbeat)
;; Step 6: Register graceful shutdown handler
;; SBCL-specific: catches Ctrl+C (SIGINT) and saves before exit
#+sbcl
(sb-sys:enable-interrupt sb-unix:sigint
(lambda (sig code scp)
(declare (ignore sig code scp))
(harness-log "SHUTDOWN: SIGINT received. Saving memory...")
(when *shutdown-save-enabled*
(save-memory-to-disk))
(uiop:quit 0)))
;; Step 7: Idle loop - sleep in chunks, checking for interrupts
(let ((sleep-interval (or (ignore-errors
(parse-integer (getenv "DAEMON_SLEEP_INTERVAL")))
3600)))
(loop
;; Check for interrupt before each sleep cycle
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
(harness-log "SHUTDOWN: Interrupt flag set. Saving memory...")
(when *shutdown-save-enabled*
(save-memory-to-disk))
(return))
;; Sleep in configured intervals (default: 1 hour)
(sleep sleep-interval))))
#+end_src
* Test Suite
These tests verify the metabolic loop and error recovery. Run with:
~(fiveam:run! 'immune-suite)~
#+begin_src lisp :tangle (expand-file-name "harness/immune-system-tests.lisp" (concat (concat (or (getenv "INSTALL_DIR") ".") "/harness") "/tests"))
(defpackage :opencortex-immune-system-tests
(:use :cl :fiveam :opencortex)
(:export #:immune-suite))
(in-package :opencortex-immune-system-tests)
(def-suite immune-suite
:description "Verification of the Immune System (Core Error Hooks)")
(in-suite immune-suite)
(test loop-error-injection
"Verify that a crash in think/decide triggers a :loop-error stimulus."
(clrhash opencortex::*skills-registry*)
(opencortex:defskill :evil-skill
:priority 100
:trigger (lambda (ctx) (eq (getf (getf ctx :payload) :sensor) :user-input))
:probabilistic (lambda (ctx) (error "CRITICAL BRAIN FAILURE"))
:deterministic nil)
(opencortex:harness-log "CLEAN LOG")
(opencortex:process-signal '(:type :EVENT :payload (:sensor :user-input)))
(let ((logs (opencortex:context-get-system-logs 20)))
(is (not (null (find-if (lambda (line) (search "CRITICAL BRAIN FAILURE" line)) logs))))))
#+end_src