From fcda723577a8b794ab1d98ad2de999099f050050 Mon Sep 17 00:00:00 2001 From: Glenn Thompson Date: Tue, 3 Mar 2026 18:15:31 +0300 Subject: [PATCH] feat: Broadcast buffer, sequential playlist, ICY metadata - Broadcast buffer: single-producer multi-consumer ring buffer with per-client read cursors. 32KB burst-on-connect for fast playback. Never blocks producer (overwrites old data for slow clients). - Sequential playlist: play-list runs tracks one at a time using Harmony's on-end callback + condition variable for completion. - ICY metadata: set-now-playing called on each track change. - Fixed string vs pathname bug in harmony:play (etypecase mismatch). - Debug logging for client disconnect diagnosis. Verified: browser plays shuffled FLAC playlist via 128kbps MP3 stream. --- cl-streamer/buffer.lisp | 115 ++++++++++++++++--------------- cl-streamer/harmony-backend.lisp | 56 +++++++++++++-- cl-streamer/package.lisp | 8 ++- cl-streamer/stream-server.lisp | 40 +++++++---- cl-streamer/test-stream.lisp | 35 +++++++--- 5 files changed, 166 insertions(+), 88 deletions(-) diff --git a/cl-streamer/buffer.lisp b/cl-streamer/buffer.lisp index 0f8f07c..f5688ef 100644 --- a/cl-streamer/buffer.lisp +++ b/cl-streamer/buffer.lisp @@ -1,84 +1,87 @@ (in-package #:cl-streamer) -(defclass ring-buffer () +;;; ---- Broadcast Ring Buffer ---- +;;; Single-producer, multi-consumer circular buffer. +;;; The writer advances write-pos; each client has its own read cursor. +;;; Old data is overwritten when the buffer wraps — slow clients lose data +;;; rather than blocking the producer (appropriate for live streaming). + +(defclass broadcast-buffer () ((data :initarg :data :accessor buffer-data) (size :initarg :size :reader buffer-size) - (read-pos :initform 0 :accessor buffer-read-pos) (write-pos :initform 0 :accessor buffer-write-pos) - (lock :initform (bt:make-lock "ring-buffer-lock") :reader buffer-lock) + (lock :initform (bt:make-lock "broadcast-buffer-lock") :reader buffer-lock) (not-empty :initform (bt:make-condition-variable :name "buffer-not-empty") :reader buffer-not-empty) - (not-full :initform (bt:make-condition-variable :name "buffer-not-full") - :reader buffer-not-full))) + (burst-size :initarg :burst-size :reader buffer-burst-size + :initform (* 32 1024) + :documentation "Bytes of recent data to send on new client connect"))) (defun make-ring-buffer (size) - "Create a ring buffer with SIZE bytes capacity." - (make-instance 'ring-buffer + "Create a broadcast ring buffer with SIZE bytes capacity." + (make-instance 'broadcast-buffer :data (make-array size :element-type '(unsigned-byte 8)) :size size)) -(defun %buffer-available (buffer) - "Internal: bytes available to read. Caller must hold lock." - (let ((write (buffer-write-pos buffer)) - (read (buffer-read-pos buffer)) - (size (buffer-size buffer))) - (mod (- write read) size))) - -(defun buffer-available (buffer) - "Return the number of bytes available to read." - (bt:with-lock-held ((buffer-lock buffer)) - (%buffer-available buffer))) - -(defun %buffer-free-space (buffer) - "Internal: bytes available to write. Caller must hold lock." - (- (buffer-size buffer) (%buffer-available buffer) 1)) - -(defun buffer-free-space (buffer) - "Return the number of bytes available to write." - (bt:with-lock-held ((buffer-lock buffer)) - (%buffer-free-space buffer))) - (defun buffer-write (buffer data &key (start 0) (end (length data))) - "Write bytes from DATA to BUFFER. Blocks if buffer is full." + "Write bytes into the broadcast buffer. Never blocks; overwrites old data." (let ((len (- end start))) - (bt:with-lock-held ((buffer-lock buffer)) - (when (> len 0) - (loop while (< (%buffer-free-space buffer) len) - do (bt:condition-wait (buffer-not-full buffer) (buffer-lock buffer))) + (when (> len 0) + (bt:with-lock-held ((buffer-lock buffer)) (let ((write-pos (buffer-write-pos buffer)) (size (buffer-size buffer)) (buf-data (buffer-data buffer))) (loop for i from start below end - for j = write-pos then (mod (1+ j) size) + for j = (mod write-pos size) then (mod (1+ j) size) do (setf (aref buf-data j) (aref data i)) - finally (setf (buffer-write-pos buffer) (mod (1+ j) size)))) + finally (setf (buffer-write-pos buffer) (+ write-pos len)))) (bt:condition-notify (buffer-not-empty buffer)))) len)) -(defun buffer-read (buffer output &key (start 0) (end (length output)) (blocking t)) - "Read bytes from BUFFER into OUTPUT. Returns number of bytes read. - If BLOCKING is T, waits for data. Otherwise returns 0 if empty." +(defun buffer-read-from (buffer read-pos output &key (start 0) (end (length output))) + "Read bytes from BUFFER starting at READ-POS into OUTPUT. + Returns (values bytes-read new-read-pos). + READ-POS is the client's absolute position in the stream." (let ((requested (- end start))) (bt:with-lock-held ((buffer-lock buffer)) - (when blocking - (loop while (zerop (%buffer-available buffer)) - do (bt:condition-wait (buffer-not-empty buffer) (buffer-lock buffer)))) - (let* ((available (%buffer-available buffer)) - (to-read (min requested available)) - (read-pos (buffer-read-pos buffer)) + (let* ((write-pos (buffer-write-pos buffer)) (size (buffer-size buffer)) - (buf-data (buffer-data buffer))) - (when (> to-read 0) - (loop for i from start below (+ start to-read) - for j = read-pos then (mod (1+ j) size) - do (setf (aref output i) (aref buf-data j)) - finally (setf (buffer-read-pos buffer) (mod (1+ j) size))) - (bt:condition-notify (buffer-not-full buffer))) - to-read)))) + (buf-data (buffer-data buffer)) + ;; Clamp read-pos: if client is too far behind, skip ahead + (oldest-available (max 0 (- write-pos size))) + (effective-read (max read-pos oldest-available)) + (available (- write-pos effective-read)) + (to-read (min requested available))) + (if (> to-read 0) + (progn + (loop for i from start below (+ start to-read) + for j = (mod effective-read size) then (mod (1+ j) size) + do (setf (aref output i) (aref buf-data j))) + (values to-read (+ effective-read to-read))) + (values 0 effective-read)))))) + +(defun buffer-wait-for-data (buffer read-pos) + "Block until new data is available past READ-POS." + (bt:with-lock-held ((buffer-lock buffer)) + (loop while (<= (buffer-write-pos buffer) read-pos) + do (bt:condition-wait (buffer-not-empty buffer) (buffer-lock buffer))))) + +(defun buffer-current-pos (buffer) + "Return the current write position (for new client burst start)." + (bt:with-lock-held ((buffer-lock buffer)) + (buffer-write-pos buffer))) + +(defun buffer-burst-start (buffer) + "Return a read position that gives BURST-SIZE bytes of recent data. + This lets new clients start playing immediately." + (bt:with-lock-held ((buffer-lock buffer)) + (let* ((write-pos (buffer-write-pos buffer)) + (size (buffer-size buffer)) + (oldest (max 0 (- write-pos size))) + (burst-start (max oldest (- write-pos (buffer-burst-size buffer))))) + burst-start))) (defun buffer-clear (buffer) - "Clear all data from the buffer." + "Clear the buffer." (bt:with-lock-held ((buffer-lock buffer)) - (setf (buffer-read-pos buffer) 0 - (buffer-write-pos buffer) 0) - (bt:condition-notify (buffer-not-full buffer)))) + (setf (buffer-write-pos buffer) 0))) diff --git a/cl-streamer/harmony-backend.lisp b/cl-streamer/harmony-backend.lisp index 7932bac..18e7f66 100644 --- a/cl-streamer/harmony-backend.lisp +++ b/cl-streamer/harmony-backend.lisp @@ -7,6 +7,7 @@ #:start-pipeline #:stop-pipeline #:play-file + #:play-list #:pipeline-encoder #:pipeline-server #:make-streaming-server)) @@ -123,15 +124,58 @@ (log:info "Audio pipeline stopped") pipeline) -(defun play-file (pipeline file-path &key (mixer :music)) +(defun play-file (pipeline file-path &key (mixer :music) title (on-end :free)) "Play an audio file through the pipeline. - The file will be decoded by Harmony and encoded for streaming." - (let* ((server (pipeline-harmony-server pipeline)) - (harmony:*server* server)) - (let ((voice (harmony:play file-path :mixer mixer))) - (log:info "Playing: ~A" file-path) + The file will be decoded by Harmony and encoded for streaming. + If TITLE is given, update ICY metadata with it. + FILE-PATH can be a string or pathname. + ON-END is passed to harmony:play (default :free)." + (let* ((path (pathname file-path)) + (server (pipeline-harmony-server pipeline)) + (harmony:*server* server) + (display-title (or title (pathname-name path)))) + ;; Update ICY metadata so listeners see the track name + (cl-streamer:set-now-playing (pipeline-mount-path pipeline) display-title) + (let ((voice (harmony:play path :mixer mixer :on-end on-end))) + (log:info "Now playing: ~A" display-title) voice))) +(defun play-list (pipeline file-list &key (gap 0.5)) + "Play a list of file paths sequentially through the pipeline. + Each entry can be a string (path) or a plist (:file path :title title). + GAP is seconds of silence between tracks." + (bt:make-thread + (lambda () + (loop for entry in file-list + while (pipeline-running-p pipeline) + do (multiple-value-bind (path title) + (if (listp entry) + (values (getf entry :file) (getf entry :title)) + (values entry nil)) + (handler-case + (let* ((done-lock (bt:make-lock "track-done")) + (done-cv (bt:make-condition-variable :name "track-done")) + (done-p nil) + (server (pipeline-harmony-server pipeline)) + (harmony:*server* server) + (voice (play-file pipeline path + :title title + :on-end (lambda (voice) + (declare (ignore voice)) + (bt:with-lock-held (done-lock) + (setf done-p t) + (bt:condition-notify done-cv)))))) + (declare (ignore voice)) + ;; Wait for the track to finish via callback + (bt:with-lock-held (done-lock) + (loop until (or done-p (not (pipeline-running-p pipeline))) + do (bt:condition-wait done-cv done-lock))) + (when (> gap 0) (sleep gap))) + (error (e) + (log:warn "Error playing ~A: ~A" path e) + (sleep 1)))))) + :name "cl-streamer-playlist")) + (declaim (inline float-to-s16)) (defun float-to-s16 (sample) "Convert a float sample (-1.0 to 1.0) to signed 16-bit integer." diff --git a/cl-streamer/package.lisp b/cl-streamer/package.lisp index c137917..ca1fce1 100644 --- a/cl-streamer/package.lisp +++ b/cl-streamer/package.lisp @@ -7,11 +7,13 @@ #:encoding-error ;; Buffer - #:ring-buffer + #:broadcast-buffer #:make-ring-buffer #:buffer-write - #:buffer-read - #:buffer-available + #:buffer-read-from + #:buffer-wait-for-data + #:buffer-current-pos + #:buffer-burst-start #:buffer-clear ;; ICY Protocol diff --git a/cl-streamer/stream-server.lisp b/cl-streamer/stream-server.lisp index c4c7860..c14c491 100644 --- a/cl-streamer/stream-server.lisp +++ b/cl-streamer/stream-server.lisp @@ -29,6 +29,8 @@ (mount :initarg :mount :accessor client-mount) (wants-metadata :initarg :wants-metadata :accessor client-wants-metadata-p) (bytes-since-meta :initform 0 :accessor client-bytes-since-meta) + (read-pos :initform 0 :accessor client-read-pos + :documentation "Client's absolute position in the broadcast buffer") (thread :initform nil :accessor client-thread) (active :initform t :accessor client-active-p))) @@ -126,7 +128,9 @@ (let ((mount (gethash path (server-mounts server)))) (if mount (serve-stream server client-socket stream mount wants-meta) - (send-404 stream path))))) + (progn + (log:debug "404 for path: ~A" path) + (send-404 stream path)))))) (error (e) (log:debug "Client error: ~A" e) (ignore-errors (usocket:socket-close client-socket)))))) @@ -167,25 +171,33 @@ (log:info "Client disconnected from ~A" (mount-path mount))))) (defun stream-to-client (client) - "Stream audio data to a client, inserting metadata as needed." + "Stream audio data to a client from the broadcast buffer. + Starts with a burst of recent data for fast playback start." (let* ((mount (client-mount client)) (buffer (mount-buffer mount)) (stream (client-stream client)) (chunk-size 4096) (chunk (make-array chunk-size :element-type '(unsigned-byte 8)))) + ;; Start from burst position for fast playback + (setf (client-read-pos client) (buffer-burst-start buffer)) (loop while (client-active-p client) - do (let ((bytes-read (buffer-read buffer chunk :blocking t))) - (when (zerop bytes-read) - (sleep 0.01) - (return)) - (handler-case - (if (client-wants-metadata-p client) - (write-with-metadata client chunk bytes-read) - (write-sequence chunk stream :end bytes-read)) - (error () - (setf (client-active-p client) nil) - (return))) - (force-output stream))))) + do (multiple-value-bind (bytes-read new-pos) + (buffer-read-from buffer (client-read-pos client) chunk) + (if (zerop bytes-read) + ;; No data yet — wait for producer + (buffer-wait-for-data buffer (client-read-pos client)) + (progn + (setf (client-read-pos client) new-pos) + (handler-case + (progn + (if (client-wants-metadata-p client) + (write-with-metadata client chunk bytes-read) + (write-sequence chunk stream :end bytes-read)) + (force-output stream)) + (error (e) + (log:debug "Client stream error: ~A" e) + (setf (client-active-p client) nil) + (return))))))))) (defun write-with-metadata (client data length) "Write audio data with ICY metadata injection." diff --git a/cl-streamer/test-stream.lisp b/cl-streamer/test-stream.lisp index 6d68fd0..d11ec58 100644 --- a/cl-streamer/test-stream.lisp +++ b/cl-streamer/test-stream.lisp @@ -1,14 +1,15 @@ -;;; End-to-end streaming test +;;; End-to-end streaming test with playlist ;;; Usage: sbcl --load test-stream.lisp ;;; ;;; Then open http://localhost:8000/stream.mp3 in VLC or browser +;;; ICY metadata will show track names as they change. (push #p"/home/glenn/SourceCode/harmony/" asdf:*central-registry*) (push #p"/home/glenn/SourceCode/asteroid/cl-streamer/" asdf:*central-registry*) (ql:quickload '(:cl-streamer :cl-streamer/encoder :cl-streamer/harmony)) -(format t "~%=== CL-Streamer End-to-End Test ===~%") +(format t "~%=== CL-Streamer Playlist Test ===~%") (format t "LAME version: ~A~%" (cl-streamer::lame-version)) ;; 1. Create and start stream server @@ -39,17 +40,33 @@ (cl-streamer/harmony:start-pipeline *pipeline*) -;; 5. Play a test file -(format t "[5] Playing test file...~%") -(defvar *test-file* - #p"/home/glenn/SourceCode/asteroid/music/library/Amon_Tobin - Dark Jovian/01 Dark Jovian.flac") +;; 5. Build a playlist from the music library +(format t "[5] Building playlist from music library...~%") +(defvar *music-dir* #p"/home/glenn/SourceCode/asteroid/music/library/") -(cl-streamer/harmony:play-file *pipeline* *test-file*) -(cl-streamer:set-now-playing "/stream.mp3" "Amon Tobin - Dark Jovian") +(defvar *playlist* + (let ((files nil)) + (dolist (dir (directory (merge-pathnames "*/" *music-dir*))) + (dolist (flac (directory (merge-pathnames "**/*.flac" dir))) + (push (list :file (namestring flac) + :title (format nil "~A - ~A" + (car (last (pathname-directory flac))) + (pathname-name flac))) + files))) + ;; Shuffle and take first 10 tracks + (subseq (alexandria:shuffle (copy-list files)) + 0 (min 10 (length files))))) + +(format t "Queued ~A tracks:~%" (length *playlist*)) +(dolist (entry *playlist*) + (format t " ~A~%" (getf entry :title))) + +;; 6. Start playlist playback +(format t "~%[6] Starting playlist...~%") +(cl-streamer/harmony:play-list *pipeline* *playlist*) (format t "~%=== Stream is live! ===~%") (format t "Listen at: http://localhost:8000/stream.mp3~%") -(format t "Listeners: ~A~%" (cl-streamer:get-listener-count)) (format t "~%Press Enter to stop...~%") (read-line)