refactor: implement Reactive Signal Pipeline and flatten cognitive loop
This commit is contained in:
267
src/core.lisp
267
src/core.lisp
@@ -8,6 +8,21 @@
|
||||
(defvar *skill-telemetry* (make-hash-table :test 'equal))
|
||||
(defvar *telemetry-lock* (bt:make-lock "kernel-telemetry-lock"))
|
||||
|
||||
(defvar *actuator-registry* (make-hash-table :test 'equal))
|
||||
|
||||
(defun register-actuator (name fn)
|
||||
"Registers an actuator function. Actuators receive two arguments: (ACTION CONTEXT)."
|
||||
(setf (gethash name *actuator-registry*) fn))
|
||||
|
||||
(defun dispatch-action (action context)
|
||||
"Routes an approved action to its registered physical actuator."
|
||||
(when (and action (listp action))
|
||||
(let* ((target (or (ignore-errors (getf action :target)) :emacs))
|
||||
(actuator-fn (gethash target *actuator-registry*)))
|
||||
(if actuator-fn
|
||||
(funcall actuator-fn action context)
|
||||
(kernel-log "DISPATCH ERROR: No actuator for ~a" target)))))
|
||||
|
||||
(defun kernel-track-telemetry (skill-name duration status)
|
||||
"Updates performance metrics for a specific skill."
|
||||
(when skill-name (bt:with-lock-held (*telemetry-lock*)
|
||||
@@ -21,39 +36,18 @@
|
||||
(bt:with-lock-held (*logs-lock*) (push msg *system-logs*) (when (> (length *system-logs*) *max-log-history*) (setf *system-logs* (subseq *system-logs* 0 *max-log-history*))))
|
||||
(format t "~a~%" msg) (finish-output)))
|
||||
|
||||
(defvar *heartbeat-thread* nil)
|
||||
(defvar *actuator-registry* (make-hash-table :test 'equal))
|
||||
(defun register-actuator (name fn)
|
||||
"Registers an actuator function. Actuators receive two arguments: (ACTION CONTEXT)."
|
||||
(setf (gethash name *actuator-registry*) fn))
|
||||
|
||||
(defun inject-stimulus (raw-message &key stream (depth 0))
|
||||
"Enqueues a raw message into the cognitive loop, handling async/sync execution and recovery."
|
||||
"Enqueues a raw message into the reactive signal pipeline, handling async/sync execution and recovery."
|
||||
(let* ((payload (getf raw-message :payload))
|
||||
(sensor (getf payload :sensor))
|
||||
;; Force Chat and Delegation to be async
|
||||
(async-p (or (getf payload :async-p) (member sensor '(:chat-message :delegation :user-command)))))
|
||||
(when stream (setf (getf raw-message :reply-stream) stream))
|
||||
(if async-p (bt:make-thread (lambda () (restart-case (handler-bind ((error (lambda (c) (kernel-log "ASYNC ERROR: ~a" c) (invoke-restart 'skip-event))))
|
||||
(cognitive-loop raw-message depth)) (skip-event () nil))) :name "org-agent-async-task")
|
||||
(restart-case (handler-bind ((error (lambda (c) (kernel-log "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (cognitive-loop raw-message depth))
|
||||
(process-signal raw-message)) (skip-event () nil))) :name "org-agent-async-task")
|
||||
(restart-case (handler-bind ((error (lambda (c) (kernel-log "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message))
|
||||
(skip-event () (kernel-log "SYSTEM RECOVERY: Stimulus dropped.~%"))))))
|
||||
|
||||
(defun spawn-task (task-description &key (async-p t))
|
||||
"Creates a new background cognitive task from a description."
|
||||
(inject-stimulus `(:type :EVENT :payload (:sensor :delegation :query ,task-description :async-p ,async-p))))
|
||||
|
||||
(defun send-swarm-packet (target-url payload)
|
||||
"Transmits a JSON payload to a remote swarm node."
|
||||
(let* ((json-payload (cl-json:encode-json-to-string payload)) (headers '(("Content-Type" . "application/json"))))
|
||||
(handler-case (dex:post target-url :headers headers :content json-payload) (error (c) (kernel-log "SWARM ERROR: ~a" c) nil))))
|
||||
|
||||
(defun dispatch-action (action context)
|
||||
"Routes an approved action to its registered physical actuator."
|
||||
(when (and action (listp action))
|
||||
(let* ((target (or (ignore-errors (getf action :target)) :emacs)) (actuator-fn (gethash target *actuator-registry*)))
|
||||
(if actuator-fn (funcall actuator-fn action context) (kernel-log "DISPATCH ERROR: No actuator for ~a" target)))))
|
||||
|
||||
(defun execute-system-action (action context)
|
||||
"Processes internal kernel commands like skill creation or environment updates."
|
||||
(declare (ignore context))
|
||||
@@ -74,104 +68,135 @@
|
||||
(:message (kernel-log "ACTUATOR [System] - ~a" (getf payload :text)))
|
||||
(t (kernel-log "ACTUATOR [System] - Unknown command ~s" cmd)))))
|
||||
|
||||
(defun cognitive-loop (raw-message &optional (depth 0))
|
||||
"The main recursive OODA cycle: Perceive, Think, Decide, Act."
|
||||
(when (> depth 10)
|
||||
(kernel-log "SYSTEM ERROR: Maximum cognitive depth reached.")
|
||||
(return-from cognitive-loop nil))
|
||||
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
|
||||
(kernel-log "SYSTEM: Loop interrupted.")
|
||||
(bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
|
||||
(return-from cognitive-loop nil))
|
||||
(defun perceive-gate (signal)
|
||||
"Initial processing: Normalizes raw stimuli and updates memory."
|
||||
(let* ((payload (getf signal :payload))
|
||||
(type (getf signal :type))
|
||||
(sensor (getf payload :sensor)))
|
||||
(kernel-log "GATE [Perceive]: ~a (~a)" type (or sensor "no-sensor"))
|
||||
(snapshot-object-store)
|
||||
(cond ((eq type :EVENT)
|
||||
(case sensor
|
||||
(:buffer-update (let ((ast (getf payload :ast))) (when ast (ingest-ast ast))))
|
||||
(:point-update (let ((element (getf payload :element))) (when element (ingest-ast element))))
|
||||
(:interrupt (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* t)))))
|
||||
((eq type :RESPONSE)
|
||||
(kernel-log "GATE [Perceive]: Act Result -> ~a" (getf payload :status))))
|
||||
(setf (getf signal :status) :perceived)
|
||||
signal))
|
||||
|
||||
(handler-case
|
||||
(let* ((start-time (get-internal-real-time))
|
||||
(type (getf raw-message :type))
|
||||
(perceive-fn (find-symbol "PERCEIVE" :org-agent))
|
||||
(context (if perceive-fn (funcall perceive-fn raw-message) raw-message)))
|
||||
(snapshot-object-store)
|
||||
(if (eq type :REQUEST)
|
||||
(dispatch-action raw-message context)
|
||||
(let* ((skill (find-triggered-skill context))
|
||||
(skill-name (when skill (skill-name skill)))
|
||||
(proposed-action (think context))
|
||||
(approved-action (decide proposed-action context))
|
||||
(status (if (and proposed-action (null approved-action)) :rejected :success))
|
||||
(duration (- (get-internal-real-time) start-time)))
|
||||
(when skill-name (kernel-track-telemetry skill-name duration status))
|
||||
(defun neuro-gate (signal)
|
||||
"System 1: Intuition and proposed actions."
|
||||
(unless (eq (getf signal :type) :EVENT)
|
||||
(return-from neuro-gate signal))
|
||||
(kernel-log "GATE [Neuro]: Consulting System 1...")
|
||||
(let ((thought (think signal)))
|
||||
(setf (getf signal :proposals) (if thought (list thought) nil))
|
||||
(setf (getf signal :status) :thought)
|
||||
signal))
|
||||
|
||||
(let* ((payload (getf approved-action :payload))
|
||||
(target (getf approved-action :target))
|
||||
(action (or (getf payload :action) (getf approved-action :action)))
|
||||
(tool-name (or (getf payload :tool) (getf approved-action :tool)))
|
||||
(tool-args (or (getf payload :args) (getf approved-action :args))))
|
||||
(if (and approved-action (eq target :tool) (eq action :call))
|
||||
;; Internal Tool Execution
|
||||
(let* ((tool (gethash (string-downcase (string tool-name)) *cognitive-tools*)))
|
||||
(if tool
|
||||
(progn
|
||||
(kernel-log "SYSTEM 2: Executing tool '~a'..." tool-name)
|
||||
(handler-case
|
||||
(let* ((clean-args (if (and (listp tool-args) (listp (car tool-args))) (car tool-args) tool-args))
|
||||
(tool-result (funcall (cognitive-tool-body tool) clean-args))
|
||||
(next-stimulus `(:type :EVENT :payload (:sensor :tool-output :result ,tool-result :tool ,tool-name))))
|
||||
(when (getf raw-message :reply-stream) (setf (getf next-stimulus :reply-stream) (getf raw-message :reply-stream)))
|
||||
(cognitive-loop next-stimulus (1+ depth)))
|
||||
(error (c)
|
||||
(kernel-log "SYSTEM ERROR: Tool '~a' failed: ~a" tool-name c)
|
||||
(let ((err-stimulus `(:type :EVENT :payload (:sensor :tool-error :tool ,tool-name :message ,(format nil "~a" c)))))
|
||||
(when (getf raw-message :reply-stream) (setf (getf err-stimulus :reply-stream) (getf raw-message :reply-stream)))
|
||||
(cognitive-loop err-stimulus (1+ depth))))))
|
||||
(progn
|
||||
(kernel-log "SYSTEM ERROR: Tool '~a' not found in registry." tool-name)
|
||||
(let ((err-stimulus `(:type :EVENT :payload (:sensor :tool-error :message "Tool not found"))))
|
||||
(when (getf raw-message :reply-stream) (setf (getf err-stimulus :reply-stream) (getf raw-message :reply-stream)))
|
||||
(cognitive-loop err-stimulus (1+ depth))))))
|
||||
(defun consensus-gate (signal)
|
||||
"Resolves multiple proposals into a single candidate action."
|
||||
(let ((proposals (getf signal :proposals)))
|
||||
(setf (getf signal :candidate) (first proposals))
|
||||
(setf (getf signal :status) :consensus)
|
||||
signal))
|
||||
|
||||
;; Physical Actuation (Emacs, Shell, etc.)
|
||||
(let ((result (dispatch-action approved-action context)))
|
||||
(when (and result (not (member target '(:emacs :system-message))))
|
||||
(let ((fallback-stimulus `(:type :EVENT :payload (:sensor :tool-output :result ,result :tool ,approved-action))))
|
||||
(when (getf raw-message :reply-stream) (setf (getf fallback-stimulus :reply-stream) (getf raw-message :reply-stream)))
|
||||
(cognitive-loop fallback-stimulus (1+ depth))))))))))
|
||||
(error (c)
|
||||
(kernel-log "LOOP CRASH - Error in recursive turn: ~a~%" c)
|
||||
;; IMMUNE SYSTEM: Inject loop failure as a new stimulus if not too deep
|
||||
;; And ensure we are not already handling an error to prevent infinite recursion
|
||||
(let ((sensor (ignore-errors (getf (getf raw-message :payload) :sensor))))
|
||||
(unless (or (> depth 2) (member sensor '(:loop-error :tool-error)))
|
||||
(inject-stimulus `(:type :EVENT :payload (:sensor :loop-error :message ,(format nil "~a" c) :depth ,depth))
|
||||
:stream (getf raw-message :reply-stream)
|
||||
:depth (1+ depth))))
|
||||
nil)))
|
||||
(defun decide-gate (signal)
|
||||
"System 2: Safety and validation."
|
||||
(let ((candidate (getf signal :candidate)))
|
||||
(if candidate
|
||||
(let ((approved (decide candidate signal)))
|
||||
(setf (getf signal :approved-action) approved)
|
||||
(unless approved (kernel-log "GATE [Decide]: REJECTED by System 2")))
|
||||
(setf (getf signal :approved-action) nil))
|
||||
(setf (getf signal :status) :decided)
|
||||
signal))
|
||||
|
||||
(defun perceive (raw-message)
|
||||
"Initial processing of raw stimuli, updating the Object Store if needed."
|
||||
(handler-case
|
||||
(let ((type (getf raw-message :type)) (payload (getf raw-message :payload)))
|
||||
(kernel-log "PERCEIVE: ~a (~a)" type (or (getf payload :sensor) "no-sensor"))
|
||||
(cond ((eq type :EVENT) (let ((sensor (getf payload :sensor)))
|
||||
(case sensor
|
||||
(:buffer-update (let ((ast (getf payload :ast))) (when ast (ingest-ast ast))))
|
||||
(:point-update (let ((element (getf payload :element))) (when element (ingest-ast element))))
|
||||
(:interrupt (bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* t))))))
|
||||
((eq type :RESPONSE)
|
||||
(kernel-log "ACT RESULT: ~a~%PAYLOAD: ~s~%" (getf payload :status) payload)))
|
||||
raw-message)
|
||||
(error (c)
|
||||
(kernel-log "PERCEIVE ERROR: Malformed stimulus received: ~a" c)
|
||||
nil)))
|
||||
(defun dispatch-gate (signal)
|
||||
"Final Stage: Actuation and feedback generation."
|
||||
(let* ((approved (getf signal :approved-action))
|
||||
(type (getf signal :type))
|
||||
(depth (getf signal :depth 0))
|
||||
(feedback nil))
|
||||
(case type
|
||||
(:REQUEST (dispatch-action signal signal))
|
||||
(:EVENT
|
||||
(when approved
|
||||
(let* ((payload (getf approved :payload))
|
||||
(target (getf approved :target))
|
||||
(action (or (getf payload :action) (getf approved :action)))
|
||||
(tool-name (or (getf payload :tool) (getf approved :tool)))
|
||||
(tool-args (or (getf payload :args) (getf approved :args))))
|
||||
(if (and (eq target :tool) (eq action :call))
|
||||
(let ((tool (gethash (string-downcase (string tool-name)) *cognitive-tools*)))
|
||||
(if tool
|
||||
(handler-case
|
||||
(let* ((clean-args (if (and (listp tool-args) (listp (car tool-args))) (car tool-args) tool-args))
|
||||
(result (funcall (cognitive-tool-body tool) clean-args)))
|
||||
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
|
||||
:payload (list :sensor :tool-output :result result :tool tool-name))))
|
||||
(error (c)
|
||||
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
|
||||
:payload (list :sensor :tool-error :tool tool-name :message (format nil "~a" c))))))
|
||||
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
|
||||
:payload (list :sensor :tool-error :message "Tool not found")))))
|
||||
(let ((result (dispatch-action approved signal)))
|
||||
(when (and result (not (member target '(:emacs :system-message))))
|
||||
(setf feedback (list :type :EVENT :depth (1+ depth) :reply-stream (getf signal :reply-stream)
|
||||
:payload (list :sensor :tool-output :result result :tool approved))))))))))
|
||||
(setf (getf signal :status) :dispatched)
|
||||
feedback))
|
||||
|
||||
(defun process-signal (signal)
|
||||
"The entry point to the Reactive Signal Pipeline."
|
||||
(let ((current-signal signal))
|
||||
(loop while current-signal do
|
||||
(let ((depth (getf current-signal :depth 0)))
|
||||
(when (> depth 10)
|
||||
(kernel-log "PIPELINE ERROR: Max depth reached.")
|
||||
(return nil))
|
||||
(when (bt:with-lock-held (*interrupt-lock*) *interrupt-flag*)
|
||||
(kernel-log "PIPELINE: Interrupted.")
|
||||
(bt:with-lock-held (*interrupt-lock*) (setf *interrupt-flag* nil))
|
||||
(return nil))
|
||||
|
||||
(handler-case
|
||||
(progn
|
||||
(setf current-signal (perceive-gate current-signal))
|
||||
(setf current-signal (neuro-gate current-signal))
|
||||
(setf current-signal (consensus-gate current-signal))
|
||||
(setf current-signal (decide-gate current-signal))
|
||||
(setf current-signal (dispatch-gate current-signal)))
|
||||
(error (c)
|
||||
(kernel-log "PIPELINE CRASH: ~a" c)
|
||||
(let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
|
||||
(if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
|
||||
(setf current-signal nil)
|
||||
(setf current-signal (list :type :EVENT :depth (1+ depth) :reply-stream (getf current-signal :reply-stream)
|
||||
:payload (list :sensor :loop-error :message (format nil "~a" c) :depth depth)))))))))))
|
||||
|
||||
(defvar *heartbeat-thread* nil)
|
||||
|
||||
(defun start-heartbeat (&optional (interval 60))
|
||||
"Spawns a thread that periodically injects a heartbeat stimulus."
|
||||
(setf *heartbeat-thread* (bt:make-thread (lambda () (loop (sleep interval) (kernel-log "KERNEL: Heartbeat pulse...")
|
||||
(inject-stimulus `(:type :EVENT :payload (:sensor :heartbeat :unix-time ,(get-universal-time)))))) :name "org-agent-heartbeat")))
|
||||
(setf *heartbeat-thread*
|
||||
(bt:make-thread
|
||||
(lambda ()
|
||||
(loop
|
||||
(sleep interval)
|
||||
(kernel-log "KERNEL: Heartbeat pulse...")
|
||||
(inject-stimulus (list :type :EVENT :payload (list :sensor :heartbeat :unix-time (get-universal-time))))))
|
||||
:name "org-agent-heartbeat")))
|
||||
|
||||
(defun stop-heartbeat () (when (and *heartbeat-thread* (bt:thread-alive-p *heartbeat-thread*)) (bt:destroy-thread *heartbeat-thread*) (setf *heartbeat-thread* nil)))
|
||||
(defun stop-heartbeat ()
|
||||
"Gracefully terminates the heartbeat pulse thread."
|
||||
(defun load-all-skills ()
|
||||
"Scans the skills directory and hot-loads them in dependency order."
|
||||
"Scans the directory defined by SKILLS_DIR and hot-loads skills using topological order."
|
||||
(when (and *heartbeat-thread* (bt:thread-alive-p *heartbeat-thread*))
|
||||
(bt:destroy-thread *heartbeat-thread*)
|
||||
(setf *heartbeat-thread* nil)))
|
||||
|
||||
(defun load-all-skills ()
|
||||
"Scans the directory defined by SKILLS_DIR and hot-loads skills using topological order."
|
||||
(let* ((env-path (uiop:getenv "SKILLS_DIR"))
|
||||
(skills-dir-str (or env-path (namestring (merge-pathnames "notes/" (user-homedir-pathname)))))
|
||||
(resolved-path (context-resolve-path skills-dir-str))
|
||||
@@ -208,29 +233,20 @@
|
||||
(loop
|
||||
(handler-case
|
||||
(progn
|
||||
;; 1. Skip leading whitespace/newlines
|
||||
(loop for char = (peek-char nil stream nil :eof)
|
||||
while (and (not (eq char :eof)) (member char '(#\Space #\Newline #\Return #\Tab)))
|
||||
do (read-char stream))
|
||||
|
||||
(let ((peek (peek-char nil stream nil :eof)))
|
||||
(if (eq peek :eof) (return))
|
||||
(let* ((len-prefix (make-string 6)))
|
||||
;; 2. Read the 6-character length prefix
|
||||
(unless (read-sequence len-prefix stream)
|
||||
(return))
|
||||
(unless (read-sequence len-prefix stream) (return))
|
||||
(let* ((len (parse-integer len-prefix :radix 16))
|
||||
(msg-payload (make-string len)))
|
||||
;; 3. Read the actual message payload
|
||||
(unless (read-sequence msg-payload stream)
|
||||
(return))
|
||||
;; 4. Parse and process
|
||||
(unless (read-sequence msg-payload stream) (return))
|
||||
(let ((msg (read-from-string msg-payload)))
|
||||
(kernel-log "DAEMON: Received stimulus (~a characters)~%" len)
|
||||
(inject-stimulus msg :stream stream))))))
|
||||
(error (c)
|
||||
(kernel-log "DAEMON CLIENT ERROR: ~a~%" c)
|
||||
(return))))
|
||||
(error (c) (kernel-log "DAEMON CLIENT ERROR: ~a~%" c) (return))))
|
||||
(kernel-log "DAEMON: Client disconnected.~%")
|
||||
(unregister-emacs-client stream)
|
||||
(ignore-errors (close stream))))
|
||||
@@ -267,5 +283,4 @@
|
||||
(let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL") :junk-allowed t)) 60)))
|
||||
(format t "KERNEL: Heartbeat interval set to ~a seconds.~%" interval)
|
||||
(start-daemon :interval interval))
|
||||
;; Keep the process alive.
|
||||
(loop (sleep 3600)))
|
||||
|
||||
@@ -45,12 +45,13 @@
|
||||
#:context-get-skill-telemetry
|
||||
#:context-assemble-global-awareness
|
||||
|
||||
;; --- Cognitive Loop & Event Bus ---
|
||||
#:perceive
|
||||
#:think
|
||||
#:decide
|
||||
#:act
|
||||
#:cognitive-loop
|
||||
;; --- Reactive Signal Pipeline ---
|
||||
#:process-signal
|
||||
#:perceive-gate
|
||||
#:neuro-gate
|
||||
#:consensus-gate
|
||||
#:decide-gate
|
||||
#:dispatch-gate
|
||||
#:inject-stimulus
|
||||
#:dispatch-action
|
||||
#:register-actuator
|
||||
|
||||
Reference in New Issue
Block a user