|
|
|
@ -29,6 +29,7 @@ import select |
|
|
|
|
import socket |
|
|
|
|
import errno |
|
|
|
|
import struct |
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
# OP25 defaults |
|
|
|
|
PCM_RATE = 8000 # audio sample rate (Hz) |
|
|
|
@ -169,7 +170,7 @@ class alsasound(object): |
|
|
|
|
|
|
|
|
|
self.libasound.snd_pcm_hw_params_current(self.c_pcm, c_pars) |
|
|
|
|
c_bits = self.libasound.snd_pcm_hw_params_get_sbits(c_pars) |
|
|
|
|
self.framesize = self.channels * c_bits/8 |
|
|
|
|
self.framesize = self.channels * c_bits//8 |
|
|
|
|
|
|
|
|
|
c_sw_pars = (c_void_p * int(self.libasound.snd_pcm_sw_params_sizeof() / sizeof(c_void_p)))() |
|
|
|
|
err = self.libasound.snd_pcm_sw_params_current(self.c_pcm, c_sw_pars) |
|
|
|
@ -192,7 +193,7 @@ class alsasound(object): |
|
|
|
|
|
|
|
|
|
def write(self, pcm_data): |
|
|
|
|
datalen = len(pcm_data) |
|
|
|
|
n_frames = c_ulong(datalen / self.framesize) |
|
|
|
|
n_frames = c_ulong(datalen // self.framesize) |
|
|
|
|
c_data = c_char_p(pcm_data) |
|
|
|
|
ret = 0 |
|
|
|
|
|
|
|
|
@ -293,14 +294,14 @@ class socket_audio(threading.Thread): |
|
|
|
|
if in_a is not None: |
|
|
|
|
len_a = len(in_a[0]) |
|
|
|
|
if len_a == 2: |
|
|
|
|
flag_a = ord(in_a[0][0]) + (ord(in_a[0][1]) << 8) # 16 bit little endian |
|
|
|
|
flag_a = np.frombuffer(in_a[0], dtype=np.int16)[0] |
|
|
|
|
elif len_a > 0: |
|
|
|
|
data_a = in_a[0] |
|
|
|
|
|
|
|
|
|
if in_b is not None: |
|
|
|
|
len_b = len(in_b[0]) |
|
|
|
|
if len_b == 2: |
|
|
|
|
flag_b = ord(in_b[0][0]) + (ord(in_b[0][1]) << 8) # 16 bit little endian |
|
|
|
|
flag_b = np.frombuffer(in_b[0], dtype=np.int16)[0] |
|
|
|
|
elif len_b > 0: |
|
|
|
|
data_b = in_b[0] |
|
|
|
|
|
|
|
|
@ -328,31 +329,24 @@ class socket_audio(threading.Thread): |
|
|
|
|
self.close_pcm() |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def scale(self, data): # crude amplitude scaler (volume) for S16_LE samples |
|
|
|
|
scaled_data = "" |
|
|
|
|
d_len = len(data) / 2 |
|
|
|
|
iter_d = iter(data) |
|
|
|
|
i = 0; |
|
|
|
|
while i < d_len: |
|
|
|
|
i += 1 |
|
|
|
|
pcm_r = struct.unpack('<h', next(iter_d, chr(0)) + next(iter_d, chr(0)))[0] |
|
|
|
|
pcm_s = min(max((int)(pcm_r * self.audio_gain), -32768), 32767) |
|
|
|
|
scaled_data += struct.pack('<h', pcm_s) |
|
|
|
|
return scaled_data |
|
|
|
|
def scale(self, data): |
|
|
|
|
arr = np.array(np.frombuffer(data, dtype=np.int16), dtype=np.float32) |
|
|
|
|
result = np.zeros(len(arr), dtype=np.int16) |
|
|
|
|
arr = np.clip(arr*self.audio_gain, -32767, 32766, out=result) |
|
|
|
|
return result.tobytes('C') |
|
|
|
|
|
|
|
|
|
def interleave(self, data_a, data_b): |
|
|
|
|
combi_data = "" |
|
|
|
|
d_len = max(len(data_a), len(data_b)) |
|
|
|
|
iter_a = iter(data_a) |
|
|
|
|
iter_b = iter(data_b) |
|
|
|
|
i = 0 |
|
|
|
|
while i < d_len: |
|
|
|
|
i += 2 |
|
|
|
|
combi_data += next(iter_a, chr(0)) |
|
|
|
|
combi_data += next(iter_a, chr(0)) |
|
|
|
|
combi_data += next(iter_b, chr(0)) |
|
|
|
|
combi_data += next(iter_b, chr(0)) |
|
|
|
|
return combi_data |
|
|
|
|
arr_a = np.frombuffer(data_a, dtype=np.int16) |
|
|
|
|
arr_b = np.frombuffer(data_b, dtype=np.int16) |
|
|
|
|
d_len = max(len(arr_a), len(arr_b)) |
|
|
|
|
result = np.zeros(d_len*2, dtype=np.int16) |
|
|
|
|
if len(arr_a): |
|
|
|
|
# copy arr_a to result[0,2,4, ...] |
|
|
|
|
result[ range(0, len(arr_a)*2, 2) ] = arr_a |
|
|
|
|
if len(arr_b): |
|
|
|
|
# copy arr_b to result[1,3,5, ...] |
|
|
|
|
result[ range(1, len(arr_b)*2, 2) ] = arr_b |
|
|
|
|
return result.tobytes('C') |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self.keep_running = False |
|
|
|
|