#+TITLE: The Cognitive Loop (loop.lisp) #+AUTHOR: Amr #+FILETAGS: :harness:loop: #+STARTUP: content * The Cognitive Loop (loop.lisp) ** Architectural Intent: The Reactive Signal Pipeline The core of the ~org-agent~ harness is a functional transformation pipeline. In traditional agent architectures, events are handled through deep, asynchronous recursion, which leads to fragile Lisp stacks and makes it difficult to implement advanced features like multi-model consensus. We have evolved the harness into a **Reactive Signal Pipeline**. Every event—whether it is a user keystroke, a heartbeat timer pulse, or a suggested action from an LLM—is treated as a discrete **Signal**. Signals move through a series of formal **Gates**. Each gate transforms or validates the signal until it is either physically dispatched to an actuator or safely rejected by the Deterministic Engine. *** Advantages of the Pipeline Model: - **Consensus Ready:** By treating reasoning as a signal moving through a pipe, we can "split" the pipe to query multiple LLM backends simultaneously. A Consensus Gate later in the pipe compares these proposals. - **Flat Execution:** Using a central orchestrator (~process-signal~) flattens the execution stack. Feedback from tools or errors is re-injected as a new signal rather than creating a nested function call. - **Micro-Rollbacks:** Because every signal turn is discrete, the harness can snapshot the Object Store before a turn and instantly roll back if a skill crashes. ** The Signal Pipeline Architecture #+begin_src mermaid flowchart TD S1[Signal: External Stimulus] --> P[Perceive Gate] S2[Signal: Heartbeat Pulse] --> P P --> N[Probabilistic Gate] N --> C[Consensus Gate] C --> V[Validation Gate] V --> D[Dispatch Gate] D -- Feedback Signal --> S1 #+end_src ** Package Context We ensure we are in the correct isolated namespace. #+begin_src lisp :tangle ../src/loop.lisp (in-package :org-agent) (defvar *interrupt-flag* nil) #+end_src ** Interrupt Lock A thread-safe lock used to signal the pipeline to halt execution gracefully. #+begin_src lisp :tangle ../src/loop.lisp (defvar *interrupt-lock* (bt:make-lock "harness-interrupt-lock")) #+end_src ** Physical Dispatch (dispatch-action) The final stage of the pipeline. It routes an approved action to its registered physical actuator (Emacs, Shell, etc.). #+begin_src lisp :tangle ../src/loop.lisp (defun dispatch-action (action context) "Routes an approved action to its registered physical actuator." (when (and action (listp action)) (let* ((target (or (ignore-errors (getf action :target)) :emacs)) (actuator-fn (gethash target *actuator-registry*))) (if actuator-fn (funcall actuator-fn action context) (harness-log "DISPATCH ERROR: No actuator for ~a" target))))) #+end_src ** Performance Tracking (harness-track-telemetry) Updates execution metrics for skills. This allows the harness to monitor which skills are consuming the most time or failing most frequently. #+begin_src lisp :tangle ../src/loop.lisp (defun harness-track-telemetry (skill-name duration status) "Updates performance metrics for a specific skill." (when skill-name (bt:with-lock-held (*telemetry-lock*) (let ((entry (or (gethash skill-name *skill-telemetry*) (list :executions 0 :total-time 0 :failures 0)))) (incf (getf entry :executions)) (incf (getf entry :total-time) duration) (when (eq status :rejected) (incf (getf entry :failures))) (setf (gethash skill-name *skill-telemetry*) entry))))) #+end_src ** Stimulus Injection (inject-stimulus) The entry point for all events into the harness. It enqueues raw messages into the pipeline, handling the transition from asynchronous threads to the synchronous pipeline execution. #+begin_src lisp :tangle ../src/loop.lisp (defun inject-stimulus (raw-message &key stream (depth 0)) "Enqueues a raw message into the reactive signal pipeline, handling async/sync execution and recovery." (let* ((payload (getf raw-message :payload)) (sensor (getf payload :sensor)) ;; Force Chat and Delegation to be async (async-p (or (getf payload :async-p) (member sensor '(:chat-message :delegation :user-command))))) (when stream (setf (getf raw-message :reply-stream) stream)) (if async-p (bt:make-thread (lambda () (restart-case (handler-bind ((error (lambda (c) (harness-log "ASYNC ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message)) (skip-event () nil))) :name "org-agent-async-task") (restart-case (handler-bind ((error (lambda (c) (harness-log "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message)) (skip-event () (harness-log "SYSTEM RECOVERY: Stimulus dropped.~%")))))) #+end_src ** Internal Tool Execution Handles harness-level operations that are not delegated to external actuators, such as hot-loading skills or evaluating Lisp code for system maintenance. #+begin_src lisp :tangle ../src/loop.lisp (defun execute-system-action (action context) "Processes internal harness commands like skill creation or environment updates." (declare (ignore context)) (let* ((payload (ignore-errors (getf action :payload))) (cmd (ignore-errors (getf payload :action)))) (case cmd (:eval (let ((code (getf payload :code))) (harness-log "ACTUATOR [System] - Evaluating: ~a" code) (handler-case (let ((result (eval (read-from-string code)))) (harness-log "ACTUATOR [System] - Result: ~s" result) result) (error (c) (harness-log "ACTUATOR ERROR [System] - Eval failed: ~a" c))))) (:create-skill (let* ((filename (getf payload :filename)) (content (getf payload :content)) (skills-dir (merge-pathnames "skills/" (asdf:system-source-directory :org-agent))) (full-path (merge-pathnames filename skills-dir))) (harness-log "ACTUATOR [System] - Creating skill ~a..." filename) (with-open-file (out full-path :direction :output :if-exists :supersede) (write-string content out)) (load-skill-from-org full-path))) (:set-cascade (setf *provider-cascade* (getf payload :cascade))) (:message (harness-log "ACTUATOR [System] - ~a" (getf payload :text))) (t (harness-log "ACTUATOR [System] - Unknown command ~s" cmd))))) #+end_src ** The Reactive Signal Pipeline (process-signal) This is the core functional loop. It moves a signal through the gates sequentially. *** Perceive Gate The Perceive Gate is responsible for data normalization and sensory intake. It takes raw stimulus and updates the global Object Store graph. #+begin_src lisp :tangle ../src/loop.lisp (defun perceive-gate (signal) "Initial processing: Normalizes raw stimuli and updates memory." (let* ((payload (getf signal :payload)) (type (getf signal :type)) (sensor (getf payload :sensor))) (harness-log "GATE [Perceive]: ~a (~a)" type (or sensor "no-sensor")) (snapshot-object-store) (cond ((eq type :EVENT) (case sensor (:buffer-update (let ((ast (getf payload :ast))) (when ast (ingest-ast ast)))) (:point-update (let ((element (getf payload :element))) (when element (ingest-ast element)))) (:interrupt (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* t))))) ((eq type :RESPONSE) (harness-log "GATE [Perceive]: Act Result -> ~a" (getf payload :status)))) (setf (getf signal :status) :perceived) signal)) #+end_src *** Probabilistic Gate The Probabilistic Gate invokes the neural reasoning engine. It takes the current context and generates a list of "intuitions" or proposed actions. #+begin_src lisp :tangle ../src/loop.lisp (defun neuro-gate (signal) "Probabilistic: Neural intuition and proposed actions." (unless (eq (getf signal :type) :EVENT) (return-from neuro-gate signal)) (harness-log "GATE [Probabilistic]: Consulting LLM...") (let ((thoughts (think signal))) (setf (getf signal :proposals) (if (and (listp thoughts) (listp (car thoughts))) thoughts (if thoughts (list thoughts) nil))) (setf (getf signal :status) :thought) signal)) #+end_src *** Consensus Gate When multiple LLM backends provide diverging thoughts, the Consensus Gate resolves them into a single candidate action. #+begin_src lisp :tangle ../src/loop.lisp (defun resolve-consensus (proposals signal) "Resolves diverging proposals by selecting the most consistent one." (declare (ignore signal)) (harness-log "CONSENSUS: ~a proposals found. Resolving..." (length proposals)) (let ((counts (make-hash-table :test 'equal))) (dolist (p proposals) (incf (gethash p counts 0))) (let ((winner (first proposals)) (max-count 0)) (maphash (lambda (p count) (when (> count max-count) (setq max-count count winner p))) counts) (harness-log "CONSENSUS: Winner selected with ~a votes." max-count) winner))) (defun consensus-gate (signal) "Resolves multiple proposals into a single candidate action." (let ((proposals (getf signal :proposals))) (if (and proposals (cdr proposals)) (let ((winner (resolve-consensus proposals signal))) (setf (getf signal :candidate) winner)) (setf (getf signal :candidate) (first proposals))) (setf (getf signal :status) :consensus) signal)) #+end_src *** Decide Gate The Decide Gate is the final deterministic safety net. It runs the candidate action through all loaded skill safety gates (The Deterministic Engine) before allowing it to proceed. #+begin_src lisp :tangle ../src/loop.lisp (defun decide-gate (signal) "Deterministic: Deterministic safety and validation." (let ((candidate (getf signal :candidate))) (if candidate (let* ((normalized-candidate (if (listp candidate) candidate (list :type :RESPONSE :payload (list :text candidate)))) (decision (decide normalized-candidate signal))) (setf (getf signal :approved-action) decision)) (setf (getf signal :approved-action) nil)) (setf (getf signal :status) :decided) signal)) #+end_src *** Dispatch Gate The Dispatch Gate performs the final actuation. If the action produces output (like a tool result), it returns a new signal to be re-injected into the pipeline. #+begin_src lisp :tangle ../src/loop.lisp (defun dispatch-gate (signal) "Final Stage: Actuation and feedback generation." (let* ((approved (getf signal :approved-action)) (type (getf signal :type)) (depth (getf signal :depth 0)) (feedback nil)) (case type (:REQUEST (dispatch-action signal signal)) (:EVENT (when approved (let* ((payload (getf approved :payload)) (target (getf approved :target)) (action (or (getf payload :action) (getf approved :action))) (tool-name (or (getf payload :tool) (getf approved :tool))) (tool-args (or (getf payload :args) (getf approved :args)))) (if (and (eq target :tool) (eq action :call)) (let ((tool (gethash (string-downcase (string tool-name)) *cognitive-tools*))) (if tool (handler-case (let* ((clean-args (if (and (listp tool-args) (listp (car tool-args))) (car tool-args) tool-args)) (result (funcall (cognitive-tool-body tool) clean-args))) (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream) :payload (list :sensor :tool-output :result result :tool tool-name)))) (error (c) (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream) :payload (list :sensor :tool-error :tool tool-name :message (format nil "~a" c)))))) (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream) :payload (list :sensor :tool-error :message "Tool not found"))))) (let ((result (dispatch-action approved signal))) (when (and result (not (member target '(:emacs :system-message)))) (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream) :payload (list :sensor :tool-output :result result :tool approved)))))))))) (setf (getf signal :status) :dispatched) feedback)) #+end_src *** Pipeline Orchestrator (process-signal) This is the entry point to the functional pipeline. It iterates through the gates and handles micro-rollbacks if a pipeline stage crashes. #+begin_src lisp :tangle ../src/loop.lisp (defun process-signal (signal) "The entry point to the Reactive Signal Pipeline." (let ((current-signal signal)) (loop while current-signal do (let ((depth (getf current-signal :depth 0))) (when (> depth 10) (harness-log "PIPELINE ERROR: Max depth reached.") (return nil)) (when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*) (harness-log "PIPELINE: Interrupted.") (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil)) (return nil)) (handler-case (progn (setf current-signal (perceive-gate current-signal)) (setf current-signal (neuro-gate current-signal)) (setf current-signal (consensus-gate current-signal)) (setf current-signal (decide-gate current-signal)) (setf current-signal (dispatch-gate current-signal))) (error (c) (harness-log "PIPELINE CRASH: ~a - Initiating Micro-Rollback." c) (rollback-object-store 0) (let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor)))) (if (or (> depth 2) (member sensor '(:loop-error :tool-error))) (setf current-signal nil) (setf current-signal (list :type :EVENT :depth (1+ depth) :reply-stream (getf current-signal :reply-stream) :payload (list :sensor :loop-error :message (format nil "~a" c) :depth depth))))))))))) #+end_src ** Delegation Mechanisms Allows the harness to hand off tasks to specialized background agents or processes. #+begin_src lisp :tangle ../src/loop.lisp (defun delegate-task (task-id recipient &key context) "Enqueues a task for another agent or background process." (harness-log "ORCHESTRATOR: Delegating task ~a to ~a" task-id recipient) (inject-stimulus (list :type :EVENT :payload (list :sensor :delegation :task-id task-id :recipient recipient :context context)))) #+end_src ** Heartbeat Mechanism Periodically injects a "pulse" into the system to trigger temporal skills. #+begin_src lisp :tangle ../src/loop.lisp (defvar *default-heartbeat-interval* 60 "Default interval for the system heartbeat pulse in seconds.") (defvar *heartbeat-thread* nil) (defun start-heartbeat (&optional (interval *default-heartbeat-interval*)) "Spawns a thread that periodically injects a heartbeat stimulus." (setf *heartbeat-thread* (bt:make-thread (lambda () (loop (sleep interval) (harness-log "HARNESS: Heartbeat pulse...") (inject-stimulus (list :type :EVENT :payload (list :sensor :heartbeat :unix-time (get-universal-time)))))) :name "org-agent-heartbeat"))) (defun stop-heartbeat () "Gracefully terminates the heartbeat pulse thread." (when (and *heartbeat-thread* (bt:thread-alive-p *heartbeat-thread*)) (bt:destroy-thread *heartbeat-thread*) (setf *heartbeat-thread* nil))) #+end_src ** Main Entry Point The execution entry point for the harness binary. #+begin_src lisp :tangle ../src/loop.lisp (defun main () "The entry point for the compiled standalone binary." (let* ((home (uiop:getenv "HOME")) (env-file (uiop:merge-pathnames* ".local/share/org-agent/.env" (uiop:ensure-directory-pathname home)))) (if (uiop:file-exists-p env-file) (progn (format t "HARNESS: Loading environment from ~a~%" env-file) (cl-dotenv:load-env env-file)) (format t "HARNESS ERROR: .env not found at ~a~%" env-file))) (let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL") :junk-allowed t)) *default-heartbeat-interval*))) (format t "HARNESS: Heartbeat interval set to ~a seconds.~%" interval) (start-daemon :interval interval)) (loop (sleep 3600))) #+end_src * Phase E: Chaos (Verification) The Reactive Signal Pipeline must be empirically verified through automated testing to ensure architectural integrity. #+begin_src lisp :tangle ../tests/pipeline-tests.lisp (defpackage :org-agent-pipeline-tests (:use :cl :fiveam :org-agent)) (in-package :org-agent-pipeline-tests) (def-suite pipeline-suite :description "Verification of the Reactive Signal Pipeline.") (in-suite pipeline-suite) (defun setup-mock-skills () "Register mock skills for testing." (clrhash org-agent::*skills-registry*) (org-agent::defskill :mock-refactor :priority 100 :trigger (lambda (ctx) (eq (getf (getf ctx :payload) :command) :organize-subtree)) :neuro (lambda (ctx) "Mock neuro prompt") :symbolic (lambda (action ctx) `(:type :REQUEST :id 123 :payload (:action :refactor-subtree :target-id nil :properties (("ID" . "node-123")))))) (org-agent::defskill :mock-safety :priority 50 :trigger (lambda (ctx) t) ; always triggers :neuro (lambda (ctx) "Mock neuro") :symbolic (lambda (action ctx) nil))) ; rejects everything (test test-perceive-gate "Perceive gate should update the object store and normalize signal." (clrhash org-agent::*object-store*) (let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil)))) (result (perceive-gate signal))) (is (eq :perceived (getf result :status))) (is (not (null (gethash "test-node" org-agent::*object-store*)))))) (test test-decide-gate-safety "Decide gate should block unsafe LLM proposals." (setup-mock-skills) (let* ((candidate (list :type :REQUEST :payload (list :action :eval :code "(shell-command \"rm -rf /\")"))) (signal (list :type :EVENT :candidate candidate)) (result (decide-gate signal))) (is (eq :decided (getf result :status))) (let ((approved (getf result :approved-action))) (is (eq :LOG (getf approved :type))) (is (search "Action rejected by skill heuristics" (getf (getf approved :payload) :text)))))) (test test-pipeline-flow-flat "Verify that process-signal correctly executes a signal through gates." (setup-mock-skills) (clrhash org-agent::*object-store*) (let ((signal (list :type :EVENT :payload (list :sensor :buffer-update)))) (process-signal signal) (pass "Pipeline completed execution."))) (test test-depth-limiting "Verify that the pipeline terminates runaway feedback loops." (let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat)))) (is (null (process-signal runaway-signal))))) (test test-env-loading "Verify that environment variables are accessible." (setf (uiop:getenv "LLM_ENDPOINT") "http://mock") (setf (uiop:getenv "MEMEX_USER") "Amr") (is (not (null (uiop:getenv "LLM_ENDPOINT")))) (is (stringp (org-agent::get-env "MEMEX_USER")))) (test test-path-resolution "Verify that context-resolve-path expands environment variables." (setf (uiop:getenv "MEMEX_USER") "Amr") (let ((path "$MEMEX_USER/test")) (is (search "Amr/test" (context-resolve-path path))))) (test test-skill-dependencies "Verify that resolve-skill-dependencies correctly flattens the graph." (setup-mock-skills) (org-agent::defskill :mock-dependent :priority 10 :dependencies (list "mock-safety") :trigger (lambda (ctx) nil) :neuro nil :symbolic nil) (let ((deps (org-agent::resolve-skill-dependencies "mock-dependent"))) (is (member "mock-safety" deps :test #'string-equal)) (is (member "mock-dependent" deps :test #'string-equal)))) (test test-log-buffering "Verify that harness-log correctly populates the system logs." (harness-log "Engineering TEST LOG") (let ((logs (context-get-system-logs 5))) (is (cl:some (lambda (line) (search "Engineering TEST LOG" line)) logs)))) (test test-global-awareness-assembly "Verify that context-assemble-global-awareness reports active projects." (clrhash org-agent::*object-store*) (ingest-ast (list :type :HEADLINE :properties (list :ID "proj-1" :TITLE "Project Alpha" :TAGS "project") :contents nil)) (let ((awareness (context-assemble-global-awareness))) (is (search "Project Alpha" awareness)) (is (search "proj-1" awareness)))) (test test-micro-rollback "Verify that a pipeline crash triggers an automatic Object Store rollback." (clrhash org-agent::*object-store*) (clrhash org-agent::*history-store*) (setf org-agent::*object-store-snapshots* nil) ;; State A (ingest-ast (list :type :HEADLINE :properties (list :ID "node-1" :TITLE "State A") :contents nil)) (setup-mock-skills) (org-agent::defskill :crashing-skill :priority 200 :trigger (lambda (ctx) t) :neuro (lambda (ctx) (list :type :REQUEST :payload (list :action :eval :code "(error \"BOOM\")"))) :symbolic (lambda (action ctx) (error "CRASH IN DETERMINISTIC ENGINE"))) (process-signal (list :type :EVENT :payload (list :sensor :test))) ;; Verify that we are still in State A (let ((obj (lookup-object "node-1"))) (is (not (null obj))) (is (equal (getf (org-object-attributes obj) :TITLE) "State A")))) #+end_src