feat: Initial cl-streamer skeleton for CL-native streaming
- Architecture document outlining Icecast/Liquidsoap replacement - Core streaming server with ICY metadata protocol support - Thread-safe ring buffer for audio data - Mount point abstraction with metadata updates - Multi-client connection handling This is experimental groundwork for integrating with Harmony/cl-mixed and playlisp/parsector for a pure CL streaming solution.
This commit is contained in:
parent
5f9dc80ac8
commit
e1be88a54a
|
|
@ -0,0 +1,69 @@
|
||||||
|
#+TITLE: CL-Streamer
|
||||||
|
#+AUTHOR: Glenn Thompson
|
||||||
|
#+DATE: 2026-03-03
|
||||||
|
|
||||||
|
* Overview
|
||||||
|
|
||||||
|
CL-Streamer is a Common Lisp audio streaming server designed to replace
|
||||||
|
Icecast and Liquidsoap in the Asteroid Radio project.
|
||||||
|
|
||||||
|
* Status
|
||||||
|
|
||||||
|
*EXPERIMENTAL* - This is an early proof-of-concept.
|
||||||
|
|
||||||
|
* Features
|
||||||
|
|
||||||
|
- HTTP streaming with ICY metadata protocol
|
||||||
|
- Multiple mount points
|
||||||
|
- Thread-safe ring buffers for audio data
|
||||||
|
- Listener statistics
|
||||||
|
|
||||||
|
* Dependencies
|
||||||
|
|
||||||
|
- alexandria
|
||||||
|
- bordeaux-threads
|
||||||
|
- usocket
|
||||||
|
- flexi-streams
|
||||||
|
- chunga
|
||||||
|
- log4cl
|
||||||
|
- split-sequence
|
||||||
|
|
||||||
|
Optional (for audio backend):
|
||||||
|
- harmony
|
||||||
|
- cl-mixed
|
||||||
|
- cl-mixed-mpg123
|
||||||
|
|
||||||
|
* Quick Start
|
||||||
|
|
||||||
|
#+begin_src common-lisp
|
||||||
|
(ql:quickload :cl-streamer)
|
||||||
|
|
||||||
|
;; Create and start server
|
||||||
|
(cl-streamer:start :port 8000)
|
||||||
|
|
||||||
|
;; Add a mount point
|
||||||
|
(cl-streamer:add-mount cl-streamer:*server* "/stream.mp3"
|
||||||
|
:content-type "audio/mpeg"
|
||||||
|
:bitrate 128
|
||||||
|
:name "Asteroid Radio")
|
||||||
|
|
||||||
|
;; Update now-playing metadata
|
||||||
|
(cl-streamer:set-now-playing "/stream.mp3" "Artist - Track Title")
|
||||||
|
|
||||||
|
;; Write audio data (from encoder)
|
||||||
|
(cl-streamer:write-audio-data "/stream.mp3" encoded-mp3-bytes)
|
||||||
|
|
||||||
|
;; Check listeners
|
||||||
|
(cl-streamer:get-listener-count)
|
||||||
|
|
||||||
|
;; Stop server
|
||||||
|
(cl-streamer:stop)
|
||||||
|
#+end_src
|
||||||
|
|
||||||
|
* Architecture
|
||||||
|
|
||||||
|
See =docs/CL-STREAMING-ARCHITECTURE.org= for the full design document.
|
||||||
|
|
||||||
|
* License
|
||||||
|
|
||||||
|
AGPL-3.0
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
(in-package #:cl-streamer)
|
||||||
|
|
||||||
|
(defclass ring-buffer ()
|
||||||
|
((data :initarg :data :accessor buffer-data)
|
||||||
|
(size :initarg :size :reader buffer-size)
|
||||||
|
(read-pos :initform 0 :accessor buffer-read-pos)
|
||||||
|
(write-pos :initform 0 :accessor buffer-write-pos)
|
||||||
|
(lock :initform (bt:make-lock "ring-buffer-lock") :reader buffer-lock)
|
||||||
|
(not-empty :initform (bt:make-condition-variable :name "buffer-not-empty")
|
||||||
|
:reader buffer-not-empty)
|
||||||
|
(not-full :initform (bt:make-condition-variable :name "buffer-not-full")
|
||||||
|
:reader buffer-not-full)))
|
||||||
|
|
||||||
|
(defun make-ring-buffer (size)
|
||||||
|
"Create a ring buffer with SIZE bytes capacity."
|
||||||
|
(make-instance 'ring-buffer
|
||||||
|
:data (make-array size :element-type '(unsigned-byte 8))
|
||||||
|
:size size))
|
||||||
|
|
||||||
|
(defun buffer-available (buffer)
|
||||||
|
"Return the number of bytes available to read."
|
||||||
|
(bt:with-lock-held ((buffer-lock buffer))
|
||||||
|
(let ((write (buffer-write-pos buffer))
|
||||||
|
(read (buffer-read-pos buffer))
|
||||||
|
(size (buffer-size buffer)))
|
||||||
|
(mod (- write read) size))))
|
||||||
|
|
||||||
|
(defun buffer-free-space (buffer)
|
||||||
|
"Return the number of bytes available to write."
|
||||||
|
(- (buffer-size buffer) (buffer-available buffer) 1))
|
||||||
|
|
||||||
|
(defun buffer-write (buffer data &key (start 0) (end (length data)))
|
||||||
|
"Write bytes from DATA to BUFFER. Blocks if buffer is full."
|
||||||
|
(let ((len (- end start)))
|
||||||
|
(bt:with-lock-held ((buffer-lock buffer))
|
||||||
|
(loop while (< (buffer-free-space buffer) len)
|
||||||
|
do (bt:condition-wait (buffer-not-full buffer) (buffer-lock buffer)))
|
||||||
|
(let ((write-pos (buffer-write-pos buffer))
|
||||||
|
(size (buffer-size buffer))
|
||||||
|
(buf-data (buffer-data buffer)))
|
||||||
|
(loop for i from start below end
|
||||||
|
for j = write-pos then (mod (1+ j) size)
|
||||||
|
do (setf (aref buf-data j) (aref data i))
|
||||||
|
finally (setf (buffer-write-pos buffer) (mod (1+ j) size))))
|
||||||
|
(bt:condition-notify (buffer-not-empty buffer))))
|
||||||
|
len)
|
||||||
|
|
||||||
|
(defun buffer-read (buffer output &key (start 0) (end (length output)) (blocking t))
|
||||||
|
"Read bytes from BUFFER into OUTPUT. Returns number of bytes read.
|
||||||
|
If BLOCKING is T, waits for data. Otherwise returns 0 if empty."
|
||||||
|
(let ((requested (- end start)))
|
||||||
|
(bt:with-lock-held ((buffer-lock buffer))
|
||||||
|
(when blocking
|
||||||
|
(loop while (zerop (buffer-available buffer))
|
||||||
|
do (bt:condition-wait (buffer-not-empty buffer) (buffer-lock buffer))))
|
||||||
|
(let* ((available (buffer-available buffer))
|
||||||
|
(to-read (min requested available))
|
||||||
|
(read-pos (buffer-read-pos buffer))
|
||||||
|
(size (buffer-size buffer))
|
||||||
|
(buf-data (buffer-data buffer)))
|
||||||
|
(loop for i from start below (+ start to-read)
|
||||||
|
for j = read-pos then (mod (1+ j) size)
|
||||||
|
do (setf (aref output i) (aref buf-data j))
|
||||||
|
finally (setf (buffer-read-pos buffer) (mod (1+ j) size)))
|
||||||
|
(bt:condition-notify (buffer-not-full buffer))
|
||||||
|
to-read))))
|
||||||
|
|
||||||
|
(defun buffer-clear (buffer)
|
||||||
|
"Clear all data from the buffer."
|
||||||
|
(bt:with-lock-held ((buffer-lock buffer))
|
||||||
|
(setf (buffer-read-pos buffer) 0
|
||||||
|
(buffer-write-pos buffer) 0)
|
||||||
|
(bt:condition-notify (buffer-not-full buffer))))
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
(asdf:defsystem #:cl-streamer
|
||||||
|
:description "Common Lisp audio streaming server for Asteroid Radio"
|
||||||
|
:author "Glenn Thompson <glenn@asteroid.radio>"
|
||||||
|
:license "AGPL-3.0"
|
||||||
|
:version "0.1.0"
|
||||||
|
:serial t
|
||||||
|
:depends-on (#:alexandria
|
||||||
|
#:bordeaux-threads
|
||||||
|
#:usocket
|
||||||
|
#:flexi-streams
|
||||||
|
#:chunga
|
||||||
|
#:trivial-gray-streams
|
||||||
|
#:split-sequence
|
||||||
|
#:log4cl)
|
||||||
|
:components ((:file "package")
|
||||||
|
(:file "conditions")
|
||||||
|
(:file "buffer")
|
||||||
|
(:file "icy-protocol")
|
||||||
|
(:file "stream-server")
|
||||||
|
(:file "cl-streamer")))
|
||||||
|
|
||||||
|
(asdf:defsystem #:cl-streamer/harmony
|
||||||
|
:description "Harmony audio backend for cl-streamer"
|
||||||
|
:depends-on (#:cl-streamer
|
||||||
|
#:harmony
|
||||||
|
#:cl-mixed
|
||||||
|
#:cl-mixed-mpg123)
|
||||||
|
:components ((:file "harmony-backend")))
|
||||||
|
|
||||||
|
(asdf:defsystem #:cl-streamer/encoder
|
||||||
|
:description "Audio encoding for cl-streamer (LAME MP3)"
|
||||||
|
:depends-on (#:cl-streamer
|
||||||
|
#:cffi)
|
||||||
|
:components ((:file "lame-ffi")
|
||||||
|
(:file "encoder")))
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
(in-package #:cl-streamer)
|
||||||
|
|
||||||
|
(defvar *server* nil
|
||||||
|
"The global stream server instance.")
|
||||||
|
|
||||||
|
(defun ensure-server (&key (port *default-port*))
|
||||||
|
"Ensure a server instance exists, creating one if needed."
|
||||||
|
(unless *server*
|
||||||
|
(setf *server* (make-stream-server :port port)))
|
||||||
|
*server*)
|
||||||
|
|
||||||
|
(defun start (&key (port *default-port*))
|
||||||
|
"Start the streaming server with default configuration."
|
||||||
|
(let ((server (ensure-server :port port)))
|
||||||
|
(start-server server)))
|
||||||
|
|
||||||
|
(defun stop ()
|
||||||
|
"Stop the streaming server."
|
||||||
|
(when *server*
|
||||||
|
(stop-server *server*)))
|
||||||
|
|
||||||
|
(defun write-audio-data (mount-path data &key (start 0) (end (length data)))
|
||||||
|
"Write audio data to a mount point's buffer.
|
||||||
|
This is called by the audio pipeline to feed encoded audio."
|
||||||
|
(let* ((server (ensure-server))
|
||||||
|
(mount (gethash mount-path (server-mounts server))))
|
||||||
|
(when mount
|
||||||
|
(buffer-write (mount-buffer mount) data :start start :end end))))
|
||||||
|
|
||||||
|
(defun set-now-playing (mount-path title &optional url)
|
||||||
|
"Update the now-playing metadata for a mount point."
|
||||||
|
(let ((server (ensure-server)))
|
||||||
|
(update-metadata server mount-path :title title :url url)))
|
||||||
|
|
||||||
|
(defun get-listener-count (&optional mount-path)
|
||||||
|
"Get the current listener count."
|
||||||
|
(let ((server (ensure-server)))
|
||||||
|
(listener-count server mount-path)))
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
(in-package #:cl-streamer)
|
||||||
|
|
||||||
|
(define-condition streamer-error (error)
|
||||||
|
((message :initarg :message :reader streamer-error-message))
|
||||||
|
(:report (lambda (c stream)
|
||||||
|
(format stream "Streamer error: ~A" (streamer-error-message c)))))
|
||||||
|
|
||||||
|
(define-condition connection-error (streamer-error)
|
||||||
|
((client :initarg :client :reader connection-error-client))
|
||||||
|
(:report (lambda (c stream)
|
||||||
|
(format stream "Connection error for ~A: ~A"
|
||||||
|
(connection-error-client c)
|
||||||
|
(streamer-error-message c)))))
|
||||||
|
|
||||||
|
(define-condition encoding-error (streamer-error)
|
||||||
|
((format :initarg :format :reader encoding-error-format))
|
||||||
|
(:report (lambda (c stream)
|
||||||
|
(format stream "Encoding error (~A): ~A"
|
||||||
|
(encoding-error-format c)
|
||||||
|
(streamer-error-message c)))))
|
||||||
|
|
@ -0,0 +1,57 @@
|
||||||
|
(in-package #:cl-streamer)
|
||||||
|
|
||||||
|
(defparameter *default-metaint* 16000
|
||||||
|
"Default ICY metadata interval in bytes.")
|
||||||
|
|
||||||
|
(defclass icy-metadata ()
|
||||||
|
((title :initarg :title :accessor icy-metadata-title :initform nil)
|
||||||
|
(url :initarg :url :accessor icy-metadata-url :initform nil)))
|
||||||
|
|
||||||
|
(defun make-icy-metadata (&key title url)
|
||||||
|
"Create an ICY metadata object."
|
||||||
|
(make-instance 'icy-metadata :title title :url url))
|
||||||
|
|
||||||
|
(defun encode-icy-metadata (metadata)
|
||||||
|
"Encode metadata into ICY protocol format.
|
||||||
|
Returns a byte vector with length prefix."
|
||||||
|
(let* ((stream-title (or (icy-metadata-title metadata) ""))
|
||||||
|
(stream-url (or (icy-metadata-url metadata) ""))
|
||||||
|
(meta-string (format nil "StreamTitle='~A';StreamUrl='~A';"
|
||||||
|
stream-title stream-url))
|
||||||
|
(meta-bytes (flexi-streams:string-to-octets meta-string :external-format :utf-8))
|
||||||
|
(meta-len (length meta-bytes))
|
||||||
|
(padded-len (* 16 (ceiling meta-len 16)))
|
||||||
|
(length-byte (floor padded-len 16))
|
||||||
|
(result (make-array (1+ padded-len) :element-type '(unsigned-byte 8)
|
||||||
|
:initial-element 0)))
|
||||||
|
(setf (aref result 0) length-byte)
|
||||||
|
(replace result meta-bytes :start1 1)
|
||||||
|
result))
|
||||||
|
|
||||||
|
(defun parse-icy-request (request-line headers)
|
||||||
|
"Parse an ICY/HTTP request. Returns (values mount-point wants-metadata-p).
|
||||||
|
HEADERS is an alist of (name . value) pairs."
|
||||||
|
(let* ((parts (split-sequence:split-sequence #\Space request-line))
|
||||||
|
(method (first parts))
|
||||||
|
(path (second parts))
|
||||||
|
(icy-metadata-header (cdr (assoc "icy-metadata" headers :test #'string-equal))))
|
||||||
|
(values path
|
||||||
|
(and icy-metadata-header
|
||||||
|
(string= icy-metadata-header "1")))))
|
||||||
|
|
||||||
|
(defun write-icy-response-headers (stream &key content-type metaint
|
||||||
|
(name "CL-Streamer")
|
||||||
|
(genre "Various")
|
||||||
|
(bitrate 128))
|
||||||
|
"Write ICY/HTTP response headers to STREAM."
|
||||||
|
(format stream "HTTP/1.1 200 OK~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "Content-Type: ~A~C~C" content-type #\Return #\Linefeed)
|
||||||
|
(format stream "icy-name: ~A~C~C" name #\Return #\Linefeed)
|
||||||
|
(format stream "icy-genre: ~A~C~C" genre #\Return #\Linefeed)
|
||||||
|
(format stream "icy-br: ~A~C~C" bitrate #\Return #\Linefeed)
|
||||||
|
(when metaint
|
||||||
|
(format stream "icy-metaint: ~A~C~C" metaint #\Return #\Linefeed))
|
||||||
|
(format stream "Cache-Control: no-cache, no-store~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "Connection: close~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "~C~C" #\Return #\Linefeed)
|
||||||
|
(force-output stream))
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
(defpackage #:cl-streamer
|
||||||
|
(:use #:cl #:alexandria)
|
||||||
|
(:export
|
||||||
|
;; Conditions
|
||||||
|
#:streamer-error
|
||||||
|
#:connection-error
|
||||||
|
#:encoding-error
|
||||||
|
|
||||||
|
;; Buffer
|
||||||
|
#:ring-buffer
|
||||||
|
#:make-ring-buffer
|
||||||
|
#:buffer-write
|
||||||
|
#:buffer-read
|
||||||
|
#:buffer-available
|
||||||
|
#:buffer-clear
|
||||||
|
|
||||||
|
;; ICY Protocol
|
||||||
|
#:icy-metadata
|
||||||
|
#:make-icy-metadata
|
||||||
|
#:icy-metadata-title
|
||||||
|
#:icy-metadata-url
|
||||||
|
#:encode-icy-metadata
|
||||||
|
#:icy-metaint
|
||||||
|
|
||||||
|
;; Stream Server
|
||||||
|
#:stream-server
|
||||||
|
#:make-stream-server
|
||||||
|
#:start-server
|
||||||
|
#:stop-server
|
||||||
|
#:server-running-p
|
||||||
|
#:add-mount
|
||||||
|
#:remove-mount
|
||||||
|
#:update-metadata
|
||||||
|
#:listener-count
|
||||||
|
|
||||||
|
;; Main API
|
||||||
|
#:*default-port*
|
||||||
|
#:*default-metaint*))
|
||||||
|
|
@ -0,0 +1,216 @@
|
||||||
|
(in-package #:cl-streamer)
|
||||||
|
|
||||||
|
(defparameter *default-port* 8000
|
||||||
|
"Default port for the streaming server.")
|
||||||
|
|
||||||
|
(defclass stream-server ()
|
||||||
|
((port :initarg :port :accessor server-port :initform *default-port*)
|
||||||
|
(socket :initform nil :accessor server-socket)
|
||||||
|
(running :initform nil :accessor server-running-p)
|
||||||
|
(mounts :initform (make-hash-table :test 'equal) :accessor server-mounts)
|
||||||
|
(clients :initform nil :accessor server-clients)
|
||||||
|
(clients-lock :initform (bt:make-lock "clients-lock") :reader server-clients-lock)
|
||||||
|
(accept-thread :initform nil :accessor server-accept-thread)))
|
||||||
|
|
||||||
|
(defclass mount-point ()
|
||||||
|
((path :initarg :path :accessor mount-path)
|
||||||
|
(content-type :initarg :content-type :accessor mount-content-type
|
||||||
|
:initform "audio/mpeg")
|
||||||
|
(bitrate :initarg :bitrate :accessor mount-bitrate :initform 128)
|
||||||
|
(name :initarg :name :accessor mount-name :initform "CL-Streamer")
|
||||||
|
(genre :initarg :genre :accessor mount-genre :initform "Various")
|
||||||
|
(buffer :initarg :buffer :accessor mount-buffer)
|
||||||
|
(metadata :initform (make-icy-metadata) :accessor mount-metadata)
|
||||||
|
(metadata-lock :initform (bt:make-lock "metadata-lock") :reader mount-metadata-lock)))
|
||||||
|
|
||||||
|
(defclass client-connection ()
|
||||||
|
((socket :initarg :socket :accessor client-socket)
|
||||||
|
(stream :initarg :stream :accessor client-stream)
|
||||||
|
(mount :initarg :mount :accessor client-mount)
|
||||||
|
(wants-metadata :initarg :wants-metadata :accessor client-wants-metadata-p)
|
||||||
|
(bytes-since-meta :initform 0 :accessor client-bytes-since-meta)
|
||||||
|
(thread :initform nil :accessor client-thread)
|
||||||
|
(active :initform t :accessor client-active-p)))
|
||||||
|
|
||||||
|
(defun make-stream-server (&key (port *default-port*))
|
||||||
|
"Create a new stream server instance."
|
||||||
|
(make-instance 'stream-server :port port))
|
||||||
|
|
||||||
|
(defun add-mount (server path &key (content-type "audio/mpeg")
|
||||||
|
(bitrate 128)
|
||||||
|
(name "CL-Streamer")
|
||||||
|
(genre "Various")
|
||||||
|
(buffer-size (* 1024 1024)))
|
||||||
|
"Add a mount point to the server."
|
||||||
|
(let ((mount (make-instance 'mount-point
|
||||||
|
:path path
|
||||||
|
:content-type content-type
|
||||||
|
:bitrate bitrate
|
||||||
|
:name name
|
||||||
|
:genre genre
|
||||||
|
:buffer (make-ring-buffer buffer-size))))
|
||||||
|
(setf (gethash path (server-mounts server)) mount)
|
||||||
|
mount))
|
||||||
|
|
||||||
|
(defun remove-mount (server path)
|
||||||
|
"Remove a mount point from the server."
|
||||||
|
(remhash path (server-mounts server)))
|
||||||
|
|
||||||
|
(defun update-metadata (server path &key title url)
|
||||||
|
"Update the metadata for a mount point."
|
||||||
|
(let ((mount (gethash path (server-mounts server))))
|
||||||
|
(when mount
|
||||||
|
(bt:with-lock-held ((mount-metadata-lock mount))
|
||||||
|
(let ((meta (mount-metadata mount)))
|
||||||
|
(when title (setf (icy-metadata-title meta) title))
|
||||||
|
(when url (setf (icy-metadata-url meta) url)))))))
|
||||||
|
|
||||||
|
(defun listener-count (server &optional path)
|
||||||
|
"Return the number of connected listeners.
|
||||||
|
If PATH is specified, count only listeners on that mount."
|
||||||
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
|
(if path
|
||||||
|
(count-if (lambda (c) (and (client-active-p c)
|
||||||
|
(string= path (mount-path (client-mount c)))))
|
||||||
|
(server-clients server))
|
||||||
|
(count-if #'client-active-p (server-clients server)))))
|
||||||
|
|
||||||
|
(defun start-server (server)
|
||||||
|
"Start the streaming server."
|
||||||
|
(when (server-running-p server)
|
||||||
|
(error 'streamer-error :message "Server already running"))
|
||||||
|
(setf (server-socket server)
|
||||||
|
(usocket:socket-listen "0.0.0.0" (server-port server)
|
||||||
|
:reuse-address t
|
||||||
|
:element-type '(unsigned-byte 8)))
|
||||||
|
(setf (server-running-p server) t)
|
||||||
|
(setf (server-accept-thread server)
|
||||||
|
(bt:make-thread (lambda () (accept-loop server))
|
||||||
|
:name "cl-streamer-accept"))
|
||||||
|
(log:info "CL-Streamer started on port ~A" (server-port server))
|
||||||
|
server)
|
||||||
|
|
||||||
|
(defun stop-server (server)
|
||||||
|
"Stop the streaming server."
|
||||||
|
(setf (server-running-p server) nil)
|
||||||
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
|
(dolist (client (server-clients server))
|
||||||
|
(setf (client-active-p client) nil)
|
||||||
|
(ignore-errors (usocket:socket-close (client-socket client)))))
|
||||||
|
(ignore-errors (usocket:socket-close (server-socket server)))
|
||||||
|
(log:info "CL-Streamer stopped")
|
||||||
|
server)
|
||||||
|
|
||||||
|
(defun accept-loop (server)
|
||||||
|
"Main accept loop for incoming connections."
|
||||||
|
(loop while (server-running-p server)
|
||||||
|
do (handler-case
|
||||||
|
(let ((client-socket (usocket:socket-accept (server-socket server))))
|
||||||
|
(bt:make-thread (lambda () (handle-client server client-socket))
|
||||||
|
:name "cl-streamer-client"))
|
||||||
|
(usocket:socket-error (e)
|
||||||
|
(unless (server-running-p server)
|
||||||
|
(return))
|
||||||
|
(log:warn "Accept error: ~A" e)))))
|
||||||
|
|
||||||
|
(defun handle-client (server client-socket)
|
||||||
|
"Handle a single client connection."
|
||||||
|
(let ((stream (usocket:socket-stream client-socket)))
|
||||||
|
(handler-case
|
||||||
|
(let* ((request-line (read-line stream))
|
||||||
|
(headers (read-http-headers stream)))
|
||||||
|
(multiple-value-bind (path wants-meta)
|
||||||
|
(parse-icy-request request-line headers)
|
||||||
|
(let ((mount (gethash path (server-mounts server))))
|
||||||
|
(if mount
|
||||||
|
(serve-stream server client-socket stream mount wants-meta)
|
||||||
|
(send-404 stream path)))))
|
||||||
|
(error (e)
|
||||||
|
(log:debug "Client error: ~A" e)
|
||||||
|
(ignore-errors (usocket:socket-close client-socket))))))
|
||||||
|
|
||||||
|
(defun read-http-headers (stream)
|
||||||
|
"Read HTTP headers from STREAM. Returns alist of (name . value)."
|
||||||
|
(loop for line = (read-line stream nil nil)
|
||||||
|
while (and line (> (length line) 1))
|
||||||
|
for colon-pos = (position #\: line)
|
||||||
|
when colon-pos
|
||||||
|
collect (cons (string-trim '(#\Space #\Return) (subseq line 0 colon-pos))
|
||||||
|
(string-trim '(#\Space #\Return) (subseq line (1+ colon-pos))))))
|
||||||
|
|
||||||
|
(defun serve-stream (server client-socket stream mount wants-meta)
|
||||||
|
"Serve audio stream to a client."
|
||||||
|
(let ((client (make-instance 'client-connection
|
||||||
|
:socket client-socket
|
||||||
|
:stream stream
|
||||||
|
:mount mount
|
||||||
|
:wants-metadata wants-meta)))
|
||||||
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
|
(push client (server-clients server)))
|
||||||
|
(log:info "Client connected to ~A (metadata: ~A)"
|
||||||
|
(mount-path mount) wants-meta)
|
||||||
|
(write-icy-response-headers stream
|
||||||
|
:content-type (mount-content-type mount)
|
||||||
|
:metaint (when wants-meta *default-metaint*)
|
||||||
|
:name (mount-name mount)
|
||||||
|
:genre (mount-genre mount)
|
||||||
|
:bitrate (mount-bitrate mount))
|
||||||
|
(unwind-protect
|
||||||
|
(stream-to-client client)
|
||||||
|
(setf (client-active-p client) nil)
|
||||||
|
(ignore-errors (usocket:socket-close client-socket))
|
||||||
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
|
(setf (server-clients server)
|
||||||
|
(remove client (server-clients server))))
|
||||||
|
(log:info "Client disconnected from ~A" (mount-path mount)))))
|
||||||
|
|
||||||
|
(defun stream-to-client (client)
|
||||||
|
"Stream audio data to a client, inserting metadata as needed."
|
||||||
|
(let* ((mount (client-mount client))
|
||||||
|
(buffer (mount-buffer mount))
|
||||||
|
(stream (client-stream client))
|
||||||
|
(chunk-size 4096)
|
||||||
|
(chunk (make-array chunk-size :element-type '(unsigned-byte 8))))
|
||||||
|
(loop while (client-active-p client)
|
||||||
|
do (let ((bytes-read (buffer-read buffer chunk :blocking t)))
|
||||||
|
(when (zerop bytes-read)
|
||||||
|
(sleep 0.01)
|
||||||
|
(return))
|
||||||
|
(handler-case
|
||||||
|
(if (client-wants-metadata-p client)
|
||||||
|
(write-with-metadata client chunk bytes-read)
|
||||||
|
(write-sequence chunk stream :end bytes-read))
|
||||||
|
(error ()
|
||||||
|
(setf (client-active-p client) nil)
|
||||||
|
(return)))
|
||||||
|
(force-output stream)))))
|
||||||
|
|
||||||
|
(defun write-with-metadata (client data length)
|
||||||
|
"Write audio data with ICY metadata injection."
|
||||||
|
(let* ((stream (client-stream client))
|
||||||
|
(mount (client-mount client))
|
||||||
|
(metaint *default-metaint*)
|
||||||
|
(pos 0))
|
||||||
|
(loop while (< pos length)
|
||||||
|
do (let ((bytes-until-meta (- metaint (client-bytes-since-meta client)))
|
||||||
|
(bytes-remaining (- length pos)))
|
||||||
|
(if (<= bytes-until-meta bytes-remaining)
|
||||||
|
(progn
|
||||||
|
(write-sequence data stream :start pos :end (+ pos bytes-until-meta))
|
||||||
|
(incf pos bytes-until-meta)
|
||||||
|
(setf (client-bytes-since-meta client) 0)
|
||||||
|
(let ((meta-bytes (bt:with-lock-held ((mount-metadata-lock mount))
|
||||||
|
(encode-icy-metadata (mount-metadata mount)))))
|
||||||
|
(write-sequence meta-bytes stream)))
|
||||||
|
(progn
|
||||||
|
(write-sequence data stream :start pos :end length)
|
||||||
|
(incf (client-bytes-since-meta client) bytes-remaining)
|
||||||
|
(setf pos length)))))))
|
||||||
|
|
||||||
|
(defun send-404 (stream path)
|
||||||
|
"Send a 404 response for unknown mount points."
|
||||||
|
(format stream "HTTP/1.1 404 Not Found~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "Content-Type: text/plain~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "~C~C" #\Return #\Linefeed)
|
||||||
|
(format stream "Mount point not found: ~A~%" path)
|
||||||
|
(force-output stream))
|
||||||
|
|
@ -0,0 +1,198 @@
|
||||||
|
#+TITLE: CL-Native Streaming Architecture
|
||||||
|
#+AUTHOR: Glenn Thompson
|
||||||
|
#+DATE: 2026-03-03
|
||||||
|
#+OPTIONS: toc:2
|
||||||
|
|
||||||
|
* Overview
|
||||||
|
|
||||||
|
This document outlines the architecture for replacing Icecast and Liquidsoap
|
||||||
|
with a pure Common Lisp streaming solution for Asteroid Radio.
|
||||||
|
|
||||||
|
** Goals
|
||||||
|
|
||||||
|
- Eliminate external dependencies on Icecast and Liquidsoap
|
||||||
|
- Leverage existing CL audio ecosystem (Harmony, cl-mixed, playlisp)
|
||||||
|
- Maintain compatibility with current Asteroid features
|
||||||
|
- Enable tighter integration between web app and streaming engine
|
||||||
|
|
||||||
|
** Current Architecture (Icecast/Liquidsoap)
|
||||||
|
|
||||||
|
#+begin_example
|
||||||
|
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||||||
|
│ Asteroid │────▶│ Liquidsoap │────▶│ Icecast │
|
||||||
|
│ (Radiance) │ │ (Source) │ │ (Server) │
|
||||||
|
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
||||||
|
│ │ │
|
||||||
|
│ Telnet commands │ Audio stream │ HTTP streams
|
||||||
|
│ Playlist updates │ (SOURCE protocol) │ to listeners
|
||||||
|
▼ ▼ ▼
|
||||||
|
playlists/ Decode/Encode /asteroid.mp3
|
||||||
|
stream-queue.m3u Crossfade /asteroid.aac
|
||||||
|
#+end_example
|
||||||
|
|
||||||
|
** Proposed Architecture (CL-Native)
|
||||||
|
|
||||||
|
#+begin_example
|
||||||
|
┌─────────────────────────────────────────────────────────────────┐
|
||||||
|
│ ASTEROID │
|
||||||
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
||||||
|
│ │ CL-STREAMER │ │
|
||||||
|
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
|
||||||
|
│ │ │ Playlist │ │ Audio │ │ HTTP Stream │ │ │
|
||||||
|
│ │ │ Engine │──▶│ Pipeline │──▶│ Server │ │ │
|
||||||
|
│ │ │ (playlisp) │ │ (Harmony) │ │ (ICY protocol) │ │ │
|
||||||
|
│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │
|
||||||
|
│ │ │ │ │ │ │
|
||||||
|
│ │ ▼ ▼ ▼ │ │
|
||||||
|
│ │ Parse M3U Decode/Mix/ Multi-client │ │
|
||||||
|
│ │ Track queue Encode streaming │ │
|
||||||
|
│ └───────────────────────────────────────────────────────────┘ │
|
||||||
|
│ │
|
||||||
|
│ Existing: Web UI, API, Scheduler, User features │
|
||||||
|
└─────────────────────────────────────────────────────────────────┘
|
||||||
|
#+end_example
|
||||||
|
|
||||||
|
* Components
|
||||||
|
|
||||||
|
** 1. Playlist Engine (playlisp/parsector)
|
||||||
|
|
||||||
|
Already implemented by Fade. Provides:
|
||||||
|
|
||||||
|
- M3U parsing into CLOS objects
|
||||||
|
- Track metadata (title, artist, duration, path)
|
||||||
|
- Arithmetic expressions in duration fields
|
||||||
|
- Playlist construction API
|
||||||
|
|
||||||
|
Location: =~/SourceCode/playlisp=
|
||||||
|
|
||||||
|
** 2. Audio Pipeline (Harmony + cl-mixed)
|
||||||
|
|
||||||
|
Shinmera's audio system provides:
|
||||||
|
|
||||||
|
*** Decoding (via cl-mixed extensions)
|
||||||
|
- =cl-mixed-mpg123= - MP3 decoding
|
||||||
|
- =cl-flac= - FLAC decoding
|
||||||
|
- =cl-vorbis= - OGG/Vorbis decoding
|
||||||
|
- =cl-opus= - OGG/Opus decoding
|
||||||
|
|
||||||
|
*** Mixing
|
||||||
|
- Multiple mixers (music, sfx, etc.)
|
||||||
|
- Voice management
|
||||||
|
- Effects chain (filters, EQ, etc.)
|
||||||
|
|
||||||
|
*** Crossfading
|
||||||
|
- Environment system for horizontal mixing
|
||||||
|
- Transition between segments with configurable fade times
|
||||||
|
|
||||||
|
*** Output
|
||||||
|
- Platform drains (PulseAudio, CoreAudio, WASAPI)
|
||||||
|
- Need: Network/buffer output for streaming
|
||||||
|
|
||||||
|
** 3. Audio Encoding (TO BE IMPLEMENTED)
|
||||||
|
|
||||||
|
Need to encode PCM audio to streaming formats:
|
||||||
|
|
||||||
|
*** Options
|
||||||
|
- =cl-lame= - FFI bindings to LAME (MP3 encoding)
|
||||||
|
- =cl-fdkaac= - FFI bindings to FDK-AAC (AAC encoding)
|
||||||
|
- =cl-opus= - May support encoding (verify)
|
||||||
|
|
||||||
|
*** Implementation Notes
|
||||||
|
- Harmony outputs PCM samples
|
||||||
|
- Need to capture these and encode in real-time
|
||||||
|
- Ring buffer between mixer output and encoder input
|
||||||
|
|
||||||
|
** 4. HTTP Stream Server (TO BE IMPLEMENTED)
|
||||||
|
|
||||||
|
Replace Icecast with CL-native HTTP streaming:
|
||||||
|
|
||||||
|
*** Requirements
|
||||||
|
- HTTP/1.1 chunked transfer encoding
|
||||||
|
- ICY metadata protocol (track titles in stream)
|
||||||
|
- Multiple mount points (/asteroid.mp3, /asteroid.aac)
|
||||||
|
- Concurrent client connections
|
||||||
|
- Listener statistics
|
||||||
|
|
||||||
|
*** ICY Protocol
|
||||||
|
#+begin_example
|
||||||
|
Client request:
|
||||||
|
GET /stream HTTP/1.1
|
||||||
|
Icy-MetaData: 1
|
||||||
|
|
||||||
|
Server response:
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
Content-Type: audio/mpeg
|
||||||
|
icy-metaint: 16000
|
||||||
|
|
||||||
|
[audio data - 16000 bytes]
|
||||||
|
[metadata block]
|
||||||
|
[audio data - 16000 bytes]
|
||||||
|
[metadata block]
|
||||||
|
...
|
||||||
|
#+end_example
|
||||||
|
|
||||||
|
*** Implementation Options
|
||||||
|
- Build on Hunchentoot (acceptor + handler)
|
||||||
|
- Build on usocket directly (more control)
|
||||||
|
- Use Chunga for chunked encoding
|
||||||
|
|
||||||
|
* Dependencies
|
||||||
|
|
||||||
|
** Existing (in Quicklisp/Shirakumo dist)
|
||||||
|
- harmony
|
||||||
|
- cl-mixed
|
||||||
|
- cl-mixed-mpg123
|
||||||
|
- cl-flac
|
||||||
|
- chunga
|
||||||
|
- usocket
|
||||||
|
- bordeaux-threads
|
||||||
|
|
||||||
|
** From Fade's repos
|
||||||
|
- playlisp (~/SourceCode/playlisp)
|
||||||
|
- parsector (~/SourceCode/parsector)
|
||||||
|
|
||||||
|
** To Be Created/Found
|
||||||
|
- MP3 encoder bindings (LAME)
|
||||||
|
- AAC encoder bindings (FDK-AAC) - optional
|
||||||
|
|
||||||
|
* Implementation Phases
|
||||||
|
|
||||||
|
** Phase 1: Proof of Concept
|
||||||
|
- [ ] Load Harmony and play audio files from playlist
|
||||||
|
- [ ] Capture PCM output to buffer
|
||||||
|
- [ ] Encode buffer to MP3 using LAME FFI
|
||||||
|
- [ ] Serve single HTTP stream to one client
|
||||||
|
|
||||||
|
** Phase 2: Core Streaming
|
||||||
|
- [ ] Implement ICY metadata injection
|
||||||
|
- [ ] Multi-client connection handling
|
||||||
|
- [ ] Ring buffer for audio data
|
||||||
|
- [ ] Mount point abstraction
|
||||||
|
|
||||||
|
** Phase 3: Integration
|
||||||
|
- [ ] Replace liquidsoap-command calls with direct CL calls
|
||||||
|
- [ ] Integrate with existing playlist scheduler
|
||||||
|
- [ ] Crossfading between tracks
|
||||||
|
- [ ] Listener statistics
|
||||||
|
|
||||||
|
** Phase 4: Feature Parity
|
||||||
|
- [ ] Multiple stream formats (MP3, AAC)
|
||||||
|
- [ ] Multiple bitrates
|
||||||
|
- [ ] Admin controls (skip, reload)
|
||||||
|
- [ ] YP directory registration
|
||||||
|
|
||||||
|
* Open Questions
|
||||||
|
|
||||||
|
1. Should cl-streamer be a separate ASDF system or part of asteroid?
|
||||||
|
2. How to handle the transition period (run both systems)?
|
||||||
|
3. Performance: Can CL handle 100+ concurrent listeners?
|
||||||
|
4. Licensing: LAME is LGPL, FDK-AAC has patent issues
|
||||||
|
|
||||||
|
* References
|
||||||
|
|
||||||
|
- [[https://codeberg.org/shirakumo/harmony][Harmony - CL Sound System]]
|
||||||
|
- [[https://codeberg.org/shirakumo/cl-mixed][cl-mixed - Audio Mixing Library]]
|
||||||
|
- [[https://github.com/fade/playlisp][playlisp - M3U Parser]]
|
||||||
|
- [[https://github.com/fade/parsector][parsector - Parser Combinators]]
|
||||||
|
- [[https://cast.readme.io/docs/icy][ICY Protocol Documentation]]
|
||||||
|
- [[https://www.icecast.org/docs/][Icecast Documentation]]
|
||||||
Loading…
Reference in New Issue