Safely allow IAX lines to be processed asynchronously (IAX line is now a Mutex and RefObject). Fixed IAX line (un)register transaction abort.

git-svn-id: http://voip.null.ro/svn/yate@3791 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2010-11-04 15:43:16 +00:00
parent f12e027b64
commit 8e21f8cd26
2 changed files with 250 additions and 175 deletions

View File

@ -662,8 +662,11 @@ bool IAXTransaction::abortReg()
if (!(type() == RegReq || type() == RegRel) ||
state() == Terminating || state() == Terminated)
return false;
lock();
m_userdata = 0;
sendReject();
m_outFrames.clear();
unlock();
sendReject("Aborted");
return true;
}

View File

@ -67,20 +67,15 @@ class IAXURI;
/*
* Keep a single registration line
*/
class YIAXLine : public String
class YIAXLine : public RefObject, public Mutex
{
friend class YIAXLineContainer;
friend class YIAXEngine;
public:
enum State {
Idle,
Registering,
Unregistering,
};
YIAXLine(const String& name);
virtual ~YIAXLine();
inline State state() const
{ return m_state; }
virtual const String& toString() const
{ return m_name; }
inline bool registered() const
{ return m_registered; }
inline const String& username() const
@ -91,7 +86,7 @@ public:
{ return m_callingNo; }
inline const String& callingName() const
{ return m_callingName; }
inline u_int16_t expire() const
inline int expire() const
{ return m_expire; }
inline const String& localAddr() const
{ return m_localAddr; }
@ -103,12 +98,12 @@ public:
{ return m_remotePort; }
private:
void setRegistered(bool registered, const char* reason = 0);
State m_state;
String m_name;
String m_username; // Username
String m_password; // Password
String m_callingNo; // Calling number
String m_callingName; // Calling name
u_int16_t m_expire; // Expire time
int m_expire; // Expire time
String m_localAddr;
String m_remoteAddr;
int m_localPort;
@ -189,12 +184,18 @@ public:
{ Lock lock(this); return m_lines.skipNull();}
protected:
// Clear a line's transaction
void clearTransaction(YIAXLine* line);
// Change a line String parameter. Unregister it if changed
bool changeLine(YIAXLine* line, String& dest, const String& src);
bool changeLine(YIAXLine* line, int& dest, int src);
// Update a line
bool updateLine(YIAXLine* line, Message &msg);
bool addLine(Message &msg);
YIAXLine* findLine(const String& name);
YIAXLine* findLine(YIAXLine* line);
void startRegisterLine(YIAXLine* line);
void startUnregisterLine(YIAXLine* line);
// (Un)register a line
void registerLine(YIAXLine* line, bool reg);
private:
ObjList m_lines;
};
@ -545,16 +546,19 @@ static String s_statusCmd = "status";
// Create an idle line
YIAXLine::YIAXLine(const String& name)
: String(name),
m_state(Idle), m_expire(60), m_localPort(4569), m_remotePort(4569),
: Mutex(true,"IAX:Line"), m_name(name),
m_expire(60), m_localPort(4569), m_remotePort(4569),
m_nextReg(Time::secNow() + 40),
m_registered(false),
m_register(true)
m_register(true),
m_transaction(0)
{
DDebug(&iplugin,DebugAll,"YIAXLine(%s) [%p]",m_name.c_str(),this);
}
YIAXLine::~YIAXLine()
{
DDebug(&iplugin,DebugAll,"~YIAXLine(%s) [%p]",m_name.c_str(),this);
}
// Set the registered status, emits user.notify messages if necessary
@ -563,9 +567,15 @@ void YIAXLine::setRegistered(bool registered, const char* reason)
if ((m_registered == registered) && !reason)
return;
m_registered = registered;
if (registered)
Debug(&iplugin,DebugInfo,"Line(%s) registered to %s:%d [%p]",
m_name.c_str(),m_remoteAddr.c_str(),m_remotePort,this);
else
Debug(&iplugin,DebugNote,"Line(%s) unregistered from %s:%d reason='%s' [%p]",
m_name.c_str(),m_remoteAddr.c_str(),m_remotePort,reason,this);
if (m_username) {
Message* m = new Message("user.notify");
m->addParam("account",*this);
m->addParam("account",toString());
m->addParam("protocol","iax");
m->addParam("username",m_username);
m->addParam("registered",String::boolText(registered));
@ -575,70 +585,180 @@ void YIAXLine::setRegistered(bool registered, const char* reason)
}
}
// Clear a line's transaction
void YIAXLineContainer::clearTransaction(YIAXLine* line)
{
if (!(line && line->m_transaction))
return;
IAXTransaction* trans = line->m_transaction;
line->m_transaction = 0;
trans->abortReg();
trans->setUserData(0);
}
// Change a line String parameter. Unregister it if changed
bool YIAXLineContainer::changeLine(YIAXLine* line, String& dest, const String& src)
{
if (!line || dest == src)
return false;
if (line->m_registered) {
line->m_registered = false;
registerLine(line,false);
}
dest = src;
return true;
}
// Change a line int parameter. Unregister it if changed
bool YIAXLineContainer::changeLine(YIAXLine* line, int& dest, int src)
{
if (!line || dest == src)
return false;
if (line->m_registered) {
line->m_registered = false;
registerLine(line,false);
}
dest = src;
return true;
}
// Find and update a line with parameters from message, create if needed
bool YIAXLineContainer::updateLine(Message& msg)
{
Lock lock(this);
String name = msg.getValue("account");
const String& op = msg["operation"];
bool login = (op != "logout" && op != "delete");
const String& name = msg["account"];
Debug(&iplugin,DebugAll,"updateLine %s",name.c_str());
Lock lck(this);
YIAXLine* line = findLine(name);
if (line)
return updateLine(line,msg);
return addLine(msg);
if (!line) {
if (!login)
return false;
line = new YIAXLine(name);
line->ref();
m_lines.append(line);
}
lck.drop();
line->lock();
if (login) {
bool changed = !line->m_register;
String addr = msg.getValue("server",msg.getValue("domain"));
int pos = addr.find(":");
int port = -1;
if (pos >= 0) {
port = addr.substr(pos + 1).toInteger();
addr = addr.substr(0,pos);
}
else
port = msg.getIntValue("port");
if (port > 0)
changed = changeLine(line,line->m_remotePort,port);
changed = changeLine(line,line->m_remoteAddr,addr) || changed;
changed = changeLine(line,line->m_username,msg.getValue("username")) || changed;
changed = changeLine(line,line->m_password,msg.getValue("password")) || changed;
int interval = msg.getIntValue("interval");
if (interval < 60)
interval = 60;
changed = changeLine(line,line->m_expire,interval) || changed;
if (changed || op == "login") {
line->m_nextReg = Time::secNow() + (line->m_expire * 3 / 4);
line->m_nextKeepAlive = 0;
registerLine(line,true);
}
}
else if (line->m_register)
registerLine(line,false);
line->unlock();
// The line might be removed in regTerminate()
Lock lock(this);
YIAXLine* l = findLine(line);
if (l) {
TelEngine::destruct(l);
TelEngine::destruct(line);
}
else
m_lines.append(line);
return true;
}
// Handle registration related transaction terminations
void YIAXLineContainer::regTerminate(IAXEvent* event)
{
Lock lock(this);
if (!event)
return;
IAXTransaction* trans = event->getTransaction();
if (!trans)
return;
YIAXLine* line = findLine(static_cast<YIAXLine*>(trans->getUserData()));
if (!line)
if (!line) {
trans->setUserData(0);
return;
}
line->lock();
if (line->m_transaction != trans) {
line->unlock();
trans->setUserData(0);
return;
}
const char* rsp = "accepted";
String reason;
bool ok = true;
switch (event->type()) {
case IAXEvent::Accept:
// re-register at 75% of the expire time
line->m_nextReg = Time::secNow() + (line->expire() * 3 / 4);
line->m_callingNo = trans->callingNo();
line->m_callingName = trans->callingName();
Debug(&iplugin,DebugAll,"YIAXLineContainer::regTerminate[%s] - ACK for '%s'. Next: %u",
line->c_str(),line->state() == YIAXLine::Registering?"Register":"Unregister",line->m_nextReg);
line->setRegistered(true);
if (line->m_register) {
// Honor server registration refresh. Assume default (60) if missing
unsigned int interval = 60;
if (event->getList().getNumeric(IAXInfoElement::REFRESH,interval) && !interval)
interval = 60;
if (line->m_expire != (int)interval) {
Debug(&iplugin,DebugNote,"Line(%s) changed expire interval from %d to %u [%p]",
line->toString().c_str(),line->m_expire,interval,line);
line->m_expire = interval;
}
// re-register at 75% of the expire time
line->m_nextReg = Time::secNow() + (line->expire() * 3 / 4);
line->m_callingNo = trans->callingNo();
line->m_callingName = trans->callingName();
}
ok = line->m_register;
break;
case IAXEvent::Reject:
// retry at 25% of the expire time
line->m_nextReg = Time::secNow() + (line->expire() / 2);
Debug(&iplugin,DebugAll,"YIAXLineContainer::regTerminate[%s] - REJECT for '%s'. Next: %u",
line->c_str(),line->state() == YIAXLine::Registering?"Register":"Unregister",line->m_nextReg);
line->setRegistered(false,"rejected");
line->m_nextReg = Time::secNow() + (line->expire() / 4);
ok = false;
rsp = "rejected";
event->getList().getString(IAXInfoElement::CAUSE,reason);
break;
case IAXEvent::Timeout:
// retry at 50% of the expire time
line->m_nextReg = Time::secNow() + (line->expire() / 2);
Debug(&iplugin,DebugAll,"YIAXLineContainer::regTerminate[%s] - Timeout for '%s'. Next: %u",
line->c_str(),line->state() == YIAXLine::Registering?"Register":"Unregister",line->m_nextReg);
line->setRegistered(false,"timeout");
ok = false;
rsp = "Timeout";
reason = "timeout";
break;
default:
return;
rsp = 0;
}
line->m_transaction = 0;
// Unregister operation. Remove line
if (line->state() == YIAXLine::Unregistering) {
line->setRegistered(false);
m_lines.remove(line,true);
return;
bool remove = false;
if (rsp) {
DDebug(&iplugin,event->type() == IAXEvent::Accept ? DebugAll : DebugNote,
"Line(%s) regTerminate register=%s result=%s reason=%s [%p]",
line->toString().c_str(),String::boolText(line->m_register),rsp,reason.safe(),line);
clearTransaction(line);
line->setRegistered(ok,reason);
remove = !line->m_register;
}
line->m_state = YIAXLine::Idle;
line->unlock();
if (remove) {
Lock lock(this);
m_lines.remove(line);
}
TelEngine::destruct(line);
}
// Handle registration related authentication
void YIAXLineContainer::regAuthReq(IAXEvent* event)
{
Lock lock(this);
if (!event || event->type() != IAXEvent::AuthReq)
return;
IAXTransaction* trans = event->getTransaction();
@ -646,15 +766,21 @@ void YIAXLineContainer::regAuthReq(IAXEvent* event)
return;
YIAXLine* line = findLine(static_cast<YIAXLine*>(trans->getUserData()));
if (!line) {
Debug(&iplugin,DebugAll,"Lines: NO LINE");
trans->setUserData(0);
return;
}
// Send auth reply
String response;
if (!iplugin.getEngine())
return;
iplugin.getEngine()->getMD5FromChallenge(response,trans->challenge(),line->password());
trans->sendAuthReply(response);
line->lock();
if (line->m_transaction == trans) {
String response;
iplugin.getEngine()->getMD5FromChallenge(response,trans->challenge(),line->password());
line->unlock();
trans->sendAuthReply(response);
}
else {
line->unlock();
trans->setUserData(0);
}
TelEngine::destruct(line);
}
// Handle registration related events
@ -678,117 +804,65 @@ void YIAXLineContainer::handleEvent(IAXEvent* event)
void YIAXLineContainer::evTimer(Time& time)
{
u_int32_t sec = time.sec();
Lock lock(this);
for (ObjList* l = m_lines.skipNull(); l; l = l->next()) {
YIAXLine* line = static_cast<YIAXLine*>(l->get());
// Line exists and is idle ?
if (!line || line->state() != YIAXLine::Idle)
continue;
// Time to keep alive
if (sec > line->m_nextKeepAlive) {
line->m_nextKeepAlive = sec + 25;
SocketAddr addr(AF_INET);
addr.host(line->remoteAddr());
addr.port(line->remotePort());
iplugin.getEngine()->keepAlive(addr);
}
// Time to reg/unreg
if (sec > line->m_nextReg) {
line->m_nextReg += line->expire();
if (line->m_register)
startRegisterLine(line);
else
startUnregisterLine(line);
lock();
ListIterator iter(m_lines);
GenObject* gen = 0;
while (0 != (gen = iter.get())) {
RefPointer<YIAXLine> line = static_cast<YIAXLine*>(gen);
unlock();
if (line) {
line->lock();
if (sec > line->m_nextReg) {
line->m_nextReg += line->expire();
registerLine(line,line->m_register);
}
else if (line->m_registered && sec > line->m_nextKeepAlive) {
line->m_nextKeepAlive = sec + 25;
SocketAddr addr(AF_INET);
addr.host(line->remoteAddr());
addr.port(line->remotePort());
iplugin.getEngine()->keepAlive(addr);
}
line->unlock();
line = 0;
}
lock();
}
unlock();
}
// Fill parameters with information taken from line
bool YIAXLineContainer::fillList(String& name, NamedList& dest, SocketAddr& addr, bool& registered)
{
Lock lock(this);
registered = false;
YIAXLine* line = findLine(name);
if (!line)
return false;
line->lock();
dest.setParam("username",line->username());
dest.setParam("password",line->password());
dest.setParam("caller",line->callingNo());
dest.setParam("callername",line->callingName());
addr.host(line->remoteAddr());
addr.port(line->remotePort());
String a = line->remoteAddr();
int p = line->remotePort();
registered = line->registered();
return true;
}
// Update a line with data from message, create or delete line if needed
bool YIAXLineContainer::updateLine(YIAXLine* line, Message& msg)
{
Debug(&iplugin,DebugAll,"YIAXLineContainer - updateLine: %s",line->c_str());
String op = msg.getValue("operation");
if (op == "logout") {
if (line->state() != YIAXLine::Unregistering)
return true;
if (line->state() != YIAXLine::Idle && line->m_transaction)
line->m_transaction->abortReg();
startUnregisterLine(line);
return true;
}
bool change = false;
if (line->m_remoteAddr != msg.getValue("server")) {
line->m_remoteAddr = msg.getValue("server");
change = true;
}
if (line->m_username != msg.getValue("username")) {
line->m_username = msg.getValue("username");
change = true;
}
if (line->m_password != msg.getValue("password")) {
line->m_password = msg.getValue("password");
change = true;
}
if (line->m_expire != String(msg.getValue("interval")).toInteger()) {
line->m_expire = String(msg.getValue("interval")).toInteger();
change = true;
}
line->m_nextReg = Time::secNow() + (line->m_expire * 3 / 4);
line->m_nextKeepAlive = Time::secNow() + 25;
if (change || op == "login")
startRegisterLine(line);
return change;
}
// Add a new line and start logging in to it if applicable
bool YIAXLineContainer::addLine(Message& msg)
{
Debug(&iplugin,DebugAll,"YIAXLineContainer - addLine: %s",msg.getValue("account"));
YIAXLine* line = new YIAXLine(msg.getValue("account"));
m_lines.append(line);
line->m_remoteAddr = msg.getValue("server");
line->m_username = msg.getValue("username");
line->m_password = msg.getValue("password");
line->m_expire = msg.getIntValue("interval",60);
line->m_nextReg = Time::secNow() + line->m_expire;
String op = msg.getValue("operation");
if (op.null() || (op == "login"))
startRegisterLine(line);
else
if (op == "logout")
startUnregisterLine(line);
else
line->m_register = true;
line->unlock();
TelEngine::destruct(line);
addr.host(a);
addr.port(p);
return true;
}
// Find a line by name
YIAXLine* YIAXLineContainer::findLine(const String& name)
{
for (ObjList* l = m_lines.skipNull(); l; l = l->next()) {
YIAXLine* line = static_cast<YIAXLine*>(l->get());
if (line && *line == name)
return line;
}
return 0;
Lock lock(this);
ObjList* o = m_lines.find(name);
if (!o)
return 0;
YIAXLine* line = static_cast<YIAXLine*>(o->get());
line->ref();
return line;
}
// Find a line by address, return same if found
@ -796,49 +870,47 @@ YIAXLine* YIAXLineContainer::findLine(YIAXLine* line)
{
if (!line)
return 0;
for (ObjList* l = m_lines.skipNull(); l; l = l->next()) {
YIAXLine* ln = static_cast<YIAXLine*>(l->get());
if (ln && ln == line)
return line;
}
return 0;
Lock lock(this);
if (!m_lines.find(line))
return 0;
line->ref();
return line;
}
// Initiate registration of line, only if idle
void YIAXLineContainer::startRegisterLine(YIAXLine* line)
// Initiate line (un)register
void YIAXLineContainer::registerLine(YIAXLine* line, bool reg)
{
Lock lock(this);
line->m_register = true;
if (line->state() != YIAXLine::Idle)
if (!line)
return;
if (iplugin.getEngine()->reg(line,true))
line->m_state = YIAXLine::Registering;
}
// Initiate deregistration of line, only if idle
void YIAXLineContainer::startUnregisterLine(YIAXLine* line)
{
Lock lock(this);
line->m_register = false;
if (line->state() != YIAXLine::Idle)
return;
if (iplugin.getEngine()->reg(line,false))
line->m_state = YIAXLine::Unregistering;
line->m_register = reg;
clearTransaction(line);
iplugin.getEngine()->reg(line,reg);
}
// Unregister all lines
void YIAXLineContainer::clear(bool forced)
{
Lock lock(this);
if (forced) {
Lock lock(this);
m_lines.clear();
return;
}
for (ObjList* l = m_lines.skipNull(); l; l = l->next()) {
YIAXLine* line = static_cast<YIAXLine*>(l->get());
if (line)
startUnregisterLine(line);
lock();
ListIterator iter(m_lines);
GenObject* gen = 0;
while (0 != (gen = iter.get())) {
RefPointer<YIAXLine> line = static_cast<YIAXLine*>(gen);
unlock();
if (line) {
line->lock();
if (line->m_register)
registerLine(line,false);
line->unlock();
line = 0;
}
lock();
}
unlock();
}
// Run the socket listening thread
@ -904,7 +976,7 @@ IAXTransaction* YIAXEngine::reg(YIAXLine* line, bool regreq)
addr.port(line->remotePort());
Debug(this,DebugAll,
"Outgoing Registration[%s]:\r\nUsername: %s\r\nHost: %s\r\nPort: %d\r\nTime(sec): %u",
line->c_str(),line->username().c_str(),addr.host().c_str(),addr.port(),Time::secNow());
line->toString().c_str(),line->username().c_str(),addr.host().c_str(),addr.port(),Time::secNow());
// Create IE list
IAXIEList ieList;
ieList.appendString(IAXInfoElement::USERNAME,line->username());
@ -1118,8 +1190,8 @@ YIAXDriver::~YIAXDriver()
Output("Unloading module YIAX");
lock();
channels().clear();
s_lines.clear(true);
unlock();
s_lines.clear(true);
delete m_iaxEngine;
}