From 2252918f8a3b97dd329b8d0fa08b34539ab04515 Mon Sep 17 00:00:00 2001 From: marian Date: Fri, 17 Nov 2006 14:12:04 +0000 Subject: [PATCH] Changed presence implementation. Added resource notify/subscribe messages. git-svn-id: http://voip.null.ro/svn/yate@1113 acf43c95-373e-0410-b603-e72c3f656dc1 --- modules/yjinglechan.cpp | 1523 +++++++++++++-------------------------- 1 file changed, 490 insertions(+), 1033 deletions(-) diff --git a/modules/yjinglechan.cpp b/modules/yjinglechan.cpp index 6d13cd05..3f3b6b3e 100644 --- a/modules/yjinglechan.cpp +++ b/modules/yjinglechan.cpp @@ -41,13 +41,11 @@ namespace { // anonymous class YJBEngine; // Jabber engine class YJBPresence; // Jabber presence engine class YJGEngine; // Jingle engine -class YUserPresence; // A local user's roster class YJGTransport; // Handle the transport for a connection class YJGConnection; // Jingle channel class YJGLibThread; // Library thread class ResNotifyHandler; // resource.notify handler class ResSubscribeHandler; // resource.subscribe handler -class ResUnsubscribeHandler; // resource.unsubscribe handler class YJGDriver; // The driver // Yate Payloads @@ -71,45 +69,32 @@ static TokenDict dict_payloads[] = { { 0, 0 }, }; -#define JINGLE_RESOURCE "Talk" // Default resource for local party -#define JINGLE_VOICE "voice-v1" // Voice capability for Google Talk -#define JINGLE_VERSION "1.0" // Version capability - -#define JINGLE_AUTHSTRINGLEN 16 // Username/Password length for transport - -#define JINGLE_CONN_TIMEOUT 10000 // Timeout value to override "maxcall" in call.execute - -#define JINGLE_ANONYMOUS_CALLER "anonymous" - -static const String MODULE_NAME("jingle"); -#define MODULE_PROTOCOL "xmpp" - -#define MODULE_MSG_NOTIFY "resource.notify" -#define MODULE_MSG_SUBSCRIBE "resource.subscribe" -#define MODULE_MSG_UNSUBSCRIBE "resource.unsubscribe" - -/* -resource.notify - module The sending module's name - protocol MODULE_PROTOCOL - subscription none/to/from/both - presence unknown/available/unavailable - from_user node@domain - from_resource resource - from_uri node@domain/resource - to_user node@domain - to_resource resource - to_uri node@domain/resource - -resource.subscribe -resource.unsubscribe - module The sending module's name - protocol MODULE_PROTOCOL - subscription none/to/from/both - presence unknown/available/unavailable (OPTIONAL) - from_user node@domain - to_user node@domain +// Default resource name for local party +#define JINGLE_RESOURCE "Talk" +// Username/Password length for transport +#define JINGLE_AUTHSTRINGLEN 16 +// Timeout value to override "maxcall" in call.execute +#define JINGLE_CONN_TIMEOUT 10000 +// Default caller if none for outgoing calls +#define JINGLE_ANONYMOUS_CALLER "unknown_caller" +// Messages +/* MODULE_MSG_NOTIFY + protocol MODULE_NAME + subscription true/false + status online/offline/subscribed/unsubscribed or any other string + from node@domain + to node@domain */ +#define MODULE_MSG_NOTIFY "resource.notify" +/* MODULE_MSG_NOTIFY + protocol MODULE_NAME + operation probe/subscribe/unsubscribe + from node@domain + to node@domain +*/ +#define MODULE_MSG_SUBSCRIBE "resource.subscribe" +// Module name +static const String MODULE_NAME("jingle"); /** * YJBEngine @@ -121,9 +106,15 @@ public: virtual ~YJBEngine() {} // Overloaded methods virtual bool connect(JBComponentStream* stream); + virtual bool exiting() + { return Engine::exiting(); } // Start thread members // @param read Reading socket thread count. void startThreads(u_int16_t read); + // Process a message event. + // @param event The event to process. Always a valid message event. + // @return True if the event was processed (kept). False to destroy it. + virtual bool processMessage(JBEvent* event); }; /** @@ -135,51 +126,25 @@ class YJBPresence : public JBPresence public: YJBPresence(JBEngine* engine, const NamedList& params); virtual ~YJBPresence(); - inline int autoSubscribe() const - { return m_autoSubscribe; } - inline bool delUnavailable() const - { return m_delUnavailable; } - void initialize(const NamedList& params); // Overloaded methods - virtual void processDisco(JBEvent* event); - virtual void processError(JBEvent* event); - virtual void processProbe(JBEvent* event); - virtual void processSubscribe(JBEvent* event); - virtual void processSubscribed(JBEvent* event); - virtual void processUnsubscribe(JBEvent* event); - virtual void processUnsubscribed(JBEvent* event); - virtual void processUnavailable(JBEvent* event); - virtual void processUnknown(JBEvent* event); + virtual bool notifyProbe(JBEvent* event, const JabberID& local, + const JabberID& remote); + virtual bool notifySubscribe(JBEvent* event, const JabberID& local, + const JabberID& remote, Presence presence); + virtual void notifySubscribe(XMPPUser* user, Presence presence); + virtual bool notifyPresence(JBEvent* event, const JabberID& local, + const JabberID& remote, bool available); + virtual void notifyPresence(XMPPUser* user, JIDResource* resource); + virtual void notifyNewUser(XMPPUser* user); // Start thread members // @param process Event processor thread count. - void startThreads(u_int16_t process); - // Find a user pair by JIDs (is remote has a resource the ). If none, create one - // @param newPresence Set to true on exit if a new element was created - // @param audio True to request an audio resource - // @return The presence state - bool get(const JabberID& local, JabberID& remote, bool& newPresence, bool audio = true); - // Enqueue message in the engine - void notify(const YUserPresence* yup, const char* error = 0); - void subscribe(const YUserPresence* yup, JBPresence::Presence type); - inline void cleanup() { - Lock lock(m_mutexUserpair); - m_userpair.clear(); - } - + // @param timeout Check user timeout. + void startThreads(u_int16_t process, u_int16_t timeout); protected: - void processBroadcast(JBEvent* event, bool available); - void processDirected(JBEvent* event, bool available); - void processSubscribe(JBEvent* event, JBPresence::Presence type); - int getSubscription(const JabberID& local, const JabberID& remote); - // Add/remove a user pair to the list. Used by YUserPresence constructor - void addPresence(YUserPresence* yup); - void removePresence(YUserPresence* yup); - -private: - ObjList m_userpair; // User pair list - Mutex m_mutexUserpair; // m_userpair lock - int m_autoSubscribe; // Auto subscribe state - bool m_delUnavailable; // Delete unavailable user or resource + // Create & enqueue a message from received presence parameter. + // Add status/operation/subscription parameters + bool message(Presence presence, const char* from, const char* to, + const char* subscription); }; /** @@ -204,91 +169,6 @@ private: bool m_requestSubscribe; }; -/** - * YUserPresence - */ -class YUserPresence : public RefObject, public Mutex -{ -public: - enum State { - Unknown, - Available, - Unavailable, - }; - enum Subscription { - SubNone = 0, - SubTo = 1, - SubFrom = 2, - SubBoth = 3, - }; - - YUserPresence(YJBPresence* engine, const char* local, const char* remote, - Subscription subscription, State state); - virtual ~YUserPresence(); - inline const JabberID& local() const - { return m_local; } - inline const JabberID& remote() const - { return m_remote; } - inline State localState() const - { return m_localState; } - inline State remoteState() const - { return m_remoteState; } - inline bool available() const - { return m_remoteState == Available; } - inline int subscription() const - { return m_subscription; } - inline bool subscribedTo() const - { return (m_subscription & SubTo); } - inline bool subscribedFrom() const - { return (m_subscription & SubFrom); } - inline bool audio() const - { return m_audio; } - // Send a presence element to the remote peer - // The caps parameter is used only for type None to send capabilities - bool send(JBPresence::Presence type = JBPresence::None, bool caps = true, - JBComponentStream* stream = 0); - // Request info - bool sendInfoRequest(bool info = true, JBComponentStream* stream = 0); - // Send query info/items - bool sendInfo(const char* id, JBComponentStream* stream = 0); - bool sendItems(const char* id, JBComponentStream* stream = 0); - // Process presence - void processError(JBEvent* event); - void processProbe(JBEvent* event); - void processUnavailable(JBEvent* event); - void processUnknown(JBEvent* event); - void updateSubscription(bool from, bool value); - static inline const char* stateText(int value) - { return lookup(value,s_state); } - static inline int stateType(const char* value) - { return lookup(value,s_state,Unknown); } - static inline const char* subscribeText(int value) - { return lookup(value,s_subscription); } - static inline int subscribeType(const char* value) - { return lookup(value,s_subscription,SubNone); } -protected: - void updateResource(XMLElement* element); - void updateState(bool available); - bool getStream(JBComponentStream*& stream, bool& release); - inline bool sendStanza(JBComponentStream* stream, XMLElement* xml) { - JBComponentStream::Error res = stream->sendStanza(xml); - if (res == JBComponentStream::ErrorContext || - res == JBComponentStream::ErrorNoSocket) - return false; - return true; - } - static TokenDict s_subscription[]; - static TokenDict s_state[]; -private: - JabberID m_local; // Local peer's JID - JabberID m_remote; // Remote peer's JID - State m_localState; // Remote peer's availability - State m_remoteState; // Remote peer's availability - int m_subscription; // Subscription state - bool m_audio; // Resource has audio capabilities - YJBPresence* m_engine; // The presence engine -}; - /** * YJGTransport */ @@ -356,6 +236,7 @@ public: virtual bool msgAnswered(Message& msg); virtual bool msgUpdate(Message& msg); virtual bool msgText(Message& msg, const char* text); + virtual bool msgTone(Message& msg, const char* tone); bool route(); void handleEvent(JGEvent* event); void hangup(bool reject, const char* reason = 0); @@ -375,6 +256,7 @@ public: protected: void handleJingle(JGEvent* event); + void handleError(JGEvent* event); void handleTransport(JGEvent* event); bool call(); private: @@ -406,6 +288,7 @@ public: JGReader, // m_jg->runReceive() JGProcess, // m_jg->runProcess() JBPresence, // m_presence->runProcess() + JBPresenceTimeout, // m_presence->runCheckTimeout() }; YJGLibThread(Action action, const char* name = 0, Priority prio = Normal); @@ -424,7 +307,9 @@ class ResNotifyHandler : public MessageHandler { public: ResNotifyHandler() : MessageHandler(MODULE_MSG_NOTIFY) {} - virtual bool received(Message &msg); + virtual bool received(Message& msg); + static void process(const JabberID& from, const JabberID& to, + const String& status, bool subFrom); }; /** @@ -434,17 +319,9 @@ class ResSubscribeHandler : public MessageHandler { public: ResSubscribeHandler() : MessageHandler(MODULE_MSG_SUBSCRIBE) {} - virtual bool received(Message &msg); -}; - -/** - * resource.unsubscribe message handler - */ -class ResUnsubscribeHandler : public MessageHandler -{ -public: - ResUnsubscribeHandler() : MessageHandler(MODULE_MSG_UNSUBSCRIBE) {} - virtual bool received(Message &msg); + virtual bool received(Message& msg); + static void process(const JabberID& from, const JabberID& to, + JBPresence::Presence presence); }; /** @@ -462,10 +339,20 @@ public: { return m_pendingTimeout; } inline const String& anonymousCaller() const { return m_anonymousCaller; } + // Split 'src' into parts separated by 'sep'. + // Puts the parts as names in dest if nameFirst is true + // Puts the parts as values in dest if nameFirst is false bool getParts(NamedList& dest, const char* src, const char sep, bool nameFirst); + // Assign param value to jid. + // @param checkDomain True to check if jid's domain is valid + // Return false if node or domain are 0 or domain is invalid + bool decodeJid(JabberID& jid, Message& msg, const char* param, + bool checkDomain = false); + // Create a random string of JINGLE_AUTHSTRINGLEN length void createAuthRandomString(String& dest); + // Process presence. Notify connections void processPresence(const JabberID& local, const JabberID& remote, - bool available, bool audio, const char* error); + bool available, bool audio); // Create a media string from a list void createMediaString(String& dest, ObjList& formats, char sep); @@ -513,527 +400,167 @@ void YJBEngine::startThreads(u_int16_t read) (new YJGLibThread(YJGLibThread::JBReader,"JBReader thread"))->startup(); } +bool YJBEngine::processMessage(JBEvent* event) +{ + const char* text = 0; + if (event->element()) { + XMLElement* body = event->element()->findFirstChild(XMLElement::Body); + if (body) + text = body->getText(); + } + DDebug(this,DebugInfo, + "Message '%s' from: '%s' to: '%s'.", + text,event->from().c_str(),event->to().c_str()); + return false; +} + /** * YJBPresence */ YJBPresence::YJBPresence(JBEngine* engine, const NamedList& params) - : JBPresence(engine), - m_mutexUserpair(true) + : JBPresence(engine,params) { - initialize(params); } YJBPresence::~YJBPresence() { - cleanup(); } -void YJBPresence::initialize(const NamedList& params) +bool YJBPresence::notifyProbe(JBEvent* event, const JabberID& local, + const JabberID& remote) { - m_autoSubscribe = YUserPresence::subscribeType(params.getValue("auto_subscribe")); - m_delUnavailable = params.getBoolValue("delete_unavailable",true); - if (debugAt(DebugAll)) { - String s; - s << "\r\nauto_subscribe=" << YUserPresence::subscribeText(m_autoSubscribe); - s << "\r\ndelete_unavailable=" << String::boolText(m_delUnavailable); - Debug(this,DebugAll,"Initialized:%s",s.c_str()); - } + XDebug(this,DebugAll, + "notifyProbe. Local: '%s'. Remote: '%s'.",local.c_str(),remote.c_str()); + // Enqueue message + return message(JBPresence::Probe,remote.bare(),local.bare(),0); } -void YJBPresence::processDisco(JBEvent* event) +bool YJBPresence::notifySubscribe(JBEvent* event, const JabberID& local, + const JabberID& remote, Presence presence) { - if (!event) - return; - if (event->type() == JBEvent::IqDiscoRes) - return; - // Set/Get info or items - if (!event->child()) - return; - XMPPNamespace::Type ns = XMPPNamespace::type(event->child()->getAttribute("xmlns")); - bool info = (ns == XMPPNamespace::DiscoInfo); - JabberID local(event->to()); - JabberID remote(event->from()); - Lock lock(m_mutexUserpair); - bool found = false; - // Try to match the local user. Remote user is searched by full jid - ObjList* obj = m_userpair.skipNull(); - for(; obj; obj = obj->skipNext()) { - YUserPresence* yup = static_cast(obj->get()); - if (!(yup->local().match(local) && remote == yup->remote())) - continue; - found = true; - if (info) - yup->sendInfo(event->id(),event->stream()); - else - yup->sendItems(event->id(),event->stream()); - } - if (found) - return; - // No local user: add one if it's in our domain - String identity; - if (!m_engine->getFullServerIdentity(identity) || identity != local.domain()) - return; - DDebug(this,DebugInfo,"Adding new user presence on info request."); - // Add just the bare jids: we know nothing about remote user's capabilities - YUserPresence* yup = new YUserPresence(this,local.bare(),remote.bare(), - (YUserPresence::Subscription)getSubscription(local,remote),YUserPresence::Unknown); - if (info) - yup->sendInfo(event->id(),event->stream()); - else - yup->sendItems(event->id(),event->stream()); + XDebug(this,DebugAll, + "notifySubscribe(%s). Local: '%s'. Remote: '%s'.", + presenceText(presence),local.c_str(),remote.c_str()); + // Enqueue message + return message(presence,remote.bare(),local.bare(),0); } -void YJBPresence::processError(JBEvent* event) +void YJBPresence::notifySubscribe(XMPPUser* user, Presence presence) { - if (!event) - return; - XDebug(this,DebugAll,"processError. Event: (%p).",event); - JabberID local(event->to()); - JabberID remote(event->from()); - Lock lock(m_mutexUserpair); - ObjList* obj = m_userpair.skipNull(); - for(; obj; obj = obj->skipNext()) { - YUserPresence* yup = static_cast(obj->get()); - if (local == yup->local() && remote == yup->remote()) - yup->processError(event); - } -} - -void YJBPresence::processProbe(JBEvent* event) -{ - if (!event) + if (!user) return; XDebug(this,DebugAll, - "processProbe. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - JabberID local(event->to()); - if (!local.node()) { - Debug(this,DebugNote,"processProbe. Received probe without user."); - return; - } - JabberID remote(event->from()); - Lock lock(m_mutexUserpair); - bool found = false; - // Try to match the local user. Check the remote user's full jid - ObjList* obj = m_userpair.skipNull(); - for(; obj; obj = obj->skipNext()) { - YUserPresence* yup = static_cast(obj->get()); - if (!(yup->local().match(local) && yup->remote().match(remote))) - continue; - found = true; - XDebug(this,DebugAll,"processProbe. Sending presence from existing %p.",yup); - yup->send(); - } - if (found) - return; - // Remote has a resource: check if we have a pair whose remote user has no resource - if (remote.resource()) { - obj = m_userpair.skipNull(); - for(; obj; obj = obj->skipNext()) { - YUserPresence* yup = static_cast(obj->get()); - if (!(!yup->remote().resource() && - yup->local().match(local) && - remote.bare() == yup->remote().bare())) - continue; - found = true; - XDebug(this,DebugAll,"processProbe. Sending presence from existing %p.",yup); - yup->send(); - break; - } - } - if (found) - return; - // No local user: add one if it's in our domain - String identity; - if (!m_engine->getFullServerIdentity(identity) || identity != local.domain()) { - Debug(this,DebugMild, - "processProbe. Received probe for non-local domain: %s",local.c_str()); - return; - } - DDebug(this,DebugInfo,"Adding new local user on probe request."); - // Add just the bare jids: we know nothing about remote user's capabilities - new YUserPresence(this,local.bare(),remote.bare(), - (YUserPresence::Subscription)getSubscription(local,remote),YUserPresence::Unknown); + "notifySubscribe(%s). User: (%p).",presenceText(presence),user); + // Enqueue message + message(presence,user->jid().bare(),user->local()->jid().bare(),0); } -void YJBPresence::processSubscribe(JBEvent* event) +bool YJBPresence::notifyPresence(JBEvent* event, const JabberID& local, + const JabberID& remote, bool available) { - if (!event) - return; XDebug(this,DebugAll, - "processSubscribe. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - processSubscribe(event,JBPresence::Subscribe); + "notifyPresence(%s). Local: '%s'. Remote: '%s'.", + available?"available":"unavailable",local.c_str(),remote.c_str()); + // Notify plugin only if unavailable + // Since it's a new presence, connections are interested only if unavailable + if (!available) + iplugin.processPresence(local,remote,false,false); + // Enqueue message + return message(available ? JBPresence::None : JBPresence::Unavailable, + remote.bare(),local.bare(),0); } -void YJBPresence::processSubscribed(JBEvent* event) +void YJBPresence::notifyPresence(XMPPUser* user, JIDResource* resource) { - if (!event) + if (!(user && resource)) return; - XDebug(this,DebugAll, - "processSubscribed. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - processSubscribe(event,JBPresence::Subscribed); + XDebug(this,DebugAll,"notifyPresence. User: (%p). Resource: (%p).", + user,resource); + // Notify plugin + JabberID remote(user->jid().node(),user->jid().domain(),resource->name()); + iplugin.processPresence(user->local()->jid(),remote,resource->available(), + resource->hasCap(JIDResource::CapAudio)); + // Enqueue message + message(resource->available() ? JBPresence::None : JBPresence::Unavailable, + user->jid().bare(),user->local()->jid().bare(), + String::boolText(user->subscribedTo())); } -void YJBPresence::processUnsubscribe(JBEvent* event) +void YJBPresence::notifyNewUser(XMPPUser* user) { - if (!event) + if (!user) return; - XDebug(this,DebugAll, - "processUnsubscribe. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - processSubscribe(event,JBPresence::Unsubscribe); + DDebug(this,DebugInfo,"Added new user '%s' for '%s'.", + user->jid().c_str(),user->local()->jid().c_str()); + // Add local resource + user->addLocalRes(new JIDResource(JINGLE_RESOURCE,JIDResource::Available, + JIDResource::CapAudio)); } -void YJBPresence::processUnsubscribed(JBEvent* event) -{ - if (!event) - return; - XDebug(this,DebugAll, - "processUnsubscribed. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - processSubscribe(event,JBPresence::Unsubscribed); -} - -void YJBPresence::processUnavailable(JBEvent* event) -{ - if (!event) - return; - XDebug(this,DebugAll, - "processUnavailable. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - if (!event->to()) - processBroadcast(event,false); - else - processDirected(event,false); -} - -void YJBPresence::processUnknown(JBEvent* event) -{ - if (!(event && event->element())) - return; - XDebug(this,DebugAll, - "processUnknown. Event: (%p). From: '%s' To: '%s'.", - event,event->from().c_str(),event->to().c_str()); - // The type attribute should not be present - const char* type = event->element()->getAttribute("type"); - if (type) { - DDebug(this,DebugInfo, - "processUnknown [%p]. Event: (%p). Unknown type: '%s'.", - this,event,type); - return; - } - if (!event->to()) - processBroadcast(event,true); - else - processDirected(event,true); -} - -void YJBPresence::processBroadcast(JBEvent* event, bool available) -{ - JabberID remote(event->from()); - if (available && !remote.resource()) { - Debug(this,DebugNote,"processBroadcast. Received presence without resource."); - return; - } - Lock lock(m_mutexUserpair); - ListIterator iter(m_userpair); - for(;;) { - YUserPresence* yup = static_cast(iter.get()); - if (!yup) - break; - if (!yup->remote().match(remote)) - continue; - if (available) - yup->processUnknown(event); - else { - yup->processUnavailable(event); - if (delUnavailable()) - yup->deref(); - } - } -} - -void YJBPresence::processDirected(JBEvent* event, bool available) -{ - JabberID local(event->to()); - if (!local.node()) { - Debug(this,DebugNote,"processDirected. Received presence without destination user."); - return; - } - JabberID remote(event->from()); - if (available && !remote.resource()) { - Debug(this,DebugNote,"processDirected. Received presence without resource."); - return; - } - XDebug(this,DebugAll, - "processDirected. Local: '%s' Remote: '%s'. Available: %s", - local.c_str(),remote.c_str(),available?"YES":"NO"); - Lock lock(m_mutexUserpair); - bool found = false; - // Try to match local and remote user - ListIterator iter(m_userpair); - for(;;) { - YUserPresence* yup = static_cast(iter.get()); - if (!yup) - break; - if (!(yup->local().match(local) && yup->remote().match(remote))) - continue; - found = true; - if (available) - yup->processUnknown(event); - else { - yup->processUnavailable(event); - if (delUnavailable()) - yup->deref(); - } - } - if (found) - return; - // Check if remote has a resource, is available and we have - // some pair without remote resource - if (remote.resource() && available) { - iter.reset(); - for(;;) { - YUserPresence* yup = static_cast(iter.get()); - if (!yup) - break; - if (!(!yup->remote().resource() && - yup->local().match(local) && - remote.bare() == yup->remote().bare())) - continue; - found = true; - yup->processUnknown(event); - break; - } - } - // Don't add if found or unavailable and we have to delete the unavailable users - if (found || (!available && delUnavailable())) - return; - // No local user: add one if it's in our domain - String identity; - if (!m_engine->getFullServerIdentity(identity) || identity != local.domain()) - return; - DDebug(this,DebugInfo,"Adding new local user on presence message."); - new YUserPresence(this,local,remote, - (YUserPresence::Subscription)getSubscription(local,remote), - YUserPresence::Unknown); -} - -void YJBPresence::processSubscribe(JBEvent* event, JBPresence::Presence type) -{ - JabberID local(event->to()); - JabberID remote(event->from()); - XDebug(m_engine,DebugAll,"processSubscribe. Local '%s'. Remote '%s'.", - local.c_str(),remote.c_str()); - Lock lock(m_mutexUserpair); - ObjList* obj = m_userpair.skipNull(); - YUserPresence* yup = 0; - // Check if we already have an user - for(; obj; obj = obj->skipNext()) { - yup = static_cast(obj->get()); - if (local.bare() == yup->local().bare() && - remote.bare() == yup->remote().bare()) - break; - yup = 0; - } - // Not found: add one if it's a subscribe request for an user in our domain - if (!yup && type == JBPresence::Subscribe) { - String identity; - if (!m_engine->getFullServerIdentity(identity) || - identity != local.domain()) - return; - DDebug(this,DebugAll,"Adding new local user on subscription request."); - yup = new YUserPresence(this,local.bare(),remote.bare(), - YUserPresence::SubNone,YUserPresence::Unknown); - } - if (!yup) - return; - // Confirm - bool from,value; - switch (type) { - case JBPresence::Subscribe: - // Already subscribed to us: Confirm subscription - if (yup->subscribedFrom()) { - yup->send(JBPresence::Subscribed); - return; - } - // Approve if auto subscribing - if ((autoSubscribe() & YUserPresence::SubFrom) && - !yup->send(JBPresence::Subscribed)) - return; - from = true; - value = true; - DDebug(m_engine,DebugAll,"processSubscribe. Subscribing."); - break; - case JBPresence::Subscribed: - // Already subscribed to remote user: do nothing - if (yup->subscribedTo()) - return; - from = false; - value = true; - break; - case JBPresence::Unsubscribe: - // Already unsubscribed from us: confirm it - if (!yup->subscribedFrom()) { - yup->send(JBPresence::Unsubscribed); - return; - } - // Approve if auto subscribing - if ((autoSubscribe() & YUserPresence::SubFrom) && - !yup->send(JBPresence::Unsubscribed)) - return; - from = true; - value = false; - DDebug(m_engine,DebugAll,"processSubscribe. Unsubscribing."); - break; - case JBPresence::Unsubscribed: - // If not subscribed to remote user ignore the unsubscribed confirmation - if (!yup->subscribedTo()) - return; - from = false; - value = false; - break; - default: - return; - } - // Update subscription - for (obj = m_userpair.skipNull(); obj; obj = obj->skipNext()) { - YUserPresence* tmp = static_cast(obj->get()); - if (local.bare() == tmp->local().bare() && - remote.bare() == tmp->remote().bare()) - tmp->updateSubscription(from,value); - } - // Notify engine - subscribe(yup,type); -} - -int YJBPresence::getSubscription(const JabberID& local, const JabberID& remote) -{ - Lock lock(m_mutexUserpair); - ObjList* obj = m_userpair.skipNull(); - for(; obj; obj = obj->skipNext()) { - YUserPresence* yup = static_cast(obj->get()); - if (local.bare() == yup->local().bare() && - remote.bare() == yup->remote().bare()) - return yup->subscription(); - } - return (int)(YUserPresence::SubFrom); -} - -void YJBPresence::startThreads(u_int16_t process) +void YJBPresence::startThreads(u_int16_t process, u_int16_t timeout) { // Process the received events if (!process) Debug(m_engine,DebugWarn,"No threads(s) to process events!"); for (; process; process--) (new YJGLibThread(YJGLibThread::JBPresence,"JBPresence thread"))->startup(); + // Process timeout + if (!timeout) + Debug(m_engine,DebugWarn,"No threads(s) to check user timeout!"); + for (; timeout; timeout--) + (new YJGLibThread(YJGLibThread::JBPresenceTimeout,"JBPresenceTimeout thread"))->startup(); } -bool YJBPresence::get(const JabberID& local, JabberID& remote, bool& newPresence, bool audio) +bool YJBPresence::message(Presence presence, const char* from, const char* to, + const char* subscription) { - Lock lock(m_mutexUserpair); - ObjList* obj = m_userpair.skipNull(); - YUserPresence* yup = 0; - for(; obj; obj = obj->skipNext()) { - yup = static_cast(obj->get()); - if (audio && !yup->audio()) { - yup = 0; - continue; - } - // Local user is searched by bare jid - // Remote user is searched by full or bare jid - if (local.bare() == yup->local().bare() && - yup->remote().match(remote)) { - // We found a remote user for the local one. Set the resource - remote.resource(yup->remote().resource()); - if (iplugin.m_jg->requestSubscribe()) { - bool avail = yup->available(); - yup->send(JBPresence::Subscribe); - // Simulate a new presence while we get an answer - newPresence = !avail; - return avail; - } + Message* m = 0; + const char* type = 0; + const char* status = 0; + const char* operation = 0; + switch (presence) { + case JBPresence::None: + type = MODULE_MSG_NOTIFY; + status = "online"; + break; + case JBPresence::Unavailable: + type = MODULE_MSG_NOTIFY; + status = "offline"; break; - } - yup = 0; - } - newPresence = (yup == 0); - if (newPresence) - yup = new YUserPresence(this,local,remote,YUserPresence::SubFrom, - YUserPresence::Unknown); - return yup->available(); -} - -void YJBPresence::notify(const YUserPresence* yup, const char* error) -{ - if (!yup) - return; - iplugin.processPresence(yup->local(),yup->remote(),yup->available(),yup->audio(),error); - if (error) - return; - // Enqueue message - XDebug(this,DebugInfo,"Enqueue '%s' for (%p).",MODULE_MSG_NOTIFY,yup); - Message* m = new Message(MODULE_MSG_NOTIFY); - m->addParam("module",MODULE_NAME); - m->addParam("protocol",MODULE_PROTOCOL); - m->addParam("subscription",YUserPresence::subscribeText(yup->subscription())); - m->addParam("presence",YUserPresence::stateText(yup->remoteState())); - m->addParam("from_user",yup->remote().bare()); - m->addParam("from_resource",yup->remote().resource()); - m->addParam("from_uri",yup->remote()); - m->addParam("to_user",yup->local().node()); - m->addParam("to_resource",yup->local().resource()); - m->addParam("to_uri",yup->local()); - Engine::enqueue(m); -} - -void YJBPresence::subscribe(const YUserPresence* yup, JBPresence::Presence type) -{ - if (!yup) - return; - // Check type - const char* message = 0; - bool request = true; - switch (type) { case JBPresence::Subscribed: - request = false; - case JBPresence::Subscribe: - message = MODULE_MSG_SUBSCRIBE; + type = MODULE_MSG_NOTIFY; + status = "subscribed"; break; case JBPresence::Unsubscribed: - request = false; + type = MODULE_MSG_NOTIFY; + status = "unsubscribed"; + break; + case JBPresence::Probe: + type = MODULE_MSG_SUBSCRIBE; + operation = "probe"; + break; + case JBPresence::Subscribe: + type = MODULE_MSG_SUBSCRIBE; + operation = "subscribe"; + break; case JBPresence::Unsubscribe: - message = MODULE_MSG_UNSUBSCRIBE; + type = MODULE_MSG_SUBSCRIBE; + operation = "unsubscribe"; break; default: - return; + return 0; } - XDebug(this,DebugInfo,"Enqueue '%s' for (%p).",message,yup); - Message* m = new Message(message); + m = new Message(type); m->addParam("module",MODULE_NAME); - m->addParam("protocol",MODULE_PROTOCOL); - m->addParam("subscription",YUserPresence::subscribeText(yup->subscription())); - m->addParam("from_user",yup->remote().bare()); - m->addParam("to_user",yup->local().node()); - Engine::enqueue(m); -} - -void YJBPresence::addPresence(YUserPresence* yup) -{ - if (!yup) - return; - Lock lock(m_mutexUserpair); - m_userpair.append(yup); -} - -void YJBPresence::removePresence(YUserPresence* yup) -{ - if (!yup) - return; - Lock lock(m_mutexUserpair); - m_userpair.remove(yup,false); + if (operation) + m->addParam("operation",operation); + if (subscription) + m->addParam("subscription",subscription); + if (status) + m->addParam("status",status); + m->addParam("from",from); + m->addParam("to",to); + return Engine::enqueue(m); } /** @@ -1063,12 +590,17 @@ void YJGEngine::processEvent(JGEvent* event) event->action() == JGSession::ActInitiate) { if (event->session()->ref()) { connection = new YJGConnection(this,event); - if (!connection->route()) + // Constructor failed ? + if (connection->state() == YJGConnection::Pending) + connection->deref(); + else if (!connection->route()) event->session()->jingleConn(0); } - else - DDebug(this,DebugWarn, + else { + Debug(this,DebugWarn, "processEvent. Session ref failed for new connection."); + event->session()->hangup(false,"failure"); + } } else DDebug(this,DebugAll, @@ -1092,295 +624,6 @@ void YJGEngine::startThreads(u_int16_t read, u_int16_t process) (new YJGLibThread(YJGLibThread::JGProcess,"JGProcess thread"))->startup(); } -/** - * YUserPresence - */ -TokenDict YUserPresence::s_subscription[] = { - {"none", SubNone}, - {"to", SubTo}, - {"from", SubFrom}, - {"both", SubBoth}, - {0,0}, - }; - -TokenDict YUserPresence::s_state[] = { - {"unknown", Unknown}, - {"available", Available}, - {"unavailable", Unavailable}, - {0,0}, - }; - -YUserPresence::YUserPresence(YJBPresence* engine, const char* local, - const char* remote, Subscription subscription, State state) - : Mutex(true), - m_local(local), - m_remote(remote), - m_localState(Unknown), - m_remoteState(Unknown), - m_subscription(SubNone), - m_audio(false), - m_engine(engine) -{ - if (!m_local.resource()) - m_local.resource(JINGLE_RESOURCE); - DDebug(m_engine,DebugNote, "YUserPresence. Local: %s. Remote: %s. [%p]", - m_local.c_str(),m_remote.c_str(),this); - m_engine->addPresence(this); - // Update state - if (state == Available || state == Unavailable) { - m_remoteState = state; - updateState((state == Available)); - } - // Update subscription - switch (subscription) { - case SubNone: - break; - case SubBoth: - updateSubscription(true,true); - updateSubscription(false,true); - break; - case SubFrom: - updateSubscription(true,true); - break; - case SubTo: - updateSubscription(false,true); - break; - } - // Subscribe to remote user if not already subscribed and auto subscribe is true - if (!subscribedTo() && (m_engine->autoSubscribe() & SubTo)) - send(JBPresence::Subscribe); - // Request remote's presence - if (remoteState() == Unknown) - send(JBPresence::Probe); -} - -YUserPresence::~YUserPresence() -{ - // Make us unavailable to remote peer - if (subscribedFrom() && available() && m_localState != Unavailable) - send(JBPresence::Unavailable); - m_engine->removePresence(this); - DDebug(m_engine,DebugNote, "~YUserPresence. Local: %s. Remote: %s. [%p]", - m_local.c_str(),m_remote.c_str(),this); -} - -bool YUserPresence::send(JBPresence::Presence type, bool caps, - JBComponentStream* stream) -{ - bool localStream; - XDebug(m_engine,DebugAll,"YUserPresence. Sending presence '%s'. [%p]", - JBPresence::presenceText(type),this); - if (!getStream(stream,localStream)) - return false; - // Create the element to send - XMLElement* xml = 0; - Lock lock(this); - switch (type) { - // Types that need only the bare jid - case JBPresence::Probe: - xml = JBPresence::createPresence(m_local,m_remote.bare(),type); - break; - case JBPresence::Unavailable: - xml = JBPresence::createPresence(m_local,m_remote,type); - break; - case JBPresence::Subscribe: - case JBPresence::Subscribed: - case JBPresence::Unsubscribe: - case JBPresence::Unsubscribed: - xml = JBPresence::createPresence(m_local.bare(),m_remote.bare(),type); - break; - // Types that need the full jid - case JBPresence::None: - xml = JBPresence::createPresence(m_local,m_remote.bare()); - if (caps) { - XMLElement* c = new XMLElement("c"); - c->setAttribute("xmlns","http://jabber.org/protocol/caps"); - c->setAttribute("node","http://www.google.com/xmpp/client/caps"); - c->setAttribute("ver",JINGLE_VERSION); - c->setAttribute("ext",JINGLE_VOICE); - xml->addChild(c); - } - break; - case JBPresence::Error: - return false; - } - lock.drop(); - bool result = sendStanza(stream,xml); - if (localStream) - stream->deref(); - // Update local state - if (result && - (type == JBPresence::None || type == JBPresence::Unavailable)) { - m_localState = (type == JBPresence::None ? Available : Unavailable); - return true; - } - // Set subscribe data. Not for subscribe/unsubscribe - if (type == JBPresence::Subscribed || type == JBPresence::Unsubscribed) - updateSubscription(true,type == JBPresence::Subscribed); - return result; -} - -bool YUserPresence::sendInfoRequest(bool info, JBComponentStream* stream) -{ - bool localStream; - if (!getStream(stream,localStream)) - return false; - XMLElement* xml = XMPPUtils::createIqDisco(m_local,m_remote.bare(), - String((int)random()),info); - bool result = sendStanza(stream,xml); - if (localStream) - stream->deref(); - return result; -} - -bool YUserPresence::sendInfo(const char* id, JBComponentStream* stream) -{ - bool localStream; - if (!getStream(stream,localStream)) - return false; - // Create response - XMLElement* query = XMPPUtils::createElement(XMLElement::Query, - XMPPNamespace::DiscoInfo); - // Set features - XMPPNamespace::Type ns[2] = {XMPPNamespace::Jingle, - XMPPNamespace::JingleAudio}; - JIDFeatures* f = new JIDFeatures(); - f->create(ns,2); - f->addTo(query); - f->deref(); - XMLElement* iq = XMPPUtils::createIq(XMPPUtils::IqResult,m_local,m_remote,id); - iq->addChild(query); - // Send - bool result = sendStanza(stream,iq); - if (localStream) - stream->deref(); - return result; -} - -bool YUserPresence::sendItems(const char* id, JBComponentStream* stream) -{ - bool localStream; - if (!getStream(stream,localStream)) - return false; - // Create response - XMLElement* query = XMPPUtils::createElement(XMLElement::Query, - XMPPNamespace::DiscoItems); - XMLElement* iq = XMPPUtils::createIq(XMPPUtils::IqResult,m_local,m_remote,id); - iq->addChild(query); - // Send - bool result = sendStanza(stream,iq); - if (localStream) - stream->deref(); - return result; -} - -void YUserPresence::processError(JBEvent* event) -{ - String code, type, error; - JBPresence::decodeError(event->element(),code,type,error); - DDebug(m_engine,DebugAll,"YUserPresence. Error. '%s'. Code: '%s'. [%p]", - error.c_str(),code.c_str(),this); - m_engine->notify(this,error); -} - -void YUserPresence::processUnavailable(JBEvent* event) -{ - Lock lock(this); - // Return if we already know that remote user is unavailable - if (m_remoteState == Unavailable) - return; - JabberID jid(event->from()); - // Remote has no resource: broadcast unavailable - if (jid.resource() && m_remote.resource() && m_remote.resource() != jid.resource()) - return; - XDebug(m_engine,DebugAll,"YUserPresence::processUnavailable. [%p]",this); - updateState(false); -} - -void YUserPresence::processUnknown(JBEvent* event) -{ - Lock lock(this); - // Get resource from presence - JabberID jid(event->from()); - // We already have a resource - if (m_remote.resource() && m_remote.resource() != jid.resource()) - return; - XDebug(m_engine,DebugAll, - "YUserPresence::processPresence. From '%s' to '%s'. [%p]", - event->from().c_str(),event->to().c_str(),this); - m_remote.resource(jid.resource()); - updateResource(event->element()); - // Success: Send our presence and capabilities if not already done - if (m_localState != Available) - send(JBPresence::None,true,event->stream()); - if (remoteState() != Available) - updateState(true); -} - -void YUserPresence::updateResource(XMLElement* element) -{ - if (!element) - return; - NamedList caps(""); - XMLElement* c = element->findFirstChild("c"); - if (c) - iplugin.getParts(caps,c->getAttribute("ext"),' ',true); - if (caps.getParam(JINGLE_VOICE)) - m_audio = true; - else - m_audio = false; -} - -void YUserPresence::updateSubscription(bool from, bool value) -{ - // Don't update if nothing changed - if ((from && value == subscribedFrom()) || - (!from && value == subscribedTo())) - return; - // Update - int s = (from ? SubFrom : SubTo); - if (value) - m_subscription |= s; - else - m_subscription &= ~s; - DDebug(m_engine,DebugNote, - "YUserPresence. Subscription updated. State '%s'. [%p]", - subscribeText(m_subscription),this); - // Send presence if remote user is subscribed to us - if (from && subscribedFrom()) { - send(JBPresence::Unavailable); - send(); - } - -} - -void YUserPresence::updateState(bool available) -{ - // Update - m_remoteState = (available ? Available : Unavailable); - DDebug(m_engine,DebugNote, - "YUserPresence. Remote presence updated. State: '%s'. Audio: %s. [%p]", - stateText(m_remoteState),m_audio?"YES":"NO",this); - // Notify on user presence - m_engine->notify(this); -} - -bool YUserPresence::getStream(JBComponentStream*& stream, bool& release) -{ - release = false; - if (stream) - return true; - if (!m_engine->engine()) - return false; - stream = m_engine->engine()->getStream(); - if (stream) { - release = true; - return true; - } - Debug(m_engine,DebugGoOn,"YUserPresence. No stream to send data. [%p]",this); - return false; -} - /** * YJGTransport */ @@ -1393,6 +636,7 @@ YJGTransport::YJGTransport(YJGConnection* connection, Message* msg) m_connection(connection), m_rtpData(0) { + Lock lock(this); if (!m_connection) return; // Set data members @@ -1432,9 +676,8 @@ YJGTransport::YJGTransport(YJGConnection* connection, Message* msg) // Not outgoing: Ready if (m_connection->isIncoming()) return; - //TODO: What if no format available ? // *** TRANSPORT - //TODO: Transport from message if forward + //TODO: Transport from message if RTP forward } YJGTransport::~YJGTransport() @@ -1480,22 +723,28 @@ bool YJGTransport::start() Lock lock(this); if (m_started || !(m_connection && m_mediaReady && m_transportReady)) return false; - Debug(m_connection,DebugCall,"Transport. Start. Local: '%s:%s'. Remote: '%s:%s'. [%p]", + Debug(m_connection,DebugCall, + "Transport. Start. Local: '%s:%s'. Remote: '%s:%s'. [%p]", m_address.c_str(),m_port.c_str(), m_remote->m_address.c_str(),m_remote->m_port.c_str(),m_connection); + // Start RTP Message* m = new Message("chan.rtp"); m->userData(static_cast(m_connection)); m_connection->complete(*m); m->addParam("direction","bidir"); m->addParam("media","audio"); - //TODO: Add real media #if 0 String formats; createMediaString(formats); m->addParam("formats",formats); - //TODO: Choose one format #endif - m->addParam("format","alaw"); + const char* format = 0; + ObjList* obj = m_formats.skipNull(); + if (obj) { + JGAudio* audio = static_cast(obj->get()); + format = lookup(audio->m_id.toInteger(),dict_payloads); + } + m->addParam("format",format); m->addParam("localip",m_address); m->addParam("localport",m_port); m->addParam("remoteip",m_remote->m_address); @@ -1509,7 +758,7 @@ bool YJGTransport::start() delete m; return false; } - // chan.stun + // Start STUN Message* msg = new Message("chan.stun"); msg->userData(m->userData()); msg->addParam("localusername",m_remote->m_username + m_username); @@ -1530,7 +779,7 @@ bool YJGTransport::updateMedia(ObjList& media) return false; // Check if we received any media if (0 == media.skipNull()) { - DDebug(m_connection,DebugWarn, + Debug(m_connection,DebugWarn, "Transport. The remote party has no media. [%p]",m_connection); m_connection->hangup(false,"nomedia"); return false; @@ -1539,27 +788,23 @@ bool YJGTransport::updateMedia(ObjList& media) for (GenObject* go; (go = iter_local.get());) { JGAudio* local = static_cast(go); // Check if incoming media contains local media (compare 'id' and 'name') - bool exists = false; ObjList* obj = media.skipNull(); for (; obj; obj = obj->skipNext()) { JGAudio* remote = static_cast(obj->get()); - if (local->m_id == remote->m_id && local->m_name == remote->m_name) { - exists = true; + if (local->m_id == remote->m_id && local->m_name == remote->m_name) break; - } } - // Remove from local if not exists - if (!exists) + // obj is 0. Current element from m_formats is not in received media. Remove it + if (!obj) m_formats.remove(local,true); - // Remove from remote - if (obj) - media.remove(obj,true); } // Check if both parties have common media if (0 == m_formats.skipNull()) { - DDebug(m_connection,DebugWarn, - "Transport. Unable to negotiate media (no common formats). [%p]", - m_connection); + String recvFormat; + iplugin.createMediaString(recvFormat,media,','); + Debug(m_connection,DebugWarn, + "Transport. Unable to negotiate media (no common formats). Received: '%s'. [%p]", + recvFormat.c_str(),m_connection); m_connection->hangup(false,"nomedia"); return false; } @@ -1587,10 +832,14 @@ bool YJGTransport::updateTransport(ObjList& transport) if (m_generation == remote->m_generation && m_name == remote->m_name && m_protocol == remote->m_protocol && - m_type == remote->m_type && - m_network == remote->m_network) + m_type == remote->m_type) break; // We hate it: reset and skip + DDebug(m_connection,DebugInfo, + "Transport. Unacceptable transport. Name: '%s'. Protocol: '%s. Type: '%s'. Generation: '%s'. [%p]", + m_remote->m_name.c_str(),m_remote->m_protocol.c_str(), + m_remote->m_type.c_str(),m_remote->m_generation.c_str(), + m_connection); remote = 0; } if (!remote) @@ -1625,20 +874,7 @@ XMLElement* YJGTransport::createDescription() void YJGTransport::createMediaString(String& dest) { Lock lock(this); - bool first = true; - ObjList* obj = m_formats.skipNull(); - for (; obj; obj = obj->skipNext()) { - JGAudio* a = static_cast(obj->get()); - const char* payload = lookup(a->m_id.toInteger(),dict_payloads); - if (!payload) - continue; - if (first) { - dest = payload; - first = false; - } - else - dest << "," << payload; - } + iplugin.createMediaString(dest,m_formats,','); } /** @@ -1657,7 +893,8 @@ YJGConnection::YJGConnection(YJGEngine* jgEngine, Message& msg, const char* call m_hangup(false), m_timeout(0) { - XDebug(this,DebugInfo,"YJGConnection. Outgoing. [%p]",this); + Debug(this,DebugCall,"Outgoing. Caller: '%s'. Called: '%s'. [%p]", + caller,called,this); m_callerPrompt = msg.getValue("callerprompt"); // Init transport m_transport = new YJGTransport(this,&msg); @@ -1674,7 +911,9 @@ YJGConnection::YJGConnection(YJGEngine* jgEngine, Message& msg, const char* call if (m_timeout) m_timeout += timenow - pendingTimeout; } - XDebug(this,DebugInfo,"YJGConnection. Time: " FMT64 ". Maxcall set to " FMT64 " us. [%p]",Time::now(),maxcall(),this); + XDebug(this,DebugInfo, + "YJGConnection. Time: " FMT64 ". Maxcall set to " FMT64 " us. [%p]", + Time::now(),maxcall(),this); // Startup Message* m = message("chan.startup"); m->setParam("direction",status()); @@ -1702,12 +941,14 @@ YJGConnection::YJGConnection(YJGEngine* jgEngine, JGEvent* event) m_hangup(false), m_timeout(0) { - XDebug(this,DebugInfo,"YJGConnection. Incoming. [%p]",this); + Debug(this,DebugCall,"Incoming. Caller: '%s'. Called: '%s'. [%p]", + m_remote.c_str(),m_local.c_str(),this); // Set session m_session->jingleConn(this); // Init transport m_transport = new YJGTransport(this); - m_transport->updateMedia(event->audio()); + if (!m_transport->updateMedia(event->audio())) + m_state = Pending; m_transport->updateTransport(event->transport()); // Startup Message* m = message("chan.startup"); @@ -1725,7 +966,7 @@ YJGConnection::~YJGConnection() m_session->deref(); if (m_transport) m_transport->deref(); - XDebug(this,DebugInfo,"~YJGConnection [%p].",this); + Debug(this,DebugCall,"Terminated. [%p]",this); } bool YJGConnection::route() @@ -1753,7 +994,6 @@ void YJGConnection::callAccept(Message& msg) m_session->accept(m_transport->createDescription()); m_session->acceptTransport(0); m_transport->send(m_session); - m_transport->start(); Channel::callAccept(msg); } @@ -1766,7 +1006,7 @@ void YJGConnection::callRejected(const char* error, const char* reason, else m_reason = reason; Debug(this,DebugCall,"callRejected. Reason: '%s'. [%p]",m_reason.c_str(),this); - hangup(true); + hangup(false); } bool YJGConnection::callRouted(Message& msg) @@ -1777,7 +1017,7 @@ bool YJGConnection::callRouted(Message& msg) void YJGConnection::disconnected(bool final, const char* reason) { - DDebug(this,DebugCall,"disconnected. Final: %u. Reason: '%s'. [%p]", + Debug(this,DebugCall,"disconnected. Final: %u. Reason: '%s'. [%p]", final,reason,this); if (!m_reason && reason) m_reason = reason; @@ -1798,7 +1038,7 @@ bool YJGConnection::msgUpdate(Message& msg) bool YJGConnection::msgText(Message& msg, const char* text) { - DDebug(this,DebugCall,"msgText. [%p]",this); + DDebug(this,DebugCall,"msgText. '%s'. [%p]",text,this); if (m_session) { m_session->sendMessage(text); return true; @@ -1806,6 +1046,14 @@ bool YJGConnection::msgText(Message& msg, const char* text) return false; } +bool YJGConnection::msgTone(Message& msg, const char* tone) +{ + DDebug(this,DebugCall,"msgTone. '%s'. [%p]",tone,this); + while (tone && *tone) + m_session->sendDtmf(*tone++); + return true; +} + void YJGConnection::hangup(bool reject, const char* reason) { if (m_hangup) // Already hung up @@ -1843,14 +1091,11 @@ void YJGConnection::handleEvent(JGEvent* event) event,event->type(),m_reason.c_str(),this); break; case JGEvent::Error: - DDebug(this,DebugCall, - "handleEvent((%p): %u). Error. Id: '%s'. Reason: '%s'. Text: '%s'. [%p]", - event,event->type(),event->id().c_str(),event->reason().c_str(), - event->text().c_str(),this); + handleError(event); break; case JGEvent::Message: { DDebug(this,DebugCall, - "handleEvent((%p): %u). Message text: '%s'. [%p]", + "handleEvent((%p): %u). Message: '%s'. [%p]", event,event->type(),event->text().c_str(),this); Message* m = message("chan.text"); m->addParam("text",event->text()); @@ -1858,7 +1103,7 @@ void YJGConnection::handleEvent(JGEvent* event) } break; default: - DDebug(this,DebugCall,"handleEvent((%p): %u). [%p]", + Debug(this,DebugNote,"handleEvent((%p): %u). Unexpected. [%p]", event,event->type(),event); } } @@ -1866,7 +1111,22 @@ void YJGConnection::handleEvent(JGEvent* event) void YJGConnection::handleJingle(JGEvent* event) { switch (event->action()) { - case JGSession::ActTransportInfo: + case JGSession::ActDtmf: + Debug(this,DebugInfo,"handleJingle. Dtmf(%s): '%s'. [%p]", + event->reason().c_str(),event->text().c_str(),this); + if (event->reason() == "button-up") { + Message* m = message("chan.dtmf"); + m->addParam("text",event->text()); + Engine::enqueue(m); + } + break; + case JGSession::ActDtmfMethod: + Debug(this,DebugInfo,"handleJingle. Dtmf method: '%s'. [%p]", + event->text().c_str(),this); + // TODO: method can be 'rtp' or 'xmpp' + m_session->sendResult(event->id()); + break; + case JGSession::ActTransport: { bool accept = !m_transport->transportReady() && m_transport->updateTransport(event->transport()); @@ -1915,6 +1175,14 @@ void YJGConnection::handleJingle(JGEvent* event) } } +void YJGConnection::handleError(JGEvent* event) +{ + DDebug(this,DebugCall, + "handleError. Error. Id: '%s'. Reason: '%s'. Text: '%s'. [%p]", + event->id().c_str(),event->reason().c_str(), + event->text().c_str(),this); +} + bool YJGConnection::processPresence(bool available, const char* error) { if (m_state == Terminated) { @@ -2006,48 +1274,190 @@ void YJGLibThread::run() DDebug(iplugin.m_jb,DebugAll,"%s started.",name()); iplugin.m_presence->runProcess(); break; + case JBPresenceTimeout: + DDebug(iplugin.m_jb,DebugAll,"%s started.",name()); + iplugin.m_presence->runCheckTimeout(); + break; } DDebug(iplugin.m_jb,DebugAll,"%s end of run.",name()); } - /** * resource.notify message handler */ -bool ResNotifyHandler::received(Message &msg) +bool ResNotifyHandler::received(Message& msg) { - XDebug(&iplugin,DebugAll,"Received '%s'.",MODULE_MSG_NOTIFY); - if (MODULE_NAME == msg.getValue("module")) { - XDebug(&iplugin,DebugAll,"'%s' loopback. Ignoring.",MODULE_MSG_NOTIFY); + JabberID from,to; + // Ignore message if we are not the destination + if (MODULE_NAME != msg.getValue("protocol")) return false; + // *** Check from/to + if (!iplugin.decodeJid(from,msg,"from",true)) + return true; + if (!iplugin.decodeJid(to,msg,"to")) + return true; + // *** Check status + String status = msg.getValue("status"); + if (status.null()) { + Debug(&iplugin,DebugNote, + "Received '%s' with missing 'status' parameter.",MODULE_MSG_NOTIFY); + return true; } - return false; + // *** Everything is OK. Process the message + XDebug(&iplugin,DebugAll,"Accepted '%s'.",MODULE_MSG_NOTIFY); + process(from,to,status,msg.getBoolValue("subscription",false)); + return true; +} + +void ResNotifyHandler::process(const JabberID& from, const JabberID& to, + const String& status, bool subFrom) +{ + XMPPUserRoster* roster = iplugin.m_presence->getRoster(from,true,0); + XMPPUser* user = roster->getUser(to,false,0); + // Add new user and local resource + if (!user) { + user = new XMPPUser(roster,to.node(),to.domain(), + subFrom ? XMPPUser::From : XMPPUser::None,false,false); + iplugin.m_presence->notifyNewUser(user); + if (!user->ref()) { + roster->deref(); + return; + } + } + roster->deref(); + user->lock(); + // Process + for (;;) { + if (status == "subscribed") { + // Send only if not already subscribed to us + if (!user->subscribedFrom()) + user->sendSubscribe(JBPresence::Subscribed,0); + break; + } + if (status == "unsubscribed") { + // Send only if not already unsubscribed from us + if (user->subscribedFrom()) + user->sendSubscribe(JBPresence::Unsubscribed,0); + break; + } + // Presence + JIDResource* res = user->getAudio(true,true); + if (!res) + break; + bool changed = false; + if (status == "offline") + changed = res->setPresence(false); + else { + changed = res->setPresence(true); + if (status == "online") { + if (!res->status().null()) { + res->status(""); + changed = true; + } + } + else { + if (status != res->status()) { + res->status(status); + changed = true; + } + } + } + if (changed) + user->sendPresence(res,0,true); + break; + } + user->unlock(); + user->deref(); } /** * resource.subscribe message handler */ -bool ResSubscribeHandler::received(Message &msg) +bool ResSubscribeHandler::received(Message& msg) { - XDebug(&iplugin,DebugAll,"Received '%s'.",MODULE_MSG_SUBSCRIBE); - if (MODULE_NAME == msg.getValue("module")) { - XDebug(&iplugin,DebugAll,"'%s' loopback. Ignoring.",MODULE_MSG_SUBSCRIBE); + JabberID from,to; + // Ignore message if we are not the destination + if (MODULE_NAME != msg.getValue("protocol")) return false; + // *** Check from/to + if (!iplugin.decodeJid(from,msg,"from",true)) + return true; + if (!iplugin.decodeJid(to,msg,"to")) + return true; + // *** Check operation + String tmpParam = msg.getValue("operation"); + JBPresence::Presence presence; + if (tmpParam == "subscribe") + presence = JBPresence::Subscribe; + else if (tmpParam == "probe") + presence = JBPresence::Probe; + else if (tmpParam == "unsubscribe") + presence = JBPresence::Unsubscribe; + else { + Debug(&iplugin,DebugNote, + "Received '%s' with missing or unknown parameter: operation=%s.", + MODULE_MSG_SUBSCRIBE,msg.getValue("operation")); + return true; } - return false; + // *** Everything is OK. Process the message + XDebug(&iplugin,DebugAll,"Accepted '%s'.",MODULE_MSG_SUBSCRIBE); + process(from,to,presence); + return true; } -/** - * resource.unsubscribe message handler - */ -bool ResUnsubscribeHandler::received(Message &msg) +void ResSubscribeHandler::process(const JabberID& from, const JabberID& to, + JBPresence::Presence presence) { - XDebug(&iplugin,DebugAll,"Received '%s'.",MODULE_MSG_UNSUBSCRIBE); - if (MODULE_NAME == msg.getValue("module")) { - XDebug(&iplugin,DebugAll,"'%s' loopback. Ignoring.",MODULE_MSG_UNSUBSCRIBE); - return false; + XMPPUserRoster* roster = iplugin.m_presence->getRoster(from,true,0); + XMPPUser* user = roster->getUser(to,false,0); + // Add new user and local resource + if (!user) { + user = new XMPPUser(roster,to.node(),to.domain(),XMPPUser::From, + false,false); + iplugin.m_presence->notifyNewUser(user); + if (!user->ref()) { + roster->deref(); + return; + } } - return false; + roster->deref(); + // Process + user->lock(); + for (;;) { + if (presence == JBPresence::Subscribe) { + // Already subscribed: notify. NO: send request + if (user->subscribedTo()) + iplugin.m_presence->notifySubscribe(user,JBPresence::Subscribed); + else { + user->sendSubscribe(JBPresence::Subscribe,0); + user->probe(0); + } + break; + } + if (presence == JBPresence::Unsubscribe) { + // Already unsubscribed: notify. NO: send request + if (!user->subscribedTo()) + iplugin.m_presence->notifySubscribe(user,JBPresence::Unsubscribed); + else { + user->sendSubscribe(JBPresence::Unsubscribe,0); + user->probe(0); + } + break; + } + // Respond if user has a resource with audio capabilities + JIDResource* res = user->getAudio(false,true); + if (res) { + user->notifyResource(true,res); + break; + } + // No audio resource for remote user: send probe + // Send probe fails: Assume remote user unavailable + if (!user->probe(0)) + iplugin.m_presence->notifyPresence(0,to,from,false); + break; + } + user->unlock(); + user->deref(); } /** @@ -2110,7 +1520,6 @@ void YJGDriver::initialize() installRelay(Halt); Engine::install(new ResNotifyHandler); Engine::install(new ResSubscribeHandler); - Engine::install(new ResUnsubscribeHandler); setup(); } @@ -2237,7 +1646,8 @@ void YJGDriver::initPresence(const NamedList& sect) m_presence->debugChain(this); // Init threads int process = 1; - m_presence->startThreads(process); + int timeout = 1; + m_presence->startThreads(process,timeout); } } @@ -2276,21 +1686,56 @@ bool YJGDriver::msgExecute(Message& msg, String& dest) msg.setParam("error","failure"); return false; } - const char* c = msg.getValue("caller"); + String c = msg.getValue("caller"); if (!c) c = iplugin.anonymousCaller(); - JabberID caller(c,identity,JINGLE_RESOURCE); - JabberID called(dest); - DDebug(this,DebugAll,"msgExecute. Caller: '%s'. Called: '%s'.", - caller.c_str(),called.c_str()); - bool newPresence = true; - bool available = iplugin.m_presence->get(caller,called,newPresence); - if (!(newPresence || available)) { - Debug(this,DebugNote,"Jingle call failed. Remote peer is unavailable."); - msg.setParam("error","offline"); + if (!(c && JabberID::valid(c))) { + Debug(this,DebugNote,"Jingle call failed. Missing or invalid caller name."); + msg.setParam("error","failure"); return false; } + JabberID caller(c,identity); + JabberID called(dest); + // Get remote user + bool newPresence = false; + XMPPUser* remote = iplugin.m_presence->getRemoteUser(caller,called,true,0, + true,&newPresence); + // Get a resource for the caller + JIDResource* res = remote->getAudio(true,true); + if (!res) { + iplugin.m_presence->notifyNewUser(remote); + res = remote->getAudio(true,true); + // This should never happen !!! + if (!res) { + remote->deref(); + Debug(this,DebugNote, + "Jingle call failed. Unable to get a resource for the caller."); + msg.setParam("error","failure"); + return false; + } + } + caller.resource(res->name()); + // Get a resource for the called + res = remote->getAudio(false,true); + bool available = (res != 0); + if (!(newPresence || available)) { + if (!iplugin.m_jg->requestSubscribe()) { + remote->deref(); + Debug(this,DebugNote,"Jingle call failed. Remote peer is unavailable."); + msg.setParam("error","offline"); + return false; + } + remote->sendSubscribe(JBPresence::Subscribe,0); + } + if (available) + called.resource(res->name()); + else + if (!newPresence) + remote->probe(0); + remote->deref(); // Parameters OK. Create connection and init channel + DDebug(this,DebugAll,"msgExecute. Caller: '%s'. Called: '%s'.", + caller.c_str(),called.c_str()); YJGConnection* conn = new YJGConnection(m_jg,msg,caller,called,available); Channel* ch = static_cast(msg.userData()); if (ch && conn->connect(ch,msg.getValue("reason"))) { @@ -2314,6 +1759,23 @@ bool YJGDriver::received(Message& msg, int id) return Driver::received(msg,id); } +bool YJGDriver::decodeJid(JabberID& jid, Message& msg, const char* param, + bool checkDomain) +{ + jid.set(msg.getValue(param)); + if (jid.node().null() || jid.domain().null()) { + Debug(this,DebugNote,"'%s'. Parameter '%s'='%s' is an invalid JID.", + msg.c_str(),param,jid.c_str()); + return false; + } + if (!m_presence->validDomain(jid.domain())) { + Debug(this,DebugNote,"'%s'. Parameter '%s'='%s' has invalid (unknown) domain.", + msg.c_str(),param,jid.c_str()); + return false; + } + return true; +} + void YJGDriver::createAuthRandomString(String& dest) { dest = ""; @@ -2323,7 +1785,7 @@ void YJGDriver::createAuthRandomString(String& dest) } void YJGDriver::processPresence(const JabberID& local, const JabberID& remote, - bool available, bool audio, const char* error) + bool available, bool audio) { // Check if it is a brodcast and remote user has a resource bool broadcast = local.null(); @@ -2360,7 +1822,7 @@ void YJGDriver::processPresence(const JabberID& local, const JabberID& remote, if ((!broadcast && local.bare() != conn->local().bare()) || !conn->remote().match(remote)) continue; - if (conn->processPresence(false,error)) + if (conn->processPresence(false)) conn->disconnect(); } unlock(); @@ -2369,19 +1831,14 @@ void YJGDriver::processPresence(const JabberID& local, const JabberID& remote, void YJGDriver::createMediaString(String& dest, ObjList& formats, char sep) { dest = ""; - bool first = true; + String s = sep; ObjList* obj = formats.skipNull(); for (; obj; obj = obj->skipNext()) { JGAudio* a = static_cast(obj->get()); const char* payload = lookup(a->m_id.toInteger(),dict_payloads); if (!payload) continue; - if (first) { - dest = payload; - first = false; - } - else - dest << sep << payload; + dest.append(payload,s); } }