feat: End-to-end streaming working! Custom streaming-drain + fixes
Major changes: - streaming-drain: Custom drain that captures PCM from Harmony's pack buffer (raw IEEE 754 floats in unsigned-byte-8 array), converts to signed-16 PCM via CFFI, encodes to MP3 via LAME, and writes to stream server's ring buffer - Fixed ring buffer deadlock: buffer-read/buffer-write held lock then called buffer-available which tried to acquire same lock. Created internal %buffer-available/%buffer-free-space without locking. - Fixed ring buffer zero-length guard for unbound variable in finally - Fixed sleep duration in drain: was dividing raw byte count by samplerate, now correctly converts to frames first - Added flexi-streams wrapper for bivalent HTTP socket I/O - Exported all public API symbols from cl-streamer package - Added test-stream.lisp end-to-end test script Verified: Amon Tobin FLAC -> 128kbps MP3 stream at localhost:8000 file reports: MPEG ADTS, layer III, v1, 128kbps, 44.1kHz, JntStereo
This commit is contained in:
parent
9d5166c562
commit
a9e8276e9a
|
|
@ -17,23 +17,33 @@
|
|||
:data (make-array size :element-type '(unsigned-byte 8))
|
||||
:size size))
|
||||
|
||||
(defun buffer-available (buffer)
|
||||
"Return the number of bytes available to read."
|
||||
(bt:with-lock-held ((buffer-lock buffer))
|
||||
(defun %buffer-available (buffer)
|
||||
"Internal: bytes available to read. Caller must hold lock."
|
||||
(let ((write (buffer-write-pos buffer))
|
||||
(read (buffer-read-pos buffer))
|
||||
(size (buffer-size buffer)))
|
||||
(mod (- write read) size))))
|
||||
(mod (- write read) size)))
|
||||
|
||||
(defun buffer-available (buffer)
|
||||
"Return the number of bytes available to read."
|
||||
(bt:with-lock-held ((buffer-lock buffer))
|
||||
(%buffer-available buffer)))
|
||||
|
||||
(defun %buffer-free-space (buffer)
|
||||
"Internal: bytes available to write. Caller must hold lock."
|
||||
(- (buffer-size buffer) (%buffer-available buffer) 1))
|
||||
|
||||
(defun buffer-free-space (buffer)
|
||||
"Return the number of bytes available to write."
|
||||
(- (buffer-size buffer) (buffer-available buffer) 1))
|
||||
(bt:with-lock-held ((buffer-lock buffer))
|
||||
(%buffer-free-space buffer)))
|
||||
|
||||
(defun buffer-write (buffer data &key (start 0) (end (length data)))
|
||||
"Write bytes from DATA to BUFFER. Blocks if buffer is full."
|
||||
(let ((len (- end start)))
|
||||
(bt:with-lock-held ((buffer-lock buffer))
|
||||
(loop while (< (buffer-free-space buffer) len)
|
||||
(when (> len 0)
|
||||
(loop while (< (%buffer-free-space buffer) len)
|
||||
do (bt:condition-wait (buffer-not-full buffer) (buffer-lock buffer)))
|
||||
(let ((write-pos (buffer-write-pos buffer))
|
||||
(size (buffer-size buffer))
|
||||
|
|
@ -42,7 +52,7 @@
|
|||
for j = write-pos then (mod (1+ j) size)
|
||||
do (setf (aref buf-data j) (aref data i))
|
||||
finally (setf (buffer-write-pos buffer) (mod (1+ j) size))))
|
||||
(bt:condition-notify (buffer-not-empty buffer)))
|
||||
(bt:condition-notify (buffer-not-empty buffer))))
|
||||
len))
|
||||
|
||||
(defun buffer-read (buffer output &key (start 0) (end (length output)) (blocking t))
|
||||
|
|
@ -51,18 +61,19 @@
|
|||
(let ((requested (- end start)))
|
||||
(bt:with-lock-held ((buffer-lock buffer))
|
||||
(when blocking
|
||||
(loop while (zerop (buffer-available buffer))
|
||||
(loop while (zerop (%buffer-available buffer))
|
||||
do (bt:condition-wait (buffer-not-empty buffer) (buffer-lock buffer))))
|
||||
(let* ((available (buffer-available buffer))
|
||||
(let* ((available (%buffer-available buffer))
|
||||
(to-read (min requested available))
|
||||
(read-pos (buffer-read-pos buffer))
|
||||
(size (buffer-size buffer))
|
||||
(buf-data (buffer-data buffer)))
|
||||
(when (> to-read 0)
|
||||
(loop for i from start below (+ start to-read)
|
||||
for j = read-pos then (mod (1+ j) size)
|
||||
do (setf (aref output i) (aref buf-data j))
|
||||
finally (setf (buffer-read-pos buffer) (mod (1+ j) size)))
|
||||
(bt:condition-notify (buffer-not-full buffer))
|
||||
(bt:condition-notify (buffer-not-full buffer)))
|
||||
to-read))))
|
||||
|
||||
(defun buffer-clear (buffer)
|
||||
|
|
|
|||
|
|
@ -13,6 +13,59 @@
|
|||
|
||||
(in-package #:cl-streamer/harmony)
|
||||
|
||||
;;; ---- Streaming Drain ----
|
||||
;;; Custom drain that captures PCM from Harmony's pack buffer
|
||||
;;; and feeds it to the encoder/stream server, replacing the
|
||||
;;; dummy drain which just discards audio data.
|
||||
|
||||
(defclass streaming-drain (mixed:drain)
|
||||
((encoder :initarg :encoder :accessor drain-encoder)
|
||||
(mount-path :initarg :mount-path :accessor drain-mount-path :initform "/stream.mp3")
|
||||
(channels :initarg :channels :accessor drain-channels :initform 2)))
|
||||
|
||||
(defmethod mixed:free ((drain streaming-drain)))
|
||||
|
||||
(defmethod mixed:start ((drain streaming-drain)))
|
||||
|
||||
(defmethod mixed:mix ((drain streaming-drain))
|
||||
"Read interleaved float PCM from the pack buffer, encode to MP3, write to stream.
|
||||
The pack buffer is (unsigned-byte 8) with IEEE 754 single-floats (4 bytes each).
|
||||
Layout: L0b0 L0b1 L0b2 L0b3 R0b0 R0b1 R0b2 R0b3 L1b0 ... (interleaved stereo)"
|
||||
(mixed:with-buffer-tx (data start size (mixed:pack drain))
|
||||
(when (> size 0)
|
||||
(let* ((channels (drain-channels drain))
|
||||
(bytes-per-sample 4) ; single-float = 4 bytes
|
||||
(total-floats (floor size bytes-per-sample))
|
||||
(num-samples (floor total-floats channels))
|
||||
(pcm-buffer (make-array (* num-samples channels)
|
||||
:element-type '(signed-byte 16))))
|
||||
;; Convert raw bytes -> single-float -> signed-16
|
||||
(cffi:with-pointer-to-vector-data (ptr data)
|
||||
(loop for i below (* num-samples channels)
|
||||
for byte-offset = (+ start (* i bytes-per-sample))
|
||||
for sample = (cffi:mem-ref ptr :float byte-offset)
|
||||
do (setf (aref pcm-buffer i) (float-to-s16 sample))))
|
||||
(handler-case
|
||||
(let ((mp3-data (cl-streamer:encode-pcm-interleaved
|
||||
(drain-encoder drain) pcm-buffer num-samples)))
|
||||
(when (> (length mp3-data) 0)
|
||||
(cl-streamer:write-audio-data (drain-mount-path drain) mp3-data)))
|
||||
(error (e)
|
||||
(log:warn "Encode error in drain: ~A" e)))))
|
||||
;; Sleep for the duration of audio we just processed
|
||||
;; size = bytes, each frame = channels * 4 bytes (single-float)
|
||||
(let* ((channels (drain-channels drain))
|
||||
(bytes-per-frame (* channels 4))
|
||||
(frames (floor size bytes-per-frame))
|
||||
(samplerate (mixed:samplerate (mixed:pack drain))))
|
||||
(when (> frames 0)
|
||||
(sleep (/ frames samplerate))))
|
||||
(mixed:finish size)))
|
||||
|
||||
(defmethod mixed:end ((drain streaming-drain)))
|
||||
|
||||
;;; ---- Audio Pipeline ----
|
||||
|
||||
(defclass audio-pipeline ()
|
||||
((harmony-server :initform nil :accessor pipeline-harmony-server)
|
||||
(encoder :initarg :encoder :accessor pipeline-encoder)
|
||||
|
|
@ -20,18 +73,7 @@
|
|||
(mount-path :initarg :mount-path :accessor pipeline-mount-path :initform "/stream.mp3")
|
||||
(sample-rate :initarg :sample-rate :accessor pipeline-sample-rate :initform 44100)
|
||||
(channels :initarg :channels :accessor pipeline-channels :initform 2)
|
||||
(running :initform nil :accessor pipeline-running-p)
|
||||
(encode-thread :initform nil :accessor pipeline-encode-thread)))
|
||||
|
||||
(defun make-streaming-server (&key (name "CL-Streamer") (samplerate 44100) (latency 0.02))
|
||||
"Create a Harmony server configured for streaming (no audio output).
|
||||
Uses :dummy drain so audio goes to buffer instead of speakers."
|
||||
(mixed:init)
|
||||
(harmony:make-simple-server :name name
|
||||
:samplerate samplerate
|
||||
:latency latency
|
||||
:drain :dummy
|
||||
:output-channels 2))
|
||||
(running :initform nil :accessor pipeline-running-p)))
|
||||
|
||||
(defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3")
|
||||
(sample-rate 44100) (channels 2))
|
||||
|
|
@ -44,25 +86,37 @@
|
|||
:channels channels))
|
||||
|
||||
(defun start-pipeline (pipeline)
|
||||
"Start the audio pipeline - initializes Harmony and begins encoding."
|
||||
"Start the audio pipeline - initializes Harmony with our streaming drain."
|
||||
(when (pipeline-running-p pipeline)
|
||||
(error "Pipeline already running"))
|
||||
(let ((server (make-streaming-server :samplerate (pipeline-sample-rate pipeline))))
|
||||
(mixed:init)
|
||||
(let* ((server (harmony:make-simple-server
|
||||
:name "CL-Streamer"
|
||||
:samplerate (pipeline-sample-rate pipeline)
|
||||
:latency 0.05
|
||||
:drain :dummy
|
||||
:output-channels (pipeline-channels pipeline)))
|
||||
(output (harmony:segment :output server))
|
||||
(old-drain (harmony:segment :drain output))
|
||||
(pack (mixed:pack old-drain))
|
||||
(drain (make-instance 'streaming-drain
|
||||
:encoder (pipeline-encoder pipeline)
|
||||
:mount-path (pipeline-mount-path pipeline)
|
||||
:channels (pipeline-channels pipeline))))
|
||||
;; Wire our streaming drain to the same pack buffer
|
||||
(setf (mixed:pack drain) pack)
|
||||
;; Swap: withdraw old dummy drain, add our streaming drain
|
||||
(mixed:withdraw old-drain output)
|
||||
(mixed:add drain output)
|
||||
(setf (pipeline-harmony-server pipeline) server)
|
||||
(mixed:start server))
|
||||
(setf (pipeline-running-p pipeline) t)
|
||||
(setf (pipeline-encode-thread pipeline)
|
||||
(bt:make-thread (lambda () (encode-loop pipeline))
|
||||
:name "cl-streamer-encode"))
|
||||
(log:info "Audio pipeline started")
|
||||
(log:info "Audio pipeline started with streaming drain")
|
||||
pipeline)
|
||||
|
||||
(defun stop-pipeline (pipeline)
|
||||
"Stop the audio pipeline."
|
||||
(setf (pipeline-running-p pipeline) nil)
|
||||
(when (pipeline-encode-thread pipeline)
|
||||
(ignore-errors (bt:join-thread (pipeline-encode-thread pipeline)))
|
||||
(setf (pipeline-encode-thread pipeline) nil))
|
||||
(when (pipeline-harmony-server pipeline)
|
||||
(mixed:end (pipeline-harmony-server pipeline))
|
||||
(setf (pipeline-harmony-server pipeline) nil))
|
||||
|
|
@ -78,45 +132,8 @@
|
|||
(log:info "Playing: ~A" file-path)
|
||||
voice)))
|
||||
|
||||
(defun get-pack-buffer (pipeline)
|
||||
"Get the packer's pack (bip-buffer) from Harmony's output chain."
|
||||
(let* ((server (pipeline-harmony-server pipeline))
|
||||
(output (harmony:segment :output server))
|
||||
(packer (harmony:segment :packer output)))
|
||||
(mixed:pack packer)))
|
||||
|
||||
(defun encode-loop (pipeline)
|
||||
"Main encoding loop - reads PCM from Harmony's packer, encodes, writes to stream."
|
||||
(let ((encoder (pipeline-encoder pipeline))
|
||||
(mount-path (pipeline-mount-path pipeline))
|
||||
(channels (pipeline-channels pipeline))
|
||||
(frame-size 1152))
|
||||
(loop while (pipeline-running-p pipeline)
|
||||
do (handler-case
|
||||
(let* ((pack (get-pack-buffer pipeline))
|
||||
(available (mixed:available-read pack))
|
||||
(needed (* frame-size channels 2)))
|
||||
(if (>= available needed)
|
||||
(multiple-value-bind (data start size)
|
||||
(mixed:request-read pack needed)
|
||||
(declare (ignore start))
|
||||
(when (and data (>= size needed))
|
||||
(let* ((samples (floor size (* channels 2)))
|
||||
(pcm-buffer (make-array (* samples channels)
|
||||
:element-type '(signed-byte 16))))
|
||||
(loop for i below (* samples channels)
|
||||
do (setf (aref pcm-buffer i)
|
||||
(let ((byte-offset (* i 2)))
|
||||
(logior (aref data byte-offset)
|
||||
(ash (let ((hi (aref data (1+ byte-offset))))
|
||||
(if (> hi 127) (- hi 256) hi))
|
||||
8)))))
|
||||
(mixed:finish-read pack size)
|
||||
(let ((mp3-data (cl-streamer::encode-pcm-interleaved
|
||||
encoder pcm-buffer samples)))
|
||||
(when (> (length mp3-data) 0)
|
||||
(cl-streamer::write-audio-data mount-path mp3-data))))))
|
||||
(sleep 0.005)))
|
||||
(error (e)
|
||||
(log:warn "Encode error: ~A" e)
|
||||
(sleep 0.1))))))
|
||||
(declaim (inline float-to-s16))
|
||||
(defun float-to-s16 (sample)
|
||||
"Convert a float sample (-1.0 to 1.0) to signed 16-bit integer."
|
||||
(let ((clamped (max -1.0 (min 1.0 sample))))
|
||||
(the (signed-byte 16) (round (* clamped 32767.0)))))
|
||||
|
|
|
|||
|
|
@ -34,5 +34,23 @@
|
|||
#:listener-count
|
||||
|
||||
;; Main API
|
||||
#:*server*
|
||||
#:*default-port*
|
||||
#:*default-metaint*))
|
||||
#:*default-metaint*
|
||||
#:start
|
||||
#:stop
|
||||
#:write-audio-data
|
||||
#:set-now-playing
|
||||
#:get-listener-count
|
||||
|
||||
;; Encoder
|
||||
#:make-mp3-encoder
|
||||
#:close-encoder
|
||||
#:encode-pcm-interleaved
|
||||
#:encode-flush
|
||||
#:lame-version
|
||||
|
||||
;; AAC Encoder
|
||||
#:make-aac-encoder
|
||||
#:close-aac-encoder
|
||||
#:encode-aac-pcm))
|
||||
|
|
|
|||
|
|
@ -115,7 +115,9 @@
|
|||
|
||||
(defun handle-client (server client-socket)
|
||||
"Handle a single client connection."
|
||||
(let ((stream (usocket:socket-stream client-socket)))
|
||||
(let ((stream (flexi-streams:make-flexi-stream
|
||||
(usocket:socket-stream client-socket)
|
||||
:external-format :latin-1)))
|
||||
(handler-case
|
||||
(let* ((request-line (read-line stream))
|
||||
(headers (read-http-headers stream)))
|
||||
|
|
|
|||
|
|
@ -0,0 +1,62 @@
|
|||
;;; End-to-end streaming test
|
||||
;;; Usage: sbcl --load test-stream.lisp
|
||||
;;;
|
||||
;;; Then open http://localhost:8000/stream.mp3 in VLC or browser
|
||||
|
||||
(push #p"/home/glenn/SourceCode/harmony/" asdf:*central-registry*)
|
||||
(push #p"/home/glenn/SourceCode/asteroid/cl-streamer/" asdf:*central-registry*)
|
||||
|
||||
(ql:quickload '(:cl-streamer :cl-streamer/encoder :cl-streamer/harmony))
|
||||
|
||||
(format t "~%=== CL-Streamer End-to-End Test ===~%")
|
||||
(format t "LAME version: ~A~%" (cl-streamer::lame-version))
|
||||
|
||||
;; 1. Create and start stream server
|
||||
(format t "~%[1] Starting stream server on port 8000...~%")
|
||||
(cl-streamer:start :port 8000)
|
||||
|
||||
;; 2. Add mount point
|
||||
(format t "[2] Adding mount point /stream.mp3...~%")
|
||||
(cl-streamer:add-mount cl-streamer:*server* "/stream.mp3"
|
||||
:content-type "audio/mpeg"
|
||||
:bitrate 128
|
||||
:name "Asteroid Radio (CL-Streamer Test)")
|
||||
|
||||
;; 3. Create MP3 encoder
|
||||
(format t "[3] Creating MP3 encoder (128kbps, 44100Hz, stereo)...~%")
|
||||
(defvar *encoder* (cl-streamer:make-mp3-encoder :sample-rate 44100
|
||||
:channels 2
|
||||
:bitrate 128))
|
||||
|
||||
;; 4. Create and start audio pipeline
|
||||
(format t "[4] Starting audio pipeline with Harmony...~%")
|
||||
(defvar *pipeline* (cl-streamer/harmony:make-audio-pipeline
|
||||
:encoder *encoder*
|
||||
:stream-server cl-streamer:*server*
|
||||
:mount-path "/stream.mp3"
|
||||
:sample-rate 44100
|
||||
:channels 2))
|
||||
|
||||
(cl-streamer/harmony:start-pipeline *pipeline*)
|
||||
|
||||
;; 5. Play a test file
|
||||
(format t "[5] Playing test file...~%")
|
||||
(defvar *test-file*
|
||||
#p"/home/glenn/SourceCode/asteroid/music/library/Amon_Tobin - Dark Jovian/01 Dark Jovian.flac")
|
||||
|
||||
(cl-streamer/harmony:play-file *pipeline* *test-file*)
|
||||
(cl-streamer:set-now-playing "/stream.mp3" "Amon Tobin - Dark Jovian")
|
||||
|
||||
(format t "~%=== Stream is live! ===~%")
|
||||
(format t "Listen at: http://localhost:8000/stream.mp3~%")
|
||||
(format t "Listeners: ~A~%" (cl-streamer:get-listener-count))
|
||||
(format t "~%Press Enter to stop...~%")
|
||||
|
||||
(read-line)
|
||||
|
||||
;; Cleanup
|
||||
(format t "Stopping...~%")
|
||||
(cl-streamer/harmony:stop-pipeline *pipeline*)
|
||||
(cl-streamer:close-encoder *encoder*)
|
||||
(cl-streamer:stop)
|
||||
(format t "Done.~%")
|
||||
Loading…
Reference in New Issue