Files
passepartout/literate/loop.org

450 lines
22 KiB
Org Mode

#+TITLE: The Cognitive Loop (loop.lisp)
#+AUTHOR: Amr
#+FILETAGS: :harness:loop:
#+STARTUP: content
* The Cognitive Loop (loop.lisp)
** Architectural Intent: The Reactive Signal Pipeline
The core of the ~org-agent~ harness is a functional transformation pipeline. In traditional agent architectures, events are handled through deep, asynchronous recursion, which leads to fragile Lisp stacks and makes it difficult to implement advanced features like multi-model consensus.
We have evolved the harness into a **Reactive Signal Pipeline**. Every event—whether it is a user keystroke, a heartbeat timer pulse, or a suggested action from an LLM—is treated as a discrete **Signal**.
Signals move through a series of formal **Gates**. Each gate transforms or validates the signal until it is either physically dispatched to an actuator or safely rejected by the Deliberate Engine.
*** Advantages of the Pipeline Model:
- **Consensus Ready:** By treating reasoning as a signal moving through a pipe, we can "split" the pipe to query multiple LLM backends simultaneously. A Consensus Gate later in the pipe compares these proposals.
- **Flat Execution:** Using a central orchestrator (~process-signal~) flattens the execution stack. Feedback from tools or errors is re-injected as a new signal rather than creating a nested function call.
- **Micro-Rollbacks:** Because every signal turn is discrete, the harness can snapshot the Object Store before a turn and instantly roll back if a skill crashes.
** The Signal Pipeline Architecture
#+begin_src mermaid
flowchart TD
S1[Signal: External Stimulus] --> P[Perceive Gate]
S2[Signal: Heartbeat Pulse] --> P
P --> N[Associative Gate]
N --> C[Consensus Gate]
C --> V[Validation Gate]
V --> D[Dispatch Gate]
D -- Feedback Signal --> S1
#+end_src
** Package Context
We ensure we are in the correct isolated namespace.
#+begin_src lisp :tangle ../src/loop.lisp
(in-package :org-agent)
(defvar *interrupt-flag* nil)
#+end_src
** Interrupt Lock
A thread-safe lock used to signal the pipeline to halt execution gracefully.
#+begin_src lisp :tangle ../src/loop.lisp
(defvar *interrupt-lock* (bt:make-lock "harness-interrupt-lock"))
#+end_src
** Physical Dispatch (dispatch-action)
The final stage of the pipeline. It routes an approved action to its registered physical actuator (Emacs, Shell, etc.).
#+begin_src lisp :tangle ../src/loop.lisp
(defun dispatch-action (action context)
"Routes an approved action to its registered physical actuator."
(when (and action (listp action))
(let* ((target (or (ignore-errors (getf action :target)) :emacs))
(actuator-fn (gethash target *actuator-registry*)))
(if actuator-fn
(funcall actuator-fn action context)
(harness-log "DISPATCH ERROR: No actuator for ~a" target)))))
#+end_src
** Performance Tracking (harness-track-telemetry)
Updates execution metrics for skills. This allows the harness to monitor which skills are consuming the most time or failing most frequently.
#+begin_src lisp :tangle ../src/loop.lisp
(defun harness-track-telemetry (skill-name duration status)
"Updates performance metrics for a specific skill."
(when skill-name (bt:with-lock-held (*telemetry-lock*)
(let ((entry (or (gethash skill-name *skill-telemetry*) (list :executions 0 :total-time 0 :failures 0))))
(incf (getf entry :executions)) (incf (getf entry :total-time) duration)
(when (eq status :rejected) (incf (getf entry :failures))) (setf (gethash skill-name *skill-telemetry*) entry)))))
#+end_src
** Stimulus Injection (inject-stimulus)
The entry point for all events into the harness. It enqueues raw messages into the pipeline, handling the transition from asynchronous threads to the synchronous pipeline execution.
#+begin_src lisp :tangle ../src/loop.lisp
(defun inject-stimulus (raw-message &key stream (depth 0))
"Enqueues a raw message into the reactive signal pipeline, handling async/sync execution and recovery."
(let* ((payload (getf raw-message :payload))
(sensor (getf payload :sensor))
;; Force Chat and Delegation to be async
(async-p (or (getf payload :async-p) (member sensor '(:chat-message :delegation :user-command)))))
(when stream (setf (getf raw-message :reply-stream) stream))
(if async-p (bt:make-thread (lambda () (restart-case (handler-bind ((error (lambda (c) (harness-log "ASYNC ERROR: ~a" c) (invoke-restart 'skip-event))))
(process-signal raw-message)) (skip-event () nil))) :name "org-agent-async-task")
(restart-case (handler-bind ((error (lambda (c) (harness-log "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message))
(skip-event () (harness-log "SYSTEM RECOVERY: Stimulus dropped.~%"))))))
#+end_src
** Internal Tool Execution
Handles harness-level operations that are not delegated to external actuators, such as hot-loading skills or evaluating Lisp code for system maintenance.
#+begin_src lisp :tangle ../src/loop.lisp
(defun execute-system-action (action context)
"Processes internal harness commands like skill creation or environment updates."
(declare (ignore context))
(let* ((payload (ignore-errors (getf action :payload))) (cmd (ignore-errors (getf payload :action))))
(case cmd
(:eval (let ((code (getf payload :code)))
(harness-log "ACTUATOR [System] - Evaluating: ~a" code)
(handler-case (let ((result (eval (read-from-string code))))
(harness-log "ACTUATOR [System] - Result: ~s" result)
result)
(error (c) (harness-log "ACTUATOR ERROR [System] - Eval failed: ~a" c)))))
(:create-skill (let* ((filename (getf payload :filename)) (content (getf payload :content))
(skills-dir (merge-pathnames "skills/" (asdf:system-source-directory :org-agent))) (full-path (merge-pathnames filename skills-dir)))
(harness-log "ACTUATOR [System] - Creating skill ~a..." filename)
(with-open-file (out full-path :direction :output :if-exists :supersede) (write-string content out))
(load-skill-from-org full-path)))
(:set-cascade (setf *provider-cascade* (getf payload :cascade)))
(:message (harness-log "ACTUATOR [System] - ~a" (getf payload :text)))
(t (harness-log "ACTUATOR [System] - Unknown command ~s" cmd)))))
#+end_src
** The Reactive Signal Pipeline (process-signal)
This is the core functional loop. It moves a signal through the gates sequentially.
*** Perceive Gate
The Perceive Gate is responsible for data normalization and sensory intake. It takes raw stimulus and updates the global Object Store graph.
#+begin_src lisp :tangle ../src/loop.lisp
(defun perceive-gate (signal)
"Initial processing: Normalizes raw stimuli and updates memory."
(let* ((payload (getf signal :payload))
(type (getf signal :type))
(sensor (getf payload :sensor)))
(harness-log "GATE [Perceive]: ~a (~a)" type (or sensor "no-sensor"))
(snapshot-object-store)
(cond ((eq type :EVENT)
(case sensor
(:buffer-update (let ((ast (getf payload :ast))) (when ast (ingest-ast ast))))
(:point-update (let ((element (getf payload :element))) (when element (ingest-ast element))))
(:interrupt (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* t)))))
((eq type :RESPONSE)
(harness-log "GATE [Perceive]: Act Result -> ~a" (getf payload :status))))
(setf (getf signal :status) :perceived)
signal))
#+end_src
*** Associative Gate
The Associative Gate invokes the neural reasoning engine. It takes the current context and generates a list of "intuitions" or proposed actions.
#+begin_src lisp :tangle ../src/loop.lisp
(defun neuro-gate (signal)
"Associative: Neural intuition and proposed actions."
(unless (eq (getf signal :type) :EVENT)
(return-from neuro-gate signal))
(harness-log "GATE [Associative]: Consulting LLM...")
(let ((thoughts (think signal)))
(setf (getf signal :proposals) (if (and (listp thoughts) (listp (car thoughts)))
thoughts
(if thoughts (list thoughts) nil)))
(setf (getf signal :status) :thought)
signal))
#+end_src
*** Consensus Gate
When multiple LLM backends provide diverging thoughts, the Consensus Gate resolves them into a single candidate action.
#+begin_src lisp :tangle ../src/loop.lisp
(defun resolve-consensus (proposals signal)
"Resolves diverging proposals by selecting the most consistent one."
(declare (ignore signal))
(harness-log "CONSENSUS: ~a proposals found. Resolving..." (length proposals))
(let ((counts (make-hash-table :test 'equal)))
(dolist (p proposals) (incf (gethash p counts 0)))
(let ((winner (first proposals)) (max-count 0))
(maphash (lambda (p count) (when (> count max-count) (setq max-count count winner p))) counts)
(harness-log "CONSENSUS: Winner selected with ~a votes." max-count)
winner)))
(defun consensus-gate (signal)
"Resolves multiple proposals into a single candidate action."
(let ((proposals (getf signal :proposals)))
(if (and proposals (cdr proposals))
(let ((winner (resolve-consensus proposals signal)))
(setf (getf signal :candidate) winner))
(setf (getf signal :candidate) (first proposals)))
(setf (getf signal :status) :consensus)
signal))
#+end_src
*** Decide Gate
The Decide Gate is the final deterministic safety net. It runs the candidate action through all loaded skill safety gates (The Deliberate Engine) before allowing it to proceed.
#+begin_src lisp :tangle ../src/loop.lisp
(defun decide-gate (signal)
"Deliberate: Deterministic safety and validation."
(let ((candidate (getf signal :candidate)))
(if candidate
(let* ((normalized-candidate (if (listp candidate) candidate (list :type :RESPONSE :payload (list :text candidate))))
(decision (decide normalized-candidate signal)))
(setf (getf signal :approved-action) decision))
(setf (getf signal :approved-action) nil))
(setf (getf signal :status) :decided)
signal))
#+end_src
*** Dispatch Gate
The Dispatch Gate performs the final actuation. If the action produces output (like a tool result), it returns a new signal to be re-injected into the pipeline.
#+begin_src lisp :tangle ../src/loop.lisp
(defun dispatch-gate (signal)
"Final Stage: Actuation and feedback generation."
(let* ((approved (getf signal :approved-action))
(type (getf signal :type))
(depth (getf signal :depth 0))
(feedback nil))
(case type
(:REQUEST (dispatch-action signal signal))
(:EVENT
(when approved
(let* ((payload (getf approved :payload))
(target (getf approved :target))
(action (or (getf payload :action) (getf approved :action)))
(tool-name (or (getf payload :tool) (getf approved :tool)))
(tool-args (or (getf payload :args) (getf approved :args))))
(if (and (eq target :tool) (eq action :call))
(let ((tool (gethash (string-downcase (string tool-name)) *cognitive-tools*)))
(if tool
(handler-case
(let* ((clean-args (if (and (listp tool-args) (listp (car tool-args))) (car tool-args) tool-args))
(result (funcall (cognitive-tool-body tool) clean-args)))
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
:payload (list :sensor :tool-output :result result :tool tool-name))))
(error (c)
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
:payload (list :sensor :tool-error :tool tool-name :message (format nil "~a" c))))))
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
:payload (list :sensor :tool-error :message "Tool not found")))))
(let ((result (dispatch-action approved signal)))
(when (and result (not (member target '(:emacs :system-message))))
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
:payload (list :sensor :tool-output :result result :tool approved))))))))))
(setf (getf signal :status) :dispatched)
feedback))
#+end_src
*** Pipeline Orchestrator (process-signal)
This is the entry point to the functional pipeline. It iterates through the gates and handles micro-rollbacks if a pipeline stage crashes.
#+begin_src lisp :tangle ../src/loop.lisp
(defun process-signal (signal)
"The entry point to the Reactive Signal Pipeline."
(let ((current-signal signal))
(loop while current-signal do
(let ((depth (getf current-signal :depth 0)))
(when (> depth 10) (harness-log "PIPELINE ERROR: Max depth reached.") (return nil))
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
(harness-log "PIPELINE: Interrupted.")
(bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
(return nil))
(handler-case
(progn
(setf current-signal (perceive-gate current-signal))
(setf current-signal (neuro-gate current-signal))
(setf current-signal (consensus-gate current-signal))
(setf current-signal (decide-gate current-signal))
(setf current-signal (dispatch-gate current-signal)))
(error (c)
(harness-log "PIPELINE CRASH: ~a - Initiating Micro-Rollback." c)
(rollback-object-store 0)
(let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
(if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
(setf current-signal nil)
(setf current-signal (list :type :EVENT :depth (1+ depth) :reply-stream (getf current-signal :reply-stream)
:payload (list :sensor :loop-error :message (format nil "~a" c) :depth depth)))))))))))
#+end_src
** Delegation Mechanisms
Allows the harness to hand off tasks to specialized background agents or processes.
#+begin_src lisp :tangle ../src/loop.lisp
(defun delegate-task (task-id recipient &key context)
"Enqueues a task for another agent or background process."
(harness-log "ORCHESTRATOR: Delegating task ~a to ~a" task-id recipient)
(inject-stimulus (list :type :EVENT
:payload (list :sensor :delegation
:task-id task-id
:recipient recipient
:context context))))
#+end_src
** Heartbeat Mechanism
Periodically injects a "pulse" into the system to trigger temporal skills.
#+begin_src lisp :tangle ../src/loop.lisp
(defvar *default-heartbeat-interval* 60 "Default interval for the system heartbeat pulse in seconds.")
(defvar *heartbeat-thread* nil)
(defun start-heartbeat (&optional (interval *default-heartbeat-interval*))
"Spawns a thread that periodically injects a heartbeat stimulus."
(setf *heartbeat-thread*
(bt:make-thread
(lambda ()
(loop
(sleep interval)
(harness-log "HARNESS: Heartbeat pulse...")
(inject-stimulus (list :type :EVENT :payload (list :sensor :heartbeat :unix-time (get-universal-time))))))
:name "org-agent-heartbeat")))
(defun stop-heartbeat ()
"Gracefully terminates the heartbeat pulse thread."
(when (and *heartbeat-thread* (bt:thread-alive-p *heartbeat-thread*))
(bt:destroy-thread *heartbeat-thread*)
(setf *heartbeat-thread* nil)))
#+end_src
** Main Entry Point
The execution entry point for the harness binary.
#+begin_src lisp :tangle ../src/loop.lisp
(defun main ()
"The entry point for the compiled standalone binary."
(let* ((home (uiop:getenv "HOME"))
(env-file (uiop:merge-pathnames* ".local/share/org-agent/.env" (uiop:ensure-directory-pathname home))))
(if (uiop:file-exists-p env-file)
(progn
(format t "HARNESS: Loading environment from ~a~%" env-file)
(cl-dotenv:load-env env-file))
(format t "HARNESS ERROR: .env not found at ~a~%" env-file)))
(let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL") :junk-allowed t)) *default-heartbeat-interval*)))
(format t "HARNESS: Heartbeat interval set to ~a seconds.~%" interval)
(start-daemon :interval interval))
(loop (sleep 3600)))
#+end_src
* Phase E: Chaos (Verification)
The Reactive Signal Pipeline must be empirically verified through automated testing to ensure architectural integrity.
#+begin_src lisp :tangle ../tests/pipeline-tests.lisp
(defpackage :org-agent-pipeline-tests
(:use :cl :fiveam :org-agent))
(in-package :org-agent-pipeline-tests)
(def-suite pipeline-suite
:description "Verification of the Reactive Signal Pipeline.")
(in-suite pipeline-suite)
(defun setup-mock-skills ()
"Register mock skills for testing."
(clrhash org-agent::*skills-registry*)
(org-agent::defskill :mock-refactor
:priority 100
:trigger (lambda (ctx) (eq (getf (getf ctx :payload) :command) :organize-subtree))
:neuro (lambda (ctx) "Mock neuro prompt")
:symbolic (lambda (action ctx)
`(:type :REQUEST :id 123
:payload (:action :refactor-subtree
:target-id nil
:properties (("ID" . "node-123"))))))
(org-agent::defskill :mock-safety
:priority 50
:trigger (lambda (ctx) t) ; always triggers
:neuro (lambda (ctx) "Mock neuro")
:symbolic (lambda (action ctx) nil))) ; rejects everything
(test test-perceive-gate
"Perceive gate should update the object store and normalize signal."
(clrhash org-agent::*object-store*)
(let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil))))
(result (perceive-gate signal)))
(is (eq :perceived (getf result :status)))
(is (not (null (gethash "test-node" org-agent::*object-store*))))))
(test test-decide-gate-safety
"Decide gate should block unsafe LLM proposals."
(setup-mock-skills)
(let* ((candidate (list :type :REQUEST :payload (list :action :eval :code "(shell-command \"rm -rf /\")")))
(signal (list :type :EVENT :candidate candidate))
(result (decide-gate signal)))
(is (eq :decided (getf result :status)))
(let ((approved (getf result :approved-action)))
(is (eq :LOG (getf approved :type)))
(is (search "Action rejected by skill heuristics" (getf (getf approved :payload) :text))))))
(test test-pipeline-flow-flat
"Verify that process-signal correctly executes a signal through gates."
(setup-mock-skills)
(clrhash org-agent::*object-store*)
(let ((signal (list :type :EVENT :payload (list :sensor :buffer-update))))
(process-signal signal)
(pass "Pipeline completed execution.")))
(test test-depth-limiting
"Verify that the pipeline terminates runaway feedback loops."
(let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat))))
(is (null (process-signal runaway-signal)))))
(test test-env-loading
"Verify that environment variables are accessible."
(setf (uiop:getenv "LLM_ENDPOINT") "http://mock")
(setf (uiop:getenv "MEMEX_USER") "Amr")
(is (not (null (uiop:getenv "LLM_ENDPOINT"))))
(is (stringp (org-agent::get-env "MEMEX_USER"))))
(test test-path-resolution
"Verify that context-resolve-path expands environment variables."
(setf (uiop:getenv "MEMEX_USER") "Amr")
(let ((path "$MEMEX_USER/test"))
(is (search "Amr/test" (context-resolve-path path)))))
(test test-skill-dependencies
"Verify that resolve-skill-dependencies correctly flattens the graph."
(setup-mock-skills)
(org-agent::defskill :mock-dependent
:priority 10
:dependencies (list "mock-safety")
:trigger (lambda (ctx) nil)
:neuro nil
:symbolic nil)
(let ((deps (org-agent::resolve-skill-dependencies "mock-dependent")))
(is (member "mock-safety" deps :test #'string-equal))
(is (member "mock-dependent" deps :test #'string-equal))))
(test test-log-buffering
"Verify that harness-log correctly populates the system logs."
(harness-log "Engineering TEST LOG")
(let ((logs (context-get-system-logs 5)))
(is (cl:some (lambda (line) (search "Engineering TEST LOG" line)) logs))))
(test test-global-awareness-assembly
"Verify that context-assemble-global-awareness reports active projects."
(clrhash org-agent::*object-store*)
(ingest-ast (list :type :HEADLINE :properties (list :ID "proj-1" :TITLE "Project Alpha" :TAGS "project") :contents nil))
(let ((awareness (context-assemble-global-awareness)))
(is (search "Project Alpha" awareness))
(is (search "proj-1" awareness))))
(test test-micro-rollback
"Verify that a pipeline crash triggers an automatic Object Store rollback."
(clrhash org-agent::*object-store*)
(clrhash org-agent::*history-store*)
(setf org-agent::*object-store-snapshots* nil)
;; State A
(ingest-ast (list :type :HEADLINE :properties (list :ID "node-1" :TITLE "State A") :contents nil))
(setup-mock-skills)
(org-agent::defskill :crashing-skill
:priority 200
:trigger (lambda (ctx) t)
:neuro (lambda (ctx) (list :type :REQUEST :payload (list :action :eval :code "(error \"BOOM\")")))
:symbolic (lambda (action ctx) (error "CRASH IN SYSTEM 2")))
(process-signal (list :type :EVENT :payload (list :sensor :test)))
;; Verify that we are still in State A
(let ((obj (lookup-object "node-1")))
(is (not (null obj)))
(is (equal (getf (org-object-attributes obj) :TITLE) "State A"))))
#+end_src