Added direction changing feature in the YRTP channel and improved the H.323 one.

git-svn-id: http://yate.null.ro/svn/yate/trunk@409 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-06-17 18:09:07 +00:00
parent ef847da06c
commit 891db9cf7e
4 changed files with 136 additions and 93 deletions

View File

@ -88,8 +88,14 @@ void RTPReceiver::rtpData(const void* data, int len)
}
// check if the SSRC is unchanged
if (ss != m_ssrc)
if (ss != m_ssrc) {
if (m_warn) {
m_warn = false;
Debug(DebugWarn,"RTP Received SSRC %08X but expecting %08X [%p]",
ss,m_ssrc,this);
}
return;
}
// skip over header and any CSRC
pc += 12+(4*cc);
@ -301,16 +307,14 @@ void RTPSender::timerTick(const Time& when)
RTPSession::RTPSession()
: m_transport(0), m_direction(FullStop), m_send(0), m_recv(0)
{
XDebug(DebugInfo,"RTPSession::RTPSession() [%p]",this);
DDebug(DebugInfo,"RTPSession::RTPSession() [%p]",this);
}
RTPSession::~RTPSession()
{
XDebug(DebugInfo,"RTPSession::~RTPSession() [%p]",this);
DDebug(DebugInfo,"RTPSession::~RTPSession() [%p]",this);
direction(FullStop);
group(0);
sender(0);
receiver(0);
if (m_transport) {
RTPTransport* tmp = m_transport;
m_transport = 0;
@ -440,14 +444,24 @@ void RTPSession::receiver(RTPReceiver* recv)
bool RTPSession::direction(Direction dir)
{
XDebug(DebugInfo,"RTPSession::direction(%d) old=%d [%p]",dir,m_direction,this);
DDebug(DebugInfo,"RTPSession::direction(%d) old=%d [%p]",dir,m_direction,this);
if ((dir != FullStop) && !m_transport)
return false;
// make sure we have sender and/or receiver for our direction
if ((dir & RecvOnly) && !m_recv)
receiver(createReceiver());
if ((dir & SendOnly) && !m_send)
sender(createSender());
if (dir & RecvOnly) {
if (!m_recv)
receiver(createReceiver());
}
else
receiver(0);
if (dir & SendOnly) {
if (!m_send)
sender(createSender());
}
else
sender(0);
m_direction = dir;
return true;
}

View File

@ -317,6 +317,18 @@ public:
inline void reset()
{ m_ssrc = 0; }
/**
* Get the value of the current SSRC, zero if not initialized yet
*/
inline unsigned int ssrc() const
{ return m_ssrc; }
/**
* Force a new known SSRC for all further packets
*/
inline void ssrc(unsigned int src)
{ m_ssrc = src; }
protected:
/**
* Method called periodically to keep the data flowing
@ -350,7 +362,7 @@ public:
* Constructor
*/
inline RTPReceiver(RTPSession* session = 0)
: RTPBaseIO(session)
: RTPBaseIO(session), m_warn(true)
{ }
/**
@ -412,6 +424,7 @@ private:
bool decodeSilence(bool marker, unsigned int timestamp, const void* data, int len);
void finishEvent(unsigned int timestamp);
bool pushEvent(int event, int duration, int volume, unsigned int timestamp);
bool m_warn;
};
/**
@ -688,6 +701,24 @@ public:
*/
bool direction(Direction dir);
/**
* Add a direction of this session. A transport must exist for this
* method to succeed.
* @param dir New Direction to add for this session
* @return True if direction was set, false if a failure occured
*/
inline bool addDirection(Direction dir)
{ return direction((Direction)(m_direction | dir)); }
/**
* Delete a direction of this session. A transport must exist for this
* method to succeed.
* @param dir Direction to remove for this session
* @return True if direction was set, false if a failure occured
*/
inline bool delDirection(Direction dir)
{ return direction((Direction)(m_direction & ~dir)); }
/**
* Set the data payload type for both receiver and sender.
* @param type Payload type, -1 to disable

View File

@ -648,8 +648,8 @@ YateH323Connection::YateH323Connection(YateH323EndPoint& endpoint,
&endpoint,callReference,userdata,this);
Message* msg = static_cast<Message*>(userdata);
m_chan = new YateH323Chan(this,userdata,(transport ? (const char*)transport->GetRemoteAddress() : 0));
Engine::enqueue(m_chan->message("chan.startup"));
m_chan = new YateH323Chan(this,userdata,
((transport && !userdata) ? (const char*)transport->GetRemoteAddress() : 0));
if (!msg)
return;
@ -758,13 +758,12 @@ void YateH323Connection::rtpForward(Message& msg, bool init)
Debug(DebugAll,"YateH323Connection::rtpForward(%p,%d) [%p]",
&msg,init,this);
String tmp = msg.getValue("rtp_forward");
if (!(init || m_passtrough && tmp))
if (!((init || m_passtrough) && tmp))
return;
m_passtrough = tmp.toBoolean();
if (!m_passtrough)
return;
tmp = msg.getValue("rtp_port");
int port = tmp.toInteger();
int port = msg.getIntValue("rtp_port");
String addr(msg.getValue("rtp_addr"));
if (port && addr) {
m_rtpAddr = addr;
@ -806,18 +805,10 @@ void YateH323Connection::OnCleared()
{
int reason = GetCallEndReason();
const char* rtext = CallEndReasonText(reason);
const char* err = lookup(reason,dict_errors);
Debug(DebugInfo,"YateH323Connection::OnCleared() reason: %s (%d) [%p]",
rtext,reason,this);
if (!m_chan)
return;
m_chan->status("cleared");
Message *m = m_chan->message("chan.hangup");
if (err)
m->setParam("error",err);
m->setParam("reason",rtext);
Engine::enqueue(m);
m_chan->disconnect(rtext);
if (m_chan)
m_chan->disconnect(rtext);
}
BOOL YateH323Connection::OnAlerting(const H323SignalPDU &alertingPDU, const PString &user)
@ -913,8 +904,7 @@ H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capabili
m.addParam("direction",sdir);
if (Engine::dispatch(m)) {
m_rtpid = m.getValue("rtpid");
String p(m.getValue("localport"));
externalPort = p.toInteger();
externalPort = m.getIntValue("localport");
}
}
if (externalPort || s_passtrough) {
@ -1104,7 +1094,8 @@ YateH323_ExternalRTPChannel::YateH323_ExternalRTPChannel(
YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel()
{
Debug(DebugInfo,"YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel [%p]",this);
Debug(DebugInfo,"YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel %s%s [%p]",
lookup(GetDirection(),dict_h323_dir),(isRunning ? " running" : ""),this);
if (isRunning) {
isRunning = FALSE;
if (m_conn)
@ -1431,20 +1422,37 @@ YateH323Chan::YateH323Chan(YateH323Connection* conn,bool outgoing,const char* ad
Debug(DebugAll,"YateH323Chan::YateH323Chan(%p,%s) %s [%p]",
conn,addr,direction(),this);
m_address = addr;
Engine::enqueue(message("chan.startup"));
}
YateH323Chan::~YateH323Chan()
{
Debug(DebugAll,"YateH323Chan::~YateH323Chan() %s %s [%p]",
m_status.c_str(),m_id.c_str(),this);
Message *m = message("chan.hangup");
drop();
YateH323Connection* tmp = m_conn;
m_conn = 0;
if (tmp) {
const char* err = 0;
const char* txt = "Normal cleanup";
int reason = tmp->GetCallEndReason();
if (reason != H323Connection::NumCallEndReasons) {
err = lookup(reason,dict_errors);
txt = CallEndReasonText(reason);
}
if (err)
m->setParam("error",err);
if (txt)
m->setParam("reason",txt);
Engine::enqueue(m);
PSyncPoint sync;
tmp->cleanups();
tmp->ClearCallSynchronous(&sync);
}
else
Engine::enqueue(m);
}
void YateH323Chan::disconnected(bool final, const char *reason)
@ -1521,7 +1529,9 @@ bool YateH323Chan::msgRinging(Message& msg)
Channel::msgRinging(msg);
if (!m_conn)
return false;
m_conn->rtpForward(msg);
if (msg.getParam("rtp_forward"))
m_conn->rtpForward(msg);
// FIXME: with or without media?
m_conn->AnsweringCall(H323Connection::AnswerCallAlertWithMedia);
return true;
}

View File

@ -81,7 +81,8 @@ public:
{ return m_port; }
inline void setMaster(const char* master)
{ if (master) m_master = master; }
static YRTPWrapper* find(const CallEndpoint* conn, RTPSession::Direction direction = RTPSession::SendRecv);
void addDirection(RTPSession::Direction direction);
static YRTPWrapper* find(const CallEndpoint* conn);
static YRTPWrapper* find(const String& id);
static void guessLocal(const char* remoteip, String& localip);
private:
@ -209,13 +210,13 @@ YRTPWrapper::~YRTPWrapper()
s_mutex.unlock();
}
YRTPWrapper* YRTPWrapper::find(const CallEndpoint* conn, RTPSession::Direction direction)
YRTPWrapper* YRTPWrapper::find(const CallEndpoint* conn)
{
Lock lock(s_mutex);
ObjList* l = &s_calls;
for (; l; l=l->next()) {
const YRTPWrapper *p = static_cast<const YRTPWrapper *>(l->get());
if (p && (p->conn() == conn) && (p->dir() == direction))
if (p && (p->conn() == conn))
return const_cast<YRTPWrapper *>(p);
}
return 0;
@ -334,7 +335,7 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, i
return false;
m_rtp->dataPayload(payload);
m_rtp->eventPayload(evpayload);
m_bufsize = s_cfg.getIntValue("rtp","buffer",160);
m_bufsize = s_cfg.getIntValue("rtp","buffer",240);
return true;
}
@ -375,6 +376,13 @@ void YRTPWrapper::guessLocal(const char* remoteip, String& localip)
Debug(DebugInfo,"Guessed local IP '%s' for remote '%s'",localip.c_str(),remoteip);
}
void YRTPWrapper::addDirection(RTPSession::Direction direction)
{
m_dir = (RTPSession::Direction)(m_dir | direction);
if (m_rtp && m_bufsize)
m_rtp->direction(m_dir);
}
bool YRTPSession::rtpRecvData(bool marker, unsigned int timestamp, const void* data, int len)
{
YRTPAudioSource* source = m_wrap ? m_wrap->m_source : 0;
@ -429,43 +437,6 @@ YRTPAudioSource::~YRTPAudioSource()
m_mutex.unlock();
}
#if 0
void YRTPAudioSource::run(void)
{
// wait until the rtp is fully initialized
while (!m_wrap->bufSize())
::usleep(20000);
int bufsize = m_wrap->bufSize();
int timestamp = 0;
int have_more;
m_buffer = (char*)::malloc(bufsize);
::memset(m_buffer, 0, bufsize);
Debug(DebugAll,"YRTPAudioSource::run() entering loop [%p]",this);
for (;;) {
Lock lock(m_mutex);
if (!(m_wrap && m_wrap->rtp()))
break;
int rd = ::rtp_session_recv_with_ts(m_wrap->rtp(),
m_buffer, bufsize, timestamp, &have_more);
if (rd > 0) {
XDebug(DebugAll,"YRTPAudioSource read %d bytes, ts=%d, more=%d [%p]",
rd,timestamp,have_more,this);
m_data.assign(m_buffer, rd, false);
lock.drop();
// FIXME: find number of samples from format - if known...
Forward(m_data);
if (!m_wrap)
break;
m_data.clear(false);
timestamp += bufsize;
}
lock.drop();
Thread::yield();
}
Debug(DebugAll,"YRTPAudioSource::run() exiting loop [%p]",this);
}
#endif
YRTPAudioConsumer::YRTPAudioConsumer(YRTPWrapper *wrap)
: m_wrap(wrap), m_timestamp(0)
{
@ -492,11 +463,24 @@ void YRTPAudioConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
{
if (!(m_wrap && m_wrap->bufSize() && m_wrap->rtp()))
return;
XDebug(DebugAll,"YRTPAudioConsumer writing %d bytes, ts=%d [%p]",
data.length(),m_timestamp,this);
m_wrap->rtp()->rtpSendData(false,m_timestamp,data.data(),data.length());
// if timestamp increment is not provided we have to guess...
m_timestamp += timeDelta ? timeDelta : data.length();
XDebug(DebugAll,"YRTPAudioConsumer writing %d bytes, delta=%lu ts=%d [%p]",
data.length(),timeDelta,m_timestamp,this);
unsigned int buf = m_wrap->bufSize();
const char* ptr = (const char*)data.data();
unsigned int len = data.length();
// make it safe to break a long octet buffer
if (len == timeDelta)
timeDelta = 0;
while (len && m_wrap && m_wrap->rtp()) {
unsigned int sz = len;
if ((sz > buf) && !timeDelta)
sz = buf;
m_wrap->rtp()->rtpSendData(false,m_timestamp,ptr,sz);
// if timestamp increment is not provided we have to guess...
m_timestamp += timeDelta ? timeDelta : sz;
len -= sz;
ptr += sz;
}
}
bool AttachHandler::received(Message &msg)
@ -613,7 +597,7 @@ bool RtpHandler::received(Message &msg)
String rip(msg.getValue("remoteip"));
String rport(msg.getValue("remoteport"));
YRTPWrapper *w = YRTPWrapper::find(ch,direction);
YRTPWrapper *w = YRTPWrapper::find(ch);
if (w)
Debug(DebugAll,"YRTPWrapper %p found by CallEndpoint",w);
if (!w) {
@ -633,22 +617,26 @@ bool RtpHandler::received(Message &msg)
w = new YRTPWrapper(lip,ch,direction);
w->setMaster(msg.getValue("id"));
if (d_recv) {
YRTPAudioSource* s = new YRTPAudioSource(w);
ch->setSource(s);
s->deref();
}
if (d_send) {
YRTPAudioConsumer* c = new YRTPAudioConsumer(w);
ch->setConsumer(c);
c->deref();
}
if (w->deref())
return false;
}
else {
w->ref();
w->addDirection(direction);
}
if (d_recv && !ch->getSource()) {
YRTPAudioSource* s = new YRTPAudioSource(w);
ch->setSource(s);
s->deref();
}
if (d_send && !ch->getConsumer()) {
YRTPAudioConsumer* c = new YRTPAudioConsumer(w);
ch->setConsumer(c);
c->deref();
}
if (w->deref())
return false;
if (rip && rport) {
String p(msg.getValue("payload"));