Video and other media support in SIP, removed PosgreSQL modules.

git-svn-id: http://voip.null.ro/svn/yate@493 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-09-06 02:51:09 +00:00
parent 6c96628aa8
commit b6e7a421e9
31 changed files with 732 additions and 1122 deletions

View File

@ -19,3 +19,7 @@ yate.core*
*.orig
*~
.*.swp
*.log
*.out
*.csv
*.tsv

View File

@ -1,9 +0,0 @@
[general]
tcp = yes
host = localhost
port = 5432
database = yate
user = yate
password =
socket =
priority = 50

View File

@ -1,64 +0,0 @@
DROP TABLE route;
DROP TABLE preroute;
DROP TABLE cdr;
DROP SEQUENCE route_routeid_seq;
DROP SEQUENCE preroute_prerouteid_seq;
DROP SEQUENCE cdr_cdrid_seq;
CREATE TABLE route (
routeid bigserial,
context varchar(15),
prefix varchar(50),
tehno varchar(20),
data varchar(100)
);
CREATE TABLE preroute (
prerouteid bigserial,
tehno varchar(20),
channel varchar(20),
caller varchar(30),
called varchar(30),
context varchar(15)
);
CREATE TABLE cdr (
cdrid bigserial,
time TIMESTAMP with time zone NOT NULL DEFAULT now(),
channel varchar(20),
caller varchar(30),
called varchar(30),
billtime INTEGER NOT NULL default '0',
ringtime INTEGER NOT NULL default '0',
duration INTEGER NOT NULL default '0',
status varchar(15)
);
INSERT INTO route (context,prefix,tehno,data) VALUES ('paul','1','SIP','jen');
INSERT INTO route (context,prefix,tehno,data) VALUES ('bell','112','SIP','bell');
INSERT INTO route (context,prefix,tehno,data) VALUES ('gigi','4021','ZAP','1');
INSERT INTO route (context,prefix,tehno,data) VALUES ('whatever','40238','ZAP','g1');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','1','SIP','jen');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','2','SIP','bell');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','3','ZAP','g1');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','4','ZAP','g2');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','11','ZAP','g11');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','12','ZAP','g12');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','13','ZAP','g13');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','14','ZAP','g14');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','15','ZAP','g15');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','16','ZAP','g16');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','17','ZAP','g17');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','18','ZAP','g18');
INSERT INTO route (context,prefix,tehno,data) VALUES ('default','19','ZAP','g19');
INSERT INTO preroute (tehno,channel,caller,called,context) VALUES ('Zap','1','5556','','default');
INSERT INTO preroute (tehno,channel,caller,called,context) VALUES ('Zap','1','1','','default');
INSERT INTO preroute (tehno,channel,caller,called,context) VALUES ('Zap','1','2','','bell');
INSERT INTO preroute (tehno,channel,caller,called,context) VALUES ('Zap','1','12','','gigi');
INSERT INTO cdr (channel,caller,called,billtime,ringtime,duration) VALUES ('Zap','1','12','34','10','45');

View File

@ -1,22 +0,0 @@
DROP TABLE register;
DROP TABLE routepaid;
DROP SEQUENCE register_registerid_seq;
DROP SEQUENCE routepaid_routepaidid_seq;
CREATE TABLE register (
registerid bigserial,
username varchar(20),
password varchar(20),
e164 varchar(30),
credit varchar(20),
context varchar(15)
);
CREATE TABLE routepaid (
routepaidid bigserial,
context varchar(15),
prefix varchar(50),
tehno varchar(20),
data varchar(100),
price INTEGER NOT NULL default '0'
);

View File

@ -3,6 +3,8 @@ AC_INIT(YATE, 0.9.0)
AC_CONFIG_SRCDIR([README])
AC_PREREQ(2.52)
PACKAGE_RELEASE="pre1"
PACKAGE_VERSION_MAJOR="${PACKAGE_VERSION%%.*}"
PACKAGE_VERSION_MINOR="${PACKAGE_VERSION#*.}"
PACKAGE_VERSION_MINOR="${PACKAGE_VERSION_MINOR%.*}"
@ -10,6 +12,7 @@ PACKAGE_VERSION_BUILD="${PACKAGE_VERSION##*.}"
AC_SUBST(PACKAGE_VERSION_MAJOR)
AC_SUBST(PACKAGE_VERSION_MINOR)
AC_SUBST(PACKAGE_VERSION_BUILD)
AC_SUBST(PACKAGE_RELEASE)
# Checks for programs.
AC_PROG_CXX
@ -125,28 +128,6 @@ AC_SUBST(FDSIZE_HACK)
# Checks for optional libraries.
HAVE_PGSQL=no
PGSQL_INC=""
AC_ARG_WITH(libpq,AC_HELP_STRING([--with-libpq=DIR],[use Postgress SQL from DIR (default /usr)]),[ac_cv_use_libpq=$withval],[ac_cv_use_libpq=/usr])
if [[ "x$ac_cv_use_libpq" != "xno" ]]; then
AC_MSG_CHECKING([for Postgress SQL in $ac_cv_use_libpq])
libpq="$ac_cv_use_libpq/lib/libpq.so"
for i in include include/pgsql pgsql/include include/postgresql; do
incpq="$ac_cv_use_libpq/$i"
test -f "$incpq/libpq-fe.h" && break
done
if [[ ! -f "$incpq/libpq-fe.h" ]]; then
incpq="$incpq/pgsql"
fi
if [[ -f "$incpq/libpq-fe.h" -a -f "$libpq" ]]; then
HAVE_PGSQL=yes
PGSQL_INC="-I$incpq"
fi
AC_MSG_RESULT([$HAVE_PGSQL])
fi
AC_SUBST(HAVE_PGSQL)
AC_SUBST(PGSQL_INC)
HAVE_PRI=no
HAVE_PRI_CB=no
AC_ARG_WITH(libpri,AC_HELP_STRING([--with-libpri],[use ISDN PRI if available (default)]),[ac_cv_use_libpri=$withval],[ac_cv_use_libpri=yes])

View File

@ -168,17 +168,23 @@ bool RTPReceiver::decodeEvent(bool marker, unsigned int timestamp, const void* d
int vol = pc[1] & 0x3f;
bool end = (pc[1] & 0x80) != 0;
int duration = ((int)pc[2] << 8) | pc[3];
if (m_evTs) {
if (m_evTs && (m_evNum >= 0)) {
if ((m_evNum != event) && (m_evTs <= timestamp))
pushEvent(m_evNum,timestamp - m_evTs,m_evVol,m_evTs);
}
m_evTs = timestamp;
m_evNum = event;
m_evVol = vol;
if (!end)
if (!end) {
m_evTs = timestamp;
m_evNum = event;
return true;
// FIXME: add code to push the event and also filter dumb senders
return false;
}
if (m_evTs > timestamp)
return false;
// make sure we don't see the same event again
m_evTs = timestamp+1;
m_evNum = -1;
pushEvent(event,duration,vol,timestamp);
return true;
}
bool RTPReceiver::decodeSilence(bool marker, unsigned int timestamp, const void* data, int len)
@ -188,7 +194,7 @@ bool RTPReceiver::decodeSilence(bool marker, unsigned int timestamp, const void*
void RTPReceiver::finishEvent(unsigned int timestamp)
{
if (!m_evTs)
if ((m_evNum < 0) || !m_evTs)
return;
int duration = timestamp - m_evTs;
if (duration < 10000)

View File

@ -39,6 +39,7 @@ static TokenDict sip_responses[] = {
{ "Queued", 182 },
{ "Session Progress", 183 },
{ "OK", 200 },
{ "Accepted", 202 },
{ "Multiple Choices", 300 },
{ "Moved Permanently", 301 },
{ "Moved Temporarily", 302 },
@ -70,6 +71,7 @@ static TokenDict sip_responses[] = {
{ "Busy Here", 486 },
{ "Request Terminated", 487 },
{ "Not Acceptable Here", 488 },
{ "Bad Event", 489 },
{ "Request Pending", 491 },
{ "Undecipherable", 493 },
{ "Server Internal Error", 500 },

View File

@ -276,7 +276,7 @@ SIPMessage::SIPMessage(const char* _method, const char* _uri, const char* _versi
SIPMessage::SIPMessage(SIPParty* ep, const char* buf, int len)
: body(0), m_ep(ep), m_valid(false), m_answer(false), m_outgoing(false), m_ack(false), m_cseq(-1)
{
Debug(DebugInfo,"SIPMessage::SIPMessage(%p,%d) [%p]\n------\n%s------",
DDebug(DebugInfo,"SIPMessage::SIPMessage(%p,%d) [%p]\n------\n%s------",
buf,len,this,buf);
if (m_ep)
m_ep->ref();
@ -740,10 +740,13 @@ const DataBlock& SIPMessage::getBuffer() const
m_data += "Content-Length: 0\r\n\r\n";
if (body)
m_data += body->getBody();
String buf((char*)m_data.data(),m_data.length());
Debug(DebugInfo,"SIPMessage::getBuffer() [%p]\n------\n%s------",
this,buf.c_str());
#ifdef DEBUG
if (debugAt(DebugInfo)) {
String buf((char*)m_data.data(),m_data.length());
Debug(DebugInfo,"SIPMessage::getBuffer() [%p]\n------\n%s------",
this,buf.c_str());
}
#endif
}
return m_data;
}

View File

@ -159,6 +159,27 @@ DataEndpoint* CallEndpoint::setEndpoint(const char* type)
return dat;
}
void CallEndpoint::clearEndpoint(const char* type)
{
if (null(type)) {
ObjList* l = m_data.skipNull();
for (; l; l=l->skipNext()) {
DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
DDebug(DebugAll,"Endpoint at %p type '%s' peer %p",e,e->name().c_str(),e->getPeer());
e->disconnect();
}
m_data.clear();
}
else {
DataEndpoint* dat = getEndpoint(type);
if (dat) {
m_data.remove(dat,false);
dat->disconnect();
dat->destruct();
}
}
}
void CallEndpoint::setSource(DataSource* source, const char* type)
{
DataEndpoint* dat = source ? setEndpoint(type) : getEndpoint(type);

View File

@ -49,18 +49,20 @@ class SimpleTranslator : public DataTranslator
public:
SimpleTranslator(const DataFormat& sFormat, const DataFormat& dFormat)
: DataTranslator(sFormat,dFormat) { }
virtual void Consume(const DataBlock& data, unsigned long timeDelta)
virtual void Consume(const DataBlock& data, unsigned long tStamp)
{
ref();
if (getTransSource()) {
DataBlock oblock;
if (oblock.convert(data, m_format, getTransSource()->getFormat())) {
if (!timeDelta) {
timeDelta = data.length();
if (timeDelta > oblock.length())
timeDelta = oblock.length();
if (!tStamp) {
unsigned int delta = data.length();
if (delta > oblock.length())
delta = oblock.length();
tStamp = m_timestamp + delta;
}
getTransSource()->Forward(oblock, timeDelta);
m_timestamp = tStamp;
getTransSource()->Forward(oblock, tStamp);
}
}
deref();
@ -175,22 +177,23 @@ void* DataConsumer::getObject(const String& name) const
return DataNode::getObject(name);
}
void DataSource::Forward(const DataBlock& data, unsigned long timeDelta)
void DataSource::Forward(const DataBlock& data, unsigned long tStamp)
{
// no number of samples provided - try to guess
if (!timeDelta) {
Lock lock(m_mutex);
// no timestamp provided - try to guess
if (!tStamp) {
tStamp = m_timestamp;
const FormatInfo* f = m_format.getInfo();
if (f)
timeDelta = f->guessSamples(data.length());
tStamp += f->guessSamples(data.length());
}
Lock lock(m_mutex);
m_timestamp = tStamp;
ref();
ObjList *l = m_consumers.skipNull();
for (; l; l=l->skipNext()) {
DataConsumer *c = static_cast<DataConsumer *>(l->get());
c->Consume(data,timeDelta);
c->Consume(data,m_timestamp);
}
m_timestamp += timeDelta;
deref();
}

View File

@ -22,15 +22,11 @@ MKDEPS := ../config.status
PROGS := cdrbuild.yate cdrfile.yate \
regexroute.yate regfile.yate accfile.yate \
tonegen.yate wavefile.yate conference.yate moh.yate \
callgen.yate rmanager.yate \
callgen.yate rmanager.yate pbx.yate \
extmodule.yate osschan.yate \
ysipchan.yate yrtpchan.yate
LIBS :=
ifneq (@HAVE_PGSQL@,no)
PROGS := $(PROGS) pgsqlroute.yate cdrpgsql.yate register.yate
endif
ifeq (@HAVE_PRI@_@HAVE_ZAP@,yes_yes)
PROGS := $(PROGS) zapchan.yate
endif
@ -133,8 +129,6 @@ wpchan.yate: LOCALFLAGS = libypri.o -lpri
h323chan.yate: LOCALFLAGS = -DPHAS_TEMPLATES -D_REENTRANT -DP_HAS_SEMAPHORES @H323_INC@ @H323_LIB@
pgsqlroute.yate cdrpgsql.yate register.yate: LOCALFLAGS = @PGSQL_INC@ -lpq
iaxchan.yate: @IAX2_DEP@
iaxchan.yate: LOCALLIBS = @IAX2_DEP@
iaxchan.yate: LOCALFLAGS = @IAX2_INC@ @IAX2_LIB@

View File

@ -1,150 +0,0 @@
/**
* cdrpgsql.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Write the CDR to a PostgreSQL database
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004, 2005 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <yatengine.h>
#include <stdio.h>
#include <libpq-fe.h>
using namespace TelEngine;
static PGconn *conn=0;
Mutex dbmutex;
class CdrPgsqlHandler : public MessageHandler
{
public:
CdrPgsqlHandler(const char *name)
: MessageHandler(name) { }
virtual bool received(Message &msg);
private:
};
bool CdrPgsqlHandler::received(Message &msg)
{
String op(msg.getValue("operation"));
if (op != "finalize")
return false;
// const char *calltime = c_safe(msg.getValue("time"));
const char *channel = c_safe(msg.getValue("channel"));
const char *called = c_safe(msg.getValue("called"));
const char *caller = c_safe(msg.getValue("caller"));
const char *billtime = c_safe(msg.getValue("billtime"));
const char *ringtime = c_safe(msg.getValue("ringtime"));
const char *duration = c_safe(msg.getValue("duration"));
const char *status = c_safe(msg.getValue("status"));
Lock lock(dbmutex);
if (!conn)
return false;
char buffer[2048];
snprintf(buffer,sizeof(buffer),"INSERT INTO cdr"
" (channel,caller,called,billtime,ringtime,duration,status)"
" VALUES ('%s','%s','%s','%s','%s','%s','%s')",
channel,caller,called,billtime,ringtime,duration,status);
PGresult *respgsql = PQexec(conn,buffer);
if (!respgsql || PQresultStatus(respgsql) != PGRES_COMMAND_OK)
Debug(DebugGoOn,"Failed to insert in database: %s",
PQerrorMessage(conn));
return false;
};
class StatusHandler : public MessageHandler
{
public:
StatusHandler(const char *name, unsigned prio = 1)
: MessageHandler(name,prio) { }
virtual bool received(Message &msg);
};
bool StatusHandler::received(Message &msg)
{
msg.retValue() << "name=cdrpgsql,type=misc;conn=" << (conn != 0) <<"\n";
return false;
}
class CdrPgsqlPlugin : public Plugin
{
public:
CdrPgsqlPlugin();
~CdrPgsqlPlugin();
virtual void initialize();
private:
CdrPgsqlHandler *m_handler;
};
CdrPgsqlPlugin::CdrPgsqlPlugin()
: m_handler(0)
{
Output("Loaded module CdrPgsql");
}
CdrPgsqlPlugin::~CdrPgsqlPlugin()
{
Output("Unloading module CdrPgsql");
if (conn) {
PQfinish(conn);
conn = 0;
}
}
void CdrPgsqlPlugin::initialize()
{
char *pgoptions=NULL,
*pgtty=NULL;
Output("Initializing module Cdr for PostgreSQL");
Configuration cfg(Engine::configFile("cdrpgsql"));
const char *pghost = c_safe(cfg.getValue("general","host","localhost"));
const char *pgport = c_safe(cfg.getValue("general","port","5432"));
const char *dbName = c_safe(cfg.getValue("general","database","yate"));
const char *dbUser = c_safe(cfg.getValue("general","user","postgres"));
const char *dbPass = c_safe(cfg.getValue("general","password"));
Lock lock(dbmutex);
if (conn)
PQfinish(conn);
conn = PQsetdbLogin(pghost,pgport,pgoptions,pgtty,dbName,dbUser,dbPass);
if (PQstatus(conn) == CONNECTION_BAD) {
Debug(DebugGoOn, "Connection to database '%s' failed.", dbName);
Debug(DebugGoOn, "%s", PQerrorMessage(conn));
PQfinish(conn);
conn = 0;
return;
}
if (!m_handler) {
Output("Installing Cdr for PostgreSQL handler");
m_handler = new CdrPgsqlHandler("call.cdr");
Engine::install(m_handler);
Engine::install(new StatusHandler("engine.status"));
}
}
INIT_PLUGIN(CdrPgsqlPlugin);
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -64,7 +64,7 @@ class DSoundConsumer : public DataConsumer
public:
DSoundConsumer();
~DSoundConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
private:
DSoundPlay* m_dsound;
};
@ -485,7 +485,7 @@ DSoundConsumer::~DSoundConsumer()
m_dsound->terminate();
}
void DSoundConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void DSoundConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
if (m_dsound)
m_dsound->put(data);
@ -563,7 +563,7 @@ bool SoundDriver::msgExecute(Message& msg, String& dest)
CallEndpoint* ch = static_cast<CallEndpoint*>(msg.userData());
if (ch) {
DSoundChan *ds = new DSoundChan;
if (ch->connect(ds)) {
if (ch->connect(ds,msg.getValue("reason"))) {
msg.setParam("peerid",ds->id());
ds->deref();
}

View File

@ -56,7 +56,7 @@ class GsmCodec : public DataTranslator
public:
GsmCodec(const char* sFormat, const char* dFormat, bool encoding);
~GsmCodec();
virtual void Consume(const DataBlock& data, unsigned long timeDelta);
virtual void Consume(const DataBlock& data, unsigned long tStamp);
private:
bool m_encoding;
gsm m_gsm;
@ -83,7 +83,7 @@ GsmCodec::~GsmCodec()
}
}
void GsmCodec::Consume(const DataBlock& data, unsigned long timeDelta)
void GsmCodec::Consume(const DataBlock& data, unsigned long tStamp)
{
if (!(m_gsm && getTransSource()))
return;
@ -101,7 +101,8 @@ void GsmCodec::Consume(const DataBlock& data, unsigned long timeDelta)
(gsm_signal*)(((gsm_block *)m_data.data())+i),
(gsm_byte*)(((gsm_frame *)outdata.data())+i));
}
timeDelta = consumed / 2;
if (!tStamp)
tStamp = timeStamp() + (consumed / 2);
}
else {
frames = m_data.length() / sizeof(gsm_frame);
@ -113,13 +114,14 @@ void GsmCodec::Consume(const DataBlock& data, unsigned long timeDelta)
(gsm_byte*)(((gsm_frame *)m_data.data())+i),
(gsm_signal*)(((gsm_block *)outdata.data())+i));
}
timeDelta = frames*sizeof(gsm_block) / 2;
if (!tStamp)
tStamp = timeStamp() + (frames*sizeof(gsm_block) / 2);
}
XDebug("GsmCodec",DebugAll,"%scoding %d frames of %d input bytes (consumed %d) in %d output bytes",
m_encoding ? "en" : "de",frames,m_data.length(),consumed,outdata.length());
if (frames) {
m_data.cut(-consumed);
getTransSource()->Forward(outdata,timeDelta);
getTransSource()->Forward(outdata,tStamp);
}
deref();
}

View File

@ -73,7 +73,7 @@ public:
IAXSource(const char *frm) : DataSource(frm),m_total(0),m_time(Time::now())
{ Debug(DebugInfo,"IAXSource::IAXSource [%p] frm %s",this,frm);};
~IAXSource();
void Forward(const DataBlock &data, unsigned long timeDelta = 0);
void Forward(const DataBlock &data, unsigned long tStamp = 0);
private:
unsigned m_total;
u_int64_t m_time;
@ -87,7 +87,7 @@ public:
~IAXAudioConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
private:
IAXConnection *m_conn;
@ -769,10 +769,10 @@ IAXSource::~IAXSource()
}
void IAXSource::Forward(const DataBlock &data, unsigned long timeDelta)
void IAXSource::Forward(const DataBlock &data, unsigned long tStamp)
{
m_total += data.length();
DataSource::Forward(data, timeDelta);
DataSource::Forward(data, tStamp);
}
IAXAudioConsumer::IAXAudioConsumer(IAXConnection *conn, int ast_format, const char *format)
@ -795,7 +795,7 @@ IAXAudioConsumer::~IAXAudioConsumer()
}
void IAXAudioConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void IAXAudioConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
m_total += data.length();
if (m_conn)

View File

@ -66,7 +66,7 @@ public:
OssConsumer(OssDevice* dev);
~OssConsumer();
bool init();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
private:
OssDevice* m_device;
unsigned m_total;
@ -269,7 +269,7 @@ OssConsumer::~OssConsumer()
m_device->deref();
}
void OssConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void OssConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
if (m_device->closed() || data.null())
return;

95
modules/pbx.cpp Normal file
View File

@ -0,0 +1,95 @@
/**
* pbx.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Basic PBX message handlers
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004, 2005 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <yatephone.h>
using namespace TelEngine;
class ConnHandler : public MessageHandler
{
public:
ConnHandler()
: MessageHandler("chan.connect",90)
{ }
virtual bool received(Message &msg);
};
class PbxPlugin : public Plugin
{
public:
PbxPlugin();
virtual ~PbxPlugin();
virtual void initialize();
bool m_first;
};
// Utility function to get a pointer to a call endpoint (or its peer) by id
static CallEndpoint* locateChan(const String& id, bool peer = false)
{
if (id.null())
return 0;
Message m("chan.locate");
m.addParam("id",id);
if (!Engine::dispatch(m))
return 0;
CallEndpoint* ce = static_cast<CallEndpoint*>(m.userObject("CallEndpoint"));
if (!ce)
return 0;
return peer ? ce->getPeer() : ce;
}
bool ConnHandler::received(Message &msg)
{
RefPointer<CallEndpoint> c1(locateChan(msg.getValue("id"),msg.getBoolValue("id_peer")));
RefPointer<CallEndpoint> c2(locateChan(msg.getValue("targetid"),msg.getBoolValue("targetid_peer")));
if (!(c1 && c2))
return false;
return c1->connect(c2,msg.getValue("reason"));
}
PbxPlugin::PbxPlugin()
: Plugin("PBX"), m_first(true)
{
Output("Loaded module PBX");
}
PbxPlugin::~PbxPlugin()
{
Output("Unloading module PBX");
}
void PbxPlugin::initialize()
{
if (m_first) {
m_first = false;
Engine::install(new ConnHandler);
}
}
INIT_PLUGIN(PbxPlugin);
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -1,233 +0,0 @@
/**
* pgsqlroute.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Postgres SQL based routing
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004, 2005 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <yatephone.h>
#include <libpq-fe.h>
#include <string.h>
using namespace TelEngine;
static PGconn *conn=0;
static Mutex dbmutex;
static unsigned s_route_rq = 0;
static unsigned s_route_err = 0;
static unsigned s_route_yes = 0;
static unsigned s_route_no = 0;
class RouteHandler : public MessageHandler
{
public:
RouteHandler(const char *name, unsigned prio = 1)
: MessageHandler(name,prio) { }
virtual bool received(Message &msg);
};
bool RouteHandler::received(Message &msg)
{
char buffer[2048];
u_int64_t tmr = Time::now();
String called(msg.getValue("called"));
if (called.null())
return false;
Lock lock(dbmutex);
if (!conn)
return false;
s_route_rq++;
const char *context = c_safe(msg.getValue("context","default"));
snprintf(buffer,sizeof(buffer),"SELECT tehno,data,length (prefix) as lll"
" from route where prefix= substring('%s',1,length(prefix))"
" and context='%s' order by lll desc LIMIT 1",called.c_str(),context);
PGresult *respgsql = PQexec(conn,buffer);
if (!respgsql || PQresultStatus(respgsql) != PGRES_TUPLES_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
s_route_err++;
return false;
}
if (PQntuples(respgsql) == 0) {
Debug(DebugAll,"No route.");
s_route_no++;
return false;
}
msg.retValue() = String(PQgetvalue(respgsql,0,0))+"/" + String(PQgetvalue(respgsql,0,1));
Debug(DebugInfo,"Routing call to '%s' in context '%s' using '%s' tehnology and data in " FMT64 " usec",
called.c_str(),context,msg.retValue().c_str(),Time::now()-tmr);
s_route_yes++;
return true;
};
class PrerouteHandler : public MessageHandler
{
public:
PrerouteHandler(const char *name, unsigned prio = 1)
: MessageHandler(name,prio) { }
virtual bool received(Message &msg);
};
bool PrerouteHandler::received(Message &msg)
{
char buffer[2048];
// char select_called[200];
// char select_channel[200];
u_int64_t tmr = Time::now();
// return immediately if there is already a context
if (msg.getValue("context"))
return false;
String caller(msg.getValue("caller"));
if (caller.null())
return false;
Lock lock(dbmutex);
if (!conn)
return false;
String called(msg.getValue("called"));
if (!caller.null())
// snprintf(select_called,sizeof(select_called),"and called='%s'",called.c_str());
snprintf(buffer,sizeof(buffer),"SELECT context,length (caller) as lll from preroute where caller= substring('%s',1,length(caller)) order by lll desc limit 1;",caller.c_str());
PGresult *respgsql = PQexec(conn,buffer);
if (!respgsql || PQresultStatus(respgsql) != PGRES_TUPLES_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
return false;
}
if (PQntuples(respgsql) == 0) {
Debug(DebugAll,"No preroute.");
return false;
}
msg.addParam("context",PQgetvalue(respgsql,0,0));
Debug(DebugInfo,"Classifying caller '%s' in context '%s' in " FMT64 " usec",
caller.c_str(),msg.getValue("context"),Time::now()-tmr);
return true;
#if 0
NamedList *l = s_cfg.getSection("contexts");
if (l) {
unsigned int len = l->length();
for (unsigned int i=0; i<len; i++) {
NamedString *n = l->getParam(i);
if (n) {
Regexp r(n->name());
if (s.matches(r)) {
msg.addParam("context",s.replaceMatches(*n));
Debug(DebugInfo,"Classifying caller '%s' in context '%s' by rule #%u '%s' in " FMT64 " usec",
s.c_str(),msg.getValue("context"),i+1,r.c_str(),Time::now()-tmr);
return true;
}
}
}
}
Debug(DebugInfo,"Could not classify call from '%s', wasted " FMT64 " usec",
s.c_str(),Time::now()-tmr);
return false;
#endif
};
class StatusHandler : public MessageHandler
{
public:
StatusHandler(const char *name, unsigned prio = 1)
: MessageHandler(name,prio) { }
virtual bool received(Message &msg);
};
bool StatusHandler::received(Message &msg)
{
const char *sel = msg.getValue("module");
if (sel && ::strcmp(sel,"pgsqlroute"))
return false;
msg.retValue() << "name=pgsqlroute,type=misc";
msg.retValue() << ";conn=" << (conn != 0);
msg.retValue() << ",total=" << s_route_rq << ",errors=" << s_route_err;
msg.retValue() << ",routed=" << s_route_yes << ",noroute=" << s_route_no;
msg.retValue() << "\n";
return false;
}
class PGSQLRoutePlugin : public Plugin
{
public:
PGSQLRoutePlugin();
~PGSQLRoutePlugin();
virtual void initialize();
private:
bool m_first;
};
PGSQLRoutePlugin::PGSQLRoutePlugin()
: m_first(true)
{
Output("Loaded module PGSQLRoute");
}
PGSQLRoutePlugin::~PGSQLRoutePlugin()
{
if (conn) {
PQfinish(conn);
conn = 0;
}
}
void PGSQLRoutePlugin::initialize()
{
char *pgoptions=NULL,
*pgtty=NULL;
Output("Initializing module PGSQLRoute");
Configuration cfg(Engine::configFile("pgsqlroute"));
const char *pghost = c_safe(cfg.getValue("general","host","localhost"));
const char *pgport = c_safe(cfg.getValue("general","port","5432"));
const char *dbName = c_safe(cfg.getValue("general","database","yate"));
const char *dbUser = c_safe(cfg.getValue("general","user","postgres"));
const char *dbPass = c_safe(cfg.getValue("general","password"));
Lock lock(dbmutex);
if (conn)
PQfinish(conn);
conn = PQsetdbLogin(pghost,pgport,pgoptions,pgtty,dbName,dbUser,dbPass);
if (PQstatus(conn) == CONNECTION_BAD) {
Debug(DebugWarn, "Connection to database '%s' failed.", dbName);
Debug(DebugWarn, "%s", PQerrorMessage(conn));
PQfinish(conn);
conn = 0;
return;
}
// don't bother to install handlers until we are connected
if (m_first && conn) {
m_first = false;
unsigned prio = cfg.getIntValue("general","priority",100);
Engine::install(new PrerouteHandler("call.preroute",prio));
Engine::install(new RouteHandler("call.route",prio));
Engine::install(new StatusHandler("engine.status"));
}
}
INIT_PLUGIN(PGSQLRoutePlugin);
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -1,314 +0,0 @@
/**
* register.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Ask for a registration from this module.
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004, 2005 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <yatengine.h>
#include <stdio.h>
#include <libpq-fe.h>
using namespace TelEngine;
static PGconn *conn=0;
Mutex dbmutex;
static unsigned s_route_rq = 0;
static unsigned s_route_err = 0;
static unsigned s_route_yes = 0;
static unsigned s_route_no = 0;
class AuthHandler : public MessageHandler
{
public:
AuthHandler(const char *name)
: MessageHandler(name) { }
virtual bool received(Message &msg);
};
class RegistHandler : public MessageHandler
{
public:
RegistHandler(const char *name)
: MessageHandler(name),m_init(false) { }
virtual bool received(Message &msg);
bool init();
bool m_init;
};
class UnRegistHandler : public MessageHandler
{
public:
UnRegistHandler(const char *name)
: MessageHandler(name) { }
virtual bool received(Message &msg);
};
class RouteHandler : public MessageHandler
{
public:
RouteHandler(const char *name)
: MessageHandler(name) { }
virtual bool received(Message &msg);
};
class StatusHandler : public MessageHandler
{
public:
StatusHandler(const char *name, unsigned prio = 1)
: MessageHandler(name,prio) { }
virtual bool received(Message &msg);
};
/**
* I can't remeber why i have made this class :)
class RegistThread : public Thread
{
public:
RegistThread();
~RegistThread();
void run(void);
};
*/
class RegistPlugin : public Plugin
{
public:
RegistPlugin();
~RegistPlugin();
virtual void initialize();
private:
AuthHandler *m_authhandler;
RegistHandler *m_registhandler;
UnRegistHandler *m_unregisthandler;
RouteHandler *m_routehandler;
StatusHandler *m_statushandler;
};
bool AuthHandler::received(Message &msg)
{
// const char *calltime = c_safe(msg.getValue("time"));
String username = c_safe(msg.getValue("username"));
Lock lock(dbmutex);
if (!conn)
return false;
String s = "SELECT password FROM register WHERE username='" + username + "'";
PGresult *respgsql = PQexec(conn,(const char *)s);
if (!respgsql || PQresultStatus(respgsql) != PGRES_TUPLES_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
return false;
}
if (PQntuples(respgsql) == 0) {
Debug(DebugAll,"No user.");
return false;
}
msg.retValue() << PQgetvalue(respgsql,0,0);
return true;
};
bool RegistHandler::init()
{
/**
* We must clear the routing table when loading the new table, to not
* leave any garbage there
*/
String s = "DELETE FROM routepaid";
PGresult *respgsql = PQexec(conn,(const char *)s);
if (PQresultStatus(respgsql) != PGRES_COMMAND_OK)
{
Debug(DebugWarn,"Failed to clear the routepaid table: %s",
PQerrorMessage(conn));
return false;
}
return true;
}
bool RegistHandler::received(Message &msg)
{
if (!m_init)
{
init();
m_init= true;
}
String username = c_safe(msg.getValue("username"));
String techno = c_safe(msg.getValue("driver"));
String data = c_safe(msg.getValue("data"));
Lock lock(dbmutex);
if (!conn)
return false;
String c = "SELECT credit,price,e164,context FROM register WHERE username='" + username + "'";
PGresult *respgsql = PQexec(conn,(const char *)c);
if (!respgsql || PQresultStatus(respgsql) != PGRES_TUPLES_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
return false;
}
if (PQntuples(respgsql) == 0) {
Debug(DebugAll,"No credit.");
return false;
}
String price = PQgetvalue(respgsql,0,1);
String prefix = PQgetvalue(respgsql,0,2);
String context = PQgetvalue(respgsql,0,3);
if (price.null())
price = 0;
if (context.null())
context = "default";
c = "INSERT INTO routepaid (context,prefix,tehno,data,price,username) VALUES ('" + context + "','" + prefix + "','" + techno + "','" + data + "'," + price +",'" + username + "')";
PGresult *respgsql1 = PQexec(conn,(const char *)c);
if (!respgsql1 || PQresultStatus(respgsql1) != PGRES_COMMAND_OK)
Debug(DebugWarn,"Failed to insert in database: %s",
PQerrorMessage(conn));
msg.retValue() = prefix;
return true;
};
bool UnRegistHandler::received(Message &msg)
{
String username = c_safe(msg.getValue("username"));
Lock lock(dbmutex);
if (!conn)
return false;
String s = "DELETE from routepaid WHERE username='" + username + "'";
PGresult *respgsql = PQexec(conn,(const char *)s);
if (PQresultStatus(respgsql) != PGRES_COMMAND_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
return false;
}
return true;
};
bool RouteHandler::received(Message &msg)
{
u_int64_t tmr = Time::now();
String called(msg.getValue("called"));
if (called.null())
return false;
Lock lock(dbmutex);
if (!conn)
return false;
s_route_rq++;
String context = c_safe(msg.getValue("context","default"));
String s = "SELECT tehno,data,length (prefix) as lll,price"
" from routepaid where prefix= substring('" + called + "',1,length(prefix))"
" and context='" + context + "' order by lll desc LIMIT 1";
Debug(DebugInfo,"%s",s.c_str());
PGresult *respgsql = PQexec(conn,(const char *)s);
if (!respgsql || PQresultStatus(respgsql) != PGRES_TUPLES_OK)
{
Debug(DebugWarn,"Failed to query from database: %s",
PQerrorMessage(conn));
s_route_err++;
return false;
}
if (PQntuples(respgsql) == 0) {
Debug(DebugAll,"No route.");
s_route_no++;
return false;
}
msg.setParam("driver",PQgetvalue(respgsql,0,0));
msg.retValue() = PQgetvalue(respgsql,0,1);
Debug(DebugInfo,"Routing call to '%s' in context '%s' using '%s' tehnology and data in " FMT64 " usec",
called.c_str(),context.c_str(),msg.retValue().c_str(),Time::now()-tmr);
s_route_yes++;
return true;
};
bool StatusHandler::received(Message &msg)
{
msg.retValue() << "name=register,type=misc;conn=" << (conn != 0) <<"\n";
return false;
}
RegistPlugin::RegistPlugin()
: m_authhandler(0),m_registhandler(0),m_routehandler(0),m_statushandler(0)
{
Output("Loaded module Registration");
}
RegistPlugin::~RegistPlugin()
{
if (conn) {
PQfinish(conn);
conn = 0;
}
}
void RegistPlugin::initialize()
{
char *pgoptions=NULL, *pgtty=NULL;
Output("Initializing module Register for PostgreSQL");
Configuration cfg(Engine::configFile("register"));
const char *pghost = c_safe(cfg.getValue("general","host","localhost"));
const char *pgport = c_safe(cfg.getValue("general","port","5432"));
const char *dbName = c_safe(cfg.getValue("general","database","yate"));
const char *dbUser = c_safe(cfg.getValue("general","user","postgres"));
const char *dbPass = c_safe(cfg.getValue("general","password"));
Lock lock(dbmutex);
if (conn)
PQfinish(conn);
conn = PQsetdbLogin(pghost,pgport,pgoptions,pgtty,dbName,dbUser,dbPass);
if (PQstatus(conn) == CONNECTION_BAD) {
Debug(DebugWarn, "Connection to database '%s' failed.", dbName);
Debug(DebugWarn, "%s", PQerrorMessage(conn));
PQfinish(conn);
conn = 0;
return;
}
if (!m_registhandler) {
Output("Installing Registering handler");
Engine::install(m_registhandler = new RegistHandler("user.register"));
}
if (!m_unregisthandler) {
Output("Installing UnRegistering handler");
Engine::install(m_unregisthandler = new UnRegistHandler("user.unregister"));
}
if (!m_authhandler) {
Output("Installing Authentification handler");
Engine::install(m_authhandler = new AuthHandler("user.auth"));
}
if (!m_routehandler) {
Output("Installing Route handler");
Engine::install(m_routehandler = new RouteHandler("call.route"));
}
if (!m_statushandler) {
Output("Installing Status handler");
Engine::install(m_statushandler = new StatusHandler("engine.status"));
}
}
INIT_PLUGIN(RegistPlugin);
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -154,16 +154,6 @@ vbox=>
button=1,,75,30,acc_edit,Modify
leave=<hbox
leave=<vbox
tabname=Test
vbox=>
radio=,,,,r1,Radio 1
radio=,,,,r2,
newradio=
radio=,,,,r3,Radio 3
radio=,,,,r4,Radio 4
check=,,,,c1,Check 1
check=,,,,c2,
leave=<vbox
leave=<tabbed
leave=<vbox

View File

@ -211,7 +211,7 @@ void ToneSource::run()
XDebug(&__plugin,DebugAll,"ToneSource sleeping for " FMT64 " usec",dly);
Thread::usleep((unsigned long)dly);
}
Forward(m_data,m_data.length()/2);
Forward(m_data,m_total/2);
m_total += m_data.length();
tpos += (m_data.length()*(u_int64_t)1000000/m_brate);
};

View File

@ -60,7 +60,7 @@ class WaveConsumer : public DataConsumer
public:
WaveConsumer(const String& file, CallEndpoint* chan = 0, unsigned maxlen = 0);
~WaveConsumer();
virtual void Consume(const DataBlock& data, unsigned long timeDelta);
virtual void Consume(const DataBlock& data, unsigned long tStamp);
inline void setNotify(const String& id)
{ m_id = id; }
private:
@ -219,6 +219,7 @@ void WaveSource::detectWavFormat()
void WaveSource::run()
{
unsigned long ts = 0;
int r = 0;
// wait until at least one consumer is attached
while (!r) {
@ -254,7 +255,8 @@ void WaveSource::run()
XDebug(&__plugin,DebugAll,"WaveSource sleeping for " FMT64 " usec",dly);
Thread::usleep((unsigned long)dly);
}
Forward(m_data,m_data.length()*8000/m_brate);
Forward(m_data,ts);
ts += m_data.length()*8000/m_brate;
m_total += r;
tpos += (r*(u_int64_t)1000000/m_brate);
} while (r > 0);
@ -310,7 +312,7 @@ WaveConsumer::~WaveConsumer()
}
}
void WaveConsumer::Consume(const DataBlock& data, unsigned long timeDelta)
void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
{
if (!data.null()) {
if (!m_time)
@ -318,7 +320,7 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long timeDelta)
if (m_fd >= 0)
::write(m_fd,data.data(),data.length());
m_total += data.length();
m_timestamp += timeDelta;
m_timestamp = tStamp;
if (m_maxlen && (m_total >= m_maxlen)) {
m_maxlen = 0;
if (m_fd >= 0) {

View File

@ -83,7 +83,7 @@ public:
WpConsumer(WpChan *owner, const char* format, unsigned int bufsize);
~WpConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
};
class WpChan : public PriChan
@ -306,7 +306,7 @@ WpConsumer::~WpConsumer()
static_cast<WpChan*>(m_owner)->m_wp_c = 0;
}
void WpConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void WpConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
const unsigned char* buf = (const unsigned char*)data.data();
for (unsigned int i = 0; i < data.length(); i++)

View File

@ -92,7 +92,7 @@ public:
WpConsumer(WpChan *owner, const char* format, unsigned int bufsize);
~WpConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
};
class WpChan : public PriChan
@ -421,7 +421,7 @@ WpConsumer::~WpConsumer()
static_cast<WpChan*>(m_owner)->m_wp_c = 0;
}
void WpConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void WpConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
const unsigned char* buf = (const unsigned char*)data.data();
for (unsigned int i = 0; i < data.length(); i++)

View File

@ -87,6 +87,8 @@ public:
{ return m_port; }
inline void setMaster(const char* master)
{ if (master) m_master = master; }
inline bool isAudio() const
{ return m_audio; }
void addDirection(RTPSession::Direction direction);
static YRTPWrapper* find(const CallEndpoint* conn, const String& media);
static YRTPWrapper* find(const String& id);
@ -102,6 +104,7 @@ private:
String m_master;
unsigned int m_bufsize;
unsigned int m_port;
bool m_audio;
};
class YRTPSession : public RTPSession
@ -139,12 +142,9 @@ class YRTPConsumer : public DataConsumer
public:
YRTPConsumer(YRTPWrapper* wrap);
~YRTPConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
inline int timestamp() const
{ return m_timestamp; }
virtual void Consume(const DataBlock &data, unsigned long tStamp);
private:
YRTPWrapper* m_wrap;
int m_timestamp;
};
class AttachHandler : public MessageHandler
@ -192,6 +192,7 @@ YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* me
localip,conn,media,lookup(direction,dict_yrtp_dir),this);
m_id = "yrtp/";
m_id << (unsigned int)::random();
m_audio = (m_media == "audio");
s_mutex.lock();
s_calls.append(this);
setupRTP(localip);
@ -200,8 +201,8 @@ YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* me
YRTPWrapper::~YRTPWrapper()
{
Debug(&splugin,DebugAll,"YRTPWrapper::~YRTPWrapper() %s [%p]",
lookup(m_dir,dict_yrtp_dir),this);
Debug(&splugin,DebugAll,"YRTPWrapper::~YRTPWrapper() %s '%s' [%p]",
lookup(m_dir,dict_yrtp_dir),m_media.c_str(),this);
s_mutex.lock();
s_calls.remove(this,false);
if (m_rtp) {
@ -326,22 +327,22 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, i
if (m_source) {
if (m_conn) {
m_source->ref();
m_conn->setSource();
m_conn->setSource(0,m_media);
}
m_source->m_format = format;
if (m_conn) {
m_conn->setSource(m_source);
m_conn->setSource(m_source,m_media);
m_source->deref();
}
}
if (m_consumer) {
if (m_conn) {
m_consumer->ref();
m_conn->setConsumer();
m_conn->setConsumer(0,m_media);
}
m_consumer->m_format = format;
if (m_conn) {
m_conn->setConsumer(m_consumer);
m_conn->setConsumer(m_consumer,m_media);
m_consumer->deref();
}
}
@ -404,7 +405,7 @@ bool YRTPSession::rtpRecvData(bool marker, unsigned int timestamp, const void* d
return false;
DataBlock block;
block.assign((void*)data, len, false);
source->Forward(block);
source->Forward(block,timestamp);
block.clear(false);
return true;
}
@ -462,7 +463,7 @@ YRTPSource::~YRTPSource()
}
YRTPConsumer::YRTPConsumer(YRTPWrapper *wrap)
: m_wrap(wrap), m_timestamp(0)
: m_wrap(wrap)
{
Debug(&splugin,DebugAll,"YRTPConsumer::YRTPConsumer(%p) [%p]",wrap,this);
m_format.clear();
@ -474,7 +475,7 @@ YRTPConsumer::YRTPConsumer(YRTPWrapper *wrap)
YRTPConsumer::~YRTPConsumer()
{
Debug(&splugin,DebugAll,"YRTPConsumer::~YRTPConsumer() [%p] wrapper=%p ts=%d",this,m_wrap,m_timestamp);
Debug(&splugin,DebugAll,"YRTPConsumer::~YRTPConsumer() [%p] wrapper=%p ts=%lu",this,m_wrap,m_timestamp);
if (m_wrap) {
YRTPWrapper* tmp = m_wrap;
m_wrap = 0;
@ -483,27 +484,28 @@ YRTPConsumer::~YRTPConsumer()
}
}
void YRTPConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
if (!(m_wrap && m_wrap->bufSize() && m_wrap->rtp()))
return;
XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, delta=%lu ts=%d [%p]",
data.length(),timeDelta,m_timestamp,this);
unsigned long delta = tStamp ? (tStamp - m_timestamp) : 0;
XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, delta=%lu ts=%lu [%p]",
data.length(),delta,m_timestamp,this);
unsigned int buf = m_wrap->bufSize();
const char* ptr = (const char*)data.data();
unsigned int len = data.length();
// make it safe to break a long octet buffer
if (len == timeDelta)
timeDelta = 0;
if (len == delta)
delta = 0;
while (len && m_wrap && m_wrap->rtp()) {
unsigned int sz = len;
if ((sz > buf) && !timeDelta) {
if (m_wrap->isAudio() && (sz > buf) && !delta) {
DDebug(&splugin,DebugAll,"Creating %u bytes fragment of %u bytes buffer",buf,len);
sz = buf;
}
m_wrap->rtp()->rtpSendData(false,m_timestamp,ptr,sz);
// if timestamp increment is not provided we have to guess...
m_timestamp += timeDelta ? timeDelta : sz;
m_timestamp += delta ? delta : sz;
len -= sz;
ptr += sz;
}
@ -555,7 +557,7 @@ bool AttachHandler::received(Message &msg)
return false;
}
YRTPWrapper *w = YRTPWrapper::find(ch);
YRTPWrapper *w = YRTPWrapper::find(ch,media);
if (!w)
w = YRTPWrapper::find(msg.getValue("rtpid"));
if (!w) {
@ -625,7 +627,7 @@ bool RtpHandler::received(Message &msg)
String rip(msg.getValue("remoteip"));
String rport(msg.getValue("remoteport"));
YRTPWrapper *w = YRTPWrapper::find(ch);
YRTPWrapper *w = YRTPWrapper::find(ch,media);
if (w)
Debug(&splugin,DebugAll,"YRTPWrapper %p found by CallEndpoint",w);
if (!w) {
@ -651,13 +653,13 @@ bool RtpHandler::received(Message &msg)
w->addDirection(direction);
}
if (d_recv && !ch->getSource()) {
if (d_recv && !ch->getSource(media)) {
YRTPSource* s = new YRTPSource(w);
ch->setSource(s,media);
s->deref();
}
if (d_send && !ch->getConsumer()) {
if (d_send && !ch->getConsumer(media)) {
YRTPConsumer* c = new YRTPConsumer(w);
ch->setConsumer(c,media);
c->deref();

View File

@ -42,6 +42,9 @@ static TokenDict dict_payloads[] = {
{ "g723", 4 },
{ "g728", 15 },
{ "g729", 18 },
{ "h261", 31 },
{ "h263", 34 },
{ "mpv", 32 },
{ 0, 0 },
};
@ -57,6 +60,9 @@ static TokenDict dict_rtpmap[] = {
{ "G723/8000", 4 },
{ "G728/8000", 15 },
{ "G729/8000", 18 },
{ "H261/90000", 31 },
{ "H263/90000", 34 },
{ "MPV/90000", 32 },
{ 0, 0 },
};
@ -76,6 +82,44 @@ static TokenDict dict_errors[] = {
{ 0, 0 },
};
class RtpMedia : public String
{
public:
RtpMedia(const char* media, const char* formats, int rport = -1, int lport = -1);
virtual ~RtpMedia();
inline bool isAudio() const
{ return m_audio; }
inline const String& suffix() const
{ return m_suffix; }
inline const String& id() const
{ return m_id; }
inline const String& format() const
{ return m_format; }
inline const String& formats() const
{ return m_formats; }
inline const String& remotePort() const
{ return m_rPort; }
inline const String& localPort() const
{ return m_lPort; }
const char* fmtList() const;
bool update(const char* formats, int rport = -1, int lport = -1);
void update(const Message& msg, bool pickFormat);
private:
bool m_audio;
// suffix used for this type
String m_suffix;
// list of supported format names
String m_formats;
// format used for sending data
String m_format;
// id of the local RTP channel
String m_id;
// remote RTP port
String m_rPort;
// local RTP port
String m_lPort;
};
class YateUDPParty : public SIPParty
{
public:
@ -181,6 +225,11 @@ public:
Established = 3,
Cleared = 4,
};
enum {
MediaMissing,
MediaStarted,
MediaMuted
};
YateSIPConnection(SIPEvent* ev, SIPTransaction* tr);
YateSIPConnection(Message& msg, const String& uri, const char* target = 0);
~YateSIPConnection();
@ -217,13 +266,15 @@ public:
inline int getPort() const
{ return m_port; }
private:
void setMedia(ObjList* media);
void clearTransaction();
SIPMessage* createDlgMsg(const char* method, const char* uri = 0);
bool emitPRACK(const SIPMessage* msg);
SDPBody* createSDP(const char* addr, const char* port, const char* formats, const char* format = 0);
SDPBody* createProvisionalSDP(Message &msg);
SDPBody* createPasstroughSDP(Message &msg);
SDPBody* createRtpSDP(SIPMessage* msg, const char* formats);
bool dispatchRtp(RtpMedia* media, const char* addr, bool start, bool pick);
SDPBody* createSDP(const char* addr = 0, ObjList* mediaList = 0);
SDPBody* createProvisionalSDP(Message& msg);
SDPBody* createPasstroughSDP(Message& msg);
SDPBody* createRtpSDP(const char* addr, const Message& msg);
SDPBody* createRtpSDP(bool start = false);
bool startRtp();
bool addRtpParams(Message& msg, const String& natAddr = String::empty());
@ -241,23 +292,16 @@ private:
URI m_uri;
// if we do RTP forwarding or not
bool m_rtpForward;
// id of the local RTP channel
String m_rtpid;
// remote RTP address
String m_rtpAddr;
// remote RTP port
String m_rtpPort;
// format used for sending data
String m_rtpFormat;
// local RTP address
String m_rtpLocalAddr;
// local RTP port
String m_rtpLocalPort;
// list of media descriptors
ObjList* m_rtpMedia;
// unique SDP session number
int m_sdpSession;
// SDP version number, incremented each time we generate a new SDP
int m_sdpVersion;
String m_formats;
String m_host;
String m_user;
String m_line;
@ -265,6 +309,7 @@ private:
Message* m_route;
ObjList* m_routes;
bool m_authBye;
int m_mediaStatus;
};
class UserHandler : public MessageHandler
@ -299,7 +344,8 @@ static ObjList s_lines;
static Configuration s_cfg;
static int s_maxForwards = 20;
static void parseSDP(SDPBody* sdp, String& addr, String& port, String& formats, const char* media = "audio")
// Parse a SDP and return a possibly filtered list of SDP media
static ObjList* parseSDP(const SDPBody* sdp, String& addr, ObjList* oldMedia = 0, const char* media = 0)
{
const NamedString* c = sdp->getLine("c");
if (c) {
@ -312,31 +358,50 @@ static void parseSDP(SDPBody* sdp, String& addr, String& port, String& formats,
addr = tmp;
}
}
ObjList* lst = 0;
c = sdp->getLine("m");
while (c) {
for (; c; c = sdp->getNextLine(c)) {
String tmp(*c);
if (tmp.startSkip(media)) {
int var = 0;
tmp >> var >> " RTP/AVP";
if (var > 0)
port = var;
String fmt;
bool defcodecs = s_cfg.getBoolValue("codecs","default",true);
while (tmp[0] == ' ') {
var = -1;
tmp >> " " >> var;
const char* payload = lookup(var,dict_payloads);
if (payload && s_cfg.getBoolValue("codecs",payload,defcodecs && DataTranslator::canConvert(payload))) {
if (fmt)
fmt << ",";
fmt << payload;
}
int sep = tmp.find(' ');
if (sep < 1)
continue;
String type = tmp.substr(0,sep);
tmp >> " ";
if (media && (type != media))
continue;
int port = 0;
tmp >> port >> " RTP/AVP";
String fmt;
bool defcodecs = s_cfg.getBoolValue("codecs","default",true);
while (tmp[0] == ' ') {
int var = -1;
tmp >> " " >> var;
const char* payload = lookup(var,dict_payloads);
XDebug(&plugin,DebugAll,"Payload %d format '%s'",var,payload);
if (payload && s_cfg.getBoolValue("codecs",payload,defcodecs && DataTranslator::canConvert(payload))) {
if (fmt)
fmt << ",";
fmt << payload;
}
formats = fmt;
return;
}
c = sdp->getNextLine(c);
RtpMedia* rtp = 0;
// try to take the media descriptor from the old list
if (oldMedia) {
ObjList* om = oldMedia->find(type);
if (om)
rtp = static_cast<RtpMedia*>(om->remove(false));
}
if (rtp)
rtp->update(fmt,port);
else
rtp = new RtpMedia(type,fmt,port);
if (!lst)
lst = new ObjList;
lst->append(rtp);
if (media)
return lst;
}
return lst;
}
static bool isPrivateAddr(const String& host)
@ -366,11 +431,9 @@ static void copySipHeaders(Message& msg, const SIPMessage& sip)
"proxy-authorization",
0
};
const ObjList* l = &sip.header;
for (; l; l = l->next()) {
const ObjList* l = sip.header.skipNull();
for (; l; l = l->skipNext()) {
const SIPHeaderLine* t = static_cast<const SIPHeaderLine*>(l->get());
if (!t)
continue;
String name(t->name());
name.toLower();
const char** hdr = rejectHeaders;
@ -383,6 +446,79 @@ static void copySipHeaders(Message& msg, const SIPMessage& sip)
}
}
RtpMedia::RtpMedia(const char* media, const char* formats, int rport, int lport)
: String(media), m_audio(true), m_formats(formats)
{
DDebug(&plugin,DebugAll,"RtpMedia::RtpMedia('%s','%s',%d,%d) [%p]",
media,formats,rport,lport,this);
if (operator!=("audio")) {
m_audio = false;
m_suffix << "_" << media;
}
int q = m_formats.find(',');
m_format = m_formats.substr(0,q);
if (rport >= 0)
m_rPort = rport;
if (lport >= 0)
m_lPort = lport;
}
RtpMedia::~RtpMedia()
{
DDebug(&plugin,DebugAll,"RtpMedia::~RtpMedia() '%s' [%p]",c_str(),this);
}
const char* RtpMedia::fmtList() const
{
if (m_format)
return m_format.c_str();
if (m_formats)
return m_formats.c_str();
// unspecified audio assumed to support G711
if (m_audio)
return "alaw,mulaw";
return 0;
}
// Update members with data taken from a SDP, return true if something changed
bool RtpMedia::update(const char* formats, int rport, int lport)
{
DDebug(&plugin,DebugAll,"RtpMedia::update('%s',%d,%d) [%p]",
formats,rport,lport,this);
bool chg = false;
String tmp(formats);
if (m_formats != tmp) {
chg = true;
m_formats = tmp;
int q = m_formats.find(',');
m_format = m_formats.substr(0,q);
}
if (rport >= 0) {
tmp = rport;
if (m_rPort != tmp) {
chg = true;
m_rPort = tmp;
}
}
if (lport >= 0) {
tmp = lport;
if (m_lPort != tmp) {
chg = true;
m_lPort = tmp;
}
}
return chg;
}
// Update members from a dispatched "chan.rtp" message
void RtpMedia::update(const Message& msg, bool pickFormat)
{
m_id = msg.getValue("rtpid",m_id);
m_lPort = msg.getValue("localport",m_lPort);
if (pickFormat)
m_format = msg.getValue("format");
}
YateUDPParty::YateUDPParty(Socket* sock, const SocketAddr& addr, int local)
: m_sock(sock), m_addr(addr)
{
@ -418,8 +554,11 @@ void YateUDPParty::transmit(SIPEvent* event)
tmp << "code " << msg->code;
else
tmp << "'" << msg->method << " " << msg->uri << "'";
Debug(&plugin,DebugInfo,"Sending %s %p to %s:%d",
tmp.c_str(),msg,m_addr.host().c_str(),m_addr.port());
if (plugin.debugAt(DebugInfo)) {
String buf((char*)msg->getBuffer().data(),msg->getBuffer().length());
Debug(&plugin,DebugInfo,"Sending %s %p to %s:%d\n------\n%s------",
tmp.c_str(),msg,m_addr.host().c_str(),m_addr.port(),buf.c_str());
}
m_sock->sendTo(
msg->getBuffer().data(),
msg->getBuffer().length(),
@ -564,9 +703,6 @@ bool YateSIPEndPoint::buildParty(SIPMessage* message, const char* host, int port
bool YateSIPEndPoint::Init()
{
/*
* This part have been taken from libiax after i have lost my sip driver for bayonne
*/
if (m_sock) {
Debug(&plugin,DebugInfo,"Already initialized.");
return true;
@ -626,10 +762,10 @@ void YateSIPEndPoint::run()
Debug(DebugGoOn,"SIP error on read: %d", m_sock->error());
}
} else if (res >= 72) {
Debug(&plugin,DebugInfo,"Received %d bytes SIP message from %s:%d",
res,m_addr.host().c_str(),m_addr.port());
// we got already the buffer and here we start to do "good" stuff
buf[res]=0;
Debug(&plugin,DebugInfo,"Received %d bytes SIP message from %s:%d\n------\n%s------",
res,m_addr.host().c_str(),m_addr.port(),buf);
// we got already the buffer and here we start to do "good" stuff
m_engine->addMessage(new YateUDPParty(m_sock,m_addr,m_port),buf,res);
}
#ifdef DEBUG
@ -816,9 +952,9 @@ bool YateSIPEndPoint::generic(SIPEvent* e, SIPTransaction* t)
YateSIPConnection::YateSIPConnection(SIPEvent* ev, SIPTransaction* tr)
: Channel(plugin,0,false),
m_tr(tr), m_hungup(false), m_byebye(true), m_retry(false),
m_state(Incoming), m_rtpForward(false),
m_state(Incoming), m_rtpForward(false), m_rtpMedia(0),
m_sdpSession(0), m_sdpVersion(0), m_port(0), m_route(0), m_routes(0),
m_authBye(true)
m_authBye(true), m_mediaStatus(MediaMissing)
{
Debug(this,DebugAll,"YateSIPConnection::YateSIPConnection(%p,%p) [%p]",ev,tr,this);
setReason();
@ -865,26 +1001,27 @@ YateSIPConnection::YateSIPConnection(SIPEvent* ev, SIPTransaction* tr)
m->addParam("sip_contact",ev->getMessage()->getHeaderValue("Contact"));
m->addParam("sip_user-agent",ev->getMessage()->getHeaderValue("User-Agent"));
if (ev->getMessage()->body && ev->getMessage()->body->isSDP()) {
parseSDP(static_cast<SDPBody*>(ev->getMessage()->body),m_rtpAddr,m_rtpPort,m_formats);
if (m_rtpAddr) {
setMedia(parseSDP(static_cast<SDPBody*>(ev->getMessage()->body),m_rtpAddr,m_rtpMedia));
if (m_rtpMedia) {
m_rtpForward = true;
// guess if the call comes from behind a NAT
if (s_cfg.getBoolValue("general","nat",true) && isPrivateAddr(m_rtpAddr) && !isPrivateAddr(m_host)) {
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s' port %s",
m_rtpAddr.c_str(),m_host.c_str(),m_rtpPort.c_str());
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s'",
m_rtpAddr.c_str(),m_host.c_str());
m->addParam("rtp_nat_addr",m_rtpAddr);
m_rtpAddr = m_host;
}
m->addParam("rtp_forward","possible");
m->addParam("rtp_addr",m_rtpAddr);
m->addParam("rtp_port",m_rtpPort);
m->addParam("formats",m_formats);
ObjList* l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* r = static_cast<RtpMedia*>(l->get());
m->addParam("rtp_port"+r->suffix(),r->remotePort());
m->addParam("formats"+r->suffix(),r->formats());
}
}
int q = m_formats.find(',');
m_rtpFormat = m_formats.substr(0,q);
}
DDebug(this,DebugAll,"RTP addr '%s' port %s formats '%s' format '%s'",
m_rtpAddr.c_str(),m_rtpPort.c_str(),m_formats.c_str(),m_rtpFormat.c_str());
DDebug(this,DebugAll,"RTP addr '%s' [%p]",m_rtpAddr.c_str(),this);
m_route = m;
Engine::enqueue(message("chan.startup"));
}
@ -893,9 +1030,9 @@ YateSIPConnection::YateSIPConnection(SIPEvent* ev, SIPTransaction* tr)
YateSIPConnection::YateSIPConnection(Message& msg, const String& uri, const char* target)
: Channel(plugin,0,true),
m_tr(0), m_hungup(false), m_byebye(true), m_retry(true),
m_state(Outgoing), m_rtpForward(false),
m_state(Outgoing), m_rtpForward(false), m_rtpMedia(0),
m_sdpSession(0), m_sdpVersion(0), m_port(0), m_route(0), m_routes(0),
m_authBye(false)
m_authBye(false), m_mediaStatus(MediaMissing)
{
Debug(this,DebugAll,"YateSIPConnection::YateSIPConnection(%p,'%s') [%p]",
&msg,uri.c_str(),this);
@ -938,7 +1075,7 @@ YateSIPConnection::YateSIPConnection(Message& msg, const String& uri, const char
m_dialog = *m;
SDPBody* sdp = createPasstroughSDP(msg);
if (!sdp)
sdp = createRtpSDP(m,msg.getValue("formats"));
sdp = createRtpSDP(m_host,msg);
m->setBody(sdp);
m_tr = plugin.ep()->engine()->addMessage(m);
if (m_tr) {
@ -955,6 +1092,7 @@ YateSIPConnection::~YateSIPConnection()
Debug(this,DebugAll,"YateSIPConnection::~YateSIPConnection() [%p]",this);
hangup();
clearTransaction();
setMedia(0);
if (m_route) {
delete m_route;
m_route = 0;
@ -965,6 +1103,22 @@ YateSIPConnection::~YateSIPConnection()
}
}
void YateSIPConnection::setMedia(ObjList* media)
{
if (media == m_rtpMedia)
return;
ObjList* tmp = m_rtpMedia;
m_rtpMedia = media;
if (tmp) {
ObjList* l = tmp->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
clearEndpoint(*m);
}
tmp->destruct();
}
}
void YateSIPConnection::startRouter()
{
Message* m = m_route;
@ -1125,11 +1279,11 @@ bool YateSIPConnection::emitPRACK(const SIPMessage* msg)
}
// Creates a SDP for provisional (1xx) messages
SDPBody* YateSIPConnection::createProvisionalSDP(Message &msg)
SDPBody* YateSIPConnection::createProvisionalSDP(Message& msg)
{
if (m_rtpForward)
return createPasstroughSDP(msg);
// check if our peer can source data
// check if our peer can source at least audio data
if (!(getPeer() && getPeer()->getSource()))
return 0;
if (m_rtpAddr.null())
@ -1138,164 +1292,288 @@ SDPBody* YateSIPConnection::createProvisionalSDP(Message &msg)
}
// Creates a SDP from RTP address data present in message
SDPBody* YateSIPConnection::createPasstroughSDP(Message &msg)
SDPBody* YateSIPConnection::createPasstroughSDP(Message& msg)
{
String tmp = msg.getValue("rtp_forward");
msg.clearParam("rtp_forward");
if (!(m_rtpForward && tmp.toBoolean()))
return 0;
SDPBody* sdp = 0;
tmp = msg.getValue("rtp_port");
int port = tmp.toInteger();
String addr(msg.getValue("rtp_addr"));
if (port && addr) {
m_rtpLocalAddr = addr;
m_rtpLocalPort = tmp;
sdp = createSDP(addr,tmp,msg.getValue("formats"));
if (addr.null())
return 0;
ObjList* lst = 0;
unsigned int n = msg.length();
for (unsigned int i = 0; i < n; i++) {
const NamedString* p = msg.getParam(i);
if (!p)
continue;
// search for rtp_port or rtp_port_MEDIANAME parameters
tmp = p->name();
if (!tmp.startSkip("rtp_port",false))
continue;
if (tmp && (tmp[0] != '_'))
continue;
// now tmp holds the suffix for the media, null for audio
bool audio = tmp.null();
// check if media is supported, default only for audio
if (!msg.getBoolValue("media"+tmp,audio))
continue;
int port = p->toInteger();
if (!port)
continue;
const char* fmts = msg.getValue("formats"+tmp);
if (!fmts)
continue;
if (audio)
tmp = "audio";
else
tmp >> "_";
RtpMedia* rtp = 0;
// try to take the media descriptor from the old list
if (m_rtpMedia) {
ObjList* om = m_rtpMedia->find(tmp);
if (om)
rtp = static_cast<RtpMedia*>(om->remove(false));
}
if (rtp)
rtp->update(fmts,-1,port);
else
rtp = new RtpMedia(tmp,fmts,-1,port);
if (!lst)
lst = new ObjList;
lst->append(rtp);
}
if (!lst)
return 0;
m_rtpLocalAddr = addr;
setMedia(lst);
SDPBody* sdp = createSDP(m_rtpLocalAddr);
if (sdp)
msg.setParam("rtp_forward","accepted");
return sdp;
}
// Creates an unstarted external RTP channel from remote addr and builds SDP from it
SDPBody* YateSIPConnection::createRtpSDP(SIPMessage* msg, const char* formats)
// Dispatches a RTP message for a media, optionally start RTP and pick parameters
bool YateSIPConnection::dispatchRtp(RtpMedia* media, const char* addr, bool start, bool pick)
{
if (!(media && addr))
return false;
Message m("chan.rtp");
complete(m,true);
m.addParam("direction","bidir");
m.addParam("remoteip",msg->getParty()->getPartyAddr());
m.userData(static_cast<CallEndpoint *>(this));
if (Engine::dispatch(m)) {
m_rtpForward = false;
m_rtpid = m.getValue("rtpid");
m_rtpLocalAddr = m.getValue("localip",m_rtpLocalAddr);
m_rtpLocalPort = m.getValue("localport",m_rtpLocalPort);
return createSDP(m_rtpLocalAddr,m_rtpLocalPort,formats);
m.addParam("media",*media);
m.addParam("direction","bidir");
if (m_rtpLocalAddr)
m.addParam("localip",m_rtpLocalAddr);
m.addParam("remoteip",addr);
if (start) {
m.addParam("remoteport",media->remotePort());
m.addParam("format",media->format());
}
return 0;
if (!Engine::dispatch(m))
return false;
if (!pick)
return true;
m_rtpForward = false;
m_rtpLocalAddr = m.getValue("localip",m_rtpLocalAddr);
m_mediaStatus = MediaStarted;
media->update(m,start);
return true;
}
// Creates a started external RTP channel from remote addr and builds SDP from it
// Creates a set of unstarted external RTP channels from remote addr and builds SDP from them
SDPBody* YateSIPConnection::createRtpSDP(const char* addr, const Message& msg)
{
bool defaults = true;
ObjList* lst = 0;
unsigned int n = msg.length();
for (unsigned int i = 0; i < n; i++) {
const NamedString* p = msg.getParam(i);
if (!p)
continue;
// search for rtp_port or rtp_port_MEDIANAME parameters
String tmp(p->name());
if (!tmp.startSkip("media",false))
continue;
if (tmp && (tmp[0] != '_'))
continue;
// since we found at least one media declaration disable defaults
defaults = false;
// now tmp holds the suffix for the media, null for audio
bool audio = tmp.null();
// check if media is supported, default only for audio
if (!p->toBoolean(audio))
continue;
const char* fmts = msg.getValue("formats"+tmp);
if (audio && !fmts)
fmts = "alaw,mulaw";
if (!fmts)
continue;
if (audio)
tmp = "audio";
else
tmp >> "_";
RtpMedia* rtp = 0;
// try to take the media descriptor from the old list
if (m_rtpMedia) {
ObjList* om = m_rtpMedia->find(tmp);
if (om)
rtp = static_cast<RtpMedia*>(om->remove(false));
}
if (rtp)
rtp->update(fmts);
else
rtp = new RtpMedia(tmp,fmts);
if (!lst)
lst = new ObjList;
lst->append(rtp);
}
if (defaults && !lst) {
lst = new ObjList;
lst->append(new RtpMedia("audio","alaw,mulaw"));
}
setMedia(lst);
ObjList* l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
if (!dispatchRtp(m,addr,false,true))
return 0;
}
return createSDP(m_rtpLocalAddr);
}
// Creates a set of started external RTP channels from remote addr and builds SDP from them
SDPBody* YateSIPConnection::createRtpSDP(bool start)
{
if (m_rtpAddr.null()) {
m_rtpid = "-";
return createSDP(0,m_rtpLocalPort,m_formats);
m_mediaStatus = MediaMuted;
return createSDP(0);
}
Message m("chan.rtp");
complete(m,true);
m.addParam("direction","bidir");
m.addParam("remoteip",m_rtpAddr);
if (start) {
m.addParam("remoteport",m_rtpPort);
m.addParam("format",m_rtpFormat);
ObjList* l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
if (!dispatchRtp(m,m_rtpAddr,start,true))
return 0;
}
m.userData(static_cast<CallEndpoint *>(this));
if (Engine::dispatch(m)) {
m_rtpForward = false;
m_rtpid = m.getValue("rtpid");
m_rtpLocalAddr = m.getValue("localip",m_rtpLocalAddr);
m_rtpLocalPort = m.getValue("localport",m_rtpLocalPort);
if (start)
m_rtpFormat = m.getValue("format");
return createSDP(m_rtpLocalAddr,m_rtpLocalPort,m_formats,m_rtpFormat);
}
return 0;
return createSDP(m_rtpLocalAddr);
}
// Starts an already created external RTP channel
// Starts an already created set of external RTP channels
bool YateSIPConnection::startRtp()
{
if (m_rtpid.null() || m_rtpid == "-")
if (m_mediaStatus != MediaStarted)
return false;
DDebug(this,DebugAll,"YateSIPConnection::startRtp() [%p]",this);
Message m("chan.rtp");
complete(m,true);
m.addParam("rtpid",m_rtpid);
m.addParam("direction","bidir");
m.addParam("remoteip",m_rtpAddr);
m.addParam("remoteport",m_rtpPort);
m.addParam("format",m_rtpFormat);
m.userData(static_cast<CallEndpoint *>(this));
return Engine::dispatch(m);
bool ok = true;
ObjList* l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
ok = dispatchRtp(m,m_rtpAddr,true,false) && ok;
}
return ok;
}
// Creates a SDP body from transport address and list of formats
SDPBody* YateSIPConnection::createSDP(const char* addr, const char* port, const char* formats, const char* format)
// Creates a SDP body from transport address and list of media descriptors
SDPBody* YateSIPConnection::createSDP(const char* addr, ObjList* mediaList)
{
DDebug(this,DebugAll,"YateSIPConnection::createSDP('%s','%s','%s') [%p]",
addr,port,formats,this);
// if we got no port we simply create no SDP
if (!port)
DDebug(this,DebugAll,"YateSIPConnection::createSDP('%s',%p) [%p]",
addr,mediaList,this);
if (!mediaList)
mediaList = m_rtpMedia;
// if we got no media descriptors we simply create no SDP
if (!mediaList)
return 0;
if (m_sdpSession)
++m_sdpVersion;
else
m_sdpVersion = m_sdpSession = Time::secNow();
String owner;
owner << "yate " << m_sdpSession << " " << m_sdpVersion << " IN IP4 " << addr;
// no address means on hold or muted
if (!addr)
addr = "0.0.0.0";
String tmp;
tmp << "IN IP4 " << addr;
String frm(format ? format : formats);
if (frm.null())
frm = "alaw,mulaw";
ObjList* l = frm.split(',',false);
frm = "audio ";
frm << port << " RTP/AVP";
ObjList rtpmap;
ObjList* f = l;
bool defcodecs = s_cfg.getBoolValue("codecs","default",true);
for (; f; f = f->next()) {
String* s = static_cast<String*>(f->get());
if (s) {
int payload = s->toInteger(dict_payloads,-1);
if (payload >= 0) {
const char* map = lookup(payload,dict_rtpmap);
if (map && s_cfg.getBoolValue("codecs",*s,defcodecs && DataTranslator::canConvert(*s))) {
frm << " " << payload;
String* temp = new String("rtpmap:");
*temp << payload << " " << map;
rtpmap.append(temp);
}
}
}
}
delete l;
// always claim to support telephone events
frm << " 101";
rtpmap.append(new String("rtpmap:101 telephone-event/8000"));
// no address means on hold or muted
String origin;
origin << "yate " << m_sdpSession << " " << m_sdpVersion << " IN IP4 " << (addr ? addr : m_host.safe());
String conn;
conn << "IN IP4 " << (addr ? addr : "0.0.0.0");
SDPBody* sdp = new SDPBody;
sdp->addLine("v","0");
sdp->addLine("o",owner);
sdp->addLine("s","Session");
sdp->addLine("c",tmp);
sdp->addLine("o",origin);
sdp->addLine("s","SIP Call");
sdp->addLine("c",conn);
sdp->addLine("t","0 0");
sdp->addLine("m",frm);
for (f = &rtpmap; f; f = f->next()) {
String* s = static_cast<String*>(f->get());
if (s)
sdp->addLine("a",*s);
bool defcodecs = s_cfg.getBoolValue("codecs","default",true);
for (ObjList* ml = mediaList->skipNull(); ml; ml = ml->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(ml->get());
String frm(m->fmtList());
ObjList* l = frm.split(',',false);
frm = *m;
frm << " " << m->localPort() << " RTP/AVP";
ObjList rtpmap;
ObjList* f = l;
for (; f; f = f->next()) {
String* s = static_cast<String*>(f->get());
if (s) {
int payload = s->toInteger(dict_payloads,-1);
if (payload >= 0) {
const char* map = lookup(payload,dict_rtpmap);
if (map && s_cfg.getBoolValue("codecs",*s,defcodecs && DataTranslator::canConvert(*s))) {
frm << " " << payload;
String* temp = new String("rtpmap:");
*temp << payload << " " << map;
rtpmap.append(temp);
}
}
}
}
delete l;
if (*m == "audio") {
// always claim to support telephone events
frm << " 101";
rtpmap.append(new String("rtpmap:101 telephone-event/8000"));
}
sdp->addLine("m",frm);
for (f = rtpmap.skipNull(); f; f = f->skipNext()) {
String* s = static_cast<String*>(f->get());
if (s)
sdp->addLine("a",*s);
}
}
rtpmap.clear();
return sdp;
}
// Add RTP forwarding parameters to a message
bool YateSIPConnection::addRtpParams(Message& msg, const String& natAddr)
{
if (m_rtpPort && m_rtpAddr && !startRtp() && m_rtpForward) {
if (!(m_rtpMedia && m_rtpAddr))
return false;
ObjList* l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
msg.addParam("formats"+m->suffix(),m->formats());
msg.addParam("media"+m->suffix(),"yes");
}
if (!startRtp() && m_rtpForward) {
if (natAddr)
msg.addParam("rtp_nat_addr",natAddr);
msg.addParam("rtp_forward","yes");
msg.addParam("rtp_addr",m_rtpAddr);
msg.addParam("rtp_port",m_rtpPort);
msg.addParam("formats",m_formats);
l = m_rtpMedia->skipNull();
for (; l; l = l->skipNext()) {
RtpMedia* m = static_cast<RtpMedia*>(l->get());
msg.addParam("rtp_port"+m->suffix(),m->remotePort());
}
return true;
}
return false;
@ -1356,19 +1634,15 @@ bool YateSIPConnection::process(SIPEvent* ev)
String natAddr;
if (msg->body && msg->body->isSDP()) {
DDebug(this,DebugInfo,"YateSIPConnection got SDP [%p]",this);
parseSDP(static_cast<SDPBody*>(msg->body),
m_rtpAddr,m_rtpPort,m_formats);
setMedia(parseSDP(static_cast<SDPBody*>(msg->body),m_rtpAddr,m_rtpMedia));
// guess if the call comes from behind a NAT
if (s_cfg.getBoolValue("general","nat",true) && isPrivateAddr(m_rtpAddr) && !isPrivateAddr(m_host)) {
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s' port %s",
m_rtpAddr.c_str(),m_host.c_str(),m_rtpPort.c_str());
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s'",
m_rtpAddr.c_str(),m_host.c_str());
natAddr = m_rtpAddr;
m_rtpAddr = m_host;
}
int q = m_formats.find(',');
m_rtpFormat = m_formats.substr(0,q);
DDebug(this,DebugAll,"RTP addr '%s' port %s formats '%s' format '%s'",
m_rtpAddr.c_str(),m_rtpPort.c_str(),m_formats.c_str(),m_rtpFormat.c_str());
DDebug(this,DebugAll,"RTP addr '%s' [%p]",m_rtpAddr.c_str(),this);
}
if ((!m_routes) && msg->isAnswer() && (msg->code > 100) && (msg->code < 300))
m_routes = msg->getRoutes();
@ -1414,30 +1688,27 @@ void YateSIPConnection::reInvite(SIPTransaction* t)
// hack: use a while instead of if so we can return or break out of it
while (t->initialMessage()->body && t->initialMessage()->body->isSDP()) {
// accept re-INVITE only for local RTP, not for pass-trough
if (m_rtpForward || m_rtpid.null())
if (m_rtpForward || (m_mediaStatus == MediaMissing))
break;
String addr,port,formats;
parseSDP(static_cast<SDPBody*>(t->initialMessage()->body),addr,port,formats);
int q = formats.find(',');
String frm = formats.substr(0,q);
if (port.null() || frm.null())
String addr;
ObjList* lst = parseSDP(static_cast<SDPBody*>(t->initialMessage()->body),addr);
if (!lst)
break;
m_rtpAddr = addr;
m_rtpPort = port;
m_rtpFormat = frm;
m_formats = formats;
// guess if the call comes from behind a NAT
if (s_cfg.getBoolValue("general","nat",true) && isPrivateAddr(m_rtpAddr) && !isPrivateAddr(m_host)) {
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s' port %s",
m_rtpAddr.c_str(),m_host.c_str(),m_rtpPort.c_str());
m_rtpAddr = m_host;
if (s_cfg.getBoolValue("general","nat",true) && isPrivateAddr(addr) && !isPrivateAddr(m_host)) {
Debug(this,DebugInfo,"NAT detected: private '%s' public '%s'",
addr.c_str(),m_host.c_str());
addr = m_host;
}
Debug(this,DebugAll,"New RTP addr '%s' port %s formats '%s' format '%s'",
m_rtpAddr.c_str(),m_rtpPort.c_str(),m_formats.c_str(),m_rtpFormat.c_str());
m_rtpAddr = addr;
setMedia(lst);
Debug(this,DebugAll,"New RTP addr '%s'",m_rtpAddr.c_str());
m_rtpid.clear();
setSource();
setConsumer();
m_mediaStatus = MediaMissing;
// let RTP guess again the local interface
m_rtpLocalAddr.clear();
// clear all data endpoints - createRtpSDP will build new ones
clearEndpoint();
SIPMessage* m = new SIPMessage(t->initialMessage(), 200);
SDPBody* sdp = createRtpSDP(true);
@ -1543,7 +1814,8 @@ bool YateSIPConnection::msgAnswered(Message& msg)
SDPBody* sdp = createPasstroughSDP(msg);
if (!sdp) {
m_rtpForward = false;
sdp = createRtpSDP();
// don't start RTP yet, only when we get the ACK
sdp = createRtpSDP(false);
}
m->setBody(sdp);
m_tr->setResponse(m);
@ -1555,9 +1827,13 @@ bool YateSIPConnection::msgAnswered(Message& msg)
bool YateSIPConnection::msgTone(Message& msg, const char* tone)
{
if (m_rtpid && (m_rtpid != "-")) {
msg.setParam("targetid",m_rtpid);
return false;
if (m_rtpMedia && (m_mediaStatus == MediaStarted)) {
ObjList* l = m_rtpMedia->find("audio");
const RtpMedia* m = static_cast<const RtpMedia*>(l ? l->get() : 0);
if (m) {
msg.setParam("targetid",m->id());
return false;
}
}
// FIXME: when muted or doing RTP forwarding we should use INFO messages
return false;
@ -1823,10 +2099,10 @@ YateSIPConnection* SIPDriver::findCall(const String& callid)
{
XDebug(this,DebugAll,"SIPDriver finding call '%s'",callid.c_str());
Lock mylock(this);
ObjList* l = &channels();
for (; l; l = l->next()) {
ObjList* l = channels().skipNull();
for (; l; l = l->skipNext()) {
YateSIPConnection* c = static_cast<YateSIPConnection*>(l->get());
if (c && (c->callid() == callid))
if (c->callid() == callid)
return c;
}
return 0;
@ -1836,10 +2112,10 @@ YateSIPConnection* SIPDriver::findDialog(const SIPDialog& dialog)
{
XDebug(this,DebugAll,"SIPDriver finding dialog '%s'",dialog.c_str());
Lock mylock(this);
ObjList* l = &channels();
for (; l; l = l->next()) {
ObjList* l = channels().skipNull();
for (; l; l = l->skipNext()) {
YateSIPConnection* c = static_cast<YateSIPConnection*>(l->get());
if (c && (c->dialog() == dialog))
if (c->dialog() == dialog)
return c;
}
return 0;
@ -1864,12 +2140,9 @@ bool SIPDriver::validLine(const String& line)
bool SIPDriver::received(Message &msg, int id)
{
if (id == Timer) {
ObjList* l = &s_lines;
for (; l; l = l->next()) {
YateSIPLine* line = static_cast<YateSIPLine*>(l->get());
if (line)
line->timer(msg.msgTime());
}
ObjList* l = s_lines.skipNull();
for (; l; l = l->skipNext())
static_cast<YateSIPLine*>(l->get())->timer(msg.msgTime());
}
else if (id == Halt)
s_lines.clear();

View File

@ -83,7 +83,7 @@ class ZapConsumer : public PriConsumer
public:
ZapConsumer(ZapChan *owner, const char* format, unsigned int bufsize);
~ZapConsumer();
virtual void Consume(const DataBlock &data, unsigned long timeDelta);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
private:
unsigned int m_bufsize;
};
@ -308,7 +308,7 @@ ZapConsumer::~ZapConsumer()
Debug(m_owner,DebugAll,"ZapConsumer::~ZapConsumer() [%p]",this);
}
void ZapConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
void ZapConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
int fd = static_cast<ZapChan*>(m_owner)->fd();
XDebug(DebugAll,"ZapConsumer fd=%d datalen=%u",fd,data.length());

View File

@ -19,6 +19,7 @@
#define YATE_MINOR_S "9"
#define YATE_BUILD_S "0"
#define YATE_VERSION "0.9.0"
#define YATE_RELEASE "pre1"
/* Windows version resource - file and string style */
#define YATE_WINVER_F 0,9,0,0

View File

@ -1,7 +1,7 @@
Summary: Yet Another Telephony Engine
Name: yate
Version: @PACKAGE_VERSION@
Release: 1
Release: @PACKAGE_RELEASE@
Copyright: GPL
Packager: Paul Chitescu <paulc-devel@null.ro>
Source: http://yate.null.ro/%{name}-%{version}.tar.gz
@ -34,7 +34,7 @@ for small to large scale projects.
/usr/share/man/*/yate.*
/etc/rc.d/init.d/yate
%dir /usr/lib/yate
/usr/lib/yate/*
/usr/lib/yate/*.*
%post
ldconfig
@ -42,6 +42,22 @@ ldconfig
%postun
ldconfig
%package client
Summary: Client package for yate
Group: Applications/Communication
Requires: %{name} = %{version}
%description client
The yate-client package includes the files needed to use YATE as a VoIP client
%files client
%defattr(-, root, root)
/usr/bin/yate-gtk2
%dir /usr/lib/yate/skin
/usr/lib/yate/skin/*
%dir /usr/lib/yate/gtk2
/usr/lib/yate/gtk2/*
%package devel
Summary: Development package for yate
Group: Development/Libraries

View File

@ -317,9 +317,9 @@ public:
/**
* Consumes the data sent to it from a source
* @param data The raw data block to process; an empty block ends data
* @param timeDelta Timestamp increment of data - typically samples
* @param tStamp Timestamp of data - typically samples
*/
virtual void Consume(const DataBlock& data, unsigned long timeDelta) = 0;
virtual void Consume(const DataBlock& data, unsigned long tStamp) = 0;
/**
* Get the data source of this object if it's connected
@ -371,9 +371,9 @@ public:
/**
* Forwards the data to its consumers
* @param data The raw data block to forward; an empty block ends data
* @param timeDelta Timestamp increment of data - typically samples
* @param tStamp Timestamp of data - typically samples
*/
void Forward(const DataBlock& data, unsigned long timeDelta = 0);
void Forward(const DataBlock& data, unsigned long tStamp = 0);
/**
* Attach a data consumer
@ -858,6 +858,12 @@ public:
*/
DataEndpoint* setEndpoint(const char* type = "audio");
/**
* Clear one or all data endpoints of this object
* @param type Type of data endpoint: "audio", "video", "text", NULL to clear all
*/
void clearEndpoint(const char* type = 0);
/**
* Set a data source of this object
* @param source A pointer to the new source or NULL

View File

@ -19,6 +19,7 @@
#define YATE_MINOR_S "@PACKAGE_VERSION_MINOR@"
#define YATE_BUILD_S "@PACKAGE_VERSION_BUILD@"
#define YATE_VERSION "@PACKAGE_VERSION@"
#define YATE_RELEASE "@PACKAGE_RELEASE@"
/* Windows version resource - file and string style */
#define YATE_WINVER_F @PACKAGE_VERSION_MAJOR@,@PACKAGE_VERSION_MINOR@,@PACKAGE_VERSION_BUILD@,0