Made sure only one Consumer and Source is created for each RTP wrapper.

git-svn-id: http://yate.null.ro/svn/yate/trunk@1613 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2008-01-09 16:58:42 +00:00
parent 8f46411769
commit 0e5fde9550
1 changed files with 38 additions and 7 deletions

View File

@ -125,6 +125,8 @@ public:
{ if (master) m_master = master; }
inline bool isAudio() const
{ return m_audio; }
YRTPSource* getSource();
YRTPConsumer* getConsumer();
void addDirection(RTPSession::Direction direction);
static YRTPWrapper* find(const CallEndpoint* conn, const String& media);
static YRTPWrapper* find(const String& id);
@ -232,6 +234,7 @@ static ObjList s_calls;
static Mutex s_mutex;
static Mutex s_srcMutex;
YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* media, RTPSession::Direction direction, bool rtcp)
: m_rtp(0), m_dir(direction), m_conn(conn),
m_source(0), m_consumer(0), m_media(media),
@ -478,6 +481,20 @@ void YRTPWrapper::guessLocal(const char* remoteip, String& localip)
Debug(&splugin,DebugInfo,"Guessed local IP '%s' for remote '%s'",localip.c_str(),remoteip);
}
YRTPSource* YRTPWrapper::getSource()
{
if (m_source && m_source->ref())
return m_source;
return new YRTPSource(this);
}
YRTPConsumer* YRTPWrapper::getConsumer()
{
if (m_consumer && m_consumer->ref())
return m_consumer;
return new YRTPConsumer(this);
}
void YRTPWrapper::addDirection(RTPSession::Direction direction)
{
m_dir = (RTPSession::Direction)(m_dir | direction);
@ -485,6 +502,7 @@ void YRTPWrapper::addDirection(RTPSession::Direction direction)
m_rtp->direction(m_dir);
}
YRTPSession::~YRTPSession()
{
// disconnect thread and transport before our virtual methods become invalid
@ -544,6 +562,7 @@ void YRTPSession::rtpNewSSRC(u_int32_t newSsrc)
}
}
YRTPSource::YRTPSource(YRTPWrapper* wrap)
: m_wrap(wrap), m_busy(false)
{
@ -561,9 +580,12 @@ YRTPSource::~YRTPSource()
if (m_wrap) {
s_srcMutex.lock();
YRTPWrapper* tmp = m_wrap;
const YRTPSource* s = tmp->m_source;
m_wrap = 0;
tmp->m_source = 0;
s_srcMutex.unlock();
if (s != this)
Debug(&splugin,DebugGoOn,"Wrapper %p held source %p not [%p]",tmp,s,this);
// we have just to wait for any YRTPSession::rtpRecvData() to finish
while (m_busy)
Thread::yield();
@ -571,6 +593,7 @@ YRTPSource::~YRTPSource()
}
}
YRTPConsumer::YRTPConsumer(YRTPWrapper *wrap)
: m_wrap(wrap), m_splitable(false)
{
@ -587,9 +610,12 @@ YRTPConsumer::~YRTPConsumer()
Debug(&splugin,DebugAll,"YRTPConsumer::~YRTPConsumer() [%p] wrapper=%p ts=%lu",this,m_wrap,m_timestamp);
if (m_wrap) {
YRTPWrapper* tmp = m_wrap;
const YRTPConsumer* c = tmp->m_consumer;
m_wrap = 0;
tmp->m_consumer = 0;
tmp->deref();
if (c != this)
Debug(&splugin,DebugGoOn,"Wrapper %p held consumer %p not [%p]",tmp,c,this);
}
}
@ -620,6 +646,7 @@ void YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp)
}
}
bool AttachHandler::received(Message &msg)
{
int more = 2;
@ -668,13 +695,13 @@ bool AttachHandler::received(Message &msg)
w->setMaster(msg.getValue("id"));
if (!src.null()) {
YRTPSource* s = new YRTPSource(w);
YRTPSource* s = w->getSource();
ch->setSource(s,media);
s->deref();
}
if (!cons.null()) {
YRTPConsumer* c = new YRTPConsumer(w);
YRTPConsumer* c = w->getConsumer();
ch->setConsumer(c,media);
c->deref();
}
@ -693,6 +720,7 @@ bool AttachHandler::received(Message &msg)
return !more;
}
bool RtpHandler::received(Message &msg)
{
String trans = msg.getValue("transport");
@ -722,12 +750,13 @@ bool RtpHandler::received(Message &msg)
if (ch) {
w = YRTPWrapper::find(ch,media);
if (w)
Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint",w);
Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint %p",w,ch);
}
if (!w) {
w = YRTPWrapper::find(msg.getValue("rtpid"));
const char* rid = msg.getValue("rtpid");
w = YRTPWrapper::find(rid);
if (w)
Debug(&splugin,DebugAll,"Wrapper %p found by ID",w);
Debug(&splugin,DebugAll,"Wrapper %p found by ID '%s'",w,rid);
}
if (!(ch || w)) {
Debug(&splugin,DebugWarn,"Neither call channel nor RTP wrapper found!");
@ -758,13 +787,13 @@ bool RtpHandler::received(Message &msg)
}
if (d_recv && ch && !ch->getSource(media)) {
YRTPSource* s = new YRTPSource(w);
YRTPSource* s = w->getSource();
ch->setSource(s,media);
s->deref();
}
if (d_send && ch && !ch->getConsumer(media)) {
YRTPConsumer* c = new YRTPConsumer(w);
YRTPConsumer* c = w->getConsumer();
ch->setConsumer(c,media);
c->deref();
}
@ -783,6 +812,7 @@ bool RtpHandler::received(Message &msg)
return true;
}
bool DTMFHandler::received(Message &msg)
{
String targetid(msg.getValue("targetid"));
@ -802,6 +832,7 @@ bool DTMFHandler::received(Message &msg)
return false;
}
YRTPPlugin::YRTPPlugin()
: Module("yrtp","misc"), m_first(true)
{