Fixed race of starting thread in constructor. Added "replace" attach method.

Added capability to write .lbc and .au files with headers.


git-svn-id: http://voip.null.ro/svn/yate@1222 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2007-03-19 23:06:06 +00:00
parent 21bb8d8521
commit 4f6688f5b3
1 changed files with 210 additions and 42 deletions

View File

@ -36,12 +36,14 @@ namespace { // anonymous
class WaveSource : public ThreadedSource
{
public:
WaveSource(const String& file, CallEndpoint* chan, bool autoclose = true, bool autorepeat = false);
static WaveSource* create(const String& file, CallEndpoint* chan, bool autoclose = true, bool autorepeat = false);
~WaveSource();
virtual void run();
virtual void cleanup();
void setNotify(const String& id);
private:
WaveSource(const char* file, CallEndpoint* chan, bool autoclose);
void init(const String& file, bool autorepeat);
void detectAuFormat();
void detectWavFormat();
void detectIlbcFormat();
@ -64,14 +66,25 @@ private:
class WaveConsumer : public DataConsumer
{
public:
WaveConsumer(const String& file, CallEndpoint* chan = 0, unsigned maxlen = 0);
enum Header {
None = 0,
Au,
Ilbc,
};
WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen, const char* format = 0);
~WaveConsumer();
virtual bool setFormat(const DataFormat& format);
virtual void Consume(const DataBlock& data, unsigned long tStamp);
inline void setNotify(const String& id)
{ m_id = id; }
private:
void writeIlbcHeader() const;
void writeAuHeader();
CallEndpoint* m_chan;
int m_fd;
bool m_swap;
bool m_locked;
Header m_header;
unsigned m_total;
unsigned m_maxlen;
u_int64_t m_time;
@ -81,7 +94,7 @@ private:
class WaveChan : public Channel
{
public:
WaveChan(const String& file, bool record, unsigned maxlen = 0, bool autorepeat = false);
WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat, const char* format = 0);
~WaveChan();
};
@ -128,12 +141,27 @@ bool s_asyncDelete = true;
INIT_PLUGIN(WaveFileDriver);
WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, bool autorepeat)
: m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1),
m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false),
m_nodata(false)
typedef struct {
uint32_t sign;
uint32_t offs;
uint32_t len;
uint32_t form;
uint32_t freq;
uint32_t chan;
} AuHeader;
#define ILBC_HEADER_LEN 9
WaveSource* WaveSource::create(const String& file, CallEndpoint* chan, bool autoclose, bool autorepeat)
{
WaveSource* tmp = new WaveSource(file,chan,autoclose);
tmp->init(file,autorepeat);
return tmp;
}
void WaveSource::init(const String& file, bool autorepeat)
{
Debug(&__plugin,DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file.c_str(),chan,this);
if (file == "-") {
m_nodata = true;
m_brate = 8000;
@ -178,6 +206,14 @@ WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, b
}
}
WaveSource::WaveSource(const char* file, CallEndpoint* chan, bool autoclose)
: m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1),
m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false),
m_nodata(false)
{
Debug(&__plugin,DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file,chan,this);
}
WaveSource::~WaveSource()
{
Debug(&__plugin,DebugAll,"WaveSource::~WaveSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp());
@ -197,14 +233,7 @@ WaveSource::~WaveSource()
void WaveSource::detectAuFormat()
{
struct {
uint32_t sign;
uint32_t offs;
uint32_t len;
uint32_t form;
uint32_t freq;
uint32_t chan;
} header;
AuHeader header;
if ((::read(m_fd,&header,sizeof(header)) != sizeof(header)) ||
(ntohl(header.sign) != 0x2E736E64)) {
Debug(DebugMild,"Invalid .au file header, assuming raw signed linear");
@ -240,7 +269,6 @@ void WaveSource::detectWavFormat()
Debug(DebugMild,".wav not supported yet, assuming raw signed linear");
}
#define ILBC_HEADER_LEN 9
void WaveSource::detectIlbcFormat()
{
char header[ILBC_HEADER_LEN+1];
@ -363,6 +391,17 @@ void WaveSource::setNotify(const String& id)
bool WaveSource::notify(DataSource* source, const char* reason)
{
if (!m_chan) {
if (m_id) {
DDebug(&__plugin,DebugAll,"WaveSource enqueueing notify message [%p]",this);
Message* m = new Message("chan.notify");
m->addParam("targetid",m_id);
if (reason)
m->addParam("reason",reason);
Engine::enqueue(m);
}
return false;
}
if (m_id || m_autoclose) {
DDebug(&__plugin,DebugInfo,"Preparing '%s' disconnector for '%s' chan '%s' source=%p [%p]",
reason,m_id.c_str(),(m_chan ? m_chan->id().c_str() : ""),source,this);
@ -373,11 +412,16 @@ bool WaveSource::notify(DataSource* source, const char* reason)
}
WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen)
: m_chan(chan), m_fd(-1), m_total(0), m_maxlen(maxlen), m_time(0)
WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen, const char* format)
: m_chan(chan), m_fd(-1), m_swap(false), m_locked(false), m_header(None),
m_total(0), m_maxlen(maxlen), m_time(0)
{
Debug(&__plugin,DebugAll,"WaveConsumer::WaveConsumer(\"%s\",%p,%u) [%p]",
file.c_str(),chan,maxlen,this);
Debug(&__plugin,DebugAll,"WaveConsumer::WaveConsumer(\"%s\",%p,%u,\"%s\") [%p]",
file.c_str(),chan,maxlen,format,this);
if (format) {
m_locked = true;
m_format = format;
}
if (file == "-")
return;
else if (file.endsWith(".gsm"))
@ -390,6 +434,10 @@ WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxl
m_format = "ilbc20";
else if (file.endsWith(".ilbc30"))
m_format = "ilbc30";
else if (file.endsWith(".lbc"))
m_header=Ilbc;
else if (file.endsWith(".au"))
m_header=Au;
m_fd = ::open(file.safe(),O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY|O_BINARY,S_IRUSR|S_IWUSR);
if (m_fd < 0)
Debug(DebugWarn,"Creating '%s': error %d: %s",
@ -412,13 +460,94 @@ WaveConsumer::~WaveConsumer()
}
}
void WaveConsumer::writeIlbcHeader() const
{
if (m_format == "ilbc20")
::write(m_fd,"#!iLBC20\n",ILBC_HEADER_LEN);
else if (m_format == "ilbc30")
::write(m_fd,"#!iLBC30\n",ILBC_HEADER_LEN);
else
Debug(DebugMild,"Invalid iLBC format '%s', not writing header",m_format.c_str());
}
void WaveConsumer::writeAuHeader()
{
AuHeader header;
if (m_format == "slin") {
m_swap = true;
header.form = ntohl(3);
}
else if (m_format == "mulaw")
header.form = ntohl(1);
else if (m_format == "alaw")
header.form = ntohl(27);
else {
Debug(DebugMild,"Invalid au format '%s', not writing header",m_format.c_str());
return;
}
header.sign = htonl(0x2E736E64);
header.offs = htonl(sizeof(header));
header.freq = ntohl(8000);
header.chan = ntohl(1);
header.len = 0;
::write(m_fd,&header,sizeof(header));
}
bool WaveConsumer::setFormat(const DataFormat& format)
{
if (m_locked || (format == "slin"))
return false;
bool ok = false;
switch (m_header) {
case Ilbc:
if ((format == "ilbc20") || (format == "ilbc30"))
ok = true;
break;
case Au:
if ((format == "mulaw") || (format == "alaw"))
ok = true;
break;
default:
break;
}
if (ok) {
DDebug(DebugInfo,"WaveConsumer new format '%s'",format.c_str());
m_format = format;
m_locked = true;
return true;
}
return false;
}
void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
{
if (!data.null()) {
if (!m_time)
m_time = Time::now();
if (m_fd >= 0)
::write(m_fd,data.data(),data.length());
if (m_fd >= 0) {
switch (m_header) {
case Ilbc:
writeIlbcHeader();
break;
case Au:
writeAuHeader();
break;
default:
break;
}
m_header = None;
if (m_swap) {
unsigned int n = data.length();
DataBlock swapped(0,n);
const uint16_t* s = (const uint16_t*)data.data();
uint16_t* d = (uint16_t*)swapped.data();
for (unsigned int i = 0; i < n; i+= 2)
*d++ = ntohs(*s++);
::write(m_fd,swapped.data(),swapped.length());
}
else
::write(m_fd,data.data(),data.length());
}
m_total += data.length();
if (m_maxlen && (m_total >= m_maxlen)) {
m_maxlen = 0;
@ -442,7 +571,7 @@ Disconnector::Disconnector(CallEndpoint* chan, const String& id, DataSource* sou
: m_chan(chan), m_msg(0), m_source(source), m_disc(disc)
{
if (id) {
Message *m = new Message("chan.notify");
Message* m = new Message("chan.notify");
if (chan)
m->addParam("id",chan->id());
m->addParam("targetid",id);
@ -495,16 +624,16 @@ void Disconnector::run()
}
WaveChan::WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat)
WaveChan::WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat, const char* format)
: Channel(__plugin)
{
Debug(this,DebugAll,"WaveChan::WaveChan(%s) [%p]",(record ? "record" : "play"),this);
if (record) {
setConsumer(new WaveConsumer(file,this,maxlen));
setConsumer(new WaveConsumer(file,this,maxlen,format));
getConsumer()->deref();
}
else {
setSource(new WaveSource(file,this,true,autorepeat));
setSource(WaveSource::create(file,this,true,autorepeat));
getSource()->deref();
}
}
@ -517,7 +646,7 @@ WaveChan::~WaveChan()
bool AttachHandler::received(Message &msg)
{
int more = 3;
int more = 4;
String src(msg.getValue("source"));
if (src.null())
more--;
@ -531,11 +660,11 @@ bool AttachHandler::received(Message &msg)
else {
Debug(DebugWarn,"Could not attach source with method '%s', use 'play'",
src.matchString(1).c_str());
src = "";
src.clear();
}
}
else
src = "";
src.clear();
}
String cons(msg.getValue("consumer"));
@ -551,11 +680,11 @@ bool AttachHandler::received(Message &msg)
else {
Debug(DebugWarn,"Could not attach consumer with method '%s', use 'record'",
cons.matchString(1).c_str());
cons = "";
cons.clear();
}
}
else
cons = "";
cons.clear();
}
String ovr(msg.getValue("override"));
@ -571,14 +700,34 @@ bool AttachHandler::received(Message &msg)
else {
Debug(DebugWarn,"Could not attach override with method '%s', use 'play'",
ovr.matchString(1).c_str());
ovr = "";
ovr.clear();
}
}
else
ovr = "";
ovr.clear();
}
if (src.null() && cons.null() && ovr.null())
String repl(msg.getValue("replace"));
if (repl.null())
more--;
else {
Regexp r("^wave/\\([^/]*\\)/\\(.*\\)$");
if (repl.matches(r)) {
if (repl.matchString(1) == "play") {
repl = repl.matchString(2);
more--;
}
else {
Debug(DebugWarn,"Could not attach replacement with method '%s', use 'play'",
repl.matchString(1).c_str());
repl.clear();
}
}
else
repl.clear();
}
if (src.null() && cons.null() && ovr.null() && repl.null())
return false;
// if single attach was requested we can return true if everything is ok
@ -598,7 +747,7 @@ bool AttachHandler::received(Message &msg)
}
if (!src.null()) {
WaveSource* s = new WaveSource(src,ch,false,msg.getBoolValue("autorepeat"));
WaveSource* s = WaveSource::create(src,ch,false,msg.getBoolValue("autorepeat"));
ch->setSource(s);
s->setNotify(msg.getValue("notify"));
s->deref();
@ -606,7 +755,7 @@ bool AttachHandler::received(Message &msg)
}
if (!cons.null()) {
WaveConsumer* c = new WaveConsumer(cons,ch,maxlen);
WaveConsumer* c = new WaveConsumer(cons,ch,maxlen,msg.getValue("format"));
c->setNotify(msg.getValue("notify"));
ch->setConsumer(c);
c->deref();
@ -620,7 +769,7 @@ bool AttachHandler::received(Message &msg)
ret = false;
break;
}
WaveSource* s = new WaveSource(ovr,0,false,msg.getBoolValue("autorepeat"));
WaveSource* s = WaveSource::create(ovr,0,false,msg.getBoolValue("autorepeat"));
s->setNotify(msg.getValue("notify"));
if (DataTranslator::attachChain(s,c,true))
msg.clearParam("override");
@ -632,6 +781,25 @@ bool AttachHandler::received(Message &msg)
break;
}
while (!repl.null()) {
DataConsumer* c = ch->getConsumer();
if (!c) {
Debug(DebugWarn,"Wave replacement '%s' attach request with no consumer!",repl.c_str());
ret = false;
break;
}
WaveSource* s = WaveSource::create(repl,0,false,msg.getBoolValue("autorepeat"));
s->setNotify(msg.getValue("notify"));
if (DataTranslator::attachChain(s,c,false))
msg.clearParam("replace");
else {
Debug(DebugWarn,"Failed to replacement attach wave '%s' to consumer %p",repl.c_str(),c);
s->deref();
ret = false;
}
break;
}
// Stop dispatching if we handled all requested
return ret && !more;
}
@ -701,14 +869,14 @@ bool RecordHandler::received(Message &msg)
}
if (!c1.null()) {
WaveConsumer* c = new WaveConsumer(c1,ch,maxlen);
WaveConsumer* c = new WaveConsumer(c1,ch,maxlen,msg.getValue("format"));
c->setNotify(msg.getValue("notify"));
de->setCallRecord(c);
c->deref();
}
if (!c2.null()) {
WaveConsumer* c = new WaveConsumer(c2,ch,maxlen);
WaveConsumer* c = new WaveConsumer(c2,ch,maxlen,msg.getValue("format"));
c->setNotify(msg.getValue("notify"));
de->setPeerRecord(c);
c->deref();
@ -740,7 +908,7 @@ bool WaveFileDriver::msgExecute(Message& msg, String& dest)
if (ch) {
Debug(this,DebugInfo,"%s wave file '%s'", (meth ? "Record to" : "Play from"),
dest.matchString(2).c_str());
WaveChan *c = new WaveChan(dest.matchString(2),meth,maxlen,msg.getBoolValue("autorepeat"));
WaveChan *c = new WaveChan(dest.matchString(2),meth,maxlen,msg.getBoolValue("autorepeat"),msg.getValue("format"));
if (ch->connect(c,msg.getValue("reason"))) {
msg.setParam("peerid",c->id());
c->deref();
@ -774,7 +942,7 @@ bool WaveFileDriver::msgExecute(Message& msg, String& dest)
}
m = "call.execute";
m.addParam("callto",callto);
WaveChan *c = new WaveChan(dest.matchString(2),meth,maxlen,msg.getBoolValue("autorepeat"));
WaveChan *c = new WaveChan(dest.matchString(2),meth,maxlen,msg.getBoolValue("autorepeat"),msg.getValue("format"));
m.setParam("id",c->id());
m.userData(c);
if (Engine::dispatch(m)) {