diff --git a/cl-streamer/buffer.lisp b/cl-streamer/buffer.lisp index e8abee8..0f8f07c 100644 --- a/cl-streamer/buffer.lisp +++ b/cl-streamer/buffer.lisp @@ -17,32 +17,42 @@ :data (make-array size :element-type '(unsigned-byte 8)) :size size)) +(defun %buffer-available (buffer) + "Internal: bytes available to read. Caller must hold lock." + (let ((write (buffer-write-pos buffer)) + (read (buffer-read-pos buffer)) + (size (buffer-size buffer))) + (mod (- write read) size))) + (defun buffer-available (buffer) "Return the number of bytes available to read." (bt:with-lock-held ((buffer-lock buffer)) - (let ((write (buffer-write-pos buffer)) - (read (buffer-read-pos buffer)) - (size (buffer-size buffer))) - (mod (- write read) size)))) + (%buffer-available buffer))) + +(defun %buffer-free-space (buffer) + "Internal: bytes available to write. Caller must hold lock." + (- (buffer-size buffer) (%buffer-available buffer) 1)) (defun buffer-free-space (buffer) "Return the number of bytes available to write." - (- (buffer-size buffer) (buffer-available buffer) 1)) + (bt:with-lock-held ((buffer-lock buffer)) + (%buffer-free-space buffer))) (defun buffer-write (buffer data &key (start 0) (end (length data))) "Write bytes from DATA to BUFFER. Blocks if buffer is full." (let ((len (- end start))) (bt:with-lock-held ((buffer-lock buffer)) - (loop while (< (buffer-free-space buffer) len) - do (bt:condition-wait (buffer-not-full buffer) (buffer-lock buffer))) - (let ((write-pos (buffer-write-pos buffer)) - (size (buffer-size buffer)) - (buf-data (buffer-data buffer))) - (loop for i from start below end - for j = write-pos then (mod (1+ j) size) - do (setf (aref buf-data j) (aref data i)) - finally (setf (buffer-write-pos buffer) (mod (1+ j) size)))) - (bt:condition-notify (buffer-not-empty buffer))) + (when (> len 0) + (loop while (< (%buffer-free-space buffer) len) + do (bt:condition-wait (buffer-not-full buffer) (buffer-lock buffer))) + (let ((write-pos (buffer-write-pos buffer)) + (size (buffer-size buffer)) + (buf-data (buffer-data buffer))) + (loop for i from start below end + for j = write-pos then (mod (1+ j) size) + do (setf (aref buf-data j) (aref data i)) + finally (setf (buffer-write-pos buffer) (mod (1+ j) size)))) + (bt:condition-notify (buffer-not-empty buffer)))) len)) (defun buffer-read (buffer output &key (start 0) (end (length output)) (blocking t)) @@ -51,18 +61,19 @@ (let ((requested (- end start))) (bt:with-lock-held ((buffer-lock buffer)) (when blocking - (loop while (zerop (buffer-available buffer)) + (loop while (zerop (%buffer-available buffer)) do (bt:condition-wait (buffer-not-empty buffer) (buffer-lock buffer)))) - (let* ((available (buffer-available buffer)) + (let* ((available (%buffer-available buffer)) (to-read (min requested available)) (read-pos (buffer-read-pos buffer)) (size (buffer-size buffer)) (buf-data (buffer-data buffer))) - (loop for i from start below (+ start to-read) - for j = read-pos then (mod (1+ j) size) - do (setf (aref output i) (aref buf-data j)) - finally (setf (buffer-read-pos buffer) (mod (1+ j) size))) - (bt:condition-notify (buffer-not-full buffer)) + (when (> to-read 0) + (loop for i from start below (+ start to-read) + for j = read-pos then (mod (1+ j) size) + do (setf (aref output i) (aref buf-data j)) + finally (setf (buffer-read-pos buffer) (mod (1+ j) size))) + (bt:condition-notify (buffer-not-full buffer))) to-read)))) (defun buffer-clear (buffer) diff --git a/cl-streamer/harmony-backend.lisp b/cl-streamer/harmony-backend.lisp index 704262f..7932bac 100644 --- a/cl-streamer/harmony-backend.lisp +++ b/cl-streamer/harmony-backend.lisp @@ -13,6 +13,59 @@ (in-package #:cl-streamer/harmony) +;;; ---- Streaming Drain ---- +;;; Custom drain that captures PCM from Harmony's pack buffer +;;; and feeds it to the encoder/stream server, replacing the +;;; dummy drain which just discards audio data. + +(defclass streaming-drain (mixed:drain) + ((encoder :initarg :encoder :accessor drain-encoder) + (mount-path :initarg :mount-path :accessor drain-mount-path :initform "/stream.mp3") + (channels :initarg :channels :accessor drain-channels :initform 2))) + +(defmethod mixed:free ((drain streaming-drain))) + +(defmethod mixed:start ((drain streaming-drain))) + +(defmethod mixed:mix ((drain streaming-drain)) + "Read interleaved float PCM from the pack buffer, encode to MP3, write to stream. + The pack buffer is (unsigned-byte 8) with IEEE 754 single-floats (4 bytes each). + Layout: L0b0 L0b1 L0b2 L0b3 R0b0 R0b1 R0b2 R0b3 L1b0 ... (interleaved stereo)" + (mixed:with-buffer-tx (data start size (mixed:pack drain)) + (when (> size 0) + (let* ((channels (drain-channels drain)) + (bytes-per-sample 4) ; single-float = 4 bytes + (total-floats (floor size bytes-per-sample)) + (num-samples (floor total-floats channels)) + (pcm-buffer (make-array (* num-samples channels) + :element-type '(signed-byte 16)))) + ;; Convert raw bytes -> single-float -> signed-16 + (cffi:with-pointer-to-vector-data (ptr data) + (loop for i below (* num-samples channels) + for byte-offset = (+ start (* i bytes-per-sample)) + for sample = (cffi:mem-ref ptr :float byte-offset) + do (setf (aref pcm-buffer i) (float-to-s16 sample)))) + (handler-case + (let ((mp3-data (cl-streamer:encode-pcm-interleaved + (drain-encoder drain) pcm-buffer num-samples))) + (when (> (length mp3-data) 0) + (cl-streamer:write-audio-data (drain-mount-path drain) mp3-data))) + (error (e) + (log:warn "Encode error in drain: ~A" e))))) + ;; Sleep for the duration of audio we just processed + ;; size = bytes, each frame = channels * 4 bytes (single-float) + (let* ((channels (drain-channels drain)) + (bytes-per-frame (* channels 4)) + (frames (floor size bytes-per-frame)) + (samplerate (mixed:samplerate (mixed:pack drain)))) + (when (> frames 0) + (sleep (/ frames samplerate)))) + (mixed:finish size))) + +(defmethod mixed:end ((drain streaming-drain))) + +;;; ---- Audio Pipeline ---- + (defclass audio-pipeline () ((harmony-server :initform nil :accessor pipeline-harmony-server) (encoder :initarg :encoder :accessor pipeline-encoder) @@ -20,18 +73,7 @@ (mount-path :initarg :mount-path :accessor pipeline-mount-path :initform "/stream.mp3") (sample-rate :initarg :sample-rate :accessor pipeline-sample-rate :initform 44100) (channels :initarg :channels :accessor pipeline-channels :initform 2) - (running :initform nil :accessor pipeline-running-p) - (encode-thread :initform nil :accessor pipeline-encode-thread))) - -(defun make-streaming-server (&key (name "CL-Streamer") (samplerate 44100) (latency 0.02)) - "Create a Harmony server configured for streaming (no audio output). - Uses :dummy drain so audio goes to buffer instead of speakers." - (mixed:init) - (harmony:make-simple-server :name name - :samplerate samplerate - :latency latency - :drain :dummy - :output-channels 2)) + (running :initform nil :accessor pipeline-running-p))) (defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3") (sample-rate 44100) (channels 2)) @@ -44,25 +86,37 @@ :channels channels)) (defun start-pipeline (pipeline) - "Start the audio pipeline - initializes Harmony and begins encoding." + "Start the audio pipeline - initializes Harmony with our streaming drain." (when (pipeline-running-p pipeline) (error "Pipeline already running")) - (let ((server (make-streaming-server :samplerate (pipeline-sample-rate pipeline)))) + (mixed:init) + (let* ((server (harmony:make-simple-server + :name "CL-Streamer" + :samplerate (pipeline-sample-rate pipeline) + :latency 0.05 + :drain :dummy + :output-channels (pipeline-channels pipeline))) + (output (harmony:segment :output server)) + (old-drain (harmony:segment :drain output)) + (pack (mixed:pack old-drain)) + (drain (make-instance 'streaming-drain + :encoder (pipeline-encoder pipeline) + :mount-path (pipeline-mount-path pipeline) + :channels (pipeline-channels pipeline)))) + ;; Wire our streaming drain to the same pack buffer + (setf (mixed:pack drain) pack) + ;; Swap: withdraw old dummy drain, add our streaming drain + (mixed:withdraw old-drain output) + (mixed:add drain output) (setf (pipeline-harmony-server pipeline) server) (mixed:start server)) (setf (pipeline-running-p pipeline) t) - (setf (pipeline-encode-thread pipeline) - (bt:make-thread (lambda () (encode-loop pipeline)) - :name "cl-streamer-encode")) - (log:info "Audio pipeline started") + (log:info "Audio pipeline started with streaming drain") pipeline) (defun stop-pipeline (pipeline) "Stop the audio pipeline." (setf (pipeline-running-p pipeline) nil) - (when (pipeline-encode-thread pipeline) - (ignore-errors (bt:join-thread (pipeline-encode-thread pipeline))) - (setf (pipeline-encode-thread pipeline) nil)) (when (pipeline-harmony-server pipeline) (mixed:end (pipeline-harmony-server pipeline)) (setf (pipeline-harmony-server pipeline) nil)) @@ -78,45 +132,8 @@ (log:info "Playing: ~A" file-path) voice))) -(defun get-pack-buffer (pipeline) - "Get the packer's pack (bip-buffer) from Harmony's output chain." - (let* ((server (pipeline-harmony-server pipeline)) - (output (harmony:segment :output server)) - (packer (harmony:segment :packer output))) - (mixed:pack packer))) - -(defun encode-loop (pipeline) - "Main encoding loop - reads PCM from Harmony's packer, encodes, writes to stream." - (let ((encoder (pipeline-encoder pipeline)) - (mount-path (pipeline-mount-path pipeline)) - (channels (pipeline-channels pipeline)) - (frame-size 1152)) - (loop while (pipeline-running-p pipeline) - do (handler-case - (let* ((pack (get-pack-buffer pipeline)) - (available (mixed:available-read pack)) - (needed (* frame-size channels 2))) - (if (>= available needed) - (multiple-value-bind (data start size) - (mixed:request-read pack needed) - (declare (ignore start)) - (when (and data (>= size needed)) - (let* ((samples (floor size (* channels 2))) - (pcm-buffer (make-array (* samples channels) - :element-type '(signed-byte 16)))) - (loop for i below (* samples channels) - do (setf (aref pcm-buffer i) - (let ((byte-offset (* i 2))) - (logior (aref data byte-offset) - (ash (let ((hi (aref data (1+ byte-offset)))) - (if (> hi 127) (- hi 256) hi)) - 8))))) - (mixed:finish-read pack size) - (let ((mp3-data (cl-streamer::encode-pcm-interleaved - encoder pcm-buffer samples))) - (when (> (length mp3-data) 0) - (cl-streamer::write-audio-data mount-path mp3-data)))))) - (sleep 0.005))) - (error (e) - (log:warn "Encode error: ~A" e) - (sleep 0.1)))))) +(declaim (inline float-to-s16)) +(defun float-to-s16 (sample) + "Convert a float sample (-1.0 to 1.0) to signed 16-bit integer." + (let ((clamped (max -1.0 (min 1.0 sample)))) + (the (signed-byte 16) (round (* clamped 32767.0))))) diff --git a/cl-streamer/package.lisp b/cl-streamer/package.lisp index 1871d6e..c137917 100644 --- a/cl-streamer/package.lisp +++ b/cl-streamer/package.lisp @@ -34,5 +34,23 @@ #:listener-count ;; Main API + #:*server* #:*default-port* - #:*default-metaint*)) + #:*default-metaint* + #:start + #:stop + #:write-audio-data + #:set-now-playing + #:get-listener-count + + ;; Encoder + #:make-mp3-encoder + #:close-encoder + #:encode-pcm-interleaved + #:encode-flush + #:lame-version + + ;; AAC Encoder + #:make-aac-encoder + #:close-aac-encoder + #:encode-aac-pcm)) diff --git a/cl-streamer/stream-server.lisp b/cl-streamer/stream-server.lisp index 4e38b62..c4c7860 100644 --- a/cl-streamer/stream-server.lisp +++ b/cl-streamer/stream-server.lisp @@ -115,7 +115,9 @@ (defun handle-client (server client-socket) "Handle a single client connection." - (let ((stream (usocket:socket-stream client-socket))) + (let ((stream (flexi-streams:make-flexi-stream + (usocket:socket-stream client-socket) + :external-format :latin-1))) (handler-case (let* ((request-line (read-line stream)) (headers (read-http-headers stream))) diff --git a/cl-streamer/test-stream.lisp b/cl-streamer/test-stream.lisp new file mode 100644 index 0000000..6d68fd0 --- /dev/null +++ b/cl-streamer/test-stream.lisp @@ -0,0 +1,62 @@ +;;; End-to-end streaming test +;;; Usage: sbcl --load test-stream.lisp +;;; +;;; Then open http://localhost:8000/stream.mp3 in VLC or browser + +(push #p"/home/glenn/SourceCode/harmony/" asdf:*central-registry*) +(push #p"/home/glenn/SourceCode/asteroid/cl-streamer/" asdf:*central-registry*) + +(ql:quickload '(:cl-streamer :cl-streamer/encoder :cl-streamer/harmony)) + +(format t "~%=== CL-Streamer End-to-End Test ===~%") +(format t "LAME version: ~A~%" (cl-streamer::lame-version)) + +;; 1. Create and start stream server +(format t "~%[1] Starting stream server on port 8000...~%") +(cl-streamer:start :port 8000) + +;; 2. Add mount point +(format t "[2] Adding mount point /stream.mp3...~%") +(cl-streamer:add-mount cl-streamer:*server* "/stream.mp3" + :content-type "audio/mpeg" + :bitrate 128 + :name "Asteroid Radio (CL-Streamer Test)") + +;; 3. Create MP3 encoder +(format t "[3] Creating MP3 encoder (128kbps, 44100Hz, stereo)...~%") +(defvar *encoder* (cl-streamer:make-mp3-encoder :sample-rate 44100 + :channels 2 + :bitrate 128)) + +;; 4. Create and start audio pipeline +(format t "[4] Starting audio pipeline with Harmony...~%") +(defvar *pipeline* (cl-streamer/harmony:make-audio-pipeline + :encoder *encoder* + :stream-server cl-streamer:*server* + :mount-path "/stream.mp3" + :sample-rate 44100 + :channels 2)) + +(cl-streamer/harmony:start-pipeline *pipeline*) + +;; 5. Play a test file +(format t "[5] Playing test file...~%") +(defvar *test-file* + #p"/home/glenn/SourceCode/asteroid/music/library/Amon_Tobin - Dark Jovian/01 Dark Jovian.flac") + +(cl-streamer/harmony:play-file *pipeline* *test-file*) +(cl-streamer:set-now-playing "/stream.mp3" "Amon Tobin - Dark Jovian") + +(format t "~%=== Stream is live! ===~%") +(format t "Listen at: http://localhost:8000/stream.mp3~%") +(format t "Listeners: ~A~%" (cl-streamer:get-listener-count)) +(format t "~%Press Enter to stop...~%") + +(read-line) + +;; Cleanup +(format t "Stopping...~%") +(cl-streamer/harmony:stop-pipeline *pipeline*) +(cl-streamer:close-encoder *encoder*) +(cl-streamer:stop) +(format t "Done.~%")