Files
passepartout/literate/core.org

23 KiB

The Cognitive Loop (core.lisp)

The Cognitive Loop (core.lisp)

Deep Reasoning: Beyond Asynchronous Recursion

The original `cognitive-loop` used asynchronous recursion to handle stimuli. While responsive, it led to deep Lisp stacks and made multi-backend consensus difficult to implement.

  • The Circuit Board Model: We have evolved the kernel into a functional transformation pipeline. Every event—be it a keystroke, a timer pulse, or a neural proposal—is a Signal.
  • Consensus Gates: By treating reasoning as a signal moving through a pipe, we can "split" the pipe to ask multiple LLMs simultaneously. A Consensus Gate later in the pipe compares the proposals and selects the most mathematically consistent one.
  • Multi-Modal Fusion: The pipeline can blend disparate signals (e.g. User Prompt + Low Battery Alert or Heartbeat) into a single coherent cognitive event.
  • Flat Execution: By using a feedback-loop orchestrator (`process-signal`), we flatten the execution stack. Tool outputs and errors are re-injected as new signals rather than creating nested function calls.

The Signal Pipeline (Architecture)

graph TD
    S1[Signal: User Message] --> P[Perceive Gate]
    S2[Signal: Heartbeat] --> P
    P --> N[Neuro Gate: Multi-Backend]
    N --> C[Consensus Gate]
    C --> V[Validation Gate: System 2]
    V --> D[Dispatch Gate: Actuators]
    D -- Feedback Signal --> S1

Package Context

(in-package :org-agent)

Global Kernel State

The kernel maintains several thread-safe global variables for logging, telemetry, and execution control.

(defvar *system-logs* nil)
(defvar *logs-lock* (bt:make-lock "kernel-logs-lock"))
(defvar *max-log-history* 100)
(defvar *interrupt-flag* nil)
(defvar *interrupt-lock* (bt:make-lock "kernel-interrupt-lock"))
(defvar *skill-telemetry* (make-hash-table :test 'equal))
(defvar *telemetry-lock* (bt:make-lock "kernel-telemetry-lock"))

Physical Dispatch (dispatch-action)

Routes an approved action to its registered physical actuator.

(defun dispatch-action (action context)
  "Routes an approved action to its registered physical actuator."
  (when (and action (listp action))
    (let* ((target (or (ignore-errors (getf action :target)) :emacs)) 
           (actuator-fn (gethash target *actuator-registry*)))
      (if actuator-fn 
          (funcall actuator-fn action context) 
          (kernel-log "DISPATCH ERROR: No actuator for ~a" target)))))

Performance Tracking (kernel-track-telemetry)

Updates performance metrics for a specific skill, tracking execution counts, total duration, and failure rates.

(defun kernel-track-telemetry (skill-name duration status)
  "Updates performance metrics for a specific skill."
  (when skill-name (bt:with-lock-held (*telemetry-lock*)
                     (let ((entry (or (gethash skill-name *skill-telemetry*) (list :executions 0 :total-time 0 :failures 0))))
                       (incf (getf entry :executions)) (incf (getf entry :total-time) duration)
                       (when (eq status :rejected) (incf (getf entry :failures))) (setf (gethash skill-name *skill-telemetry*) entry)))))

System Logging (kernel-log)

A centralized logging function that outputs to standard output and maintains a rolling in-memory buffer for context-aware reasoning.

(defun kernel-log (fmt &rest args)
  "Records a formatted message to the system log and standard output."
  (let ((msg (apply #'format nil fmt args)))
    (bt:with-lock-held (*logs-lock*) (push msg *system-logs*) (when (> (length *system-logs*) *max-log-history*) (setf *system-logs* (subseq *system-logs* 0 *max-log-history*))))
    (format t "~a~%" msg) (finish-output)))

Stimulus Injection (inject-stimulus)

This is the entry point for all events into the kernel. It initializes a signal and passes it to the `process-signal` pipeline.

(defun inject-stimulus (raw-message &key stream (depth 0))
  "Enqueues a raw message into the reactive signal pipeline, handling async/sync execution and recovery."
  (let* ((payload (getf raw-message :payload)) 
         (sensor (getf payload :sensor))
         ;; Force Chat and Delegation to be async
         (async-p (or (getf payload :async-p) (member sensor '(:chat-message :delegation :user-command)))))
    (when stream (setf (getf raw-message :reply-stream) stream))
    (if async-p (bt:make-thread (lambda () (restart-case (handler-bind ((error (lambda (c) (kernel-log "ASYNC ERROR: ~a" c) (invoke-restart 'skip-event))))
                                                           (process-signal raw-message)) (skip-event () nil))) :name "org-agent-async-task")
        (restart-case (handler-bind ((error (lambda (c) (kernel-log "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message))
          (skip-event () (kernel-log "SYSTEM RECOVERY: Stimulus dropped.~%"))))))

Internal Tool Execution

The `execute-system-action` function handles kernel-level operations such as hot-loading skills, evaluating raw Lisp, or setting environment variables.

(defun execute-system-action (action context)
  "Processes internal kernel commands like skill creation or environment updates."
  (declare (ignore context))
  (let* ((payload (ignore-errors (getf action :payload))) (cmd (ignore-errors (getf payload :action))))
    (case cmd
      (:eval (let ((code (getf payload :code)))
               (kernel-log "ACTUATOR [System] - Evaluating: ~a" code)
               (handler-case (let ((result (eval (read-from-string code))))
                               (kernel-log "ACTUATOR [System] - Result: ~s" result)
                               result)
                 (error (c) (kernel-log "ACTUATOR ERROR [System] - Eval failed: ~a" c)))))
      (:create-skill (let* ((filename (getf payload :filename)) (content (getf payload :content))
                            (skills-dir (merge-pathnames "skills/" (asdf:system-source-directory :org-agent))) (full-path (merge-pathnames filename skills-dir)))
                       (kernel-log "ACTUATOR [System] - Creating skill ~a..." filename)
                       (with-open-file (out full-path :direction :output :if-exists :supersede) (write-string content out))
                       (load-skill-from-org full-path)))
      (:set-cascade (setf *provider-cascade* (getf payload :cascade)))
      (:message (kernel-log "ACTUATOR [System] - ~a" (getf payload :text)))
      (t (kernel-log "ACTUATOR [System] - Unknown command ~s" cmd)))))

The Reactive Signal Pipeline (process-signal)

The kernel has evolved into a functional transformation pipeline. Every event—be it a keystroke, a timer pulse, or a neural proposal—is a Signal. Signals flow through a series of "Gates" that progressively enrich and validate the event until it is dispatched to an actuator.

Perceive Gate

Normalizes raw stimuli and updates the Object Store knowledge graph.

(defun perceive-gate (signal)
  "Initial processing: Normalizes raw stimuli and updates memory."
  (let* ((payload (getf signal :payload))
         (type (getf signal :type))
         (sensor (getf payload :sensor)))
    (kernel-log "GATE [Perceive]: ~a (~a)" type (or sensor "no-sensor"))
    (snapshot-object-store)
    (cond ((eq type :EVENT)
           (case sensor
             (:buffer-update (let ((ast (getf payload :ast))) (when ast (ingest-ast ast))))
             (:point-update (let ((element (getf payload :element))) (when element (ingest-ast element))))
             (:interrupt (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* t)))))
          ((eq type :RESPONSE)
           (kernel-log "GATE [Perceive]: Act Result -> ~a" (getf payload :status))))
    (setf (getf signal :status) :perceived)
    signal))

Neuro Gate

Invokes the neural System 1 engine to generate intuition-based proposals. If parallel consensus is enabled, this gate returns a list of proposals.

(defun neuro-gate (signal)
  "System 1: Intuition and proposed actions."
  (unless (eq (getf signal :type) :EVENT)
    (return-from neuro-gate signal))
  (kernel-log "GATE [Neuro]: Consulting System 1...")
  (let ((thoughts (think signal)))
    (setf (getf signal :proposals) (if (and thoughts (listp thoughts) (listp (car thoughts))) 
                                       thoughts 
                                       (if thoughts (list thoughts) nil)))
    (setf (getf signal :status) :thought)
    signal))

Consensus Gate

Compares multiple proposals (from parallel backends) and selects the most consistent one.

(defun resolve-consensus (proposals signal)
  "Resolves diverging proposals by voting or selecting the safest one."
  (declare (ignore signal))
  (kernel-log "CONSENSUS: ~a proposals found. Resolving..." (length proposals))
  ;; Simplified consensus: Majority vote or first safe one
  ;; For now, we'll select the proposal that appears most frequently.
  (let ((counts (make-hash-table :test 'equal)))
    (dolist (p proposals)
      (incf (gethash p counts 0)))
    (let ((winner (first proposals))
          (max-count 0))
      (maphash (lambda (p count)
                 (when (> count max-count)
                   (setq max-count count
                         winner p)))
               counts)
      (kernel-log "CONSENSUS: Winner selected with ~a votes." max-count)
      winner)))

(defun consensus-gate (signal)
  "Resolves multiple proposals into a single candidate action."
  (let ((proposals (getf signal :proposals)))
    (if (and proposals (cdr proposals))
        (let ((winner (resolve-consensus proposals signal)))
          (setf (getf signal :candidate) winner))
        (setf (getf signal :candidate) (first proposals)))
    (setf (getf signal :status) :consensus)
    signal))

Decide Gate

The System 2 safety gate. Validates the candidate action against formal rules and PSF invariants.

(defun decide-gate (signal)
  "System 2: Safety and validation."
  (let ((candidate (getf signal :candidate)))
    (if candidate
        (let ((approved (decide candidate signal)))
          (setf (getf signal :approved-action) approved)
          (unless approved (kernel-log "GATE [Decide]: REJECTED by System 2")))
        (setf (getf signal :approved-action) nil))
    (setf (getf signal :status) :decided)
    signal))

Dispatch Gate

Routes approved actions to actuators. If an action results in new information (like tool output), it returns a FEEDBACK signal to be re-injected.

(defun dispatch-gate (signal)
  "Final Stage: Actuation and feedback generation."
  (let* ((approved (getf signal :approved-action))
         (type (getf signal :type))
         (depth (getf signal :depth 0))
         (feedback nil))
    (case type
      (:REQUEST (dispatch-action signal signal))
      (:EVENT 
       (when approved
         (let* ((payload (getf approved :payload))
                (target (getf approved :target))
                (action (or (getf payload :action) (getf approved :action)))
                (tool-name (or (getf payload :tool) (getf approved :tool)))
                (tool-args (or (getf payload :args) (getf approved :args))))
           (if (and (eq target :tool) (eq action :call))
               (let ((tool (gethash (string-downcase (string tool-name)) *cognitive-tools*)))
                 (if tool
                     (handler-case
                         (let* ((clean-args (if (and (listp tool-args) (listp (car tool-args))) (car tool-args) tool-args))
                                (result (funcall (cognitive-tool-body tool) clean-args)))
                           (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
                                                :payload (list :sensor :tool-output :result result :tool tool-name))))
                       (error (c)
                         (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
                                              :payload (list :sensor :tool-error :tool tool-name :message (format nil "~a" c))))))
                     (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
                                          :payload (list :sensor :tool-error :message "Tool not found")))))
               (let ((result (dispatch-action approved signal)))
                 (when (and result (not (member target '(:emacs :system-message))))
                   (setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
                                        :payload (list :sensor :tool-output :result result :tool approved))))))))))
    (setf (getf signal :status) :dispatched)
    feedback))

Pipeline Orchestrator (process-signal)

Moves a signal through the gates in a flat loop, handling feedback signals without increasing the Lisp stack depth.

(defun process-signal (signal)
  "The entry point to the Reactive Signal Pipeline."
  (let ((current-signal signal))
    (loop while current-signal do
      (let ((depth (getf current-signal :depth 0)))
        (when (> depth 10)
          (kernel-log "PIPELINE ERROR: Max depth reached.")
          (return nil))
        (when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
          (kernel-log "PIPELINE: Interrupted.")
          (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
          (return nil))
        
        (handler-case
            (progn
              (setf current-signal (perceive-gate current-signal))
              (setf current-signal (neuro-gate current-signal))
              (setf current-signal (consensus-gate current-signal))
              (setf current-signal (decide-gate current-signal))
              (setf current-signal (dispatch-gate current-signal)))
          (error (c)
            (kernel-log "PIPELINE CRASH: ~a - Initiating Micro-Rollback." c)
            (rollback-object-store 0)
            (let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
              (if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
                  (setf current-signal nil)
                  (setf current-signal (list :type :EVENT :depth (1+ depth) :reply-stream (getf current-signal :reply-stream)
                                             :payload (list :sensor :loop-error :message (format nil "~a" c) :depth depth)))))))))))

Delegation Mechanisms

Allows the core to hand off tasks to specialized background agents or processes.

(defun delegate-task (task-id recipient &key context)
  "Enqueues a task for another agent or background process."
  (kernel-log "ORCHESTRATOR: Delegating task ~a to ~a" task-id recipient)
  (inject-stimulus (list :type :EVENT 
                         :payload (list :sensor :delegation 
                                        :task-id task-id 
                                        :recipient recipient 
                                        :context context))))

Heartbeat Mechanism

Periodically injects a "pulse" into the system to trigger temporal skills (like cron jobs or reminders).

(defvar *heartbeat-thread* nil)

(defun start-heartbeat (&optional (interval 60))
  "Spawns a thread that periodically injects a heartbeat stimulus."
  (setf *heartbeat-thread* 
        (bt:make-thread 
         (lambda () 
           (loop 
             (sleep interval) 
             (kernel-log "KERNEL: Heartbeat pulse...")
             (inject-stimulus (list :type :EVENT :payload (list :sensor :heartbeat :unix-time (get-universal-time)))))) 
         :name "org-agent-heartbeat")))

(defun stop-heartbeat () 
  "Gracefully terminates the heartbeat pulse thread."
  (when (and *heartbeat-thread* (bt:thread-alive-p *heartbeat-thread*)) 
    (bt:destroy-thread *heartbeat-thread*) 
    (setf *heartbeat-thread* nil)))

Boot Sequence (initialize-all-skills)

The kernel initialization sequence has been moved to the Micro-Loader in the skills module. It remains exported for consistency.

(defun load-all-skills ()
  "Deprecated: use initialize-all-skills. Centralized boot orchestrator."
  (initialize-all-skills))

Main Entry Point

The execution entry point for the kernel binary.

(defun main ()
  "The entry point for the compiled standalone binary."
  (let* ((home (uiop:getenv "HOME"))
         (env-file (uiop:merge-pathnames* ".local/share/org-agent/.env" (uiop:ensure-directory-pathname home))))
    (if (uiop:file-exists-p env-file)
        (progn
          (format t "KERNEL: Loading environment from ~a~%" env-file)
          (cl-dotenv:load-env env-file))
        (format t "KERNEL ERROR: .env not found at ~a~%" env-file)))
  (let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL") :junk-allowed t)) 60)))
    (format t "KERNEL: Heartbeat interval set to ~a seconds.~%" interval)
    (start-daemon :interval interval))
  (loop (sleep 3600)))

Phase E: Chaos (Verification)

Following the PSF mandates, the Reactive Signal Pipeline must be empirically verified. The following test suite ensures that signals flow correctly through the gates, that feedback is handled without stack recursion, and that depth limits are strictly enforced.

(defpackage :org-agent-pipeline-tests
  (:use :cl :fiveam :org-agent))
(in-package :org-agent-pipeline-tests)

(def-suite pipeline-suite
  :description "Verification of the Reactive Signal Pipeline.")
(in-suite pipeline-suite)

(defun setup-mock-skills ()
  "Register mock skills for testing."
  (clrhash org-agent::*skills-registry*)
  
  (org-agent::defskill :mock-refactor
    :priority 100
    :trigger (lambda (ctx) (eq (getf (getf ctx :payload) :command) :organize-subtree))
    :neuro (lambda (ctx) "Mock neuro prompt")
    :symbolic (lambda (action ctx) 
                `(:type :REQUEST :id 123 
                  :payload (:action :refactor-subtree 
                            :target-id nil 
                            :properties (("ID" . "node-123"))))))

  (org-agent::defskill :mock-safety
    :priority 50
    :trigger (lambda (ctx) t) ; always triggers
    :neuro (lambda (ctx) "Mock neuro")
    :symbolic (lambda (action ctx) nil))) ; rejects everything

(test test-perceive-gate
  "Perceive gate should update the object store and normalize signal."
  (clrhash org-agent::*object-store*)
  (let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil))))
         (result (perceive-gate signal)))
    (is (eq :perceived (getf result :status)))
    (is (not (null (gethash "test-node" org-agent::*object-store*))))))

(test test-decide-gate-safety
  "Decide gate should block unsafe LLM proposals."
  (setup-mock-skills)
  (let* ((candidate (list :type :REQUEST :payload (list :action :eval :code "(shell-command \"rm -rf /\")")))
         (signal (list :type :EVENT :candidate candidate))
         (result (decide-gate signal)))
    (is (eq :decided (getf result :status)))
    (let ((approved (getf result :approved-action)))
      (is (eq :LOG (getf approved :type)))
      (is (search "Action rejected by skill heuristics" (getf (getf approved :payload) :text))))))

(test test-pipeline-flow-flat
  "Verify that process-signal correctly executes a signal through gates."
  (setup-mock-skills)
  (clrhash org-agent::*object-store*)
  (let ((signal (list :type :EVENT :payload (list :sensor :buffer-update))))
    (process-signal signal)
    (pass "Pipeline completed execution.")))

(test test-depth-limiting
  "Verify that the pipeline terminates runaway feedback loops."
  (let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat))))
    (is (null (process-signal runaway-signal)))))

(test test-env-loading
  "Verify that environment variables are accessible."
  (setf (uiop:getenv "LLM_ENDPOINT") "http://mock")
  (setf (uiop:getenv "MEMEX_USER") "Amr")
  (is (not (null (uiop:getenv "LLM_ENDPOINT"))))
  (is (stringp (org-agent::get-env "MEMEX_USER"))))

(test test-path-resolution
  "Verify that context-resolve-path expands environment variables."
  (setf (uiop:getenv "MEMEX_USER") "Amr")
  (let ((path "$MEMEX_USER/test"))
    (is (search "Amr/test" (context-resolve-path path)))))

(test test-skill-dependencies
  "Verify that resolve-skill-dependencies correctly flattens the graph."
  (setup-mock-skills)
  (org-agent::defskill :mock-dependent
    :priority 10
    :dependencies (list "mock-safety")
    :trigger (lambda (ctx) nil)
    :neuro nil
    :symbolic nil)
  (let ((deps (org-agent::resolve-skill-dependencies "mock-dependent")))
    (is (member "mock-safety" deps :test #'string-equal))
    (is (member "mock-dependent" deps :test #'string-equal))))

(test test-log-buffering
  "Verify that kernel-log correctly populates the system logs."
  (kernel-log "PSF TEST LOG")
  (let ((logs (context-get-system-logs 5)))
    (is (cl:some (lambda (line) (search "PSF TEST LOG" line)) logs))))

(test test-global-awareness-assembly
  "Verify that context-assemble-global-awareness reports active projects."
  (clrhash org-agent::*object-store*)
  (ingest-ast (list :type :HEADLINE :properties (list :ID "proj-1" :TITLE "Project Alpha" :TAGS "project") :contents nil))
  (let ((awareness (context-assemble-global-awareness)))
    (is (search "Project Alpha" awareness))
    (is (search "proj-1" awareness))))

(test test-micro-rollback
  "Verify that a pipeline crash triggers an automatic Object Store rollback."
  (clrhash org-agent::*object-store*)
  (clrhash org-agent::*history-store*)
  (setf org-agent::*object-store-snapshots* nil)
  
  ;; State A
  (ingest-ast (list :type :HEADLINE :properties (list :ID "node-1" :TITLE "State A") :contents nil))
  
  (setup-mock-skills)
  ;; Skill that crashes in Symbolic Gate
  (org-agent::defskill :crashing-skill
    :priority 200
    :trigger (lambda (ctx) t)
    :neuro (lambda (ctx) (list :type :REQUEST :payload (list :action :eval :code "(error \"BOOM\")")))
    :symbolic (lambda (action ctx) (error "CRASH IN SYSTEM 2")))

  ;; Run pipeline. This turn will:
  ;; 1. Perceive (Take snapshot of State A)
  ;; 2. Neuro (Think)
  ;; 3. Decide (Crash!)
  ;; 4. Rollback to State A.
  (process-signal (list :type :EVENT :payload (list :sensor :test)))
  
  ;; Verify that we are still in State A
  (let ((obj (lookup-object "node-1")))
    (is (not (null obj)))
    (is (equal (getf (org-object-attributes obj) :TITLE) "State A"))))