Implemented ping on client to server streams.

git-svn-id: http://voip.null.ro/svn/yate@5252 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2012-09-05 13:50:08 +00:00
parent 4682d3a3da
commit 18c9bdba66
6 changed files with 194 additions and 14 deletions

View File

@ -55,6 +55,22 @@
; A value outside min/max interval will be adjusted to nearest interval margin
;stream_redirectcount=2
; stream_pinginterval: integer: The interval in milliseconds to ping the server
; Ping is always re-scheduled when receiving data from the server
; This parameter is applied on reload
; Minimum allowed value is 60000, maximum allowed value is 3600000
; Defaults to 600000
; Set it to 0 to disable ping
;stream_pinginterval=600000
; stream_pingtimeout: integer: Sent ping timeout in milliseconds
; When a ping times out the stream is terminated
; This parameter is applied on reload
; Minimum allowed value is 10000, maximum allowed value is 60000
; Defaults to 30000
; Set it to 0 to disable ping
;stream_pingtimeout=30000
; entitycaps: boolean: Enable entity capabilities cache.
; If enabled entity capabilities will be requested and cached each time a presence
; stanza is received

View File

@ -31,7 +31,7 @@ using namespace TelEngine;
static unsigned int fixValue(const NamedList& p, const char* param,
unsigned int defVal, unsigned int min, unsigned int max, bool zero = false)
{
unsigned int val = p.getIntValue(param);
unsigned int val = p.getIntValue(param,defVal);
if (!val) {
if (!zero)
val = defVal;
@ -102,9 +102,9 @@ static const String s_googleMailNode = "http://mail.google.com/xmpp/client/caps"
#define JB_SRV_INTERVAL_MIN 10000
#define JB_SRV_INTERVAL_MAX 120000
// Ping
#define JB_PING_INTERVAL 120000
#define JB_PING_INTERVAL 600000
#define JB_PING_INTERVAL_MIN 60000
#define JB_PING_INTERVAL_MAX 600000
#define JB_PING_INTERVAL_MAX 3600000
#define JB_PING_TIMEOUT 30000
#define JB_PING_TIMEOUT_MIN 10000
#define JB_PING_TIMEOUT_MAX JB_PING_INTERVAL_MIN
@ -972,9 +972,11 @@ void JBEngine::initialize(const NamedList& params)
m_srvTimeout = fixValue(params,"stream_srvtimeout",
JB_SRV_INTERVAL,JB_SRV_INTERVAL_MIN,JB_SRV_INTERVAL_MAX);
m_pingInterval = fixValue(params,"stream_pinginterval",
JB_PING_INTERVAL,JB_PING_INTERVAL_MIN,JB_PING_INTERVAL_MAX);
client ? JB_PING_INTERVAL : 0,JB_PING_INTERVAL_MIN,JB_PING_INTERVAL_MAX,true);
m_pingTimeout = fixValue(params,"stream_pingtimeout",
JB_PING_TIMEOUT,JB_PING_TIMEOUT_MIN,JB_PING_TIMEOUT_MAX);
client ? JB_PING_TIMEOUT : 0,JB_PING_TIMEOUT_MIN,JB_PING_TIMEOUT_MAX,true);
if (!(m_pingInterval && m_pingTimeout))
m_pingInterval = m_pingTimeout = 0;
m_idleTimeout = fixValue(params,"stream_idletimeout",
JB_IDLE_INTERVAL,JB_IDLE_INTERVAL_MIN,JB_IDLE_INTERVAL_MAX);
int defVal = JB_REDIRECT_COUNT;

View File

@ -76,6 +76,26 @@ static bool decodeBase64(String& buf, const String& str, JBStream* stream)
return false;
}
#ifdef DEBUG
static bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
{
if (!(stream && xml && pingId))
return false;
if (pingId != xml->getAttribute(YSTRING("id")))
return false;
const char* it = xml->attribute(YSTRING("type"));
XMPPUtils::IqType iqType = XMPPUtils::iqType(it);
bool ok = (iqType == XMPPUtils::IqResult || iqType == XMPPUtils::IqError);
if (ok)
Debug(stream,DebugAll,"Ping with id=%s confirmed by '%s' [%p]",pingId.c_str(),it,stream);
return ok;
}
#else
static inline bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
{
return false;
}
#endif
static const TokenDict s_location[] = {
{"internal", 0},
@ -149,7 +169,7 @@ JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
m_sasl(0),
m_state(Idle), m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0),
m_setupTimeout(0), m_startTimeout(0),
m_pingTimeout(0), m_nextPing(0),
m_pingTimeout(0), m_pingInterval(0), m_nextPing(0),
m_idleTimeout(0), m_connectTimeout(0),
m_restart(0), m_timeToFillRestart(0),
m_engine(engine), m_type(t),
@ -178,7 +198,7 @@ JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const Jabber
: Mutex(true,"JBStream"),
m_sasl(0),
m_state(Idle), m_local(local), m_remote(remote), m_serverHost(serverHost),
m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0),
m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0), m_stanzaIndex(0),
m_setupTimeout(0), m_startTimeout(0),
m_pingTimeout(0), m_nextPing(0),
m_idleTimeout(0), m_connectTimeout(0),
@ -1030,6 +1050,9 @@ void JBStream::process(u_int64_t time)
switch (m_state) {
case Running:
processRunning(xml,from,to);
// Reset ping
setNextPing(true);
m_pingId = "";
break;
case Features:
if (m_incoming)
@ -1087,6 +1110,7 @@ bool JBStream::processRunning(XmlElement* xml, const JabberID& from, const Jabbe
case XmlTag::Iq:
if (ns != m_xmlns)
break;
checkPing(this,xml,m_pingId);
m_events.append(new JBEvent(JBEvent::Iq,this,xml,from,to,xml->findFirstChild()));
return true;
default:
@ -1112,17 +1136,29 @@ void JBStream::checkTimeouts(u_int64_t time)
}
// Running: check ping and idle timers
if (m_state == Running) {
const char* reason = 0;
if (m_pingTimeout) {
if (m_pingTimeout < time)
terminate(0,false,0,XMPPError::ConnTimeout,"Ping timeout");
if (m_pingTimeout < time) {
Debug(this,DebugNote,"Ping stanza with id '%s' timed out [%p]",m_pingId.c_str(),this);
reason = "Ping timeout";
}
}
else if (m_nextPing && time >= m_nextPing) {
m_pingId = (unsigned int)time;
// TODO: Send it
Debug(this,DebugStub,"JBStream::checkTimeouts() sendPing() not implemented");
XmlElement* ping = setNextPing(false);
if (ping) {
DDebug(this,DebugAll,"Sending ping with id=%s [%p]",m_pingId.c_str(),this);
if (!sendStanza(ping))
m_pingId = "";
}
else {
resetPing();
m_pingId = "";
}
}
else if (m_idleTimeout && m_idleTimeout < time)
terminate(0,true,0,XMPPError::ConnTimeout,"Stream idle");
if (m_idleTimeout && m_idleTimeout < time && !reason)
reason = "Stream idle";
if (reason)
terminate(0,m_incoming,0,XMPPError::ConnTimeout,reason);
return;
}
// Stream setup timer
@ -1209,6 +1245,12 @@ void JBStream::resetConnection(Socket* sock)
}
}
// Build a ping iq stanza
XmlElement* JBStream::buildPing(const String& stanzaId)
{
return 0;
}
// Build a stream start XML element
XmlElement* JBStream::buildStreamStart()
{
@ -1611,6 +1653,10 @@ void JBStream::changeState(State newState, u_int64_t time)
stateName(),lookup(newState,s_stateName),this);
// Set/reset state depending data
switch (m_state) {
case Running:
resetPing();
m_pingId = "";
break;
case WaitStart:
// Reset connect status if not timeout
if (m_startTimeout && m_startTimeout > time)
@ -1663,6 +1709,8 @@ void JBStream::changeState(State newState, u_int64_t time)
resetConnectStatus();
setRedirect();
m_redirectCount = 0;
m_pingInterval = m_engine->m_pingInterval;
setNextPing(true);
setFlags(StreamSecured | StreamAuthenticated);
resetFlags(InError);
m_setupTimeout = 0;
@ -1977,6 +2025,57 @@ void JBStream::setIdleTimer(u_int64_t msecNow)
XDebug(this,DebugAll,"Idle timeout set to " FMT64 "ms [%p]",m_idleTimeout,this);
}
// Reset ping data
void JBStream::resetPing()
{
if (!(m_pingTimeout || m_nextPing))
return;
XDebug(this,DebugAll,"Reset ping data [%p]",this);
m_nextPing = 0;
m_pingTimeout = 0;
}
// Set the time of the next ping if there is any timeout and we don't have a ping in progress
// @return XmlElement containing the ping to send, 0 if no ping is going to be sent or 'force' is true
XmlElement* JBStream::setNextPing(bool force)
{
if (!m_pingInterval) {
resetPing();
return 0;
}
if (m_type != c2s && m_type != comp)
return 0;
if (force) {
m_nextPing = Time::msecNow() + m_pingInterval;
m_pingTimeout = 0;
XDebug(this,DebugAll,"Next ping " FMT64U " [%p]",m_nextPing,this);
return 0;
}
XmlElement* ping = 0;
if (m_nextPing) {
// Ping still active in engine ?
Time time;
if (m_nextPing > time.msec())
return 0;
if (m_engine->m_pingTimeout) {
generateIdIndex(m_pingId,"_ping_");
ping = buildPing(m_pingId);
if (ping)
m_pingTimeout = time.msec() + m_engine->m_pingTimeout;
else
m_pingTimeout = 0;
}
else
resetPing();
}
if (m_pingInterval)
m_nextPing = Time::msecNow() + m_pingInterval;
else
m_nextPing = 0;
XDebug(this,DebugAll,"Next ping " FMT64U " ping=%p [%p]",m_nextPing,ping,this);
return ping;
}
// Process incoming elements in Challenge state
// Return false if stream termination was initiated
bool JBStream::processChallenge(XmlElement* xml, const JabberID& from, const JabberID& to)
@ -2573,6 +2672,12 @@ JBClientStream::JBClientStream(JBEngine* engine, const JabberID& jid, const Stri
m_password = params.getValue("password");
}
// Build a ping iq stanza
XmlElement* JBClientStream::buildPing(const String& stanzaId)
{
return XMPPUtils::createPing(stanzaId);
}
// Bind a resource to an incoming stream
void JBClientStream::bind(const String& resource, const char* id, XMPPError::Type error)
{
@ -3457,6 +3562,7 @@ bool JBClusterStream::processRunning(XmlElement* xml, const JabberID& from, cons
XmlElement* child = 0;
switch (t) {
case XmlTag::Iq:
checkPing(this,xml,m_pingId);
evType = JBEvent::Iq;
child = xml->findFirstChild();
break;

View File

@ -775,6 +775,14 @@ XmlElement* XMPPUtils::createIqError(const char* from, const char* to, XmlElemen
return iq;
}
// Create an 'iq' element with a ping child
XmlElement* XMPPUtils::createPing(const char* id, const char* from, const char* to)
{
XmlElement* iq = XMPPUtils::createIq(XMPPUtils::IqGet,from,to,id);
iq->addChild(XMPPUtils::createElement(XmlTag::Ping,XMPPNamespace::Ping));
return iq;
}
// Create an 'iq' element of type 'get' with a 'vcard' child
XmlElement* XMPPUtils::createVCard(bool get, const char* from, const char* to, const char* id)
{

View File

@ -1219,6 +1219,16 @@ public:
static XmlElement* createIqError(const char* from, const char* to, XmlElement*& xml,
int type, int error, const char* text = 0);
/**
* Create an 'iq' element with a ping child
* @param id The 'id' attribute
* @param from The 'from' attribute
* @param to The 'to' attribute
* @return A valid XmlElement pointer
*/
static XmlElement* createPing(const char* id = 0, const char* from = 0,
const char* to = 0);
/**
* Create an 'iq' element with a 'vcard' child
* @param get True to set the iq's type to 'get', false to set it to 'set'

View File

@ -1016,6 +1016,13 @@ protected:
*/
virtual void resetConnection(Socket* sock = 0);
/**
* Build a ping iq stanza
* @param stanzaId Stanza id
* @return 0
*/
virtual XmlElement* buildPing(const String& stanzaId);
/**
* Build a stream start XML element
* @return XmlElement pointer
@ -1202,6 +1209,28 @@ protected:
*/
void setIdleTimer(u_int64_t msecNow = Time::msecNow());
/**
* Reset ping data
*/
void resetPing();
/**
* Set the time of the next ping if there is any timeout and we don't have a ping in progress.
* Set the ping timeout if an element is returned
* @param force True to set it even if already set
* @return XmlElement containing the ping to send, 0 if no ping is going to be sent or 'force' is true
*/
XmlElement* setNextPing(bool force);
/**
* Generate a stanza index from stream id and current stanza index
* Set the ping timeout if an element is returned
* @param buf Destination string
* @param extra Optional extra string
*/
inline void generateIdIndex(String& buf, const char* extra = 0)
{ buf = id() + extra + String(++m_stanzaIndex); }
State m_state; // Stream state
String m_id; // Stream id
JabberID m_local; // Local peer's jid
@ -1213,10 +1242,12 @@ protected:
JBEvent* m_lastEvent; // Last event generated by this stream
ObjList m_events; // Queued events
ObjList m_pending; // Pending outgoing elements
unsigned int m_stanzaIndex; // Index used to generate IDs for stanzas
// Timers
u_int64_t m_setupTimeout; // Overall stream setup timeout
u_int64_t m_startTimeout; // Incoming: wait stream start period
u_int64_t m_pingTimeout; // Sent ping timeout
u_int64_t m_pingInterval; // Ping interval
u_int64_t m_nextPing; // Next ping
u_int64_t m_idleTimeout; // Stream idle timeout
u_int64_t m_connectTimeout; // Stream connect timeout
@ -1417,6 +1448,13 @@ public:
virtual JBClientStream* clientStream()
{ return this; }
/**
* Build a ping iq stanza
* @param stanzaId Stanza id
* @return Valid XmlElement pointer
*/
virtual XmlElement* buildPing(const String& stanzaId);
/**
* Bind a resource to an incoming stream. This method should be called
* after processing a Bind event