Phase 1: Define CLOS protocol layer for cl-streamer

- New cl-streamer/protocol.lisp with generic functions for server,
  pipeline, encoder, and hook protocols
- harmony-backend.lisp: convert defuns to defmethod on protocol generics,
  import/re-export protocol symbols, add hook system, backward-compat aliases
- encoder.lisp, aac-encoder.lisp: add encoder-encode/encoder-close methods
- package.lisp: export all protocol symbols
- cl-streamer.asd: add protocol.lisp to components

Runtime verified: audio streams, metadata displays, crossfades work.
This commit is contained in:
Glenn Thompson 2026-03-08 11:09:50 +03:00
parent 37a3b761db
commit c79cebcfa7
6 changed files with 334 additions and 26 deletions

View File

@ -135,3 +135,11 @@
(replace result chunk :start1 pos)
(incf pos (length chunk)))
result))))
;;; ---- Protocol Methods ----
(defmethod encoder-encode ((encoder aac-encoder) pcm-buffer num-samples)
(encode-aac-pcm encoder pcm-buffer num-samples))
(defmethod encoder-close ((encoder aac-encoder))
(close-aac-encoder encoder))

View File

@ -17,7 +17,8 @@
(:file "buffer")
(:file "icy-protocol")
(:file "stream-server")
(:file "cl-streamer")))
(:file "cl-streamer")
(:file "protocol")))
(asdf:defsystem #:cl-streamer/harmony
:description "Harmony audio backend for cl-streamer"

View File

@ -94,3 +94,14 @@
(defun lame-version ()
"Return the LAME library version string."
(get-lame-version))
;;; ---- Protocol Methods ----
(defmethod encoder-encode ((encoder mp3-encoder) pcm-buffer num-samples)
(encode-pcm-interleaved encoder pcm-buffer num-samples))
(defmethod encoder-flush ((encoder mp3-encoder))
(encode-flush encoder))
(defmethod encoder-close ((encoder mp3-encoder))
(close-encoder encoder))

View File

@ -2,23 +2,51 @@
(:use #:cl #:alexandria)
(:local-nicknames (#:harmony #:org.shirakumo.fraf.harmony)
(#:mixed #:org.shirakumo.fraf.mixed))
(:export #:audio-pipeline
#:make-audio-pipeline
#:add-pipeline-output
#:start-pipeline
#:stop-pipeline
#:play-file
#:play-list
#:pipeline-server
#:make-streaming-server
;; Track state & control
#:pipeline-current-track
#:pipeline-on-track-change
;; Import protocol generics — we define methods on these
(:import-from #:cl-streamer
#:pipeline-start
#:pipeline-stop
#:pipeline-running-p
#:pipeline-play-file
#:pipeline-play-list
#:pipeline-skip
#:pipeline-queue-files
#:pipeline-get-queue
#:pipeline-clear-queue
#:pipeline-current-track
#:pipeline-listener-count
#:pipeline-update-metadata
#:pipeline-add-hook
#:pipeline-remove-hook
#:pipeline-fire-hook)
(:export #:audio-pipeline
#:make-audio-pipeline
#:add-pipeline-output
;; Re-export protocol generics so callers can use cl-streamer/harmony:X
#:pipeline-start
#:pipeline-stop
#:pipeline-running-p
#:pipeline-play-file
#:pipeline-play-list
#:pipeline-skip
#:pipeline-queue-files
#:pipeline-get-queue
#:pipeline-clear-queue
#:pipeline-current-track
#:pipeline-listener-count
#:pipeline-update-metadata
#:pipeline-add-hook
#:pipeline-remove-hook
#:pipeline-fire-hook
;; Backward-compatible aliases (delegate to protocol generics)
#:start-pipeline
#:stop-pipeline
#:play-file
#:play-list
;; Harmony-specific (not in protocol)
#:pipeline-server
#:make-streaming-server
#:pipeline-on-track-change
#:pipeline-pending-playlist-path
#:pipeline-on-playlist-change
;; Metadata helpers
@ -111,9 +139,9 @@
(mount-path :initarg :mount-path :accessor pipeline-mount-path :initform "/stream.mp3")
(sample-rate :initarg :sample-rate :accessor pipeline-sample-rate :initform 44100)
(channels :initarg :channels :accessor pipeline-channels :initform 2)
(running :initform nil :accessor pipeline-running-p)
(running :initform nil :accessor %pipeline-running)
;; Track state
(current-track :initform nil :accessor pipeline-current-track
(current-track :initform nil :accessor %pipeline-current-track
:documentation "Plist of current track: (:title :artist :album :file :display-title)")
(on-track-change :initarg :on-track-change :initform nil
:accessor pipeline-on-track-change
@ -129,7 +157,10 @@
:documentation "Playlist path queued by scheduler, applied when tracks start playing")
(on-playlist-change :initarg :on-playlist-change :initform nil
:accessor pipeline-on-playlist-change
:documentation "Callback (lambda (pipeline playlist-path)) called when scheduler playlist starts")))
:documentation "Callback (lambda (pipeline playlist-path)) called when scheduler playlist starts")
;; Hook system
(hooks :initform (make-hash-table :test 'eq) :reader pipeline-hooks
:documentation "Hash table mapping event keywords to lists of hook functions")))
(defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3")
(sample-rate 44100) (channels 2))
@ -155,9 +186,9 @@
(make-instance 'streaming-drain :channels (pipeline-channels pipeline))))
(drain-add-output (pipeline-drain pipeline) encoder mount-path))
(defun start-pipeline (pipeline)
(defmethod pipeline-start ((pipeline audio-pipeline))
"Start the audio pipeline - initializes Harmony with our streaming drain."
(when (pipeline-running-p pipeline)
(when (%pipeline-running pipeline)
(error "Pipeline already running"))
(mixed:init)
(let* ((server (harmony:make-simple-server
@ -187,14 +218,14 @@
(mixed:add drain output))
(setf (pipeline-harmony-server pipeline) server)
(mixed:start server))
(setf (pipeline-running-p pipeline) t)
(setf (%pipeline-running pipeline) t)
(log:info "Audio pipeline started with streaming drain (~A outputs)"
(length (drain-outputs (pipeline-drain pipeline))))
pipeline)
(defun stop-pipeline (pipeline)
(defmethod pipeline-stop ((pipeline audio-pipeline))
"Stop the audio pipeline."
(setf (pipeline-running-p pipeline) nil)
(setf (%pipeline-running pipeline) nil)
(when (pipeline-harmony-server pipeline)
(mixed:end (pipeline-harmony-server pipeline))
(setf (pipeline-harmony-server pipeline) nil))
@ -203,12 +234,12 @@
;;; ---- Pipeline Control ----
(defun pipeline-skip (pipeline)
(defmethod pipeline-skip ((pipeline audio-pipeline))
"Skip the current track. The play-list loop will detect this and advance."
(setf (pipeline-skip-flag pipeline) t)
(log:info "Skip requested"))
(defun pipeline-queue-files (pipeline file-entries &key (position :end))
(defmethod pipeline-queue-files ((pipeline audio-pipeline) file-entries &key (position :end))
"Add file entries to the pipeline queue.
Each entry is a string (path) or plist (:file path :title title).
POSITION is :end (append) or :next (prepend)."
@ -220,12 +251,12 @@
(append (pipeline-file-queue pipeline) file-entries)))))
(log:info "Queued ~A files (~A)" (length file-entries) position))
(defun pipeline-get-queue (pipeline)
(defmethod pipeline-get-queue ((pipeline audio-pipeline))
"Get the current file queue (copy)."
(bt:with-lock-held ((pipeline-queue-lock pipeline))
(copy-list (pipeline-file-queue pipeline))))
(defun pipeline-clear-queue (pipeline)
(defmethod pipeline-clear-queue ((pipeline audio-pipeline))
"Clear the file queue."
(bt:with-lock-held ((pipeline-queue-lock pipeline))
(setf (pipeline-file-queue pipeline) nil))
@ -301,9 +332,13 @@
(dolist (output (drain-outputs (pipeline-drain pipeline)))
(cl-streamer:set-now-playing (cdr output) display-title)))
(defmethod pipeline-update-metadata ((pipeline audio-pipeline) title)
"Update ICY metadata on all mount points (protocol method)."
(update-all-mounts-metadata pipeline title))
(defun notify-track-change (pipeline track-info)
"Update pipeline state and fire the on-track-change callback."
(setf (pipeline-current-track pipeline) track-info)
(setf (%pipeline-current-track pipeline) track-info)
(when (pipeline-on-track-change pipeline)
(handler-case
(funcall (pipeline-on-track-change pipeline) pipeline track-info)
@ -421,7 +456,7 @@
(idx 0)
(remaining-list (list (copy-list file-list)))
(current-list (list (copy-list file-list))))
(loop while (pipeline-running-p pipeline)
(loop while (%pipeline-running pipeline)
for entry = (next-entry pipeline remaining-list current-list)
do (cond
;; No entry and loop mode: re-queue current playlist
@ -481,7 +516,7 @@
(ignore-errors (mixed:frame-position voice))
(ignore-errors (mixed:frame-count voice))
(ignore-errors (mixed:samplerate voice))))
(loop while (and (pipeline-running-p pipeline)
(loop while (and (%pipeline-running pipeline)
(not (mixed:done-p voice))
(not (pipeline-skip-flag pipeline)))
for remaining = (voice-remaining-seconds voice)
@ -514,3 +549,51 @@
(log:error "play-list thread crashed: ~A" e))))
:name "cl-streamer-playlist"))
;;; ---- Backward-Compatible Aliases ----
;;; These allow existing code using cl-streamer/harmony:start-pipeline etc.
;;; to continue working while we transition to the protocol generics.
(defun start-pipeline (pipeline)
"Start the audio pipeline. Alias for (pipeline-start pipeline)."
(pipeline-start pipeline))
(defun stop-pipeline (pipeline)
"Stop the audio pipeline. Alias for (pipeline-stop pipeline)."
(pipeline-stop pipeline))
;;; ---- Protocol Method Implementations ----
(defmethod pipeline-running-p ((pipeline audio-pipeline))
"Return T if the pipeline is currently running."
(%pipeline-running pipeline))
(defmethod pipeline-current-track ((pipeline audio-pipeline))
"Return the current track info plist, or NIL."
(%pipeline-current-track pipeline))
(defmethod pipeline-listener-count ((pipeline audio-pipeline) &optional mount)
"Return the listener count from the stream server."
(cl-streamer:get-listener-count mount))
;;; ---- Hook System ----
(defmethod pipeline-add-hook ((pipeline audio-pipeline) event function)
"Register FUNCTION to be called when EVENT occurs.
Events: :track-change, :playlist-change"
(push function (gethash event (pipeline-hooks pipeline)))
(log:debug "Hook added for ~A" event))
(defmethod pipeline-remove-hook ((pipeline audio-pipeline) event function)
"Remove FUNCTION from the hook list for EVENT."
(setf (gethash event (pipeline-hooks pipeline))
(remove function (gethash event (pipeline-hooks pipeline))))
(log:debug "Hook removed for ~A" event))
(defmethod pipeline-fire-hook ((pipeline audio-pipeline) event &rest args)
"Fire all hooks registered for EVENT."
(dolist (fn (gethash event (pipeline-hooks pipeline)))
(handler-case
(apply fn pipeline args)
(error (e)
(log:warn "Hook error (~A): ~A" event e)))))

View File

@ -55,4 +55,38 @@
;; AAC Encoder
#:make-aac-encoder
#:close-aac-encoder
#:encode-aac-pcm))
#:encode-aac-pcm
;; Protocol — Server
#:server-start
#:server-stop
#:server-running-p
#:server-add-mount
#:server-remove-mount
#:server-update-metadata
#:server-listener-count
#:server-write-audio
;; Protocol — Pipeline
#:pipeline-start
#:pipeline-stop
#:pipeline-running-p
#:pipeline-play-file
#:pipeline-play-list
#:pipeline-skip
#:pipeline-queue-files
#:pipeline-get-queue
#:pipeline-clear-queue
#:pipeline-current-track
#:pipeline-listener-count
#:pipeline-update-metadata
;; Protocol — Pipeline Hooks
#:pipeline-add-hook
#:pipeline-remove-hook
#:pipeline-fire-hook
;; Protocol — Encoder
#:encoder-encode
#:encoder-flush
#:encoder-close))

171
cl-streamer/protocol.lisp Normal file
View File

@ -0,0 +1,171 @@
;;;; protocol.lisp - Protocol definitions for cl-streamer
;;;; Defines the generic function protocol that decouples application code
;;;; from specific backend implementations (Harmony, encoders, server).
;;;;
;;;; Applications program against these generics; backends provide methods.
(in-package #:cl-streamer)
;;; ============================================================
;;; Stream Server Protocol
;;; ============================================================
;;; The stream server handles HTTP connections, mount points,
;;; ICY metadata injection, and client lifecycle.
(defgeneric server-start (server)
(:documentation "Start the stream server, begin accepting connections."))
(defgeneric server-stop (server)
(:documentation "Stop the stream server, disconnect all clients."))
(defgeneric server-running-p (server)
(:documentation "Return T if the server is currently running."))
(defgeneric server-add-mount (server path &key content-type bitrate name genre buffer-size)
(:documentation "Add a mount point to the server. Returns the mount-point object."))
(defgeneric server-remove-mount (server path)
(:documentation "Remove a mount point from the server."))
(defgeneric server-update-metadata (server path &key title url)
(:documentation "Update ICY metadata for a mount point."))
(defgeneric server-listener-count (server &optional path)
(:documentation "Return the number of connected listeners.
If PATH is given, count only listeners on that mount."))
(defgeneric server-write-audio (server mount-path data &key start end)
(:documentation "Write encoded audio data to a mount point's broadcast buffer."))
;;; ============================================================
;;; Audio Pipeline Protocol
;;; ============================================================
;;; The pipeline connects an audio source (e.g. Harmony) to
;;; encoders and the stream server. It manages playback,
;;; queueing, crossfading, and metadata propagation.
(defgeneric pipeline-start (pipeline)
(:documentation "Start the audio pipeline."))
(defgeneric pipeline-stop (pipeline)
(:documentation "Stop the audio pipeline."))
(defgeneric pipeline-running-p (pipeline)
(:documentation "Return T if the pipeline is currently running."))
(defgeneric pipeline-play-file (pipeline file-path &key title)
(:documentation "Play a single audio file through the pipeline.
Returns (values voice display-title track-info)."))
(defgeneric pipeline-play-list (pipeline file-list &key crossfade-duration
fade-in fade-out
loop-queue)
(:documentation "Play a list of files sequentially with crossfading.
Each entry can be a string (path) or plist (:file path :title title).
Runs in a background thread."))
(defgeneric pipeline-skip (pipeline)
(:documentation "Skip the currently playing track."))
(defgeneric pipeline-queue-files (pipeline file-entries &key position)
(:documentation "Add file entries to the playback queue.
POSITION is :end (append, default) or :next (prepend)."))
(defgeneric pipeline-get-queue (pipeline)
(:documentation "Return a copy of the current playback queue."))
(defgeneric pipeline-clear-queue (pipeline)
(:documentation "Clear the playback queue."))
(defgeneric pipeline-current-track (pipeline)
(:documentation "Return the current track info plist, or NIL.
Plist keys: :file :display-title :artist :title :album"))
(defgeneric pipeline-listener-count (pipeline &optional mount)
(:documentation "Return the listener count (delegates to the server)."))
(defgeneric pipeline-update-metadata (pipeline title)
(:documentation "Update ICY metadata on all mount points."))
;;; ============================================================
;;; Pipeline Hook Protocol
;;; ============================================================
;;; Hooks replace direct slot access for callbacks.
;;; Events: :track-change, :playlist-change
(defgeneric pipeline-add-hook (pipeline event function)
(:documentation "Register FUNCTION to be called when EVENT occurs.
Events:
:track-change (lambda (pipeline track-info))
:playlist-change (lambda (pipeline playlist-path))"))
(defgeneric pipeline-remove-hook (pipeline event function)
(:documentation "Remove FUNCTION from the hook list for EVENT."))
(defgeneric pipeline-fire-hook (pipeline event &rest args)
(:documentation "Fire all hooks registered for EVENT with ARGS.
Called internally by the pipeline implementation."))
;;; ============================================================
;;; Encoder Protocol
;;; ============================================================
;;; Encoders convert PCM audio data into a streaming format
;;; (MP3, AAC, Opus, etc).
(defgeneric encoder-encode (encoder pcm-buffer num-samples)
(:documentation "Encode PCM samples. Returns encoded byte vector.
PCM-BUFFER is a (signed-byte 16) array of interleaved stereo samples.
NUM-SAMPLES is the number of sample frames (not individual samples)."))
(defgeneric encoder-flush (encoder)
(:documentation "Flush any remaining data from the encoder. Returns byte vector."))
(defgeneric encoder-close (encoder)
(:documentation "Release encoder resources."))
;;; ============================================================
;;; Default method implementations
;;; ============================================================
;;; Server protocol — default methods on the existing stream-server class
;;; These delegate to the existing functions so nothing breaks.
(defmethod server-start ((server stream-server))
(start-server server))
(defmethod server-stop ((server stream-server))
(stop-server server))
(defmethod server-running-p ((server stream-server))
(slot-value server 'running))
(defmethod server-add-mount ((server stream-server) path
&key (content-type "audio/mpeg")
(bitrate 128)
(name "CL-Streamer")
(genre "Various")
(buffer-size (* 1024 1024)))
(add-mount server path
:content-type content-type
:bitrate bitrate
:name name
:genre genre
:buffer-size buffer-size))
(defmethod server-remove-mount ((server stream-server) path)
(remove-mount server path))
(defmethod server-update-metadata ((server stream-server) path &key title url)
(update-metadata server path :title title :url url))
(defmethod server-listener-count ((server stream-server) &optional path)
(listener-count server path))
(defmethod server-write-audio ((server stream-server) mount-path data
&key (start 0) (end (length data)))
(let ((mount (gethash mount-path (server-mounts server))))
(when mount
(buffer-write (mount-buffer mount) data :start start :end end))))
;;; Encoder protocol methods are defined in encoder.lisp and aac-encoder.lisp
;;; alongside their respective class definitions (separate ASDF subsystems).