Use async connect if available. The stream connect timeout value is no longer global: it's used for each connect attempt (configured address, srv record, domain). Remember connect status and re-connect starting from old status. Added srv query timeout configurable option.
git-svn-id: http://yate.null.ro/svn/yate/trunk@4150 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
11e1b27e83
commit
6a337d5482
|
@ -33,12 +33,16 @@
|
|||
;stream_setuptimeout=120000
|
||||
|
||||
; stream_connecttimeout: integer: The interval, in milliseconds, allowed for an
|
||||
; outgoing stream to make a TCP connection to a remote host, including SRV request
|
||||
; and resolving domain(s)
|
||||
; outgoing stream to make a TCP connection to a remote host
|
||||
; Defaults to 60000 if missing or invalid
|
||||
; Minimum allowed value is 1000, maximum allowed value is 120000
|
||||
;stream_connecttimeout=60000
|
||||
|
||||
; stream_srvtimeout: integer: The timeout interval, in milliseconds, for SRV query
|
||||
; Defaults to 30000 if missing or invalid
|
||||
; Minimum allowed value is 10000, maximum allowed value is 120000
|
||||
;stream_srvtimeout=30000
|
||||
|
||||
; entitycaps: boolean: Enable entity capabilities cache.
|
||||
; If enabled entity capabilities will be requested and cached each time a presence
|
||||
; stanza is received
|
||||
|
|
|
@ -70,12 +70,16 @@
|
|||
;stream_setuptimeout=120000
|
||||
|
||||
; stream_connecttimeout: integer: The interval, in milliseconds, allowed for an
|
||||
; outgoing stream to make a TCP connection to a remote host, including SRV request
|
||||
; and resolving domain(s)
|
||||
; outgoing stream to make a TCP connection to a remote host
|
||||
; Defaults to 60000 if missing or invalid
|
||||
; Minimum allowed value is 1000, maximum allowed value is 120000
|
||||
;stream_connecttimeout=60000
|
||||
|
||||
; stream_srvtimeout: integer: The timeout interval, in milliseconds, for SRV query
|
||||
; Defaults to 30000 if missing or invalid
|
||||
; Minimum allowed value is 10000, maximum allowed value is 120000
|
||||
;stream_srvtimeout=30000
|
||||
|
||||
; stream_idletimeout: integer: The interval, in milliseconds, allowed for a
|
||||
; server to server stream to be idle
|
||||
; Defaults to 3600000 (1h) if missing or invalid
|
||||
|
|
|
@ -61,6 +61,14 @@ const TokenDict JBEvent::s_type[] = {
|
|||
{0,0}
|
||||
};
|
||||
|
||||
const TokenDict JBConnect::s_statusName[] = {
|
||||
{"Start", Start},
|
||||
{"Address", Address},
|
||||
{"Srv", Srv},
|
||||
{"Domain", Domain},
|
||||
{0,0}
|
||||
};
|
||||
|
||||
// Entity caps item tag in document
|
||||
static const String s_entityCapsItem = "item";
|
||||
// Node values used by entity caps
|
||||
|
@ -89,6 +97,10 @@ static const String s_googleMailNode = "http://mail.google.com/xmpp/client/caps"
|
|||
#define JB_CONNECT_INTERVAL 60000
|
||||
#define JB_CONNECT_INTERVAL_MIN 1000
|
||||
#define JB_CONNECT_INTERVAL_MAX 120000
|
||||
// Stream SRV query timer
|
||||
#define JB_SRV_INTERVAL 30000
|
||||
#define JB_SRV_INTERVAL_MIN 10000
|
||||
#define JB_SRV_INTERVAL_MAX 120000
|
||||
// Ping
|
||||
#define JB_PING_INTERVAL 120000
|
||||
#define JB_PING_INTERVAL_MIN 60000
|
||||
|
@ -550,11 +562,11 @@ void SASL::buildMD5Digest(String& dest, const NamedList& params,
|
|||
*/
|
||||
// Constructor. Add itself to the stream's engine
|
||||
JBConnect::JBConnect(const JBStream& stream)
|
||||
: m_domain(stream.remote().domain()), m_port(0),
|
||||
: m_status(Start), m_domain(stream.remote().domain()), m_port(0),
|
||||
m_engine(stream.engine()), m_stream(stream.toString()),
|
||||
m_streamType((JBStream::Type)stream.type())
|
||||
{
|
||||
stream.connectAddr(m_address,m_port,m_localIp);
|
||||
stream.connectAddr(m_address,m_port,m_localIp,m_status,m_srvs);
|
||||
if (m_engine)
|
||||
m_engine->connectStatus(this,true);
|
||||
}
|
||||
|
@ -582,8 +594,90 @@ void JBConnect::connect()
|
|||
{
|
||||
if (!m_engine)
|
||||
return;
|
||||
Debug(m_engine,DebugAll,"JBConnect(%s) starting [%p]",m_stream.c_str(),this);
|
||||
Debug(m_engine,DebugAll,"JBConnect(%s) starting stat=%s [%p]",
|
||||
m_stream.c_str(),lookup(m_status,s_statusName),this);
|
||||
int port = m_port;
|
||||
if (!port) {
|
||||
if (m_streamType == JBStream::c2s)
|
||||
port = XMPP_C2S_PORT;
|
||||
else if (m_streamType == JBStream::s2s)
|
||||
port = XMPP_S2S_PORT;
|
||||
else {
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) no port for %s stream [%p]",
|
||||
m_stream.c_str(),lookup(m_streamType,JBStream::s_typeName),this);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Socket* sock = 0;
|
||||
bool stop = false;
|
||||
advanceStatus();
|
||||
// Try to use ip/port
|
||||
if (m_status == Address) {
|
||||
if (m_address && port) {
|
||||
sock = connect(m_address,port,stop);
|
||||
if (sock || stop || exiting(sock)) {
|
||||
terminated(sock,false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
advanceStatus();
|
||||
}
|
||||
if (m_status == Srv) {
|
||||
if (!m_srvs.skipNull()) {
|
||||
// Get SRV records from remote party
|
||||
String query;
|
||||
if (m_streamType == JBStream::c2s)
|
||||
query = "_xmpp-client._tcp.";
|
||||
else
|
||||
query = "_xmpp-server._tcp.";
|
||||
query << m_domain;
|
||||
String error;
|
||||
// Start connecting timeout
|
||||
if (!notifyConnecting(true,true))
|
||||
return;
|
||||
int code = Resolver::srvQuery(query,m_srvs,&error);
|
||||
// Stop the timeout if not exiting
|
||||
if (exiting(sock) || !notifyConnecting(false,true)) {
|
||||
terminated(0,false);
|
||||
return;
|
||||
}
|
||||
if (!code)
|
||||
DDebug(m_engine,DebugAll,"JBConnect(%s) SRV query for '%s' got %u records [%p]",
|
||||
m_stream.c_str(),query.c_str(),m_srvs.count(),this);
|
||||
else
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) SRV query for '%s' failed: %d '%s' [%p]",
|
||||
m_stream.c_str(),query.c_str(),code,error.c_str(),this);
|
||||
}
|
||||
else
|
||||
// Remove the first entry: we already used it
|
||||
m_srvs.remove();
|
||||
ObjList* o = 0;
|
||||
while (0 != (o = m_srvs.skipNull())) {
|
||||
SrvRecord* rec = static_cast<SrvRecord*>(o->get());
|
||||
sock = connect(*rec,rec->m_port,stop);
|
||||
o->remove();
|
||||
if (sock || stop || exiting(sock)) {
|
||||
terminated(sock,false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
advanceStatus();
|
||||
}
|
||||
if (m_status == Domain) {
|
||||
// Try to resolve the domain
|
||||
if (port)
|
||||
sock = connect(m_domain,port,stop);
|
||||
advanceStatus();
|
||||
}
|
||||
terminated(sock,false);
|
||||
}
|
||||
|
||||
// Create and try to connect a socket. Return it on success
|
||||
// Set stop on fatal failure and return 0
|
||||
Socket* JBConnect::connect(const char* addr, int port, bool& stop)
|
||||
{
|
||||
Socket* sock = new Socket(PF_INET,SOCK_STREAM);
|
||||
// Bind to local ip
|
||||
if (m_localIp) {
|
||||
SocketAddr lip(PF_INET);
|
||||
lip.host(m_localIp);
|
||||
|
@ -594,7 +688,7 @@ void JBConnect::connect()
|
|||
String tmp;
|
||||
Thread::errorString(tmp,sock->error());
|
||||
Debug(m_engine,DebugNote,
|
||||
"JBConnect(%s) failed to bind to '%s' (%s). %d %s [%p]",
|
||||
"JBConnect(%s) failed to bind to '%s' (%s). %d '%s' [%p]",
|
||||
m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),
|
||||
sock->error(),tmp.c_str(),this);
|
||||
}
|
||||
|
@ -602,120 +696,108 @@ void JBConnect::connect()
|
|||
else
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) invalid local ip '%s' [%p]",
|
||||
m_stream.c_str(),m_localIp.c_str(),this);
|
||||
if (!ok) {
|
||||
delete sock;
|
||||
terminated(0,false);
|
||||
return;
|
||||
stop = !ok || exiting(sock);
|
||||
if (stop) {
|
||||
deleteSocket(sock);
|
||||
return 0;
|
||||
}
|
||||
if (exiting(sock))
|
||||
return;
|
||||
DDebug(m_engine,DebugAll,"JBConnect(%s) bound to '%s' (%s) [%p]",
|
||||
m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),this);
|
||||
}
|
||||
// Try to use ip/port
|
||||
int port = m_port;
|
||||
if (!port) {
|
||||
if (m_streamType == JBStream::c2s)
|
||||
port = XMPP_C2S_PORT;
|
||||
else if (m_streamType == JBStream::s2s)
|
||||
port = XMPP_S2S_PORT;
|
||||
else {
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) no port for cluster stream [%p]",
|
||||
// Use async connect
|
||||
u_int64_t tout = 0;
|
||||
if (m_engine)
|
||||
tout = m_engine->m_connectTimeout * 1000;
|
||||
if (tout && !(sock->canSelect() && sock->setBlocking(false))) {
|
||||
tout = 0;
|
||||
if (sock->canSelect()) {
|
||||
String tmp;
|
||||
Thread::errorString(tmp,sock->error());
|
||||
Debug(m_engine,DebugInfo,
|
||||
"JBConnect(%s) using sync connect (async set failed). %d '%s' [%p]",
|
||||
m_stream.c_str(),sock->error(),tmp.c_str(),this);
|
||||
}
|
||||
else
|
||||
Debug(m_engine,DebugInfo,
|
||||
"JBConnect(%s) using sync connect (select() not available) [%p]",
|
||||
m_stream.c_str(),this);
|
||||
delete sock;
|
||||
terminated(0,false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (m_address && connect(sock,m_address,port)) {
|
||||
terminated(sock,false);
|
||||
return;
|
||||
if (!notifyConnecting(tout == 0)) {
|
||||
stop = true;
|
||||
deleteSocket(sock);
|
||||
return 0;
|
||||
}
|
||||
if (exiting(sock))
|
||||
return;
|
||||
// Try to use the domain
|
||||
if (!m_domain) {
|
||||
delete sock;
|
||||
terminated(0,false);
|
||||
return;
|
||||
}
|
||||
if (!m_port && (m_streamType == JBStream::c2s || m_streamType == JBStream::s2s)) {
|
||||
// Get SRV records from remote party
|
||||
String query;
|
||||
if (m_streamType == JBStream::c2s)
|
||||
query = "_xmpp-client._tcp.";
|
||||
else
|
||||
query = "_xmpp-server._tcp.";
|
||||
query << m_domain;
|
||||
ObjList srv;
|
||||
String error;
|
||||
int code = Resolver::srvQuery(query,srv,&error);
|
||||
if (exiting(sock))
|
||||
return;
|
||||
if (!code) {
|
||||
DDebug(m_engine,DebugAll,"JBConnect(%s) SRV query for '%s' got %u records [%p]",
|
||||
m_stream.c_str(),query.c_str(),srv.count(),this);
|
||||
for (ObjList* o = srv.skipNull(); o; o = o->skipNext()) {
|
||||
SrvRecord* rec = static_cast<SrvRecord*>(o->get());
|
||||
if (connect(sock,*rec,rec->m_port)) {
|
||||
terminated(sock,false);
|
||||
return;
|
||||
}
|
||||
if (exiting(sock))
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) SRV query for '%s' failed: %d '%s' [%p]",
|
||||
m_stream.c_str(),query.c_str(),code,error.c_str(),this);
|
||||
}
|
||||
// Try to resolve the domain
|
||||
if (connect(sock,m_domain,port)) {
|
||||
terminated(sock,false);
|
||||
return;
|
||||
}
|
||||
delete sock;
|
||||
terminated(0,false);
|
||||
}
|
||||
|
||||
// Connect a socket
|
||||
bool JBConnect::connect(Socket*& sock, const char* addr, int port)
|
||||
{
|
||||
u_int64_t start = tout ? Time::now() : 0;
|
||||
SocketAddr a(PF_INET);
|
||||
a.host(addr);
|
||||
a.port(port);
|
||||
// Check exiting: it may take some time to resolve the domain
|
||||
if (exiting(sock))
|
||||
return false;
|
||||
stop = exiting(sock);
|
||||
if (stop)
|
||||
return 0;
|
||||
if (!a.host()) {
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) failed to resolve '%s' [%p]",
|
||||
m_stream.c_str(),addr,this);
|
||||
return false;
|
||||
deleteSocket(sock);
|
||||
return 0;
|
||||
}
|
||||
DDebug(m_engine,DebugAll,"JBConnect(%s) attempt to connect to '%s:%d' (%s) [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),addr,this);
|
||||
unsigned int intervals = 0;
|
||||
if (start) {
|
||||
start = Time::now() - start;
|
||||
if (tout > start)
|
||||
intervals = (unsigned int)(tout - start) / Thread::idleUsec();
|
||||
// Make sure we wait for at least 1 timeout interval
|
||||
if (!intervals)
|
||||
intervals = 1;
|
||||
}
|
||||
String domain;
|
||||
if (a.host() != addr)
|
||||
domain << " (" << addr << ")";
|
||||
Debug(m_engine,DebugAll,"JBConnect(%s) attempt to connect to '%s:%d'%s [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
|
||||
bool ok = (0 != sock->connect(a));
|
||||
if (ok)
|
||||
DDebug(m_engine,DebugAll,"JBConnect(%s) connected to '%s:%d' (%s) [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),addr,this);
|
||||
else {
|
||||
String tmp;
|
||||
Thread::errorString(tmp,sock->error());
|
||||
Debug(m_engine,DebugNote,
|
||||
"JBConnect(%s) failed to connect to '%s:%d' (%s). %d '%s' [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),addr,sock->error(),tmp.c_str(),this);
|
||||
bool timeout = false;
|
||||
// Async connect in progress
|
||||
if (!ok && sock->inProgress()) {
|
||||
bool done = false;
|
||||
bool event = false;
|
||||
while (intervals && !(done || event || stop)) {
|
||||
if (!sock->select(0,&done,&event,Thread::idleUsec()))
|
||||
break;
|
||||
intervals--;
|
||||
stop = exiting(sock);
|
||||
}
|
||||
timeout = !intervals && !(done || event);
|
||||
if (sock && !sock->error() && (done || event) && sock->updateError())
|
||||
ok = !sock->error();
|
||||
}
|
||||
return ok;
|
||||
if (ok) {
|
||||
Debug(m_engine,DebugAll,"JBConnect(%s) connected to '%s:%d'%s [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
|
||||
return sock;
|
||||
}
|
||||
if (sock) {
|
||||
String reason;
|
||||
if (timeout)
|
||||
reason = "Timeout";
|
||||
else {
|
||||
String tmp;
|
||||
Thread::errorString(tmp,sock->error());
|
||||
reason << sock->error() << " '" << tmp << "'";
|
||||
}
|
||||
Debug(m_engine,DebugNote,"JBConnect(%s) failed to connect to '%s:%d'%s. %s [%p]",
|
||||
m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),reason.safe(),this);
|
||||
deleteSocket(sock);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check if exiting. Release socket
|
||||
bool JBConnect::exiting(Socket*& sock)
|
||||
{
|
||||
bool done = Thread::check(false) || !m_engine || m_engine->exiting();
|
||||
if (done && sock) {
|
||||
delete sock;
|
||||
sock = 0;
|
||||
}
|
||||
if (done && sock)
|
||||
deleteSocket(sock);
|
||||
return done;
|
||||
}
|
||||
|
||||
|
@ -728,28 +810,85 @@ void JBConnect::terminated(Socket* sock, bool final)
|
|||
// Remove from engine
|
||||
if (engine)
|
||||
engine->connectStatus(this,false);
|
||||
if (done)
|
||||
if (done) {
|
||||
if (!final && Thread::check(false))
|
||||
Debug(m_engine,DebugAll,"JBConnect(%s) cancelled [%p]",m_stream.c_str(),this);
|
||||
return;
|
||||
if (!final)
|
||||
Debug(engine,DebugAll,"JBConnect stream='%s' terminated [%p]",
|
||||
m_stream.c_str(),this);
|
||||
else
|
||||
Debug(engine,DebugWarn,"JBConnect stream='%s' abnormally terminated! [%p]",
|
||||
m_stream.c_str(),this);
|
||||
// Find the stream and notify it
|
||||
}
|
||||
JBStream* stream = engine->findStream(m_stream,m_streamType);
|
||||
if (!final)
|
||||
Debug(engine,DebugAll,"JBConnect(%s) terminated [%p]",m_stream.c_str(),this);
|
||||
else if (stream)
|
||||
Debug(engine,DebugWarn,"JBConnect(%s) abnormally terminated! [%p]",
|
||||
m_stream.c_str(),this);
|
||||
// Notify stream
|
||||
if (stream) {
|
||||
stream->connectTerminated(sock);
|
||||
TelEngine::destruct(stream);
|
||||
}
|
||||
else {
|
||||
if (sock)
|
||||
delete sock;
|
||||
DDebug(engine,DebugInfo,"JBConnect stream='%s' vanished while connecting [%p]",
|
||||
deleteSocket(sock);
|
||||
DDebug(engine,DebugInfo,"JBConnect(%s) stream vanished while connecting [%p]",
|
||||
m_stream.c_str(),this);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify connecting to the stream. Return false if stream vanished
|
||||
bool JBConnect::notifyConnecting(bool sync, bool useCurrentStat)
|
||||
{
|
||||
JBStream* stream = m_engine ? m_engine->findStream(m_stream,m_streamType) : 0;
|
||||
if (!stream)
|
||||
return false;
|
||||
int stat = m_status;
|
||||
if (!useCurrentStat) {
|
||||
// Advertised state:
|
||||
// Srv --> Address: we'll advance the state on retry
|
||||
// Domain --> Start to re-start on retry
|
||||
if (m_status == Srv)
|
||||
stat = Address;
|
||||
else if (m_status == Domain)
|
||||
stat = Start;
|
||||
}
|
||||
bool ok = stream->connecting(sync,stat,m_srvs);
|
||||
TelEngine::destruct(stream);
|
||||
return ok;
|
||||
}
|
||||
|
||||
// Delete a socket
|
||||
void JBConnect::deleteSocket(Socket*& sock)
|
||||
{
|
||||
if (!sock)
|
||||
return;
|
||||
sock->setReuse();
|
||||
sock->setLinger(0);
|
||||
delete sock;
|
||||
sock = 0;
|
||||
}
|
||||
|
||||
// Advance the status
|
||||
void JBConnect::advanceStatus()
|
||||
{
|
||||
if (m_status == Start)
|
||||
m_status = Address;
|
||||
else if (m_status == Address) {
|
||||
if (m_domain) {
|
||||
if (!m_port &&
|
||||
(m_streamType == JBStream::c2s || m_streamType == JBStream::s2s))
|
||||
m_status = Srv;
|
||||
else
|
||||
m_status = Domain;
|
||||
}
|
||||
else
|
||||
m_status = Start;
|
||||
}
|
||||
else if (m_status == Srv)
|
||||
m_status = Domain;
|
||||
else if (m_status == Domain)
|
||||
m_status = Start;
|
||||
else
|
||||
m_status = Address;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* JBEngine
|
||||
|
@ -759,7 +898,7 @@ JBEngine::JBEngine(const char* name)
|
|||
m_exiting(false),
|
||||
m_restartMax(JB_RESTART_COUNT), m_restartUpdInterval(JB_RESTART_UPDATE),
|
||||
m_setupTimeout(JB_SETUP_INTERVAL), m_startTimeout(JB_START_INTERVAL),
|
||||
m_connectTimeout(JB_CONNECT_INTERVAL),
|
||||
m_connectTimeout(JB_CONNECT_INTERVAL), m_srvTimeout(JB_SRV_INTERVAL),
|
||||
m_pingInterval(JB_PING_INTERVAL), m_pingTimeout(JB_PING_TIMEOUT),
|
||||
m_idleTimeout(0),
|
||||
m_streamReadBuffer(JB_STREAMBUF), m_maxIncompleteXml(XMPP_MAX_INCOMPLETEXML),
|
||||
|
@ -807,6 +946,8 @@ void JBEngine::initialize(const NamedList& params)
|
|||
JB_START_INTERVAL,JB_START_INTERVAL_MIN,JB_START_INTERVAL_MAX);
|
||||
m_connectTimeout = fixValue(params,"stream_connecttimeout",
|
||||
JB_CONNECT_INTERVAL,JB_CONNECT_INTERVAL_MIN,JB_CONNECT_INTERVAL_MAX);
|
||||
m_srvTimeout = fixValue(params,"stream_srvtimeout",
|
||||
JB_SRV_INTERVAL,JB_SRV_INTERVAL_MIN,JB_SRV_INTERVAL_MAX);
|
||||
m_pingInterval = fixValue(params,"stream_pinginterval",
|
||||
JB_PING_INTERVAL,JB_PING_INTERVAL_MIN,JB_PING_INTERVAL_MAX);
|
||||
m_pingTimeout = fixValue(params,"stream_pingtimeout",
|
||||
|
@ -1154,15 +1295,7 @@ void JBEngine::removeStream(JBStream* stream, bool delObj)
|
|||
{
|
||||
if (!stream)
|
||||
return;
|
||||
lock();
|
||||
ObjList* o = m_connect.find(stream->toString());
|
||||
if (o) {
|
||||
JBConnect* conn = static_cast<JBConnect*>(o->get());
|
||||
Debug(this,DebugAll,"Stopping stream connect thread (%p,%s)",
|
||||
conn,conn->toString().c_str());
|
||||
conn->stopConnect();
|
||||
}
|
||||
unlock();
|
||||
stopConnect(stream->toString());
|
||||
}
|
||||
|
||||
// Add/remove a connect stream thread when started/stopped
|
||||
|
@ -1172,6 +1305,8 @@ void JBEngine::connectStatus(JBConnect* conn, bool started)
|
|||
return;
|
||||
Lock lock(this);
|
||||
if (started) {
|
||||
// Make sure we remove any existing connect stream with the same name
|
||||
stopConnect(conn->toString());
|
||||
m_connect.append(conn)->setDelete(false);
|
||||
DDebug(this,DebugAll,"Added stream connect thread (%p)",conn);
|
||||
}
|
||||
|
@ -1182,6 +1317,19 @@ void JBEngine::connectStatus(JBConnect* conn, bool started)
|
|||
}
|
||||
}
|
||||
|
||||
// Stop a connect stream
|
||||
void JBEngine::stopConnect(const String& name)
|
||||
{
|
||||
Lock lock(this);
|
||||
ObjList* o = m_connect.find(name);
|
||||
if (!o)
|
||||
return;
|
||||
JBConnect* conn = static_cast<JBConnect*>(o->get());
|
||||
Debug(this,DebugAll,"Stopping stream connect thread (%p,%s)",conn,name.c_str());
|
||||
conn->stopConnect();
|
||||
o->remove(false);
|
||||
}
|
||||
|
||||
// Find a stream by its name in a given set list
|
||||
JBStream* JBEngine::findStream(const String& id, JBStreamSetList* list)
|
||||
{
|
||||
|
|
|
@ -155,7 +155,7 @@ JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
|
|||
m_engine(engine), m_type(t),
|
||||
m_incoming(true), m_terminateEvent(0),
|
||||
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
|
||||
m_connectPort(0), m_compress(0)
|
||||
m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start)
|
||||
{
|
||||
if (ssl)
|
||||
setFlags(StreamSecured | StreamTls);
|
||||
|
@ -186,7 +186,7 @@ JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const Jabber
|
|||
m_incoming(false), m_name(name),
|
||||
m_terminateEvent(0),
|
||||
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
|
||||
m_connectPort(0), m_compress(0)
|
||||
m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start)
|
||||
{
|
||||
if (!m_name)
|
||||
m_engine->buildStreamName(m_name,this);
|
||||
|
@ -231,6 +231,7 @@ void JBStream::connectTerminated(Socket*& sock)
|
|||
}
|
||||
else {
|
||||
DDebug(this,DebugNote,"Connect failed [%p]",this);
|
||||
resetConnectStatus();
|
||||
terminate(0,false,0,XMPPError::NoRemote);
|
||||
}
|
||||
return;
|
||||
|
@ -243,6 +244,29 @@ void JBStream::connectTerminated(Socket*& sock)
|
|||
}
|
||||
}
|
||||
|
||||
// Connecting notification. Start connect timer for synchronous connect
|
||||
bool JBStream::connecting(bool sync, int stat, ObjList& srvs)
|
||||
{
|
||||
if (incoming() || !m_engine || state() != Connecting)
|
||||
return false;
|
||||
Lock lock(this);
|
||||
if (state() != Connecting)
|
||||
return false;
|
||||
m_connectStatus = stat;
|
||||
SrvRecord::copy(m_connectSrvs,srvs);
|
||||
if (sync) {
|
||||
if (stat != JBConnect::Srv)
|
||||
m_connectTimeout = Time::msecNow() + m_engine->m_connectTimeout;
|
||||
else
|
||||
m_connectTimeout = Time::msecNow() + m_engine->m_srvTimeout;
|
||||
}
|
||||
else
|
||||
m_connectTimeout = 0;
|
||||
DDebug(this,DebugAll,"Connecting sync=%u stat=%s [%p]",
|
||||
sync,lookup(m_connectStatus,JBConnect::s_statusName),this);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Get an object from this stream
|
||||
void* JBStream::getObject(const String& name) const
|
||||
{
|
||||
|
@ -261,6 +285,17 @@ const String& JBStream::toString() const
|
|||
return m_name;
|
||||
}
|
||||
|
||||
// Retrieve connection address(es), port and status
|
||||
void JBStream::connectAddr(String& addr, int& port, String& localip, int& stat,
|
||||
ObjList& srvs) const
|
||||
{
|
||||
addr = m_connectAddr;
|
||||
port = m_connectPort;
|
||||
localip = m_localIp;
|
||||
stat = m_connectStatus;
|
||||
SrvRecord::copy(srvs,m_connectSrvs);
|
||||
}
|
||||
|
||||
// Set/reset RosterRequested flag
|
||||
void JBStream::setRosterRequested(bool ok)
|
||||
{
|
||||
|
@ -821,14 +856,19 @@ bool JBStream::canProcess(u_int64_t time)
|
|||
}
|
||||
if (state() == Idle) {
|
||||
// Re-connect
|
||||
// Don't connect non client if we are in error and have nothing to send
|
||||
if (m_restart) {
|
||||
bool conn = (m_connectStatus > JBConnect::Start);
|
||||
if (!conn && m_restart) {
|
||||
// Don't connect non client or cluster if we are in error and
|
||||
// have nothing to send
|
||||
if (m_type != c2s && m_type != cluster &&
|
||||
flag(InError) && !m_pending.skipNull())
|
||||
return false;
|
||||
conn = true;
|
||||
m_restart--;
|
||||
}
|
||||
if (conn) {
|
||||
resetFlags(InError);
|
||||
changeState(Connecting);
|
||||
m_restart--;
|
||||
m_engine->connectStream(this);
|
||||
return false;
|
||||
}
|
||||
|
@ -1048,7 +1088,15 @@ void JBStream::checkTimeouts(u_int64_t time)
|
|||
}
|
||||
// Stream connect timer
|
||||
if (m_connectTimeout && m_connectTimeout < time) {
|
||||
terminate(0,m_incoming,0,XMPPError::ConnTimeout,"Stream connect timeout");
|
||||
DDebug(this,DebugNote,"Connect timed out stat=%s [%p]",
|
||||
lookup(m_connectStatus,JBConnect::s_statusName),this);
|
||||
// Don't terminate if there are more connect options
|
||||
if (state() == Connecting && m_connectStatus > JBConnect::Start) {
|
||||
m_engine->stopConnect(toString());
|
||||
m_engine->connectStream(this);
|
||||
}
|
||||
else
|
||||
terminate(0,m_incoming,0,XMPPError::ConnTimeout,"Stream connect timeout");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -1474,6 +1522,9 @@ void JBStream::changeState(State newState, u_int64_t time)
|
|||
// Set/reset state depending data
|
||||
switch (m_state) {
|
||||
case WaitStart:
|
||||
// Reset connect status if not timeout
|
||||
if (m_startTimeout && m_startTimeout > time)
|
||||
resetConnectStatus();
|
||||
m_startTimeout = 0;
|
||||
break;
|
||||
case Securing:
|
||||
|
@ -1482,6 +1533,7 @@ void JBStream::changeState(State newState, u_int64_t time)
|
|||
break;
|
||||
case Connecting:
|
||||
m_connectTimeout = 0;
|
||||
m_engine->stopConnect(toString());
|
||||
break;
|
||||
case Register:
|
||||
if (type() == c2s)
|
||||
|
@ -1518,6 +1570,7 @@ void JBStream::changeState(State newState, u_int64_t time)
|
|||
clientStream()->m_registerReq = 0;
|
||||
break;
|
||||
case Running:
|
||||
resetConnectStatus();
|
||||
setFlags(StreamSecured | StreamAuthenticated);
|
||||
resetFlags(InError);
|
||||
m_setupTimeout = 0;
|
||||
|
@ -1525,13 +1578,6 @@ void JBStream::changeState(State newState, u_int64_t time)
|
|||
if (m_state != Running)
|
||||
m_events.append(new JBEvent(JBEvent::Running,this,0));
|
||||
break;
|
||||
case Connecting:
|
||||
if (m_engine->m_connectTimeout)
|
||||
m_connectTimeout = time + m_engine->m_connectTimeout;
|
||||
else
|
||||
m_connectTimeout = 0;
|
||||
DDebug(this,DebugAll,"Set connect timeout " FMT64 " [%p]",m_connectTimeout,this);
|
||||
break;
|
||||
case Securing:
|
||||
socketSetCanRead(false);
|
||||
break;
|
||||
|
@ -2335,6 +2381,14 @@ bool JBStream::compress(XmlElementOut* xml)
|
|||
return false;
|
||||
}
|
||||
|
||||
// Reset connect status data
|
||||
void JBStream::resetConnectStatus()
|
||||
{
|
||||
DDebug(this,DebugAll,"resetConnectStatus() [%p]",this);
|
||||
m_connectStatus = JBConnect::Start;
|
||||
m_connectSrvs.clear();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* JBClientStream
|
||||
|
|
|
@ -67,6 +67,17 @@ void SrvRecord::insert(ObjList& list, SrvRecord* rec)
|
|||
list.append(rec);
|
||||
}
|
||||
|
||||
// Copy a SrvRecord list into another one
|
||||
void SrvRecord::copy(ObjList& dest, const ObjList& src)
|
||||
{
|
||||
dest.clear();
|
||||
for (ObjList* o = src.skipNull(); o; o = o->skipNext()) {
|
||||
SrvRecord* rec = static_cast<SrvRecord*>(o->get());
|
||||
dest.append(new SrvRecord(*rec,rec->m_port,rec->m_priority,rec->m_weight));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Make a SRV query
|
||||
int Resolver::srvQuery(const char* query, ObjList& result, String* error)
|
||||
{
|
||||
|
|
|
@ -85,6 +85,13 @@ public:
|
|||
*/
|
||||
static void insert(ObjList& list, SrvRecord* rec);
|
||||
|
||||
/**
|
||||
* Copy a SrvRecord list into another one
|
||||
* @param dest Destination list
|
||||
* @param src Source list
|
||||
*/
|
||||
static void copy(ObjList& dest, const ObjList& src);
|
||||
|
||||
int m_port;
|
||||
int m_priority;
|
||||
int m_weight;
|
||||
|
|
|
@ -712,17 +712,16 @@ public:
|
|||
}
|
||||
|
||||
/**
|
||||
* Retrieve connection address(es) and port
|
||||
* Retrieve connection address(es), port and status
|
||||
* This method is not thread safe
|
||||
* @param addr The remote ip
|
||||
* @param port The remote port
|
||||
* @param localip Local ip to bind
|
||||
* @param stat Current connect status
|
||||
* @param srvs List to copy stream SRV records
|
||||
*/
|
||||
inline void connectAddr(String& addr, int& port, String& localip) const {
|
||||
addr = m_connectAddr;
|
||||
port = m_connectPort;
|
||||
localip = m_localIp;
|
||||
}
|
||||
void connectAddr(String& addr, int& port, String& localip, int& stat,
|
||||
ObjList& srvs) const;
|
||||
|
||||
/**
|
||||
* Set/reset RosterRequested flag
|
||||
|
@ -853,6 +852,16 @@ public:
|
|||
*/
|
||||
virtual void connectTerminated(Socket*& sock);
|
||||
|
||||
/**
|
||||
* Connecting notification. Start connect timer for synchronous connect
|
||||
* This method is thread safe
|
||||
* @param sync True if the connection is synchronous
|
||||
* @param stat Current status of the connect thread
|
||||
* @param srvs Current list of SRV records in the connect thread
|
||||
* @return True if accepted
|
||||
*/
|
||||
virtual bool connecting(bool sync, int stat, ObjList& srvs);
|
||||
|
||||
/**
|
||||
* Get an object from this stream
|
||||
* @param name The name of the object to get
|
||||
|
@ -1232,6 +1241,8 @@ private:
|
|||
// Compress data to be sent (the pending stream xml buffer or pending stanza)
|
||||
// Return false on failure
|
||||
bool compress(XmlElementOut* xml = 0);
|
||||
// Reset connect status data
|
||||
void resetConnectStatus();
|
||||
|
||||
enum {
|
||||
SocketCanRead = 0x01,
|
||||
|
@ -1285,6 +1296,8 @@ private:
|
|||
int m_connectPort; // Remote port to connect to
|
||||
String m_localIp; // Local ip to bind when connecting
|
||||
Compressor* m_compress;
|
||||
int m_connectStatus; // Current connect stream status
|
||||
ObjList m_connectSrvs; // Current connect stream SRV records
|
||||
};
|
||||
|
||||
|
||||
|
@ -1737,6 +1750,13 @@ class YJABBER_API JBConnect : public GenObject
|
|||
{
|
||||
YCLASS(JBConnect,GenObject)
|
||||
public:
|
||||
enum Status {
|
||||
Start = 0,
|
||||
Address, // Use configured address
|
||||
Srv, // Use SRV records
|
||||
Domain // Use stream remote domain
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor. Add itself to the stream's engine
|
||||
* @param stream The stream to connect
|
||||
|
@ -1759,6 +1779,11 @@ public:
|
|||
*/
|
||||
virtual const String& toString() const;
|
||||
|
||||
/**
|
||||
* Status name dictionary
|
||||
*/
|
||||
static const TokenDict s_statusName[];
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Connect the socket.
|
||||
|
@ -1775,11 +1800,19 @@ private:
|
|||
{}
|
||||
// Check if exiting. Release socket if exiting
|
||||
bool exiting(Socket*& sock);
|
||||
// Connect a socket
|
||||
bool connect(Socket*& sock, const char* addr, int port);
|
||||
// Create and try to connect a socket. Return it on success
|
||||
// Set stop on fatal failure and return 0
|
||||
Socket* connect(const char* addr, int port, bool& stop);
|
||||
// Notify termination, remove from engine
|
||||
void terminated(Socket* sock, bool final);
|
||||
// Notify connecting to the stream. Return false if stream vanished
|
||||
bool notifyConnecting(bool sync, bool useCurrentStat = false);
|
||||
// Delete a socket and zero the pointer
|
||||
void deleteSocket(Socket*& sock);
|
||||
// Advance connect status
|
||||
void advanceStatus();
|
||||
|
||||
int m_status; // Current status
|
||||
String m_domain; // Remote domain
|
||||
String m_address; // Remote ip address
|
||||
int m_port; // Port to connect to
|
||||
|
@ -1787,6 +1820,7 @@ private:
|
|||
String m_stream; // Stream name
|
||||
JBStream::Type m_streamType; // Stream type
|
||||
String m_localIp; // Local ip to bind when connecting
|
||||
ObjList m_srvs; // SRV records list
|
||||
};
|
||||
|
||||
|
||||
|
@ -2091,6 +2125,7 @@ protected:
|
|||
unsigned int m_setupTimeout; // Overall stream setup timeout
|
||||
unsigned int m_startTimeout; // Wait stream start period
|
||||
unsigned int m_connectTimeout; // Outgoing: socket connect timeout
|
||||
unsigned int m_srvTimeout; // SRV query timeout
|
||||
unsigned int m_pingInterval; // Stream idle interval (no data received)
|
||||
unsigned int m_pingTimeout; // Sent ping timeout
|
||||
unsigned int m_idleTimeout; // Stream idle timeout (nothing sent or received)
|
||||
|
@ -2103,6 +2138,8 @@ protected:
|
|||
private:
|
||||
// Add/remove a connect stream thread when started/stopped
|
||||
void connectStatus(JBConnect* conn, bool started);
|
||||
// Stop a connect stream
|
||||
void stopConnect(const String& name);
|
||||
|
||||
ObjList m_connect; // Connecting streams
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue