Interlocking related fixes.

git-svn-id: http://voip.null.ro/svn/yate@640 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2006-01-12 19:06:25 +00:00
parent 3490d2d891
commit 4fd9fe2dc9
4 changed files with 179 additions and 97 deletions

View File

@ -907,6 +907,7 @@ void Driver::dropAll(Message &msg)
DDebug(this,DebugAll,"Dropping %s channel %p [%p]",
name().c_str(),static_cast<Channel*>(c),this);
c->msgDrop(msg,reason);
c = 0;
lock();
}
}

View File

@ -193,14 +193,15 @@ ThreadPrivate::~ThreadPrivate()
Debugger debug("ThreadPrivate::~ThreadPrivate()"," %p '%s' [%p]",m_thread,m_name,this);
#endif
m_running = false;
tmutex.lock();
Lock lock(tmutex);
threads.remove(this,false);
if (m_thread && m_updest) {
Thread *t = m_thread;
m_thread = 0;
// let other threads access the list while we delete our upper layer
lock.drop();
delete t;
}
tmutex.unlock();
}
void ThreadPrivate::destroy()

View File

@ -270,7 +270,9 @@ bool ConnHandler::received(Message &msg, int id)
String callid(msg.getValue("targetid"));
if (!callid.startsWith("callgen/",false))
return false;
GenConnection *conn = GenConnection::find(callid);
s_mutex.lock();
RefPointer<GenConnection> conn = GenConnection::find(callid);
s_mutex.unlock();
if (!conn) {
Debug(DebugInfo,"Target '%s' was not found in list",callid.c_str());
return false;
@ -315,17 +317,18 @@ void CleanThread::run()
Debug("CallGen",DebugInfo,"CleanThread::run() [%p]",this);
while (!Engine::exiting()) {
Thread::usleep(100000);
Lock lock(s_mutex);
s_mutex.lock();
Time t;
ObjList* l = &s_calls;
while (l) {
GenConnection* c = static_cast<GenConnection*>(l->get());
if (c && c->oldAge(t)) {
ListIterator iter(s_calls);
for (;;) {
RefPointer<GenConnection> c = static_cast<GenConnection*>(iter.get());
s_mutex.unlock();
if (!c)
break;
if (c->oldAge(t))
c->destruct();
if (c != l->get())
continue;
}
l = l->next();
c = 0;
s_mutex.lock();
}
}
}

View File

@ -85,6 +85,7 @@
using namespace TelEngine;
static bool s_externalRtp;
static bool s_fallbackRtp;
static bool s_passtrough;
static Configuration s_cfg;
@ -367,7 +368,7 @@ protected:
YateGkRegThread* m_thread;
};
class YateH323Connection : public H323Connection
class YateH323Connection : public H323Connection, public DebugEnabler
{
PCLASSINFO(YateH323Connection, H323Connection)
friend class YateH323Chan;
@ -405,8 +406,12 @@ public:
static BOOL decodeCapability(const H323Capability& capability, const char** dataFormat, int* payload = 0, String* capabName = 0);
inline bool hasRemoteAddress() const
{ return m_passtrough && (m_remotePort > 0); }
inline bool nativeRtp() const
{ return m_nativeRtp; }
private:
String m_chanId;
YateH323Chan* m_chan;
Mutex* m_mutex;
bool m_externalRtp;
bool m_nativeRtp;
bool m_passtrough;
@ -451,9 +456,9 @@ class YateH323Chan : public Channel
public:
YateH323Chan(YateH323Connection* conn,Message* msg,const char* addr);
~YateH323Chan();
BOOL openAudioChannel(BOOL isEncoding, H323AudioCodec &codec);
PChannel* openAudioChannel(BOOL isEncoding);
bool stopDataLinks();
void hangup(bool dropChan = true);
void hangup(bool dropChan = true, bool clearCall = true);
void finish();
virtual void zeroRefs();
@ -466,6 +471,7 @@ public:
virtual bool callRouted(Message& msg);
virtual void callAccept(Message& msg);
virtual void callRejected(const char* error, const char* reason, const Message* msg);
virtual bool setDebug(Message& msg);
inline void setTarget(const char* targetid)
{ m_targetid = targetid; }
private:
@ -891,7 +897,7 @@ bool YateH323EndPoint::internalGkClient(int mode, const PString& name)
YateH323Connection::YateH323Connection(YateH323EndPoint& endpoint,
H323Transport* transport, unsigned callReference, void* userdata)
: H323Connection(endpoint,callReference), m_chan(0),
: H323Connection(endpoint,callReference), m_chan(0), m_mutex(0),
m_externalRtp(s_externalRtp), m_nativeRtp(false), m_passtrough(false),
m_rtpPort(0), m_remotePort(0), m_needMedia(true)
{
@ -903,6 +909,10 @@ YateH323Connection::YateH323Connection(YateH323EndPoint& endpoint,
Message* msg = static_cast<Message*>(userdata);
m_chan = new YateH323Chan(this,msg,
((transport && !userdata) ? (const char*)transport->GetRemoteAddress() : 0));
m_chanId = m_chan->id();
m_mutex = m_chan->mutex();
debugCopy(m_chan);
debugName(m_chanId);
if (!msg) {
m_passtrough = s_passtrough;
return;
@ -921,19 +931,22 @@ YateH323Connection::YateH323Connection(YateH323EndPoint& endpoint,
}
}
// Called by the cleaner thread after CleanUpOnCallEnd() and OnCleared()
YateH323Connection::~YateH323Connection()
{
Debug(&hplugin,DebugAll,"YateH323Connection::~YateH323Connection() [%p]",this);
Debug(this,DebugAll,"YateH323Connection::~YateH323Connection() [%p]",this);
YateH323Chan* tmp = m_chan;
m_chan = 0;
if (tmp)
tmp->finish();
cleanups();
debugName(0);
}
// Called by the cleaner thread before OnCleared() and the destructor
void YateH323Connection::CleanUpOnCallEnd()
{
Debug(&hplugin,DebugAll,"YateH323Connection::CleanUpOnCallEnd() [%p]",this);
Debug(this,DebugAll,"YateH323Connection::CleanUpOnCallEnd() [%p]",this);
if (m_chan)
m_chan->stopDataLinks();
H323Connection::CleanUpOnCallEnd();
@ -943,26 +956,33 @@ void YateH323Connection::cleanups(bool closeChans, bool dropChan)
{
if (dropChan)
m_chan = 0;
if (closeChans) {
if (closeChans && Lock()) {
CloseAllLogicalChannels(true);
CloseAllLogicalChannels(false);
Unlock();
}
}
H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PString &caller,
const H323SignalPDU &setupPDU, H323SignalPDU &connectPDU)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnAnswerCall caller='%s' chan=%p [%p]",
Debug(this,DebugInfo,"YateH323Connection::OnAnswerCall caller='%s' chan=%p [%p]",
(const char *)caller,m_chan,this);
if (!m_chan)
TelEngine::Lock lock(m_mutex);
if (!(m_chan && m_chan->alive()))
return H323Connection::AnswerCallDenied;
if (!hplugin.canRoute()) {
Debug(DebugWarn,"Not answering H.323 call, full or exiting");
Debug(this,DebugWarn,"Not answering H.323 call, full or exiting");
YateH323Chan* tmp = m_chan;
m_chan = 0;
tmp->hangup(false,false);
tmp->deref();
return H323Connection::AnswerCallDenied;
}
const YateH323EndPoint& ep = static_cast<const YateH323EndPoint&>(GetEndPoint());
Message *m = m_chan->message("call.preroute",false,true);
lock.drop();
const YateH323EndPoint& ep = static_cast<const YateH323EndPoint&>(GetEndPoint());
if (ep.c_str())
m->setParam("in_line",ep.c_str());
const char *s = s_cfg.getValue("incoming","context");
@ -971,28 +991,28 @@ H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PStrin
m->setParam("callername",caller);
s = GetRemotePartyNumber();
Debug(m_chan,DebugInfo,"GetRemotePartyNumber()='%s'",s);
Debug(this,DebugInfo,"GetRemotePartyNumber()='%s'",s);
m->setParam("caller",s ? s : (const char *)("h323/"+caller));
const Q931& q931 = setupPDU.GetQ931();
const H225_Setup_UUIE& setup = setupPDU.m_h323_uu_pdu.m_h323_message_body;
const H225_ArrayOf_AliasAddress& adr = setup.m_destinationAddress;
for (int i = 0; i<adr.GetSize(); i++)
Debug(m_chan,DebugAll,"adr[%d]='%s'",i,(const char *)H323GetAliasAddressString(adr[i]));
Debug(this,DebugAll,"adr[%d]='%s'",i,(const char *)H323GetAliasAddressString(adr[i]));
String called;
if (adr.GetSize() > 0)
called = (const char *)H323GetAliasAddressString(adr[0]);
if (called)
Debug(m_chan,DebugInfo,"Called number (alias) is '%s'",called.c_str());
Debug(this,DebugInfo,"Called number (alias) is '%s'",called.c_str());
else {
PString cal;
if (q931.GetCalledPartyNumber(cal)) {
called=(const char *)cal;
Debug(m_chan,DebugInfo,"Called-Party-Number (IE) is '%s'",called.c_str());
Debug(this,DebugInfo,"Called-Party-Number (IE) is '%s'",called.c_str());
}
}
if (called.null()) {
Debug(m_chan,DebugMild,"No called number present!");
Debug(this,DebugMild,"No called number present!");
called = s_cfg.getValue("incoming","called");
}
if (called)
@ -1000,7 +1020,7 @@ H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PStrin
#if 0
s = GetRemotePartyAddress();
Debug(m_chan,DebugInfo,"GetRemotePartyAddress()='%s'",s);
Debug(this,DebugInfo,"GetRemotePartyAddress()='%s'",s);
if (s)
m->setParam("calledname",s);
#endif
@ -1020,7 +1040,7 @@ H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PStrin
void YateH323Connection::rtpExecuted(Message& msg)
{
Debug(m_chan,DebugAll,"YateH323Connection::rtpExecuted(%p) [%p]",
Debug(this,DebugAll,"YateH323Connection::rtpExecuted(%p) [%p]",
&msg,this);
m_needMedia = msg.getBoolValue("needmedia",m_needMedia);
if (!m_passtrough)
@ -1028,12 +1048,12 @@ void YateH323Connection::rtpExecuted(Message& msg)
String tmp = msg.getValue("rtp_forward");
m_passtrough = (tmp == "accepted");
if (m_passtrough)
Debug(m_chan,DebugInfo,"H323 Peer accepted RTP forward");
Debug(this,DebugInfo,"H323 Peer accepted RTP forward");
}
void YateH323Connection::rtpForward(Message& msg, bool init)
{
Debug(m_chan,DebugAll,"YateH323Connection::rtpForward(%p,%d) [%p]",
Debug(this,DebugAll,"YateH323Connection::rtpForward(%p,%d) [%p]",
&msg,init,this);
String tmp = msg.getValue("rtp_forward");
if (!((init || m_passtrough) && tmp))
@ -1048,12 +1068,12 @@ void YateH323Connection::rtpForward(Message& msg, bool init)
m_rtpPort = port;
m_formats = msg.getValue("formats");
msg.setParam("rtp_forward","accepted");
Debug(m_chan,DebugInfo,"Accepted RTP forward %s:%d formats '%s'",
Debug(this,DebugInfo,"Accepted RTP forward %s:%d formats '%s'",
addr.c_str(),port,m_formats.safe());
}
else {
m_passtrough = false;
Debug(m_chan,DebugInfo,"Disabling RTP forward [%p]",this);
Debug(this,DebugInfo,"Disabling RTP forward [%p]",this);
}
}
@ -1062,8 +1082,11 @@ void YateH323Connection::answerCall(AnswerCallResponse response)
bool media = false;
if (hasRemoteAddress() && m_rtpPort)
media = true;
else if (m_chan && m_chan->getConsumer() && m_chan->getConsumer()->getConnSource())
media = true;
else {
TelEngine::Lock lock(m_mutex);
if (m_chan && m_chan->alive() && m_chan->getConsumer() && m_chan->getConsumer()->getConnSource())
media = true;
}
// modify responses to indicate we have early media (remote ringing)
if (media) {
switch (response) {
@ -1082,7 +1105,8 @@ void YateH323Connection::answerCall(AnswerCallResponse response)
void YateH323Connection::OnEstablished()
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnEstablished() [%p]",this);
TelEngine::Lock lock(m_mutex);
Debug(this,DebugInfo,"YateH323Connection::OnEstablished() [%p]",this);
if (!m_chan)
return;
if (HadAnsweredCall()) {
@ -1092,6 +1116,7 @@ void YateH323Connection::OnEstablished()
m_chan->status("answered");
m_chan->maxcall(0);
Message *m = m_chan->message("call.answered",false,true);
lock.drop();
if (m_passtrough) {
if (m_remotePort) {
m->addParam("rtp_forward","yes");
@ -1100,7 +1125,7 @@ void YateH323Connection::OnEstablished()
m->addParam("formats",m_remoteFormats);
}
else {
Debug(m_chan,DebugWarn,"H323 RTP passtrough with no remote address! [%p]",this);
Debug(this,DebugWarn,"H323 RTP passtrough with no remote address! [%p]",this);
if (m_needMedia)
ClearCall(EndedByCapabilityExchange);
}
@ -1108,24 +1133,31 @@ void YateH323Connection::OnEstablished()
Engine::enqueue(m);
}
// Called by the cleaner thread between CleanUpOnCallEnd() and the destructor
void YateH323Connection::OnCleared()
{
int reason = GetCallEndReason();
const char* rtext = CallEndReasonText(reason);
const char* error = lookup(reason,dict_errors);
Debug(m_chan,DebugInfo,"YateH323Connection::OnCleared() error: '%s' reason: %s (%d) [%p]",
Debug(this,DebugInfo,"YateH323Connection::OnCleared() error: '%s' reason: %s (%d) [%p]",
error,rtext,reason,this);
if (m_chan)
TelEngine::Lock lock(m_mutex);
if (m_chan && m_chan->ref()) {
lock.drop();
m_chan->disconnect(error ? error : rtext);
m_chan->deref();
}
}
BOOL YateH323Connection::OnAlerting(const H323SignalPDU &alertingPDU, const PString &user)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnAlerting '%s' [%p]",(const char *)user,this);
Debug(this,DebugInfo,"YateH323Connection::OnAlerting '%s' [%p]",(const char *)user,this);
TelEngine::Lock lock(m_mutex);
if (!m_chan)
return FALSE;
m_chan->status("ringing");
Message *m = m_chan->message("call.ringing",false,true);
lock.drop();
if (hasRemoteAddress()) {
m->addParam("rtp_forward","yes");
m->addParam("rtp_addr",m_remoteAddr);
@ -1138,13 +1170,15 @@ BOOL YateH323Connection::OnAlerting(const H323SignalPDU &alertingPDU, const PStr
BOOL YateH323Connection::OnReceivedProgress(const H323SignalPDU& pdu)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnReceivedProgress [%p]",this);
Debug(this,DebugInfo,"YateH323Connection::OnReceivedProgress [%p]",this);
if (!H323Connection::OnReceivedProgress(pdu))
return FALSE;
TelEngine::Lock lock(m_mutex);
if (!m_chan)
return FALSE;
m_chan->status("progressing");
Message *m = m_chan->message("call.progress",false,true);
lock.drop();
if (hasRemoteAddress()) {
m->addParam("rtp_forward","yes");
m->addParam("rtp_addr",m_remoteAddr);
@ -1157,13 +1191,15 @@ BOOL YateH323Connection::OnReceivedProgress(const H323SignalPDU& pdu)
void YateH323Connection::OnUserInputTone(char tone, unsigned duration, unsigned logicalChannel, unsigned rtpTimestamp)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnUserInputTone '%c' duration=%u [%p]",tone,duration,this);
Debug(this,DebugInfo,"YateH323Connection::OnUserInputTone '%c' duration=%u [%p]",tone,duration,this);
TelEngine::Lock lock(m_mutex);
if (!m_chan)
return;
Message *m = m_chan->message("chan.dtmf",false,true);
lock.drop();
char buf[2];
buf[0] = tone;
buf[1] = 0;
Message *m = m_chan->message("chan.dtmf",false,true);
m->addParam("text",buf);
m->addParam("duration",String(duration));
Engine::enqueue(m);
@ -1171,12 +1207,14 @@ void YateH323Connection::OnUserInputTone(char tone, unsigned duration, unsigned
void YateH323Connection::OnUserInputString(const PString &value)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnUserInputString '%s' [%p]",(const char *)value,this);
Debug(this,DebugInfo,"YateH323Connection::OnUserInputString '%s' [%p]",(const char *)value,this);
TelEngine::Lock lock(m_mutex);
if (!m_chan)
return;
String text((const char *)value);
const char *type = text.startSkip("MSG") ? "chan.text" : "chan.dtmf";
Message *m = m_chan->message(type,false,true);
lock.drop();
m->addParam("text",text);
Engine::enqueue(m);
}
@ -1184,14 +1222,19 @@ void YateH323Connection::OnUserInputString(const PString &value)
BOOL YateH323Connection::OpenAudioChannel(BOOL isEncoding, unsigned bufferSize,
H323AudioCodec &codec)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OpenAudioChannel [%p]",this);
Debug(this,DebugInfo,"YateH323Connection::OpenAudioChannel [%p]",this);
if (!m_nativeRtp) {
Debug(DebugGoOn,"YateH323Connection::OpenAudioChannel for non-native RTP in [%p]",this);
if (m_needMedia)
ClearCall(EndedByCapabilityExchange);
return FALSE;
}
return m_chan && m_chan->openAudioChannel(isEncoding,codec);
PChannel* achan = 0;
TelEngine::Lock lock(m_mutex);
if (m_chan && m_chan->alive())
achan = m_chan->openAudioChannel(isEncoding);
lock.drop();
return achan && codec.AttachChannel(achan);
}
#ifdef NEED_RTP_QOS_PARAM
@ -1200,18 +1243,18 @@ H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capabili
H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capability& capability,H323Channel::Directions dir,unsigned sessionID,const H245_H2250LogicalChannelParameters* param)
#endif
{
Debug(m_chan,DebugAll,"H323Connection::CreateRealTimeLogicalChannel%s%s [%p]",
Debug(this,DebugAll,"H323Connection::CreateRealTimeLogicalChannel%s%s [%p]",
m_externalRtp ? " external" : "",m_passtrough ? " passtrough" : "",this);
if (m_externalRtp || m_passtrough) {
const char* sdir = lookup(dir,dict_h323_dir);
const char *format = 0;
decodeCapability(capability,&format);
Debug(m_chan,DebugAll,"Capability '%s' format '%s' session %u %s",
Debug(this,DebugAll,"Capability '%s' format '%s' session %u %s",
(const char *)capability.GetFormatName(),format,sessionID,sdir);
// disallow codecs not supported by remote receiver
if (m_passtrough && !(m_formats.null() || (m_formats.find(format) >= 0))) {
Debug(m_chan,DebugMild,"Refusing to create '%s' not in remote '%s'",format,m_formats.c_str());
Debug(this,DebugMild,"Refusing to create '%s' not in remote '%s'",format,m_formats.c_str());
return 0;
}
@ -1224,17 +1267,25 @@ H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capabili
}
PIPSocket::Address externalIpAddress;
GetControlChannel().GetLocalAddress().GetIpAddress(externalIpAddress);
Debug(m_chan,DebugInfo,"address '%s'",(const char *)externalIpAddress.AsString());
Debug(this,DebugInfo,"address '%s'",(const char *)externalIpAddress.AsString());
WORD externalPort = 0;
if (!m_passtrough) {
Message m("chan.rtp");
m.addParam("localip",externalIpAddress.AsString());
m.userData(m_chan);
if (sdir)
m.addParam("direction",sdir);
if (Engine::dispatch(m)) {
m_rtpid = m.getValue("rtpid");
externalPort = m.getIntValue("localport");
TelEngine::Lock lock(m_mutex);
if (m_chan && m_chan->alive()) {
Message m("chan.rtp");
m.userData(m_chan);
lock.drop();
m.addParam("localip",externalIpAddress.AsString());
if (sdir)
m.addParam("direction",sdir);
if (Engine::dispatch(m)) {
m_rtpid = m.getValue("rtpid");
externalPort = m.getIntValue("localport");
}
}
else {
Debug(this,DebugInfo,"Not creating logical channel for a dead channel [%p]",this);
return 0;
}
}
if (externalPort || m_passtrough) {
@ -1245,7 +1296,12 @@ H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capabili
}
return new YateH323_ExternalRTPChannel(*this, capability, dir, sessionID, externalIpAddress, externalPort);
}
Debug(m_chan,DebugWarn,"YateH323Connection falling back to native RTP [%p]",this);
if (s_fallbackRtp)
Debug(this,DebugWarn,"YateH323Connection falling back to native RTP [%p]",this);
else {
Debug(this,DebugWarn,"YateH323Connection RTP failed but not falling back! [%p]",this);
return 0;
}
}
m_nativeRtp = true;
@ -1258,7 +1314,7 @@ H323Channel* YateH323Connection::CreateRealTimeLogicalChannel(const H323Capabili
void YateH323Connection::OnSetLocalCapabilities()
{
Debug(m_chan,DebugAll,"YateH323Connection::OnSetLocalCapabilities()%s%s [%p]",
Debug(this,DebugAll,"YateH323Connection::OnSetLocalCapabilities()%s%s [%p]",
m_externalRtp ? " external" : "",m_passtrough ? " passtrough" : "",this);
H323Connection::OnSetLocalCapabilities();
if (m_formats.null())
@ -1271,7 +1327,7 @@ void YateH323Connection::OnSetLocalCapabilities()
decodeCapability(localCapabilities[i],&format,0,&fname);
if (format) {
if (m_formats.find(format) < 0) {
Debug(m_chan,DebugAll,"Removing capability '%s' (%s) not in remote '%s'",
Debug(this,DebugAll,"Removing capability '%s' (%s) not in remote '%s'",
fname.c_str(),format,m_formats.c_str());
localCapabilities.Remove(fname.c_str());
i--;
@ -1289,13 +1345,13 @@ void YateH323Connection::OnSetLocalCapabilities()
BOOL YateH323Connection::OnStartLogicalChannel(H323Channel & channel)
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnStartLogicalChannel(%p) [%p]",&channel,this);
Debug(this,DebugInfo,"YateH323Connection::OnStartLogicalChannel(%p) [%p]",&channel,this);
return m_nativeRtp ? H323Connection::OnStartLogicalChannel(channel) : TRUE;
}
BOOL YateH323Connection::OnCreateLogicalChannel(const H323Capability & capability, H323Channel::Directions dir, unsigned & errorCode )
{
Debug(m_chan,DebugInfo,"YateH323Connection::OnCreateLogicalChannel('%s',%s) [%p]",(const char *)capability.GetFormatName(),lookup(dir,dict_h323_dir),this);
Debug(this,DebugInfo,"YateH323Connection::OnCreateLogicalChannel('%s',%s) [%p]",(const char *)capability.GetFormatName(),lookup(dir,dict_h323_dir),this);
return H323Connection::OnCreateLogicalChannel(capability,dir,errorCode);
}
@ -1333,7 +1389,7 @@ BOOL YateH323Connection::decodeCapability(const H323Capability& capability, cons
void YateH323Connection::setRemoteAddress(const char* remoteIP, WORD remotePort)
{
if (!m_remotePort) {
Debug(m_chan,DebugInfo,"Copying remote RTP address [%p]",this);
Debug(this,DebugInfo,"Copying remote RTP address [%p]",this);
m_remotePort = remotePort;
m_remoteAddr = remoteIP;
}
@ -1342,12 +1398,12 @@ void YateH323Connection::setRemoteAddress(const char* remoteIP, WORD remotePort)
BOOL YateH323Connection::startExternalRTP(const char* remoteIP, WORD remotePort, H323Channel::Directions dir, YateH323_ExternalRTPChannel* chan)
{
const char* sdir = lookup(dir,dict_h323_dir);
Debug(m_chan,DebugAll,"YateH323Connection::startExternalRTP(\"%s\",%u,%s,%p) [%p]",
Debug(this,DebugAll,"YateH323Connection::startExternalRTP(\"%s\",%u,%s,%p) [%p]",
remoteIP,remotePort,sdir,chan,this);
if (m_passtrough && m_rtpPort) {
setRemoteAddress(remoteIP,remotePort);
Debug(m_chan,DebugInfo,"Passing RTP to %s:%d",m_rtpAddr.c_str(),m_rtpPort);
Debug(this,DebugInfo,"Passing RTP to %s:%d",m_rtpAddr.c_str(),m_rtpPort);
const PIPSocket::Address ip(m_rtpAddr.safe());
WORD dataPort = m_rtpPort;
chan->SetExternalAddress(H323TransportAddress(ip, dataPort), H323TransportAddress(ip, dataPort+1));
@ -1357,7 +1413,6 @@ BOOL YateH323Connection::startExternalRTP(const char* remoteIP, WORD remotePort,
if (!m_externalRtp)
return FALSE;
Message m("chan.rtp");
m.userData(m_chan);
if (m_rtpid)
m.setParam("rtpid",m_rtpid);
if (sdir)
@ -1372,6 +1427,11 @@ BOOL YateH323Connection::startExternalRTP(const char* remoteIP, WORD remotePort,
if ((payload >= 0) && (payload < 127))
m.addParam("payload",String(payload));
TelEngine::Lock lock(m_mutex);
if (!(m_chan && m_chan->alive() && m_chan->driver()))
return FALSE;
m.userData(m_chan);
lock.drop();
if (Engine::dispatch(m)) {
m_rtpid = m.getValue("rtpid");
return TRUE;
@ -1381,8 +1441,9 @@ BOOL YateH323Connection::startExternalRTP(const char* remoteIP, WORD remotePort,
void YateH323Connection::stoppedExternal(H323Channel::Directions dir)
{
Debug(m_chan,DebugInfo,"YateH323Connection::stoppedExternal(%s) chan=%p [%p]",
Debug(this,DebugInfo,"YateH323Connection::stoppedExternal(%s) chan=%p [%p]",
lookup(dir,dict_h323_dir),m_chan,this);
TelEngine::Lock lock(m_mutex);
if (!m_chan)
return;
switch (dir) {
@ -1421,14 +1482,14 @@ YateH323_ExternalRTPChannel::YateH323_ExternalRTPChannel(
: H323_ExternalRTPChannel(connection, capability, direction, sessionID, ip, dataPort),
m_conn(&connection)
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::YateH323_ExternalRTPChannel %s addr=%s:%u [%p]",
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::YateH323_ExternalRTPChannel %s addr=%s:%u [%p]",
lookup(GetDirection(),dict_h323_dir), (const char *)ip.AsString(), dataPort,this);
SetExternalAddress(H323TransportAddress(ip, dataPort), H323TransportAddress(ip, dataPort+1));
}
YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel()
{
Debug(&hplugin,DebugInfo,"YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel %s%s [%p]",
Debug(m_conn,DebugInfo,"YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel %s%s [%p]",
lookup(GetDirection(),dict_h323_dir),(isRunning ? " running" : ""),this);
if (isRunning) {
isRunning = FALSE;
@ -1439,7 +1500,7 @@ YateH323_ExternalRTPChannel::~YateH323_ExternalRTPChannel()
BOOL YateH323_ExternalRTPChannel::Start()
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::Start() [%p]",this);
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::Start() [%p]",this);
if (!(m_conn && H323_ExternalRTPChannel::Start()))
return FALSE;
@ -1455,7 +1516,7 @@ BOOL YateH323_ExternalRTPChannel::OnReceivedPDU(
const H245_H2250LogicalChannelParameters& param,
unsigned& errorCode)
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::OnReceivedPDU [%p]",this);
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::OnReceivedPDU [%p]",this);
if (!H323_ExternalRTPChannel::OnReceivedPDU(param,errorCode))
return FALSE;
if (!m_conn || m_conn->hasRemoteAddress())
@ -1470,19 +1531,19 @@ BOOL YateH323_ExternalRTPChannel::OnReceivedPDU(
BOOL YateH323_ExternalRTPChannel::OnSendingPDU(H245_H2250LogicalChannelParameters& param)
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::OnSendingPDU [%p]",this);
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::OnSendingPDU [%p]",this);
return H323_ExternalRTPChannel::OnSendingPDU(param);
}
BOOL YateH323_ExternalRTPChannel::OnReceivedAckPDU(const H245_H2250LogicalChannelAckParameters& param)
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::OnReceivedAckPDU [%p]",this);
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::OnReceivedAckPDU [%p]",this);
return H323_ExternalRTPChannel::OnReceivedAckPDU(param);
}
void YateH323_ExternalRTPChannel::OnSendOpenAck(H245_H2250LogicalChannelAckParameters& param)
{
Debug(&hplugin,DebugAll,"YateH323_ExternalRTPChannel::OnSendOpenAck [%p]",this);
Debug(m_conn,DebugAll,"YateH323_ExternalRTPChannel::OnSendOpenAck [%p]",this);
H323_ExternalRTPChannel::OnSendOpenAck(param);
}
@ -1740,7 +1801,7 @@ void YateH323Connection::setCallerID(const char* number, const char* name)
display << number << " [" << name << "]";
else
display = name;
Debug(m_chan,DebugInfo,"Setting H.323 caller: number='%s' name='%s'",number,display.c_str());
Debug(this,DebugInfo,"Setting H.323 caller: number='%s' name='%s'",number,display.c_str());
SetLocalPartyName(number);
localAliasNames.AppendString(display.c_str());
}
@ -1752,7 +1813,7 @@ void YateH323Connection::setCallerID(const char* number, const char* name)
display = number;
else
display = name;
Debug(m_chan,DebugInfo,"Setting H.323 caller: name='%s'",display.c_str());
Debug(this,DebugInfo,"Setting H.323 caller: name='%s'",display.c_str());
SetLocalPartyName(display.c_str());
}
}
@ -1779,6 +1840,7 @@ YateH323Chan::~YateH323Chan()
{
Debug(this,DebugAll,"YateH323Chan::~YateH323Chan() %s %s [%p]",
m_status.c_str(),id().c_str(),this);
dropChan();
stopDataLinks();
if (m_conn)
m_conn->cleanups();
@ -1790,11 +1852,11 @@ YateH323Chan::~YateH323Chan()
void YateH323Chan::zeroRefs()
{
DDebug(this,DebugAll,"YateH323Chan::zeroRefs() conn=%p [%p]",m_conn,this);
stopDataLinks();
if (m_conn) {
if (m_conn && m_conn->nativeRtp()) {
// let the OpenH323 cleaner thread to do the cleanups so we don't have
// to block until the native data threads terminate
dropChan();
stopDataLinks();
hangup(false);
cleanup();
return;
@ -1814,7 +1876,7 @@ void YateH323Chan::finish()
}
}
void YateH323Chan::hangup(bool dropChan)
void YateH323Chan::hangup(bool dropChan, bool clearCall)
{
DDebug(this,DebugAll,"YateH323Chan::hangup() [%p]",this);
if (m_hungup)
@ -1823,7 +1885,7 @@ void YateH323Chan::hangup(bool dropChan)
Message *m = message("chan.hangup");
YateH323Connection* tmp = m_conn;
m_conn = 0;
if (tmp) {
if (clearCall && tmp) {
const char* err = 0;
const char* txt = "Normal cleanup";
int reason = tmp->GetCallEndReason();
@ -1856,6 +1918,7 @@ void YateH323Chan::disconnected(bool final, const char *reason)
// Shut down the data transfers so OpenH323 can stop its related threads
bool YateH323Chan::stopDataLinks()
{
Lock lock(m_mutex);
bool pending = false;
YateH323AudioSource* s = YOBJECT(YateH323AudioSource,getSource());
if (s) {
@ -1870,27 +1933,27 @@ bool YateH323Chan::stopDataLinks()
return pending;
}
BOOL YateH323Chan::openAudioChannel(BOOL isEncoding, H323AudioCodec &codec)
PChannel* YateH323Chan::openAudioChannel(BOOL isEncoding)
{
if (isEncoding) {
if (!getConsumer()) {
setConsumer(new YateH323AudioConsumer);
getConsumer()->deref();
}
// data going TO h.323
if (getConsumer())
return codec.AttachChannel(static_cast<YateH323AudioConsumer *>(getConsumer()),false);
YateH323AudioConsumer* cons = static_cast<YateH323AudioConsumer*>(getConsumer());
if (!cons) {
setConsumer(cons = new YateH323AudioConsumer);
cons->deref();
}
return cons;
}
else {
if (!getSource()) {
setSource(new YateH323AudioSource);
getSource()->deref();
}
// data coming FROM h.323
if (getSource())
return codec.AttachChannel(static_cast<YateH323AudioSource *>(getSource()),false);
YateH323AudioSource* src = static_cast<YateH323AudioSource*>(getSource());
if (!src) {
setSource(src = new YateH323AudioSource);
src->deref();
}
return src;
}
return FALSE;
return 0;
}
bool YateH323Chan::callRouted(Message& msg)
@ -1898,9 +1961,10 @@ bool YateH323Chan::callRouted(Message& msg)
Channel::callRouted(msg);
if (m_conn) {
String s(msg.retValue());
if (s.startSkip("h323/",false) && s && msg.getBoolValue("redirect")) {
if (s.startSkip("h323/",false) && s && msg.getBoolValue("redirect") && m_conn->Lock()) {
Debug(this,DebugAll,"YateH323Chan redirecting to '%s' [%p]",s.c_str(),this);
m_conn->TransferCall(s.safe());
m_conn->Unlock();
return false;
}
return true;
@ -1968,6 +2032,17 @@ bool YateH323Chan::msgText(Message& msg, const char* text)
return false;
}
bool YateH323Chan::setDebug(Message& msg)
{
if (!Channel::setDebug(msg))
return false;
Lock lock(m_mutex);
if (m_conn)
m_conn->debugCopy(this);
return true;
}
bool UserHandler::received(Message &msg)
{
String tmp(msg.getValue("protocol"));
@ -1984,6 +2059,7 @@ bool UserHandler::received(Message &msg)
return true;
}
H323Driver::H323Driver()
: Driver("h323","varchans")
{
@ -2060,6 +2136,7 @@ void H323Driver::initialize()
setup();
s_externalRtp = s_cfg.getBoolValue("general","external_rtp",false);
s_passtrough = s_cfg.getBoolValue("general","forward_rtp",false);
s_fallbackRtp = s_cfg.getBoolValue("general","fallback_rtp",true);
// mantain compatibility with old config files
s_passtrough = s_cfg.getBoolValue("general","passtrough_rtp",s_passtrough);
maxRoute(s_cfg.getIntValue("incoming","maxqueue",5));