add pulseaudio thx boatbod

This commit is contained in:
Max 2020-08-05 21:31:11 -04:00
parent 1180f9d8b6
commit 28ef6960cc
3 changed files with 417 additions and 254 deletions

View File

@ -29,7 +29,8 @@ from optparse import OptionParser
from sockaudio import socket_audio
def signal_handler(signal, frame):
audiothread.stop()
sys.stderr.write("audio.py shutting down\n")
audio_handler.stop()
sys.exit(0)
parser = OptionParser()
@ -38,16 +39,16 @@ parser.add_option("-H", "--host-ip", type="string", default="0.0.0.0", help="IP
parser.add_option("-u", "--wireshark-port", type="int", default=23456, help="Wireshark port")
parser.add_option("-2", "--two-channel", action="store_true", default=False, help="single or two channel audio")
parser.add_option("-x", "--audio-gain", type="float", default="1.0", help="audio gain (default = 1.0)")
parser.add_option("-s", "--stdout", action="store_true", default=False, help="write to stdout instead of audio device")
(options, args) = parser.parse_args()
if len(args) != 0:
parser.print_help()
sys.exit(1)
audiothread = socket_audio(options.host_ip, options.wireshark_port, options.audio_output, options.two_channel, options.audio_gain)
audio_handler = socket_audio(options.host_ip, options.wireshark_port, options.audio_output, options.two_channel, options.audio_gain, options.stdout)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
while True:
time.sleep(1)
audio_handler.run()

View File

@ -68,7 +68,7 @@ from gr_gnuplot import mixer_sink_c
from gr_gnuplot import setup_correlation
from terminal import op25_terminal
from sockaudio import socket_audio
from sockaudio import audio_thread
#speeds = [300, 600, 900, 1200, 1440, 1800, 1920, 2400, 2880, 3200, 3600, 3840, 4000, 4800, 6000, 6400, 7200, 8000, 9600, 14400, 19200]
speeds = [4800, 6000]
@ -233,7 +233,7 @@ class p25_rx_block (gr.top_block):
# attach audio thread
if self.options.udp_player:
self.audio = socket_audio("127.0.0.1", self.options.wireshark_port, self.options.audio_output, False, self.options.audio_gain)
self.audio = audio_thread("127.0.0.1", self.options.wireshark_port, self.options.audio_output, False, self.options.audio_gain)
else:
self.audio = None

View File

@ -22,6 +22,7 @@
# 02110-1301, USA.
from ctypes import *
import os
import sys
import time
import threading
@ -29,16 +30,18 @@ import select
import socket
import errno
import struct
import ctypes
import numpy as np
from log_ts import log_ts
# OP25 defaults
PCM_RATE = 8000 # audio sample rate (Hz)
PCM_BUFFER_SIZE = 4000 # size of ALSA buffer in frames
PCM_RATE = 8000 # audio sample rate (Hz)
PCM_BUFFER_SIZE = 4000 # size of ALSA buffer in frames
MAX_SUPERFRAME_SIZE = 320 # maximum size of incoming UDP audio buffer
MAX_SUPERFRAME_SIZE = 320 # maximum size of incoming UDP audio buffer
# Debug
LOG_AUDIO_XRUNS = True # log audio underruns to stderr
LOG_AUDIO_XRUNS = True # log audio underruns to stderr
# Alsa PCM constants
SND_PCM_FORMAT_S8 = c_int(0)
@ -104,285 +107,444 @@ SND_PCM_ACCESS_RW_INTERLEAVED = c_int(3)
SND_PCM_ACCESS_RW_NONINTERLEAVED = c_int(4)
SND_PCM_ACCESS_LAST = SND_PCM_ACCESS_RW_NONINTERLEAVED
PA_STREAM_PLAYBACK = 1
PA_SAMPLE_S16LE = 3
# Python CTypes wrapper to Alsa libasound2
class alsasound(object):
def __init__(self):
self.libasound = cdll.LoadLibrary("libasound.so.2")
self.c_pcm = c_void_p()
self.format = 0
self.channels = 0
self.rate = 0
self.framesize = 0
def __init__(self):
self.libasound = cdll.LoadLibrary("libasound.so.2")
self.c_pcm = c_void_p()
self.format = 0
self.channels = 0
self.rate = 0
self.framesize = 0
def open(self, hwdev):
b_hwdev = create_string_buffer(str.encode(hwdev))
c_stream = SND_PCM_STREAM_PLAYBACK
err = self.libasound.snd_pcm_open(byref(self.c_pcm), b_hwdev, c_stream, SND_PCM_NORMAL)
return err
def open(self, hwdev):
b_hwdev = create_string_buffer(str.encode(hwdev))
c_stream = SND_PCM_STREAM_PLAYBACK
err = self.libasound.snd_pcm_open(byref(self.c_pcm), b_hwdev, c_stream, SND_PCM_NORMAL)
return err
def close(self):
if (self.c_pcm.value == None):
return
self.libasound.snd_pcm_close(self.c_pcm)
self.c_pcm.value = None
def close(self):
if (self.c_pcm.value == None):
return
self.libasound.snd_pcm_close(self.c_pcm)
self.c_pcm.value = None
def setup(self, pcm_format, pcm_channels, pcm_rate, pcm_buffer_size):
if (self.c_pcm.value == None):
return
def setup(self, pcm_format, pcm_channels, pcm_rate, pcm_buffer_size):
if (self.c_pcm.value == None):
return
self.format = pcm_format
self.channels = pcm_channels
self.rate = pcm_rate
pcm_buf_sz = c_ulong(pcm_buffer_size)
self.format = pcm_format
self.channels = pcm_channels
self.rate = pcm_rate
pcm_buf_sz = c_ulong(pcm_buffer_size)
c_pars = (c_void_p * int(self.libasound.snd_pcm_hw_params_sizeof() / sizeof(c_void_p)))()
err = self.libasound.snd_pcm_hw_params_any(self.c_pcm, c_pars)
if err < 0:
sys.stderr.write("hw_params_any failed: %d\n" % err)
return err
c_pars = (c_void_p * int(self.libasound.snd_pcm_hw_params_sizeof() / sizeof(c_void_p)))()
err = self.libasound.snd_pcm_hw_params_any(self.c_pcm, c_pars)
if err < 0:
sys.stderr.write("hw_params_any failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_access(self.c_pcm, c_pars, SND_PCM_ACCESS_RW_INTERLEAVED)
if err < 0:
sys.stderr.write("set_access failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_format(self.c_pcm, c_pars, c_uint(self.format))
if err < 0:
sys.stderr.write("set_format failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_channels(self.c_pcm, c_pars, c_uint(self.channels))
if err < 0:
sys.stderr.write("set_channels failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_rate(self.c_pcm, c_pars, c_uint(self.rate), c_int(0))
if err < 0:
sys.stderr.write("set_rate failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_buffer_size_near(self.c_pcm, c_pars, byref(pcm_buf_sz))
if err < 0:
sys.stderr.write("set_buffer_size_near failed: %d\n" % err)
return err
if pcm_buf_sz.value != pcm_buffer_size:
sys.stderr.write("set_buffer_size_near requested %d, but returned %d\n" % (pcm_buffer_size, pcm_buf_sz.value))
err = self.libasound.snd_pcm_hw_params(self.c_pcm, c_pars)
if err < 0:
sys.stderr.write("hw_params failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_access(self.c_pcm, c_pars, SND_PCM_ACCESS_RW_INTERLEAVED)
if err < 0:
sys.stderr.write("set_access failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_format(self.c_pcm, c_pars, c_uint(self.format))
if err < 0:
sys.stderr.write("set_format failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_channels(self.c_pcm, c_pars, c_uint(self.channels))
if err < 0:
sys.stderr.write("set_channels failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_rate(self.c_pcm, c_pars, c_uint(self.rate), c_int(0))
if err < 0:
sys.stderr.write("set_rate failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_hw_params_set_buffer_size_near(self.c_pcm, c_pars, byref(pcm_buf_sz))
if err < 0:
sys.stderr.write("set_buffer_size_near failed: %d\n" % err)
return err
if pcm_buf_sz.value != pcm_buffer_size:
sys.stderr.write("set_buffer_size_near requested %d, but returned %d\n" % (pcm_buffer_size, pcm_buf_sz.value))
err = self.libasound.snd_pcm_hw_params(self.c_pcm, c_pars)
if err < 0:
sys.stderr.write("hw_params failed: %d\n" % err)
return err
self.libasound.snd_pcm_hw_params_current(self.c_pcm, c_pars)
c_bits = self.libasound.snd_pcm_hw_params_get_sbits(c_pars)
self.framesize = self.channels * c_bits//8
self.libasound.snd_pcm_hw_params_current(self.c_pcm, c_pars)
c_bits = self.libasound.snd_pcm_hw_params_get_sbits(c_pars)
self.framesize = self.channels * c_bits//8
c_sw_pars = (c_void_p * int(self.libasound.snd_pcm_sw_params_sizeof() / sizeof(c_void_p)))()
err = self.libasound.snd_pcm_sw_params_current(self.c_pcm, c_sw_pars)
if err < 0:
sys.stderr.write("get_sw_params_current failed: %d\n" % err)
return err
pcm_start_threshold = int(pcm_buf_sz.value * 0.75)
err = self.libasound.snd_pcm_sw_params_set_start_threshold(self.c_pcm, c_sw_pars, c_uint(pcm_start_threshold))
if err < 0:
sys.stderr.write("set_sw_params_start_threshold failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_sw_params(self.c_pcm, c_sw_pars)
if err < 0:
sys.stderr.write("sw_params failed: %d\n" % err)
return err
c_sw_pars = (c_void_p * int(self.libasound.snd_pcm_sw_params_sizeof() / sizeof(c_void_p)))()
err = self.libasound.snd_pcm_sw_params_current(self.c_pcm, c_sw_pars)
if err < 0:
sys.stderr.write("get_sw_params_current failed: %d\n" % err)
return err
pcm_start_threshold = int(pcm_buf_sz.value * 0.75)
err = self.libasound.snd_pcm_sw_params_set_start_threshold(self.c_pcm, c_sw_pars, c_uint(pcm_start_threshold))
if err < 0:
sys.stderr.write("set_sw_params_start_threshold failed: %d\n" % err)
return err
err = self.libasound.snd_pcm_sw_params(self.c_pcm, c_sw_pars)
if err < 0:
sys.stderr.write("sw_params failed: %d\n" % err)
return err
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
#self.dump()
return ret
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
#self.dump()
return ret
def write(self, pcm_data):
datalen = len(pcm_data)
n_frames = c_ulong(datalen // self.framesize)
c_data = c_char_p(pcm_data)
ret = 0
def write(self, pcm_data):
datalen = len(pcm_data)
n_frames = c_ulong(datalen // self.framesize)
c_data = c_char_p(pcm_data)
ret = 0
if (self.c_pcm.value == None):
sys.stderr.write("PCM device is closed\n")
return -1
if (self.c_pcm.value == None):
sys.stderr.write("PCM device is closed\n")
return -1
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
if (ret < 0):
if (ret == -errno.EPIPE): # underrun
if (LOG_AUDIO_XRUNS):
sys.stderr.write("%f PCM underrun\n" % time.time())
ret = self.libasound.snd_pcm_recover(self.c_pcm, ret, 1)
if (ret >= 0):
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
else:
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
elif (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
if (ret < 0):
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
elif (ret < 0): # other error
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
if (ret < 0):
if (ret == -errno.EPIPE): # underrun
if (LOG_AUDIO_XRUNS):
sys.stderr.write("%s PCM underrun\n" % log_ts.get())
ret = self.libasound.snd_pcm_recover(self.c_pcm, ret, 1)
if (ret >= 0):
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
else:
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
ret = self.libasound.snd_pcm_writei(self.c_pcm, cast(c_data, POINTER(c_void_p)), n_frames)
elif (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
if (ret < 0):
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
elif (ret < 0): # other error
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
return ret
return ret
def drain(self):
ret = self.libasound.snd_pcm_drain(self.c_pcm)
if (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
def drain(self):
ret = self.libasound.snd_pcm_drain(self.c_pcm)
if (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
return ret
def drop(self):
ret = self.libasound.snd_pcm_drop(self.c_pcm)
if (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
def drop(self):
ret = self.libasound.snd_pcm_drop(self.c_pcm)
if (ret == -errno.ESTRPIPE): # suspended
while True:
ret = self.libasound.snd_pcm_resume(self.c_pcm)
if (ret != -errno.EAGAIN):
break
time.sleep(1)
ret = self.libasound.snd_pcm_prepare(self.c_pcm)
return ret
def dump(self):
if (self.c_pcm.value == None):
return
def dump(self):
if (self.c_pcm.value == None):
return
c_buf_p = c_void_p()
c_str_p = c_char_p()
c_strlen = c_uint(0)
self.libasound.snd_output_buffer_open(byref(c_buf_p))
self.libasound.snd_pcm_dump_setup(self.c_pcm, c_buf_p)
c_strlen = self.libasound.snd_output_buffer_string(c_buf_p, byref(c_str_p))
sys.stderr.write("%s\n" % c_str_p.value[0:c_strlen-1])
self.libasound.snd_output_close(c_buf_p)
c_buf_p = c_void_p()
c_str_p = c_char_p()
c_strlen = c_uint(0)
self.libasound.snd_output_buffer_open(byref(c_buf_p))
self.libasound.snd_pcm_dump_setup(self.c_pcm, c_buf_p)
c_strlen = self.libasound.snd_output_buffer_string(c_buf_p, byref(c_str_p))
sys.stderr.write("%s\n" % c_str_p.value[0:c_strlen-1])
self.libasound.snd_output_close(c_buf_p)
# OP25 thread to receive UDP audio samples and send to Alsa driver
class socket_audio(threading.Thread):
def __init__(self, udp_host, udp_port, pcm_device, two_channels = False, audio_gain = 1.0, **kwds):
threading.Thread.__init__(self, **kwds)
self.setDaemon(1)
self.keep_running = True
self.two_channels = two_channels
self.audio_gain = audio_gain
self.sock_a = None
self.sock_b = None
self.pcm = alsasound()
self.setup_sockets(udp_host, udp_port)
self.setup_pcm(pcm_device)
self.start()
return
def check(self):
return 0
def run(self):
while self.keep_running:
readable, writable, exceptional = select.select( [self.sock_a, self.sock_b], [], [self.sock_a, self.sock_b] )
in_a = None
in_b = None
data_a = ""
data_b = ""
flag_a = -1
flag_b = -1
class _struct_pa_sample_spec(Structure):
_fields_ = [("format", c_int),
("rate", c_uint32),
("channels", c_uint8)]
# Data received on the udp port is 320 bytes for an audio frame or 2 bytes for a flag
if self.sock_a in readable:
in_a = self.sock_a.recvfrom(MAX_SUPERFRAME_SIZE)
class pa_sound(object):
def __init__(self):
self.error = c_int(0)
self.libpa = cdll.LoadLibrary("libpulse-simple.so.0")
self.libpa.strerror.restype = c_char_p
self.ss = _struct_pa_sample_spec(PA_SAMPLE_S16LE, 8000, 2)
if self.sock_b in readable:
in_b = self.sock_b.recvfrom(MAX_SUPERFRAME_SIZE)
def open(self, hwdevice):
self.out = c_void_p(self.libpa.pa_simple_new(None,
"OP25".encode("ascii"),
PA_STREAM_PLAYBACK,
None,
"OP25 Playback".encode('ascii'),
byref(self.ss),
None,
None,
byref(self.error)))
if in_a is not None:
len_a = len(in_a[0])
if len_a == 2:
flag_a = np.frombuffer(in_a[0], dtype=np.int16)[0]
elif len_a > 0:
data_a = in_a[0]
if not self.out:
sys.stderr.write("%s Could not open PulseAudio stream: %s\n" % (log_ts.get(), self.libpa.strerror(self.error)))
if in_b is not None:
len_b = len(in_b[0])
if len_b == 2:
flag_b = np.frombuffer(in_b[0], dtype=np.int16)[0]
elif len_b > 0:
data_b = in_b[0]
return self.error.value
if (((flag_a == 0) and (flag_b == 0)) or
((flag_a == 0) and ((in_b is None) or (flag_b == 1))) or
((flag_b == 0) and ((in_a is None) or (flag_a == 1)))):
self.pcm.drain()
continue
def close(self):
self.libpa.pa_simple_free(self.out)
if (((flag_a == 1) and (flag_b == 1)) or
((flag_a == 1) and (in_b is None)) or
((flag_b == 1) and (in_a is None))):
self.pcm.drop()
continue
def setup(self, pcm_format, pcm_channels, pcm_rate, pcm_buffer_size):
self.ss.format = PA_SAMPLE_S16LE # fixed format
self.ss.channels = pcm_channels
self.ss.rate = pcm_rate
return 0
if not self.two_channels:
data_a = self.scale(data_a)
self.pcm.write(self.interleave(data_a, data_a))
else:
data_a = self.scale(data_a)
data_b = self.scale(data_b)
self.pcm.write(self.interleave(data_a, data_b))
def write(self, pcm_data):
self.libpa.pa_simple_write(self.out, pcm_data, len(pcm_data), byref(self.error))
return self.error
self.close_sockets()
self.close_pcm()
return
def drain(self):
self.libpa.pa_simple_drain(self.out, byref(self.error))
return self.error.value
def scale(self, data):
arr = np.array(np.frombuffer(data, dtype=np.int16), dtype=np.float32)
result = np.zeros(len(arr), dtype=np.int16)
arr = np.clip(arr*self.audio_gain, -32767, 32766, out=result)
return result.tobytes('C')
def drop(self):
self.libpa.pa_simple_flush(self.out, byref(self.error))
return self.error.value
def interleave(self, data_a, data_b):
arr_a = np.frombuffer(data_a, dtype=np.int16)
arr_b = np.frombuffer(data_b, dtype=np.int16)
d_len = max(len(arr_a), len(arr_b))
result = np.zeros(d_len*2, dtype=np.int16)
if len(arr_a):
# copy arr_a to result[0,2,4, ...]
result[ range(0, len(arr_a)*2, 2) ] = arr_a
if len(arr_b):
# copy arr_b to result[1,3,5, ...]
result[ range(1, len(arr_b)*2, 2) ] = arr_b
return result.tobytes('C')
def dump(self):
return 0
def stop(self):
self.keep_running = False
return
def check(self):
return 0
def setup_sockets(self, udp_host, udp_port):
self.sock_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_a.setblocking(0)
self.sock_b.setblocking(0)
self.sock_a.bind((udp_host, udp_port))
self.sock_b.bind((udp_host, udp_port + 2))
return
def close_sockets(self):
self.sock_a.close()
self.sock_b.close()
return
def setup_pcm(self, hwdevice):
sys.stderr.write('audio device: %s\n' % hwdevice)
err = self.pcm.open(hwdevice)
if err < 0:
sys.stderr.write('failed to open audio device: %s\n' % hwdevice)
self.pcm.dump()
self.keep_running = False
return
# Wrapper to emulate pcm writes of sound samples to stdout (for liquidsoap)
class stdout_wrapper(object):
def __init__(self):
self.silence = chr(0) * 640
pass
err = self.pcm.setup(SND_PCM_FORMAT_S16_LE.value, 2, PCM_RATE, PCM_BUFFER_SIZE)
if err < 0:
sys.stderr.write('failed to set up pcm stream\n')
self.keep_running = False
return
return
def open(self, hwdev):
return 0
def close_pcm(self):
self.pcm.close()
return
def close(self):
return 0
def setup(self, pcm_format, pcm_channels, pcm_rate, pcm_buffer_size):
return 0
def drain(self):
try:
sys.stdout.flush()
except IOError: # IOError means listener has terminated
return -1
return 0
def drop(self):
return 0
def write(self, pcm_data):
try:
sys.stdout.write(pcm_data)
except IOError: # IOError means listener has terminated
return -1
return 0
def check(self):
rc = 0
if (self.write(self.silence) < 0) or (self.drain() < 0): # write silence to check pipe connectivity
rc = -1
return rc
def dump(self):
pass
# Main class that receives UDP audio samples and sends them to a PCM subsystem (currently ALSA or STDOUT)
class socket_audio(object):
def __init__(self, udp_host, udp_port, pcm_device, two_channels = False, audio_gain = 1.0, dest_stdout = False, **kwds):
self.keep_running = True
self.two_channels = two_channels
self.audio_gain = audio_gain
self.dest_stdout = dest_stdout
self.sock_a = None
self.sock_b = None
self.pcm = None
if dest_stdout:
pcm_device = "stdout"
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) # reopen stdout with buffering disabled
self.pcm = stdout_wrapper()
else:
if pcm_device.lower() == "pulse":
try:
self.pcm = pa_sound() # first try to use PulseAudio
sys.stderr.write("using PulseAudio sound system\n")
except:
sys.stderr.write("unable to load PulseAudio library\n")
pcm_device = "default"
if self.pcm is None:
try:
self.pcm = alsasound() # if PulseAudio not available, try to use ALSA
sys.stderr.write("using ALSA sound system\n")
except:
sys.stderr.write("unable to load ALSA library\n")
if self.pcm is not None:
self.setup_pcm(pcm_device)
else:
self.keep_running = False
self.setup_sockets(udp_host, udp_port)
def run(self):
rc = 0
while self.keep_running and (rc >= 0):
readable, writable, exceptional = select.select( [self.sock_a, self.sock_b], [], [self.sock_a, self.sock_b], 5.0)
in_a = None
in_b = None
data_a = ""
data_b = ""
flag_a = -1
flag_b = -1
# Check for select() polling timeout and pcm self-check
if (not readable) and (not writable) and (not exceptional):
rc = self.pcm.check()
if isinstance(rc, ctypes.c_int):
rc = rc.value
continue
# Data received on the udp port is 320 bytes for an audio frame or 2 bytes for a flag
if self.sock_a in readable:
in_a = self.sock_a.recvfrom(MAX_SUPERFRAME_SIZE)
if self.sock_b in readable:
in_b = self.sock_b.recvfrom(MAX_SUPERFRAME_SIZE)
if in_a is not None:
len_a = len(in_a[0])
if len_a == 2:
flag_a = np.frombuffer(in_a[0], dtype=np.int16)[0]
elif len_a > 0:
data_a = in_a[0]
if in_b is not None:
len_b = len(in_b[0])
if len_b == 2:
flag_b = np.frombuffer(in_b[0], dtype=np.int16)[0]
elif len_b > 0:
data_b = in_b[0]
if (flag_a == 0) or (flag_b == 0):
rc = self.pcm.drain()
if isinstance(rc, ctypes.c_int):
rc = rc.value
continue
if (((flag_a == 1) and (flag_b == 1)) or
((flag_a == 1) and (in_b is None)) or
((flag_b == 1) and (in_a is None))):
rc = self.pcm.drop()
if isinstance(rc, ctypes.c_int):
rc = rc.value
continue
if not self.two_channels:
data_a = self.scale(data_a)
rc = self.pcm.write(self.interleave(data_a, data_a))
if isinstance(rc, ctypes.c_int):
rc = rc.value
else:
data_a = self.scale(data_a)
data_b = self.scale(data_b)
rc = self.pcm.write(self.interleave(data_a, data_b))
if isinstance(rc, ctypes.c_int):
rc = rc.value
self.close_sockets()
self.close_pcm()
return
def scale(self, data): # crude amplitude scaler (volume) for S16_LE samples
arr = np.array(np.frombuffer(data, dtype=np.int16), dtype=np.float32)
result = np.zeros(len(arr), dtype=np.int16)
arr = np.clip(arr*self.audio_gain, -32767, 32766, out=result)
return result.tobytes('C')
def interleave(self, data_a, data_b):
arr_a = np.frombuffer(data_a, dtype=np.int16)
arr_b = np.frombuffer(data_b, dtype=np.int16)
d_len = max(len(arr_a), len(arr_b))
result = np.zeros(d_len*2, dtype=np.int16)
if len(arr_a):
# copy arr_a to result[0,2,4, ...]
result[ range(0, len(arr_a)*2, 2) ] = arr_a
if len(arr_b):
# copy arr_b to result[1,3,5, ...]
result[ range(1, len(arr_b)*2, 2) ] = arr_b
return result.tobytes('C')
def stop(self):
self.keep_running = False
return
def setup_sockets(self, udp_host, udp_port):
sys.stderr.write("Listening on %s:%d\n" % (udp_host, udp_port))
self.sock_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_a.setblocking(0)
self.sock_b.setblocking(0)
self.sock_a.bind((udp_host, udp_port))
self.sock_b.bind((udp_host, udp_port + 2))
return
def close_sockets(self):
self.sock_a.close()
self.sock_b.close()
return
def setup_pcm(self, hwdevice):
sys.stderr.write('audio device: %s\n' % hwdevice)
err = self.pcm.open(hwdevice)
if err < 0:
sys.stderr.write('failed to open audio device: %s\n' % hwdevice)
self.pcm.dump()
self.keep_running = False
return
err = self.pcm.setup(SND_PCM_FORMAT_S16_LE.value, 2, PCM_RATE, PCM_BUFFER_SIZE)
if err < 0:
sys.stderr.write('failed to set up pcm stream\n')
self.keep_running = False
return
return
def close_pcm(self):
sys.stderr.write('audio closing\n')
if self.pcm is not None:
self.pcm.close()
return
class audio_thread(threading.Thread):
def __init__(self, udp_host, udp_port, pcm_device, two_channels = False, audio_gain = 1.0, dest_stdout = False, **kwds):
threading.Thread.__init__(self, **kwds)
self.setDaemon(True)
self.keep_running = True
self.sock_audio = socket_audio(udp_host, udp_port, pcm_device, two_channels, audio_gain, dest_stdout, **kwds)
self.start()
return
def run(self):
self.sock_audio.run()
def stop(self):
self.sock_audio.stop()