351 lines
14 KiB
Org Mode
351 lines
14 KiB
Org Mode
#+PROPERTY: header-args:lisp :tangle (expand-file-name "loop.lisp" (or (identity (getenv "INSTALL_DIR")) (file-name-directory (buffer-file-name))))
|
|
#+TITLE: The Metabolic Loop (loop.lisp)
|
|
#+AUTHOR: Amr
|
|
#+FILETAGS: :harness:loop:
|
|
#+STARTUP: content
|
|
|
|
* The Metabolic Loop (loop.lisp)
|
|
** Architectural Intent
|
|
|
|
The Metabolic Loop is the /cranial nerve reflex/ of OpenCortex. While skills provide specialized intelligence, the loop provides the fundamental rhythm of existence: the continuous processing of signals from perception through cognition to action.
|
|
|
|
Unlike a simple event loop, the Metabolic Loop implements a sophisticated error recovery model. When the system encounters an error, it distinguishes between:
|
|
|
|
1. *Transient errors* (tool failures, network timeouts) - recoverable, no state rollback
|
|
2. *Critical errors* (undefined functions, malformed data structures) - require memory rollback
|
|
3. *Recursive loops* (signals generating more signals indefinitely) - depth limit enforcement
|
|
|
|
This design ensures the agent remains stable under adverse conditions while preserving the ability to recover from genuine system failures.
|
|
|
|
** Why Separate Perceive-Reason-Act?
|
|
|
|
The three-stage pipeline mirrors the classical sense-think-act paradigm but with a crucial difference: each stage is a pure function that transforms a signal. This allows:
|
|
|
|
- *Perceive* to normalize raw input into a standardized signal format
|
|
- *Reason* to transform the perceived signal into an approved action (or reject it)
|
|
- *Act* to execute the approved action and potentially generate a feedback signal
|
|
|
|
The feedback loop (Act returning a signal that feeds back into Perceive) enables complex multi-step operations where each action can trigger subsequent reasoning.
|
|
|
|
** The Metabolic Pipeline
|
|
|
|
Every signal in openCortex moves through the same three-stage pipeline:
|
|
|
|
1. *Perceive:* Normalize raw input into a standardized Signal
|
|
2. *Reason:* Generate a proposal via LLM, verify via skills
|
|
3. *Act:* Execute the approved action, generate feedback
|
|
|
|
#+begin_src mermaid
|
|
sequenceDiagram
|
|
participant User
|
|
participant Gateway
|
|
participant Perceive
|
|
participant Reason
|
|
participant Act
|
|
participant User
|
|
|
|
User->>Gateway: "Write a note about X"
|
|
Gateway->>Perceive: Raw message
|
|
Perceive->>Perceive: Normalize to Signal
|
|
Perceive->>Reason: Signal
|
|
Reason->>Reason: LLM generates proposal
|
|
Reason->>Reason: Skills verify proposal
|
|
Reason->>Act: Approved action
|
|
Act->>Act: Execute action
|
|
Act->>Reason: Feedback signal
|
|
Reason->>Perceive: New signal
|
|
Perceive->>Gateway: Response
|
|
Gateway->>User: "Done"
|
|
#+end_src
|
|
|
|
|
|
** Thread Safety
|
|
|
|
The loop operates in a multi-threaded environment:
|
|
- The main thread runs the heartbeat and idle loop
|
|
- Async sensors spawn threads for non-blocking I/O
|
|
- Interrupt handling requires mutex protection to prevent race conditions
|
|
|
|
* Package and Thread-Safe Variables
|
|
|
|
#+begin_src lisp
|
|
(in-package :opencortex)
|
|
|
|
(defvar *interrupt-flag* nil
|
|
"Atomic flag set by signal handlers to trigger graceful shutdown.
|
|
Using a dedicated variable avoids race conditions in interrupt handling.")
|
|
|
|
(defvar *interrupt-lock* (bt:make-lock "harness-interrupt-lock")
|
|
"Mutex protecting *interrupt-flag* access.
|
|
Locking is required because SBCL's interrupt handlers run in uncertain contexts.")
|
|
|
|
(defvar *heartbeat-thread* nil
|
|
"Handle to the heartbeat thread, allowing explicit termination on shutdown.")
|
|
#+end_src
|
|
|
|
* The Metabolic Pipeline
|
|
|
|
** process-signal: The Core Engine
|
|
|
|
This function implements the Perceive-Reason-Act pipeline. It processes a signal through all three stages and handles the feedback loop where Actions can generate new signals.
|
|
|
|
The depth counter prevents infinite recursion—a signal that generates another signal that generates another, etc. By limiting to depth 10, we ensure the system eventually converges or gracefully terminates.
|
|
|
|
#+begin_src lisp
|
|
(defun process-signal (signal)
|
|
"The entry point to the Metabolic Pipeline: Perceive -> Reason -> Act.
|
|
|
|
SIGNAL is a property list with the following structure:
|
|
- :type - :EVENT, :REQUEST, :RESPONSE, etc.
|
|
- :payload - The actual content (sensor data, approved actions, etc.)
|
|
- :meta - Metadata including source, session, reply stream
|
|
- :depth - Recursion depth counter (starts at 0)
|
|
- :status - Processing status (:perceived, :reasoned, :acted)
|
|
|
|
Returns NIL when processing is complete, or a new signal for feedback loop."
|
|
|
|
(let ((current-signal signal))
|
|
(loop while current-signal do
|
|
|
|
;; Depth limiting prevents infinite recursion from feedback loops
|
|
(let ((depth (getf current-signal :depth 0))
|
|
(meta (getf current-signal :meta)))
|
|
(when (> depth 10)
|
|
(harness-log "METABOLISM ERROR: Max recursion depth reached.")
|
|
(return nil))
|
|
|
|
;; Check for graceful shutdown interrupt
|
|
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
|
|
(harness-log "METABOLISM: Interrupted by shutdown signal.")
|
|
(bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
|
|
(return nil))
|
|
|
|
;; The three-stage pipeline wrapped in error handling
|
|
(handler-case
|
|
(progn
|
|
;; Stage 1: Perceive - normalize sensory input
|
|
(setf current-signal (perceive-gate current-signal))
|
|
|
|
;; Stage 2: Reason - generate and verify action proposals
|
|
(setf current-signal (reason-gate current-signal))
|
|
|
|
;; Stage 3: Act - execute approved actions
|
|
(let ((feedback (act-gate current-signal)))
|
|
(if feedback
|
|
;; Action generated a feedback signal - continue processing
|
|
(progn
|
|
;; Preserve metadata from original signal
|
|
(unless (getf feedback :meta)
|
|
(setf (getf feedback :meta) meta))
|
|
(setf current-signal feedback))
|
|
;; No feedback - pipeline complete
|
|
(setf current-signal nil))))
|
|
|
|
;; Error recovery with differentiated response
|
|
(error (c)
|
|
(let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
|
|
(harness-log "METABOLISM CRASH [~a]: ~a" (or sensor :unknown) c)
|
|
|
|
;; Only rollback memory on critical errors, not transient tool failures
|
|
;; This prevents losing recent context due to a single bad API call
|
|
(unless (member sensor '(:loop-error :tool-error :syntax-error))
|
|
(harness-log "CRITICAL ERROR: Initiating Micro-Rollback.")
|
|
(rollback-memory 0))
|
|
|
|
;; At deep recursion or known error types, terminate gracefully
|
|
(if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
|
|
(setf current-signal nil)
|
|
;; Otherwise, convert error to a loop-error signal for retry
|
|
(setf current-signal
|
|
(list :type :EVENT
|
|
:depth (1+ depth)
|
|
:meta meta
|
|
:payload (list :sensor :loop-error
|
|
:message (format nil "~a" c)
|
|
:depth depth)))))))))))
|
|
#+end_src
|
|
|
|
** The Feedback Loop Explained
|
|
|
|
The pipeline implements a feedback loop where Act can return a new signal:
|
|
|
|
1. User input arrives → Perceive normalizes it
|
|
2. Reason generates an action → Act executes it
|
|
3. If the action was a tool call that returned new information → Act returns a feedback signal
|
|
4. Feedback signal feeds back into step 1 for further reasoning
|
|
|
|
This enables multi-step workflows where each action can trigger additional analysis.
|
|
|
|
* Heartbeat Mechanism
|
|
|
|
The heartbeat thread ensures the agent remains alive even without external input. It drives two critical functions:
|
|
|
|
1. **Latent reflection** - the agent can think without external prompting
|
|
2. **Periodic maintenance** - memory auto-save, orphan detection, etc.
|
|
|
|
** Heartbeat Configuration Variables
|
|
|
|
#+begin_src lisp
|
|
(defvar *auto-save-interval* 300
|
|
"Interval in seconds between automatic memory saves.
|
|
Defaults to 300 seconds (5 minutes). Set via MEMORY_AUTO_SAVE_INTERVAL env var.")
|
|
|
|
(defvar *heartbeat-save-counter* 0
|
|
"Tracks heartbeats since last save, used to calculate auto-save timing.")
|
|
#+end_src
|
|
|
|
** start-heartbeat: The Pulsing Heart
|
|
|
|
#+begin_src lisp
|
|
(defun start-heartbeat ()
|
|
"Starts the background heartbeat thread.
|
|
|
|
The heartbeat runs in a dedicated thread to avoid blocking the main
|
|
signal processing loop. Each heartbeat:
|
|
|
|
1. Injects a :HEARTBEAT signal into the metabolic pipeline
|
|
2. Checks if memory should be auto-saved (based on interval ratio)
|
|
|
|
Configuration via environment:
|
|
- HEARTBEAT_INTERVAL: Seconds between heartbeats (default: 60)
|
|
- MEMORY_AUTO_SAVE_INTERVAL: Seconds between auto-saves (default: 300)"
|
|
|
|
(let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL"))) 60))
|
|
(auto-save (or (ignore-errors (parse-integer (uiop:getenv "MEMORY_AUTO_SAVE_INTERVAL"))) *auto-save-interval*)))
|
|
(setf *auto-save-interval* auto-save)
|
|
(setf *heartbeat-save-counter* 0)
|
|
|
|
(setf *heartbeat-thread*
|
|
(bt:make-thread
|
|
(lambda ()
|
|
(loop
|
|
;; Wait for interval
|
|
(sleep interval)
|
|
|
|
;; Update counter and check if it's time to save
|
|
(incf *heartbeat-save-counter*)
|
|
(when (>= *heartbeat-save-counter* (/ *auto-save-interval* interval))
|
|
(setf *heartbeat-save-counter* 0)
|
|
(save-memory-to-disk))
|
|
|
|
;; Inject heartbeat signal - this runs through the full pipeline
|
|
;; allowing the agent to do latent reflection even with no input
|
|
(inject-stimulus
|
|
(list :type :EVENT
|
|
:payload (list :sensor :heartbeat
|
|
:unix-time (get-universal-time)))))
|
|
|
|
:name "opencortex-heartbeat")))))
|
|
#+end_src
|
|
|
|
* Main Entry Point
|
|
|
|
** Shutdown Configuration
|
|
|
|
#+begin_src lisp
|
|
(defvar *shutdown-save-enabled* t
|
|
"When T, save memory to disk on graceful shutdown.
|
|
Disable for testing or when memory persistence is handled externally.")
|
|
#+end_src
|
|
|
|
** main: System Bootstrap and Idle Loop
|
|
|
|
The main function orchestrates system startup:
|
|
|
|
1. Load environment variables from ~/.local/share/opencortex/.env
|
|
2. Restore memory from previous snapshot (crash recovery)
|
|
3. Initialize actuators and load all skills
|
|
4. Start the heartbeat thread
|
|
5. Register SIGINT handler for graceful Ctrl+C shutdown
|
|
6. Enter idle loop (sleeping in 1-hour increments)
|
|
|
|
#+begin_src lisp
|
|
(defun main ()
|
|
"Entry point for OpenCortex. Initializes the system and enters idle loop.
|
|
|
|
Startup sequence:
|
|
1. Load environment from ~/.local/share/opencortex/.env
|
|
2. Restore memory from disk (if snapshot exists)
|
|
3. Initialize actuators (shell, cli, system)
|
|
4. Load all skills from SKILLS_DIR
|
|
5. Start heartbeat thread
|
|
6. Register SIGINT handler for graceful shutdown
|
|
7. Enter idle loop (sleeps in DAEMON_SLEEP_INTERVAL chunks)
|
|
|
|
The idle loop checks for interrupts and saves memory before exit."
|
|
|
|
;; Step 1: Load environment variables from standard location
|
|
(let* ((home (uiop:getenv "HOME"))
|
|
(env-file (uiop:merge-pathnames*
|
|
".local/share/opencortex/.env"
|
|
(uiop:ensure-directory-pathname home))))
|
|
(when (uiop:file-exists-p env-file)
|
|
(cl-dotenv:load-env env-file)))
|
|
|
|
;; Step 2: Crash recovery - load memory from previous snapshot
|
|
(load-memory-from-disk)
|
|
|
|
;; Step 3-4: Initialize actuators and load skills
|
|
(initialize-actuators)
|
|
(initialize-all-skills)
|
|
|
|
;; Step 5: Start the heartbeat
|
|
(start-heartbeat)
|
|
|
|
;; Step 6: Register graceful shutdown handler
|
|
;; SBCL-specific: catches Ctrl+C (SIGINT) and saves before exit
|
|
#+sbcl
|
|
(sb-sys:enable-interrupt sb-unix:sigint
|
|
(lambda (sig code scp)
|
|
(declare (ignore sig code scp))
|
|
(harness-log "SHUTDOWN: SIGINT received. Saving memory...")
|
|
(when *shutdown-save-enabled*
|
|
(save-memory-to-disk))
|
|
(uiop:quit 0)))
|
|
|
|
;; Step 7: Idle loop - sleep in chunks, checking for interrupts
|
|
(let ((sleep-interval (or (ignore-errors
|
|
(parse-integer (uiop:getenv "DAEMON_SLEEP_INTERVAL")))
|
|
3600)))
|
|
(loop
|
|
;; Check for interrupt before each sleep cycle
|
|
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
|
|
(harness-log "SHUTDOWN: Interrupt flag set. Saving memory...")
|
|
(when *shutdown-save-enabled*
|
|
(save-memory-to-disk))
|
|
(return))
|
|
|
|
;; Sleep in configured intervals (default: 1 hour)
|
|
(sleep sleep-interval))))
|
|
#+end_src
|
|
|
|
* Test Suite
|
|
|
|
These tests verify the metabolic loop and error recovery. Run with:
|
|
~(fiveam:run! 'immune-suite)~
|
|
|
|
#+begin_src lisp :tangle (expand-file-name "immune-system-tests.lisp" (concat (or (identity (getenv "INSTALL_DIR")) ".") "/tests"))
|
|
(defpackage :opencortex-immune-system-tests
|
|
(:use :cl :fiveam :opencortex)
|
|
(:export #:immune-suite))
|
|
|
|
(in-package :opencortex-immune-system-tests)
|
|
|
|
(def-suite immune-suite
|
|
:description "Verification of the Immune System (Core Error Hooks)")
|
|
|
|
(in-suite immune-suite)
|
|
|
|
(test loop-error-injection
|
|
"Verify that a crash in think/decide triggers a :loop-error stimulus."
|
|
(clrhash opencortex::*skills-registry*)
|
|
(opencortex:defskill :evil-skill
|
|
:priority 100
|
|
:trigger (lambda (ctx) (eq (getf (getf ctx :payload) :sensor) :user-input))
|
|
:probabilistic (lambda (ctx) (error "CRITICAL BRAIN FAILURE"))
|
|
:deterministic nil)
|
|
(opencortex:harness-log "CLEAN LOG")
|
|
(opencortex:process-signal '(:type :EVENT :payload (:sensor :user-input)))
|
|
(let ((logs (opencortex:context-get-system-logs 20)))
|
|
(is (not (null (find-if (lambda (line) (search "CRITICAL BRAIN FAILURE" line)) logs))))))
|
|
#+end_src |