(in-package :passepartout) (defvar *gateway-configs* (make-hash-table :test 'equal) "Maps platform name to plist (:token :thread :interval :enabled)") (defvar *gateway-registry* (make-hash-table :test 'equal) "Maps platform name to plist (:poll-fn :send-fn :default-interval)") (defun telegram-get-token () (vault-get-secret :telegram)) (defun telegram-poll () "Polls Telegram for new messages and injects them into the harness." (let* ((token (telegram-get-token))) (when token (let* ((last-id (getf (gethash "telegram" *gateway-configs*) :last-update-id 0)) (url (format nil "https://api.telegram.org/bot~a/getUpdates?offset=~a" token (1+ last-id)))) (handler-case (let* ((response (dex:get url)) (json (cl-json:decode-json-from-string response)) (updates (cdr (assoc :result json)))) (dolist (update updates) (let* ((update-id (cdr (assoc :update--id update))) (message (cdr (assoc :message update))) (chat (cdr (assoc :chat message))) (chat-id (cdr (assoc :id chat))) (text (cdr (assoc :text message)))) (setf (getf (gethash "telegram" *gateway-configs*) :last-update-id) update-id) (when (and text chat-id) (log-message "TELEGRAM: Received message from ~a" chat-id) (unless (ignore-errors (hitl-handle-message text :telegram)) (stimulus-inject (list :type :EVENT :meta (list :source :telegram :chat-id (format nil "~a" chat-id)) :payload (list :sensor :user-input :text text)))))))) (error (c) (log-message "TELEGRAM POLL ERROR: ~a" c))))))) (defun telegram-send (action context) "Sends a message via Telegram." (declare (ignore context)) (let* ((payload (getf action :payload)) (meta (getf action :meta)) (chat-id (or (getf meta :chat-id) (getf payload :chat-id) (getf action :chat-id))) (text (or (getf payload :text) (getf action :text))) (token (telegram-get-token))) (when (and token chat-id text) (handler-case (let ((url (format nil "https://api.telegram.org/bot~a/sendMessage" token))) (dex:post url :headers '(("Content-Type" . "application/json")) :content (cl-json:encode-json-to-string `((chat_id . ,chat-id) (text . ,text))))) (error (c) (log-message "TELEGRAM ERROR: ~a" c)))))) (defun signal-get-account () (vault-get-secret :signal)) (defun signal-poll () "Polls Signal for new messages and injects them into the harness." (let ((account (signal-get-account))) (when account (handler-case (let* ((output (uiop:run-program (list "signal-cli" "-u" account "receive" "--json") :output :string :error-output :string :ignore-error-status t)) (lines (cl-ppcre:split "\\\\n" output))) (dolist (line lines) (when (and line (> (length line) 0)) (let* ((json (ignore-errors (cl-json:decode-json-from-string line))) (envelope (cdr (assoc :envelope json))) (source (cdr (assoc :source envelope))) (data-message (cdr (assoc :data-message envelope))) (text (cdr (assoc :message data-message)))) (when (and source text) (log-message "SIGNAL: Received message from ~a" source) (unless (ignore-errors (hitl-handle-message text :signal)) (stimulus-inject (list :type :EVENT :meta (list :source :signal :chat-id source) :payload (list :sensor :user-input :text text))))))))) (error (c) (log-message "SIGNAL POLL ERROR: ~a" c)))))) (defun signal-send (action context) "Sends a message via Signal." (declare (ignore context)) (let* ((payload (getf action :payload)) (meta (getf action :meta)) (chat-id (or (getf meta :chat-id) (getf payload :chat-id) (getf action :chat-id))) (text (or (getf payload :text) (getf action :text))) (account (signal-get-account))) (when (and account chat-id text) (handler-case (uiop:run-program (list "signal-cli" "-u" account "send" "-m" text chat-id) :output :string :error-output :string) (error (c) (log-message "SIGNAL ERROR: ~a" c)))))) (defun gateway-registry-initialize () "Registers all built-in gateway handlers." (setf (gethash "telegram" *gateway-registry*) (list :poll-fn #'telegram-poll :send-fn #'telegram-send :default-interval 3)) (setf (gethash "signal" *gateway-registry*) (list :poll-fn #'signal-poll :send-fn #'signal-send :default-interval 5))) (defun gateway-configured-p (platform) "Returns T if a platform has a stored token." (let ((config (gethash platform *gateway-configs*))) (and config (getf config :token)))) (defun gateway-active-p (platform) "Returns T if a platform's polling thread is alive." (let ((config (gethash platform *gateway-configs*))) (and config (getf config :thread) (bt:thread-alive-p (getf config :thread))))) (defun messaging-link (platform token) "Links a platform with a token and starts polling." (let ((platform-lc (string-downcase platform))) (unless (gethash platform-lc *gateway-registry*) (error "Unknown platform: ~a. Available: ~{~a~^, ~}" platform (loop for k being the hash-keys of *gateway-registry* collect k))) (when (or (null token) (zerop (length token))) (error "Token cannot be empty")) (log-message "MESSAGING: Linking to ~a..." platform-lc) (gateway-unlink platform-lc) (let* ((registry-entry (gethash platform-lc *gateway-registry*)) (interval (or (getf registry-entry :default-interval) 5))) (setf (gethash platform-lc *gateway-configs*) (list :token token :interval interval :enabled t)) (vault-set-secret (intern (string-upcase platform-lc) :keyword) token) (gateway-start platform-lc) (log-message "MESSAGING: Successfully linked ~a" platform-lc) (format t "Successfully linked ~a gateway. Token stored securely.~%" platform-lc) t))) (defun messaging-unlink (platform) "Unlinks a platform and stops its polling thread." (let ((platform-lc (string-downcase platform))) (gateway-stop platform-lc) (remhash platform-lc *gateway-configs*) (log-message "MESSAGING: Unlinked ~a" platform-lc) (format t "Successfully unlinked ~a gateway.~%" platform-lc) t)) (defun gateway-start (platform) "Starts the polling thread for a linked gateway." (let ((platform-lc (string-downcase platform))) (let ((config (gethash platform-lc *gateway-configs*))) (when (and config (getf config :enabled) (not (gateway-active-p platform-lc))) (let ((poll-fn (getf (gethash platform-lc *gateway-registry*) :poll-fn))) (when poll-fn (let ((interval (getf config :interval))) (setf (getf config :thread) (bt:make-thread (lambda () (loop (when (getf (gethash platform-lc *gateway-configs*) :enabled) (funcall poll-fn)) (sleep interval))) :name (format nil "passepartout-~a-gateway" platform-lc))) (log-message "MESSAGING: Started ~a polling (interval: ~as)" platform-lc interval)))))))) (defun gateway-stop (platform) "Stops the polling thread for a gateway." (let ((platform-lc (string-downcase platform))) (let ((config (gethash platform-lc *gateway-configs*))) (when (and config (getf config :thread)) (when (bt:thread-alive-p (getf config :thread)) (log-message "MESSAGING: Stopping ~a polling thread" platform-lc) (bt:destroy-thread (getf config :thread)))) (setf (getf config :thread) nil)))) (defun messaging-list () "Returns a list of all gateways with their status." (loop for platform being the hash-keys of *gateway-registry* collect (let ((configured (gateway-configured-p platform)) (active (gateway-active-p platform))) (list :platform platform :configured configured :active active)))) (defun messaging-list-print () "Prints a formatted table of gateways." (format t "~%") (format t " ~20@A ~12@A ~10@A~%" "PLATFORM" "CONFIGURED" "STATUS") (dolist (gw (messaging-list)) (format t " ~20@A ~12@A ~10@A~%" (getf gw :platform) (if (getf gw :configured) "yes" "no") (cond ((getf gw :active) "ACTIVE") ((getf gw :configured) "stopped") (t "not linked")))) (format t "~%")) (defun gateway-start-all () "Called at boot to start all configured gateways." (dolist (config (loop for platform being the hash-keys of *gateway-configs* collect (list platform (gethash platform *gateway-configs*)))) (destructuring-bind (platform config) config (when (and (getf config :enabled) (not (gateway-active-p platform))) (gateway-start platform))))) (register-actuator :telegram #'telegram-send) (register-actuator :signal #'signal-send) (defskill :passepartout-gateway-messaging :priority 150 :trigger (lambda (ctx) (declare (ignore ctx)) nil)) (gateway-registry-initialize) (gateway-start-all) (eval-when (:compile-toplevel :load-toplevel :execute) (ql:quickload :fiveam :silent t)) (defpackage :passepartout-gateway-messaging-tests (:use :cl :fiveam :passepartout) (:export #:messaging-suite)) (in-package :passepartout-gateway-messaging-tests) (def-suite messaging-suite :description "Verification of Gateway Messaging") (in-suite messaging-suite) (test test-gateway-registry-initialize "Contract 1: gateway-registry-initialize populates the registry." (clrhash passepartout::*gateway-registry*) (gateway-registry-initialize) (is (not (zerop (hash-table-count passepartout::*gateway-registry*)))) (is (getf (gethash "telegram" passepartout::*gateway-registry*) :configured)))