diff --git a/conf.d/sigtransport.conf.sample b/conf.d/sigtransport.conf.sample new file mode 100644 index 00000000..67b17a2a --- /dev/null +++ b/conf.d/sigtransport.conf.sample @@ -0,0 +1,40 @@ +; Each section in this file describes a SIGTRAN connection +; Connections are referenced from other configurations describing the upper layer + +;[name-of-connection] +; The name of the section identifies the connection + +; type: keyword: Socket type - sctp, tcp, udp, unix +;type=sctp + +; stream: bool: Socket connection type. +; Designed for SCTP sockets to create a stream socket or a sequenced packet socket +; NOTE: for M2PA if stream is false the M2PA autostart should be on true on both ends, +; if stream is false the M2PA autostart should be true only at one end +;stream=true + +; local: string: Primary local address +; Format is ipv4:port like: 1.1.1.1:3566 +;local= + +; localN: string: Additional local addresses, SCTP only +; Multiple addresses can be specified by incrementing the 1-based index at the end of 'local' +; The address format is ip:port like: +; local1=1.2.3.4:3566 +; local2=2.3.4.5:3566 +;local1= + +; remote: string: Primary remote address +; Format is ipv4:port like: 2.2.2.2:3566 +;remote= + +; remoteN: string: Additional remote addresses, SCTP only +; Multiple addresses can be specified by incrementing the 1-based index at the end of 'remote' +; The address format is ip:port like: +; remote1=5.6.7.8:3566 +; remote2=6.7.8.9:3566 +;remote1= + +; endpoint: bool: Set to true if this is an endpoint that actively tries to connect, +; false to listen for remote connections +;endpoint=false diff --git a/libs/ysig/engine.cpp b/libs/ysig/engine.cpp index d47648cb..67a5ad09 100644 --- a/libs/ysig/engine.cpp +++ b/libs/ysig/engine.cpp @@ -94,6 +94,8 @@ SignallingComponent* SignallingFactory::build(const String& type, const NamedLis // now build some objects we know about if (type == "SS7MTP2") return new SS7MTP2(*name); + else if (type == "SS7M2PA") + return new SS7M2PA(*name); else if (type == "SS7MTP3") return new SS7MTP3(*name); else if (type == "SS7Router") diff --git a/libs/ysig/sigtran.cpp b/libs/ysig/sigtran.cpp index 7589e1a4..871f01f9 100644 --- a/libs/ysig/sigtran.cpp +++ b/libs/ysig/sigtran.cpp @@ -76,7 +76,7 @@ void SIGTRAN::attach(SIGTransport* trans) Lock lock(m_transMutex); if (trans == m_trans) return; - if (trans && trans->ref()) + if (!(trans && trans->ref())) trans = 0; SIGTransport* tmp = m_trans; m_trans = trans; @@ -85,8 +85,10 @@ void SIGTRAN::attach(SIGTransport* trans) tmp->attach(0); tmp->destruct(); } - if (trans) + if (trans) { trans->attach(this); + trans->deref(); + } } // Transmit a SIGTRAN message over the attached transport @@ -116,6 +118,11 @@ bool SIGTransport::processMSG(unsigned char msgVersion, unsigned char msgClass, return m_sigtran && m_sigtran->processMSG(msgVersion,msgClass,msgType,msg,streamId); } +void SIGTransport::notifyLayer(SignallingInterface::Notification event) +{ + if (m_sigtran) + m_sigtran->notifyLayer(event); +} // Build the common header and transmit a message to the network bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass, unsigned char msgType, const DataBlock& msg, int streamId) @@ -140,4 +147,569 @@ bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass, return ok; } +/** + * Class SS7M2PA + */ + +static TokenDict s_state[] = { + {"Alignment", SS7M2PA::Alignment}, + {"ProvingNormal", SS7M2PA::ProvingNormal}, + {"ProvingEmergency", SS7M2PA::ProvingEmergency}, + {"Ready", SS7M2PA::Ready}, + {"ProcessorOutage", SS7M2PA::ProcessorOutage}, + {"ProcessorRecovered", SS7M2PA::ProcessorRecovered}, + {"Busy", SS7M2PA::Busy}, + {"BusyEnded", SS7M2PA::BusyEnded}, + {"OutOfService", SS7M2PA::OutOfService}, + {0,0} +}; + +static TokenDict s_messageType[] = { + {"UserData", SS7M2PA::UserData}, + {"LinkStatus", SS7M2PA::LinkStatus}, + {0,0} +}; + +SS7M2PA::SS7M2PA(const NamedList& params) + : SignallingComponent(params.safe("SS7M2PA"),¶ms), m_seqNr(0), + m_needToAck(0), m_lastAck(0), m_localStatus(OutOfService), m_state(OutOfService), + m_remoteStatus(OutOfService), m_transportState(Idle), m_mutex(String("Mutex:") + debugName()), m_t1(0), + m_t2(0), m_t3(0), m_t4(0), m_ackTimer(0), m_confTimer(0), m_dumpMsg(false) + +{ + // Alignemnt ready timer ~45s + m_t1.interval(params,"t1",45000,50000,false); + // Not Aligned timer ~5s + m_t2.interval(params,"t2",5000,5500,false); + // Aligned timer ~1s + m_t3.interval(params,"t3",1000,1500,false); + // Prouving timer Normal ~8s, Emergency ~0.5s + m_t4.interval(params,"t4",1000,1000,false); + // Acknowledge timer ~1s + m_ackTimer.interval(params,"ack_timer",1000,1100,false); + // Confirmation timer 1/2 t4 + m_confTimer.interval(params,"conf_timer",500,600,false); + DDebug(this,DebugAll,"Creating SS7M2PA [%p]",this); +} + +SS7M2PA::~SS7M2PA() +{ + Lock lock(m_mutex); + m_ackList.clear(); + m_bufMsg.clear(); + DDebug(this,DebugAll,"Destroing SS7M2PA [%p]",this); +} + +bool SS7M2PA::initialize(const NamedList* config) +{ +#ifdef DEBUG + String tmp; + if (config && debugAt(DebugAll)) + config->dump(tmp,"\r\n ",'\'',true); + Debug(this,DebugInfo,"SS7M2PA::initialize(%p) [%p]%s",config,this,tmp.c_str()); +#endif + m_dumpMsg = config->getBoolValue("dumpMsg",false); + m_autostart = config->getBoolValue("autostart",true); + if (config && !transport()) { + NamedString* name = config->getParam("sig"); + if (!name) + name = config->getParam("basename"); + if (name) { + NamedPointer* ptr = YOBJECT(NamedPointer,name); + NamedList* trConfig = ptr ? YOBJECT(NamedList,ptr->userData()) : 0; + NamedList params(name->c_str()); + params.addParam("basename",*name); + params.addParam("protocol","ss7"); + if (trConfig) + params.copyParams(*trConfig); + else { + params.copySubParams(*config,params + "."); + trConfig = ¶ms; + } + SIGTransport* tr = YSIGCREATE(SIGTransport,¶ms); + if (!tr) + return false; + SIGTRAN::attach(tr); + if (!tr->initialize(trConfig)) + SIGTRAN::attach(0); + } + } + if (transport()) + m_reliable = transport()->reliable(); + return transport() && (control(Resume,const_cast(config))); +} + +void SS7M2PA::dumpMsg(u_int8_t version, u_int8_t mClass, u_int8_t type, + const DataBlock& data, int stream, bool send) +{ + String dump = "SS7M2PA "; + dump << (send ? "Sending:" : "Received:"); + dump << "\n-----"; + String indent = "\n "; + dump << indent << "Version: " << version; + dump << " " << "Message class: " << mClass; + dump << " " << "Message type: " << lookup(type,s_messageType,"Unknown"); + dump << indent << "Stream: " << stream; + u_int32_t fsn = (data[1] << 16) | (data[2] << 8) | data[3]; + u_int32_t bsn = (data[5] << 16) | (data[6] << 8) | data[7]; + dump << indent << "FSN : " << fsn << " BSN: " << bsn; + if (type == LinkStatus) { + u_int32_t status = (data[8] << 24) | (data[9] << 16) | (data[10] << 8) | data[11]; + dump << indent << "Status: " << lookup(status,s_state); + } + else { + String hex; + hex.hexify((u_int8_t*)data.data() + 8,data.length() - 8,' '); + dump << indent << "Data: " << hex; + } + dump << "\n-----"; + Debug(this,DebugInfo,"%s",dump.c_str()); +} + +bool SS7M2PA::processMSG(unsigned char msgVersion, unsigned char msgClass, + unsigned char msgType, const DataBlock& msg, int streamId) +{ + if (msgClass != M2PA) { + Debug(this,DebugWarn,"M2PA received non M2PA message class %d",msgClass); + dumpMsg(msgVersion,msgClass,msgType,msg,streamId,false); + return false; + } + if (m_dumpMsg) + dumpMsg(msgVersion,msgClass,msgType,msg,streamId,false); + Lock lock(m_mutex); + if (!operational() && msgType == UserData) { + if (m_remoteStatus != ProcessorOutage || m_remoteStatus != Busy) + return false; + // If we are not operational buffer the received messages and ack them when we are op + m_bufMsg.append(new DataBlock(msg)); + DDebug(this,DebugAll,"Buffering data message while non operational, %d messages buffered", + m_bufMsg.count()); + return true; + } + if (!operational() && msgType == UserData) + return false; + if (!decodeSeq(msg,(u_int8_t)msgType)) + return false; + DataBlock data(msg); + data.cut(-8); + if (!data.length()) + return true; + if (msgType == LinkStatus) + return processLinkStatus(data,streamId); + if (streamId != 1) + DDebug(this,DebugNote,"Received data message on Link status stream"); + // We should return false only if transport is sctp!!!!!!!!!! + lock.drop(); + SS7MSU msu(data); + return receivedMSU(msu); +} + +bool SS7M2PA::decodeSeq(const DataBlock& data,u_int8_t msgType) +{ + if (data.length() < 8) + return false; + u_int32_t fsn = (data[1] << 16) | (data[2] << 8) | data[3]; + u_int32_t bsn = (data[5] << 16) | (data[6] << 8) | data[7]; + if (msgType == LinkStatus) { + if (fsn != m_needToAck) { + DDebug(this,DebugNote,"Received LinkStatus message with wrong sequence number %d expected %d", + fsn,m_needToAck); + abortAlignment("Wrong Sequence number"); + transmitLS(); + return false; + } + if (bsn == getNext(m_lastAck)) + removeFrame(bsn); + if (bsn == m_lastAck) + return true; + // If we are here meens that something went wrong + abortAlignment("msgType == LinkStatus"); + transmitLS(); + return false; + } + if (fsn != getNext(m_needToAck) && fsn != m_needToAck) { + abortAlignment("Received Out of sequence frame"); + transmitLS(); + return false; + } + else { + if (fsn == getNext(m_needToAck) && operational()) { + if (m_confTimer.started()) { + sendAck(); + m_confTimer.stop(); + } + m_needToAck = fsn; + m_confTimer.start(); + } + else if (!operational()) + m_bufMsg.append(new DataBlock(data)); + } + if (bsn == getNext(m_lastAck)) + removeFrame(bsn); + if (bsn != m_lastAck) { + abortAlignment(String("Received unexpected bsn: ") << bsn); + transmitLS(); + return false; + } + return true; +} + +void SS7M2PA::timerTick(const Time& when) +{ + Lock lock(m_mutex); + if (m_confTimer.started() && m_confTimer.timeout(when.msec())) { + sendAck(); // Acknowledge last received message before endpoint drops down the link + m_confTimer.stop(); + } + if (m_ackTimer.started() && m_ackTimer.timeout(when.msec())) { + m_ackTimer.stop(); + if (m_reliable) { + lock.drop(); + abortAlignment("Ack timer timeout"); + } else + retransData(); + } + if (m_t2.started() && m_t2.timeout(when.msec())) { + m_t2.stop(); + abortAlignment("T2 timeout"); + return; + } + if (m_t3.started() && m_t3.timeout(when.msec())) { + m_t3.stop(); + abortAlignment("T3 timeout"); + return; + } + if (m_t4.started() && m_t4.timeout(when.msec())) { + m_t4.stop(); + setLocalStatus(Ready); + transmitLS(); + m_t1.start(); + return; + } + if (m_t1.started() && m_t1.timeout(when.msec())) { + m_t1.stop(); + abortAlignment("T1 timeout."); + } +} + +void SS7M2PA::removeFrame(u_int32_t bsn) +{ + Lock lock(m_mutex); + for (ObjList* o = m_ackList.skipNull();o;o = o->skipNext()) { + DataBlock* d = static_cast(o->get()); + u_int32_t seq = (d->at(1) << 16) | (d->at(2) << 8) | d->at(3); + if (bsn != seq) + continue; + m_lastAck = bsn; + m_ackList.remove(d); + m_ackTimer.stop(); + break; + } +} + +void SS7M2PA::setLocalStatus(unsigned int status) +{ + if (status == m_localStatus) + return; + m_localStatus = status; +} + +void SS7M2PA::setRemoteStatus(unsigned int status) +{ + if (status == m_remoteStatus) + return; + m_remoteStatus = status; +} + +bool SS7M2PA::aligned() const +{ + return ((m_localStatus == ProvingNormal) || (m_localStatus == ProvingEmergency)) && + ((m_remoteStatus == ProvingNormal) || (m_remoteStatus == ProvingEmergency)); +} + +bool SS7M2PA::operational() const +{ + return m_localStatus == Ready && m_remoteStatus == Ready; +} + +void SS7M2PA::sendAck() +{ + DataBlock data; + setHeader(data); + dumpMsg(1,M2PA,UserData,data,1,true); + transmitMSG(1,M2PA,UserData,data,1); +} + +bool SS7M2PA::control(Operation oper, NamedList* params) +{ + switch (oper) { + case Pause: + m_state = OutOfService; + abortAlignment("Control request pause."); + transmitLS(); + return true; + case Resume: + if (aligned()) + return true; + case Align: + { + bool em = params && params->getBoolValue("emergency"); + m_state = em ? ProvingNormal : ProvingEmergency; + if (m_autostart) + startAlignment(); + return true; + } + case Status: + return operational(); + default: + return false; + } +} + +void SS7M2PA::startAlignment(bool emergency) +{ + setLocalStatus(OutOfService); + transmitLS(); + setLocalStatus(Alignment); + SS7Layer2::notify(); +} + +void SS7M2PA::transmitLS(int streamId) +{ + if (m_transportState != Established) + return; + DataBlock data; + setHeader(data); + u_int8_t ms[4]; + ms[1] = ms[2] = ms[3] = ms[0] = 0; + ms[3] = m_localStatus; + data.append(ms,4); + if (m_dumpMsg) + dumpMsg(1,M2PA, 2,data,streamId,true); + transmitMSG(1,M2PA, 2, data,streamId); + XDebug(this,DebugInfo,"Sending LinkStatus %s",lookup(m_localStatus,s_state)); +} + +void SS7M2PA::setHeader(DataBlock& data) +{ + u_int8_t head[8]; + head[0] = head[4] = 0; + head[1] = (m_seqNr >> 16) & 0xff; + head[2] = (m_seqNr >> 8) & 0xff; + head[3] = m_seqNr & 0xff ; + head[5] = (m_needToAck >> 16) & 0xff; + head[6] = (m_needToAck >> 8) & 0xff; + head[7] = m_needToAck & 0xff ; + data.append(head,8); +} + +void SS7M2PA::abortAlignment(String from) +{ + DDebug(this,DebugNote,"Aborting alignment: %s",from.c_str()); + setLocalStatus(OutOfService); + setRemoteStatus(OutOfService); + m_needToAck = m_lastAck = m_seqNr = 0; + if (m_confTimer.started()) + m_confTimer.stop(); + if (m_ackTimer.started()) + m_ackTimer.stop(); + if (m_t2.started()) + m_t2.stop(); + if (m_t3.started()) + m_t3.stop(); + if (m_t4.started()) + m_t4.stop(); + if (m_t1.started()) + m_t1.stop(); + if (m_state == ProvingNormal || m_state == ProvingEmergency) + startAlignment(); + SS7Layer2::notify(); +} + +bool SS7M2PA::processLinkStatus(DataBlock& data,int streamId) +{ + if (data.length() < 4) + return false; + u_int32_t status = (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; + if (m_remoteStatus == status && status != OutOfService) + return true; + + XDebug(this,DebugAll,"Received link status: %s , local status : %s , requested status %s", + lookup(status,s_state),lookup(m_localStatus,s_state),lookup(m_state,s_state)); + switch (status) { + case Alignment: + if (m_t2.started()) { + m_t2.stop(); + setLocalStatus(m_state); + m_t3.start(); + transmitLS(); + } + else if (m_state == ProvingNormal || m_state == ProvingEmergency) + transmitLS(); + else + return false; + setRemoteStatus(status); + break; + case ProvingNormal: + case ProvingEmergency: + if (m_localStatus != ProvingNormal && m_localStatus != ProvingEmergency && + (m_localStatus == Alignment && m_t3.started())) + return false; + if (m_t3.started()) { + m_t3.stop(); + m_t4.start(); + } + else if (m_state == ProvingNormal || m_state == ProvingEmergency) { + setLocalStatus(status); + transmitLS(); + m_t4.start(); + } + setRemoteStatus(status); + break; + case Ready: + if (m_localStatus != Ready) { + setLocalStatus(Ready); + transmitLS(); + } + setRemoteStatus(status); + SS7Layer2::notify(); + if (m_t4.started()) + m_t4.stop(); + if (m_t1.started()) + m_t1.stop(); + if (m_bufMsg.count()) + dequeueMsg(); + break; + case ProcessorRecovered: + transmitLS(); + setRemoteStatus(status); + break; + case BusyEnded: + setRemoteStatus(Ready); + SS7Layer2::notify(); + break; + case ProcessorOutage: + case Busy: + setRemoteStatus(status); + SS7Layer2::notify(); + break; + case OutOfService: + if (m_localStatus == Ready) { + abortAlignment("Received : LinkStatus Out of service, local status Ready"); + SS7Layer2::notify(); + } + if ((m_state == ProvingNormal || m_state == ProvingEmergency)) { + if (m_localStatus == Alignment) { + transmitLS(); + m_t2.start(); + } else if (m_localStatus == OutOfService) + startAlignment(); + else + return false; + } + setRemoteStatus(status); + break; + default: + Debug(this,DebugNote,"Received unknown link status message %d",status); + return false; + } + return true; +} + +ObjList* SS7M2PA::recoverMSU() +{ + Lock lock(m_mutex); + ObjList* lst = 0; + for (;;) { + DataBlock* pkt = static_cast(m_ackList.remove(false)); + if (!pkt) + break; + if (pkt->length() > 8) { + SS7MSU* msu = new SS7MSU(8 + (char*)pkt->data(),pkt->length() - 8); + if (!lst) + lst = new ObjList; + lst->append(msu); + } + TelEngine::destruct(pkt); + } + return lst; +} + +void SS7M2PA::retransData() +{ + for (ObjList* o = m_ackList.skipNull();o;o = o->skipNext()) { + DataBlock* msg = static_cast(o->get()); + u_int8_t* head = (u_int8_t*)msg->data(); + head[5] = (m_needToAck >> 16) & 0xff; + head[6] = (m_needToAck >> 8) & 0xff; + head[7] = m_needToAck & 0xff ; + if (m_confTimer.started()) + m_confTimer.stop(); + transmitMSG(1,M2PA, 1, *msg,1); + if (!m_ackTimer.started()) + m_ackTimer.start(); + } +} + +void SS7M2PA::dequeueMsg() +{ + for (ObjList* o = m_bufMsg.skipNull();o;o = o->skipNext()) { + DataBlock* msg = static_cast(o->get()); + if (!decodeSeq(*msg,UserData)) + return; + msg->cut(-8); // Remove M2PA Header + SS7MSU msu(*msg); + receivedMSU(msu); + sendAck(); + } +} + +bool SS7M2PA::transmitMSU(const SS7MSU& msu) +{ + if (msu.length() < 3) { + Debug(this,DebugWarn,"Asked to send too short MSU of length %u [%p]", + msu.length(),this); + return false; + } + // If we don't have an attached interface don't bother + if (!transport()) + return false; + Lock lock(m_mutex); + DataBlock packet; + increment(m_seqNr); + setHeader(packet); + if (m_confTimer.started()) + m_confTimer.stop(); + packet += msu; + m_ackList.append(new DataBlock(packet)); + if (m_dumpMsg) + dumpMsg(1,M2PA,1,packet,1,true); + bool ok = transmitMSG(1,M2PA,1,packet,1); + lock.drop(); + if (!m_ackTimer.started()) + m_ackTimer.start(); + return ok; +} + +void SS7M2PA::notifyLayer(SignallingInterface::Notification event) +{ + switch (event) { + case SignallingInterface::LinkDown: + m_transportState = Idle; + m_seqNr = m_needToAck = m_lastAck = 0; + abortAlignment("LinkDown"); + SS7Layer2::notify(); + break; + case SignallingInterface::LinkUp: + m_transportState = Established; + Debug(this,DebugInfo,"Interface is up [%p]",this); + if (m_autostart) + startAlignment(); + SS7Layer2::notify(); + break; + default: + return; + } +} + /* vi: set ts=8 sw=4 sts=4 noet: */ diff --git a/libs/ysig/yatesig.h b/libs/ysig/yatesig.h index e32eea49..3b0951bd 100644 --- a/libs/ysig/yatesig.h +++ b/libs/ysig/yatesig.h @@ -3788,9 +3788,25 @@ public: * Get the SIGTRAN component attached to this transport * @return Pointer to adaptation layer or NULL */ - inline SIGTRAN* sigtran() const + inline SIGTRAN* sigtran() const { return m_sigtran; } + /** + * Check if transport layer is reliable + * @return true if transport is reliable + */ + virtual bool reliable() const = 0; + + void notifyLayer(SignallingInterface::Notification status); + + /** + * Configure and initialize the component and any subcomponents it may have + * @param config Optional configuration parameters override + * @return True if the component was initialized properly + */ + virtual bool initialize(const NamedList* config) + { return false;} + /** * Check if the network transport layer is connected * @param streamId Identifier of the stream to check if applicable @@ -3798,27 +3814,12 @@ public: */ virtual bool connected(int streamId) const = 0; -protected: - /** - * Constructor - * @param name Default empty component name - */ - inline SIGTransport(const char* name = 0) - : SignallingComponent(name), m_sigtran(0) - { } - /** * Attach an user adaptation layer * @param sigtran SIGTRAN component to attach, can be NULL */ void attach(SIGTRAN* sigtran); - /** - * Notification if the attached state changed - * @param hasUAL True if an User Adaptation Layer is now attached - */ - virtual void attached(bool hasUAL) = 0; - /** * Send a complete message to the adaptation layer for processing * @param msgVersion Version of the protocol @@ -3831,6 +3832,21 @@ protected: bool processMSG(unsigned char msgVersion, unsigned char msgClass, unsigned char msgType, const DataBlock& msg, int streamId) const; +protected: + /** + * Constructor + * @param name Default empty component name + */ + inline SIGTransport(const char* name = 0) + : SignallingComponent(name), m_sigtran(0) + { } + + /** + * Notification if the attached state changed + * @param hasUAL True if an User Adaptation Layer is now attached + */ + virtual void attached(bool hasUAL) = 0; + /** * Transmit a message to the network * @param msgVersion Version of the protocol @@ -3924,6 +3940,9 @@ public: */ bool connected(int streamId = 0) const; + virtual void notifyLayer(SignallingInterface::Notification status) + {} + /** * Message class names dictionary * @return Pointer to dictionary of message classes @@ -4676,6 +4695,211 @@ protected: */ class YSIG_API SS7M2PA : public SS7Layer2, public SIGTRAN { +public: + enum m2paState { + Alignment = 1, + ProvingNormal = 2, + ProvingEmergency = 3, + Ready = 4, + ProcessorOutage = 5, + ProcessorRecovered = 6, + Busy = 7, + BusyEnded = 8, + OutOfService = 9, + }; + + enum msgType { + UserData = 1, + LinkStatus = 2 + }; + + enum sctpState { + Idle, + Associating, + Established + }; + + /** + * Constructor + */ + SS7M2PA(const NamedList& params); + + /** + * Destructor + */ + ~SS7M2PA(); + + /** + * Configure and initialize MTP2 and its interface + * @param config Optional configuration parameters override + * @return True if MTP2 and the interface were initialized properly + */ + virtual bool initialize(const NamedList* config); + + /** + * Execute a control operation. Operations can change the link status or + * can query the aligned status. + * @param oper Operation to execute + * @param params Optional parameters for the operation + * @return True if the command completed successfully, for query operations + * also indicates the data link is aligned and operational + */ + virtual bool control(Operation oper, NamedList* params = 0); + + /** + * Push a Message Signal Unit down the protocol stack + * @param msu MSU data to transmit + * @return True if message was successfully queued + */ + virtual bool transmitMSU(const SS7MSU& msu); + + /** + * Method called when the transport status has been changed + * @param status Up or down + */ + virtual void notifyLayer(SignallingInterface::Notification status); + + /** + * Remove the MSUs waiting in the transmit queue and return them + * @return List of MSUs taken from the queue + */ + virtual ObjList* recoverMSU(); + + /** + * Decode sequence numbers from message and process them + * @param data The message + * @param msgType The message type + * @return True if sequence numbers ar as we expected to be + */ + bool decodeSeq(const DataBlock& data, u_int8_t msgType); + + /** + * Helper method called when an error was detected + * Change state to OutOfService and notifys upper layer + * @param from Debuging purpose, Information about detected error + */ + void abortAlignment(String from); + + /** + * Send link status message to inform the peer about ouer curent state + * @param streamId The id of the stream who should send the message + */ + void transmitLS(int streamId = 0); + + /** + * Create M2PA header (sequence numbers) + * @param data The data where the header will be stored + */ + void setHeader(DataBlock& data); + + /** + * Decode and process link status message + * @param data The message + * @param streamId The stream id witch received the message + * @return True if the message was procesed + */ + bool processLinkStatus(DataBlock& data, int streamId); + + /** + * Helper method used to acknowledge the last received message + * when no data are to transmit + */ + void sendAck(); + + /** + * Remove a frame from acknowledgement list + * @param bsn The sequence number of the frame to be removed + */ + void removeFrame(u_int32_t bsn); + + /** + * Increment the given sequence number + * @param nr The number to increment + * @return The incremented number + */ + inline u_int32_t increment(u_int32_t &nr) + { return (nr == 0xffffff) ?(nr = 0) : nr++; } + + /** + * Obtain next sequence number + * @param nr The number + * @return The next number in sequence + */ + inline u_int32_t getNext(u_int32_t nr) + { return (nr == 0xffffff) ? 0 : nr + 1; } + +protected: + + /** + * Periodical timer tick used to perform alignment and housekeeping + * @param when Time to use as computing base for events and timeouts + */ + virtual void timerTick(const Time& when); + + /** + * Check if the link is aligned. + * The link may not be operational, the other side may be still proving. + * @return True if the link is aligned + */ + virtual bool aligned() const; + + /** + * Check if the link is aligned and operational + * @return True if the link is operational + */ + virtual bool operational() const; + + /** + * Process a complete message + * @param msgVersion Version of the protocol + * @param msgClass Class of the message + * @param msgType Type of the message, depends on the class + * @param msg Message data, may be empty + * @param streamId Identifier of the stream the message was received on + * @return True if the message was handled + */ + virtual bool processMSG(unsigned char msgVersion, unsigned char msgClass, + unsigned char msgType, const DataBlock& msg, int streamId); + + /** + * Initiates alignment and proving procedure + * @param emergency True if emergency alignment is desired + */ + void startAlignment(bool emergency = false); + + /** + * Retransmit unacknowledged data + */ + void retransData(); + + /** + * Transmit the buffered messages to mtp3 and acknowledge them + */ + void dequeueMsg(); +private: + void dumpMsg(u_int8_t version, u_int8_t mClass, u_int8_t type, + const DataBlock& data, int stream, bool send); + void setLocalStatus(unsigned int status); + void setRemoteStatus(unsigned int status); + u_int32_t m_seqNr; + u_int32_t m_needToAck; + u_int32_t m_lastAck; + unsigned int m_localStatus; + unsigned int m_state; + unsigned int m_remoteStatus; + unsigned int m_transportState; + Mutex m_mutex; + bool m_reliable; + bool m_autostart; + ObjList m_ackList; + ObjList m_bufMsg; + SignallingTimer m_t1; + SignallingTimer m_t2; + SignallingTimer m_t3; + SignallingTimer m_t4; + SignallingTimer m_ackTimer; + SignallingTimer m_confTimer; + bool m_dumpMsg; }; /** diff --git a/modules/Makefile.in b/modules/Makefile.in index 52a74d90..a5f53bef 100644 --- a/modules/Makefile.in +++ b/modules/Makefile.in @@ -43,7 +43,7 @@ PROGS := cdrbuild.yate cdrfile.yate regexroute.yate \ server/mgcpgw.yate server/mgcpca.yate \ server/mrcpspeech.yate \ server/ysigchan.yate \ - server/ciscosm.yate \ + server/ciscosm.yate server/sigtransport.yate \ server/presence.yate server/subscription.yate \ server/users.yate \ server/analog.yate server/analogdetect.yate \ @@ -91,6 +91,10 @@ endif endif endif +ifneq (@HAVE_SCTP_NETINET@,no) +PROGS := $(PROGS) server/lksctp.yate +endif + ifneq (@HAVE_SPANDSP@,no) PROGS := $(PROGS) faxchan.yate endif @@ -219,11 +223,11 @@ qt4/%.moc: @srcdir@/qt4/%.h $(MKDEPS) $(INCFILES) # Take special care of the modules that depend on optional libs -server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate: ../libyatesig.so -server/ysigchan.yate server/analog.yate server/ciscosm.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig +server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: ../libyatesig.so +server/ysigchan.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig server/wpcard.yate server/tdmcard.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig @WANPIPE_FLAGS@ server/zapcard.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig @ZAP_FLAGS@ -server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate: LOCALLIBS = -lyatesig +server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: LOCALLIBS = -lyatesig server/analogdetect.yate: ../libs/ymodem/libyatemodem.a server/analogdetect.yate: LOCALFLAGS = -I@top_srcdir@/libs/ymodem @@ -262,6 +266,9 @@ server/mgcpgw.yate: ../libyatemgcp.so server/mgcpgw.yate: LOCALFLAGS = -I@top_srcdir@/libs/ymgcp server/mgcpgw.yate: LOCALLIBS = -lyatemgcp +server/lksctp.yate: LOCALFLAGS = @SCTP_FLAGS@ +server/lksctp.yate: LOCALLIBS = -lsctp + ilbccodec.yate: ../libs/ilbc/libilbc.a ilbccodec.yate: LOCALLIBS = -L../libs/ilbc -lilbc ilbccodec.yate: LOCALFLAGS = @ILBC_INC@ diff --git a/modules/server/lksctp.cpp b/modules/server/lksctp.cpp new file mode 100644 index 00000000..6b05034b --- /dev/null +++ b/modules/server/lksctp.cpp @@ -0,0 +1,274 @@ +/** + * lksctp.cpp + * This file is part of the YATE Project http://YATE.null.ro + * + * SCTP sockets provider based on Linux Kernel SCTP + * + * Yet Another Telephony Engine - a fully featured software PBX and IVR + * Copyright (C) 2009-2010 Null Team + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include +#include +#include + +using namespace TelEngine; +namespace { // anonymous + +class LKSocket; +class LKModule; +class LKHandler; + +class LKSocket : public SctpSocket +{ +public: + LKSocket(); + LKSocket(SOCKET fd); + virtual ~LKSocket(); + virtual bool bindx(ObjList& addrs); + virtual bool connectx(ObjList& addrs); + virtual int sendMsg(const void* buf, int length, int stream, int& flags); + virtual int recvMsg(void* buf, int length, SocketAddr& addr, int& stream, int& flags); + virtual Socket* accept(SocketAddr& addr); + virtual bool setStreams(int inbound, int outbound); + virtual bool getStreams(int& in, int& out); + virtual bool subscribeEvents(); + virtual int sendTo(void* buf, int buflen, int stream, SocketAddr& addr, int flags); + bool sctpDown(void* buf); + bool sctpUp(void* buf); +private: + int m_inbound; + int m_outbound; +}; + +class LKHandler : public MessageHandler +{ +public: + LKHandler() : MessageHandler("socket.sctp") { } + virtual bool received(Message &msg); +}; + +class LKModule : public Module +{ +public: + LKModule(); + ~LKModule(); + virtual void initialize(); +private: + bool m_init; +}; + +static LKModule plugin; + +/** + * class LKSocket + */ + +LKSocket::LKSocket() +{ + XDebug(&plugin,DebugAll,"Creating LKSocket [%p]",this); +} + +LKSocket::LKSocket(SOCKET fd) + : SctpSocket(fd) +{ + XDebug(&plugin,DebugAll,"Creating LKSocket [%p]",this); +} + +LKSocket::~LKSocket() +{ + XDebug(&plugin,DebugAl,"Destroying LKSocket [%p]",this); +} + +bool LKSocket::bindx(ObjList& addresses) +{ + struct sockaddr addr[addresses.count()]; + int i = 0; + for (ObjList* o = addresses.skipNull();o;o = o->skipNext()) { + SocketAddr* a = static_cast(o->get()); + addr[i++] = *(a->address()); + } + int error = sctp_bindx(handle(),addr,addresses.count(),SCTP_BINDX_ADD_ADDR); + return (error >= 0); +} + +bool LKSocket::connectx(ObjList& addresses) +{ + struct sockaddr addr[addresses.count()]; + int i = 0; + for (ObjList* o = addresses.skipNull();o;o = o->skipNext()) { + SocketAddr* a = static_cast(o->get()); + addr[i++] = *(a->address()); + } + int error = sctp_connectx(handle(),addr,addresses.count(),NULL); + return (error >= 0); +} + +Socket* LKSocket::accept(SocketAddr& addr) +{ + struct sockaddr address; + socklen_t len = 0; + SOCKET sock = acceptHandle(&address,&len); + LKSocket* ret = (sock == invalidHandle()) ? 0 : new LKSocket(sock); + if (ret) + addr.assign(&address,len); + return ret; +} + +int LKSocket::recvMsg(void* buf, int length, SocketAddr& addr, int& stream, int& flags) +{ + sctp_sndrcvinfo sri; + memset(&sri,0,sizeof(sri)); + struct sockaddr address; + socklen_t len = 0; + int flag = 0; + int r = sctp_recvmsg(handle(),buf,length,&address,&len,&sri,&flag); + addr.assign(&address,len); + if (flag & MSG_NOTIFICATION) { + if (sctpDown(buf)) { + flags = 1; + } + else if (sctpUp(buf)) + flags = 2; + else + flags = 0; + r = 0; + } + stream = sri.sinfo_stream; + return r; +} + +int LKSocket::sendMsg(const void* buf, int length, int stream, int& flags) +{ + sctp_sndrcvinfo sri; + memset(&sri,0,sizeof(sri)); + sri.sinfo_stream = stream; + int r = sctp_send(handle(),buf,length,&sri,flags); + return r; +} + +int LKSocket::sendTo(void* buf, int buflen, int stream, SocketAddr& addr, int flags) +{ + return sctp_sendmsg(handle(),buf,buflen,addr.address(),addr.length(),0,flags,stream,0,0); +} + +bool LKSocket::setStreams(int inbound, int outbound) +{ + sctp_initmsg initMsg; + memset(&initMsg,0,sizeof(initMsg)); + initMsg.sinit_max_instreams = inbound; + initMsg.sinit_num_ostreams = outbound; + if (setsockopt(handle(),IPPROTO_SCTP,SCTP_INITMSG,&initMsg,sizeof(initMsg)) < 0) { + DDebug(&plugin,DebugNote,"Unable to set streams number. Error: %s",strerror(errno)); + return false; + } + return true; +} + +bool LKSocket::subscribeEvents() +{ + struct sctp_event_subscribe events; + bzero(&events, sizeof(events)); + events.sctp_data_io_event = 1; + events.sctp_send_failure_event = 1; + events.sctp_peer_error_event = 1; + events.sctp_shutdown_event = 1; + events.sctp_association_event = 1; + int ret = setsockopt(handle(),IPPROTO_SCTP,SCTP_EVENTS, &events, sizeof(events)); + return (ret != -1); +} + +bool LKSocket::getStreams(int& in, int& out) +{ + sctp_status status; + memset(&status,0,sizeof(status)); + socklen_t len; + if (getsockopt(handle(),IPPROTO_SCTP,SCTP_STATUS, &status,&len) < 0) { + DDebug(&plugin,DebugNote,"Unable to find the number of negotiated streams: %s", + strerror(errno)); + return false; + } + XDebug(&plugin,DebugAll,"Sctp streams inbound = %u , outbound = %u", + status.sstat_instrms,status.sstat_outstrms); + m_inbound = status.sstat_instrms; + m_outbound = status.sstat_outstrms; + return true; +} + +bool LKSocket::sctpDown(void* buf) +{ + union sctp_notification *sn = (union sctp_notification *)buf; + switch (sn->sn_assoc_change.sac_state) { + case SCTP_COMM_LOST: + case SCTP_SHUTDOWN_COMP: + case SCTP_CANT_STR_ASSOC: + case SCTP_RESTART: + return true; + } + return false; +} + +bool LKSocket::sctpUp(void* buf) +{ + union sctp_notification *sn = (union sctp_notification *)buf; + switch (sn->sn_assoc_change.sac_state) { + case SCTP_COMM_UP: + return true; + } + return false; +} + +/** + * class LKHandler + */ + +bool LKHandler::received(Message &msg) +{ + Socket** ppSock = static_cast(msg.userObject("Socket*")); + int fd = msg.getIntValue("handle",-1); + *ppSock = new LKSocket(fd); + return true; +} + +/** + * class LKModule + */ + +LKModule::LKModule() + : Module("lksctp","misc",true), + m_init(false) +{ + Output("Loading module LKSCTP"); +} + +LKModule::~LKModule() +{ + Output("Unloading module LKSCTP"); +} + +void LKModule::initialize() +{ + if (!m_init) { + Output("Initialize module LKSCTP"); + m_init = true; + Engine::install(new LKHandler()); + } +} + +}; // anonymous namespace + +/* vi: set ts=8 sw=4 sts=4 noet: */ diff --git a/modules/server/sigtransport.cpp b/modules/server/sigtransport.cpp new file mode 100644 index 00000000..9e1376f0 --- /dev/null +++ b/modules/server/sigtransport.cpp @@ -0,0 +1,1164 @@ +/** + * sigtransport.cpp + * This file is part of the YATE Project http://YATE.null.ro + * + * SIGTRAN transports provider, supports SCTP, TCP, UDP, UNIX sockets + * + * Yet Another Telephony Engine - a fully featured software PBX and IVR + * Copyright (C) 2009-2010 Null Team + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include + +#define MAX_BUF_SIZE 48500 + +using namespace TelEngine; +namespace { // anonymous + +class Transport; +class TransportWorker; +class TransportThread; +class SockRef; +class MessageReader; +class TReader; +class StreamReader; +class ListenThread; +class TransportModule; + +class TransportWorker +{ + friend class TransportThread; +public: + inline TransportWorker() + : m_thread(0) + { } + virtual ~TransportWorker() { stop(); } + virtual bool readData() = 0; + virtual bool connectSocket() = 0; + virtual bool needConnect() = 0; + bool running(); + bool start(Thread::Priority prio = Thread::Normal); + inline void resetThread() + { m_thread = 0; } +protected: + void stop(); +private: + TransportThread* m_thread; +}; + +class SockRef : public RefObject +{ +public: + inline SockRef(Socket** sock) + : m_sock(sock) + { } + void* getObject(const String& name) const + { + if (name == "Socket*") + return m_sock; + return RefObject::getObject(name); + } +private: + Socket** m_sock; +}; + +class TransportThread : public Thread +{ + friend class TransportWorker; +public: + inline TransportThread(TransportWorker* worker, Priority prio = Normal) + : Thread("SignallingTransporter",prio), m_worker(worker) + { } + virtual ~TransportThread(); + virtual void run(); +private: + TransportWorker* m_worker; +}; + +class ListenerThread : public Thread +{ +public: + bool init(const NamedList& param); + ListenerThread(Transport* trans); + ~ListenerThread(); + inline void terminate() + { cancel(); } + virtual void run(); + bool addAddress(const NamedList ¶m); +private: + Socket* m_socket; + Transport* m_transport; + bool m_stream; +}; + +class TReader : public TransportWorker, public Mutex +{ +public: + inline TReader() + : Mutex(true,"TReader"), m_canSend(true), m_isSending(false) + { } + virtual ~TReader(); + virtual void listen(int maxConn) = 0; + virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0; + virtual void setSocket(Socket* s) = 0; + bool m_canSend; + bool m_isSending; +}; + +class Transport : public SIGTransport +{ +public: + enum TransportType { + None = 0, + Sctp, + // All the following transports are not standard + Tcp, + Udp, + Unix, + }; + enum State { + Up, + Initiating, + Down + }; + virtual bool initialize(const NamedList* config); + Transport(const NamedList& params); + ~Transport(); + inline unsigned char getVersion(unsigned char* buf) + { return buf[0]; } + inline unsigned char getType(unsigned char* buf) + { return buf[3]; } + inline unsigned char getClass(unsigned char* buf) + { return buf[2]; } + inline int transType() + { return m_type; } + inline bool listen() + { return m_listener != 0; } + void setStatus(int status); + inline int status() + { return m_state; } + inline void resetListener() + { m_listener = 0; } + bool addSocket(Socket* socket,SocketAddr& adress); + virtual bool reliable() const + { return m_type == Sctp || m_type == Tcp; } + virtual bool control(const NamedList ¶m); + virtual bool connected(int id) const + { return true;} + virtual void attached(bool ual) + { } + bool connectSocket(); + u_int32_t getMsgLen(unsigned char* buf); + bool addAddress(const NamedList ¶m, Socket* socket); + bool bindSocket(); + static SignallingComponent* create(const String& type,const NamedList& params); + virtual bool transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0); +private: + TReader* m_reader; + bool m_streamer; + int m_type; + int m_state; + ListenerThread* m_listener; + const NamedList m_config; + bool m_endpoint; +}; + +class StreamReader : public TReader +{ +public: + StreamReader(Transport* transport, Socket* sock); + ~StreamReader(); + virtual bool readData(); + virtual bool connectSocket() + { return m_transport->connectSocket(); } + virtual bool needConnect() + { return m_transport && m_transport->status() == Transport::Down && !m_transport->listen(); } + virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0); + virtual void setSocket(Socket* s); + virtual void listen(int maxConn) + { } +private: + Transport* m_transport; + Socket* m_socket; + DataBlock m_sendBuffer; + DataBlock m_recvBuffer; + DataBlock m_headerBuffer; + int m_headerLen; + DataBlock m_readBuffer; + u_int32_t m_totalPacketLen; +}; + +class MessageReader : public TReader +{ +public: + MessageReader(Transport* transport, Socket* sock, SocketAddr& remote); + ~MessageReader(); + virtual bool readData(); + virtual bool connectSocket() + { return m_transport->connectSocket(); } + virtual bool needConnect() + { return m_transport && m_transport->status() == Transport::Down && !m_transport->listen(); } + virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0); + virtual void setSocket(Socket* s); + virtual void listen(int maxConn) + { m_socket->listen(maxConn); } +private: + Socket* m_socket; + Transport* m_transport; + SocketAddr m_remote; +}; + +class TransportModule : public Module +{ +public: + TransportModule(); + ~TransportModule(); + virtual void initialize(); +private: + bool m_init; +}; + +static TransportModule plugin; +YSIGFACTORY2(Transport); + +static const TokenDict s_transType[] = { + { "none", Transport::None }, + { "sctp", Transport::Sctp }, + { "tcp", Transport::Tcp }, + { "udp", Transport::Udp }, + { "unix", Transport::Unix }, + { 0, 0 } +}; + +static void resolveAddress(const String& addr, String& ip, int& port) +{ + ObjList* o = addr.split(':'); + if (o && o->count() < 2) { + ip = "0.0.0.0"; + port = 3565; + return; + } + String* host = static_cast(o->get()); + if (host) + ip = *host; + o = o->skipNext(); + String* p = static_cast(o->get()); + if (p) + port = p->toInteger(); +} + +/** ListenerThread class */ + +ListenerThread::ListenerThread(Transport* trans) + : Thread("Listener Thread"), + m_socket(0), m_transport(trans), m_stream(true) +{ + DDebug("Transport Listener:",DebugAll,"Creating ListenerThread (%p)",this); +} + +ListenerThread::~ListenerThread() +{ + DDebug("Transport Listener",DebugAll,"Destroing ListenerThread (%p)",this); + if (m_transport) { + Debug("Transport Listener",DebugWarn,"Unusual exit"); + m_transport->resetListener(); + } + m_transport = 0; + m_socket->terminate(); + delete m_socket; +} + +bool ListenerThread::init(const NamedList& param) +{ + m_socket = new Socket(); + bool multi = param.getParam("local1") != 0; + m_stream = param.getBoolValue("stream",true); + if (multi && m_transport->transType() != Transport::Sctp) { + Debug("ListenerThread",DebugWarn,"Socket %s does not suport multihomed", + lookup(m_transport->transType(),s_transType)); + return false; + } + switch (m_transport->transType()) { + case Transport::Sctp: + { + Socket* soc = 0; + Message m("socket.sctp"); + SockRef* s = new SockRef(&soc); + m.userData(s); + if (!(Engine::dispatch(m) && soc)) { + DDebug("ListenerThread",DebugWarn,"Could not obtain SctpSocket"); + return false; + } + m_socket = soc; + m_socket->create(AF_INET,m_stream ? SOCK_STREAM : SOCK_SEQPACKET,IPPROTO_SCTP); + break; + } + case Transport::Tcp: + m_socket->create(AF_INET,SOCK_STREAM); + break; + case Transport::Udp: + m_socket->create(AF_INET,SOCK_DGRAM); + break; + case Transport::Unix: + m_socket->create(AF_UNIX,SOCK_STREAM); + break; + default: + Debug("ListenerThread",DebugWarn,"Unknown type of socket"); + } + if (!m_socket->valid()) { + Debug("ListenerThread",DebugWarn,"Unable to create listener socket: %s", + strerror(m_socket->error())); + return false; + } + if (!m_socket->setBlocking(false)) { + DDebug("ListenerThread",DebugWarn,"Unable to set listener to nonblocking mode"); + return false; + } + SocketAddr addr(AF_INET); + String address, adr = param.getValue("local"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + if (!m_socket->bind(addr)) { + Debug(DebugWarn,"Unable to bind to %s:%u %s",addr.host().c_str(),addr.port(),strerror(errno)); + return false; + } else + DDebug("ListenerThread",DebugAll,"Socket bind to %s:%u", + addr.host().c_str(),addr.port()); + if (multi && !addAddress(param)) + return false; + if (!m_socket->listen(3)) { + DDebug("ListenerThread",DebugWarn,"Unable to listen on socket: %s", strerror(m_socket->error())); + return false; + } + return true; +} + +void ListenerThread::run() +{ + if (!m_stream) + return; + for (;;) + { + Thread::msleep(50,false); + if (check(false) || Engine::exiting()) + break; + SocketAddr address; + Socket* newSoc = m_socket->accept(address); + if (!newSoc) { + if (!m_socket->canRetry()) + DDebug("ListenerThread",DebugNote, "Accept error: %s", strerror(m_socket->error())); + continue; + } else { + if (!m_transport->addSocket(newSoc,address)) + DDebug("ListenerThread",DebugNote,"Connection rejected for %s", + address.host().c_str()); + Debug(DebugStub,"See if should be done peeloff for new incomming connections"); + } + } + if (m_transport) { + m_transport->resetListener(); + m_transport = 0; + } +} + +bool ListenerThread::addAddress(const NamedList ¶m) +{ + ObjList o; + for (int i = 1; ; i++) { + SocketAddr* addr = new SocketAddr(AF_INET); + String temp = "local"; + temp += i; + const String* adr = param.getParam(temp); + if (!adr) + break; + String address; + int port; + resolveAddress(*adr,address,port); + addr->host(address); + addr->port(port); + o.append(addr); + } + SctpSocket* s = static_cast(m_socket); + if (!s) { + Debug("ListenerThread",DebugGoOn,"Failed to cast socket"); + return false; + } + if (!s->bindx(o)) { + DDebug("ListenerThread",DebugNote,"Failed to bindx sctp socket [%p] %s",s,strerror(errno)); + return false; + } else + Debug(DebugNote,"Socket binded to %d auxiliar addresses",o.count()); + return true; +} + + +/** TransportThread class */ + +TransportThread::~TransportThread() +{ + if (m_worker) + m_worker->resetThread(); + DDebug(DebugAll,"Destroing Transport Thread [%p]",this); +} + +void TransportThread::run() +{ + if (!m_worker) + return; + while (true) { + bool ret = false; + if (m_worker->needConnect()) + ret = m_worker->connectSocket(); + else + ret = m_worker->readData(); + if (ret) + Thread::check(true); + else + Thread::msleep(5,true); + } +} + +TReader::~TReader() +{ + Debug(DebugAll,"Destroing TReader [%p]",this); +} + +/** TransportWorker class */ + +bool TransportWorker::running() +{ + return m_thread && m_thread->running(); +} + +bool TransportWorker::start(Thread::Priority prio) +{ + if (!m_thread) + m_thread = new TransportThread(this,prio); + if (m_thread->running()) + return true; + if (m_thread->startup()) + return true; + m_thread->cancel(true); + m_thread = 0; + return false; +} + +void TransportWorker::stop() +{ + if (!m_thread) + return; + if (m_thread->running()) + m_thread->cancel(); +} + +/** + * Transport calss + */ + +SignallingComponent* Transport::create(const String& type, const NamedList& name) +{ + if (type != "SIGTransport") + return 0; + Configuration cfg(Engine::configFile("sigtransport")); + cfg.load(); + + const char* sectName = name.getValue("basename"); + NamedList* config = cfg.getSection(sectName); + if (!config) { + DDebug(&plugin,DebugWarn,"No section '%s' in configuration",c_safe(sectName)); + return 0; + } + return new Transport(*config); +} + +Transport::Transport(const NamedList ¶m) + : m_reader(0), m_state(Down), m_listener(0), m_config(param), m_endpoint(true) +{ + setName("Transport"); + Debug(this,DebugAll,"Transport created (%p)",this); +} + +Transport::~Transport() +{ + if (m_listener) + m_listener->terminate(); + while (m_listener) + Thread::yield(); + DDebug(this,DebugGoOn,"Destroing Transport [%p]",this); + delete m_reader; +} + +bool Transport::control(const NamedList ¶m) +{ + String oper = param.getValue("operation","init"); + if (oper == "init") + return initialize(¶m); + else if (oper == "add_addr") { + if (!m_listener) { + Debug(this,DebugWarn,"Unable to listen on another address ,listener is missing"); + return false; + } + return m_listener->addAddress(param); + } + return false; +} + +bool Transport::initialize(const NamedList* params) +{ + Configuration cfg(Engine::configFile("sigtransport")); + cfg.load(); + const char* sectName = params->getValue("basename"); + NamedList* config = cfg.getSection(sectName); + if (!config) { + DDebug(&plugin,DebugWarn,"No section '%s' in configuration",c_safe(sectName)); + return false; + } + m_type = lookup(config->getValue("type","sctp"),s_transType); + m_streamer = config->getBoolValue("stream",true); + m_endpoint = config->getBoolValue("endpoint",false); + if (!m_endpoint && m_streamer) { + m_listener = new ListenerThread(this); + if (!m_listener->init(*config)) { + DDebug(this,DebugNote,"Unable to start listener"); + return false; + } + m_listener->startup(); + return true; + } + if (m_streamer) + m_reader = new StreamReader(this,0); + else { + SocketAddr addr(AF_INET); + String address, adr = m_config.getValue("remote"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + m_reader = new MessageReader(this,0,addr); + bindSocket(); + if (m_type == Sctp) { + // Send an empty message to create the connection + DataBlock data; + data.append("0"); + if(m_reader->sendMSG(data,data,1)) + m_reader->listen(1); + setStatus(Up); + } + } + m_reader->start(); + return true; +} + +bool Transport::bindSocket() +{ + Socket* socket = new Socket(); + bool multi = m_config.getParam("local1") != 0; + if (multi && transType() != Transport::Sctp) { + Debug(this,DebugWarn,"Sockets type %s do not suport multihomed", + lookup(transType(),s_transType)); + return false; + } + switch (m_type) { + case Transport::Sctp: + { + Socket* soc = 0; + Message m("socket.sctp"); + SockRef* s = new SockRef(&soc); + m.userData(s); + if (!(Engine::dispatch(m) && soc)) { + DDebug("ListenerThread",DebugWarn,"Could not obtain SctpSocket"); + return false; + } + socket = soc; + socket->create(AF_INET,SOCK_SEQPACKET,IPPROTO_SCTP); + SctpSocket* sctp = static_cast(socket); + if (!sctp->setStreams(2,2)) + DDebug(this,DebugInfo,"Failed to set sctp stream number"); + if (!sctp->subscribeEvents()) + DDebug(this,DebugInfo,"Unable to subscribe to Sctp events"); + break; + } + case Transport::Udp: + socket->create(AF_INET,SOCK_DGRAM); + break; + default: + DDebug(this,DebugWarn,"Unknown/unwanted type of socket %s", + lookup(transType(),s_transType,"Unknown")); + } + if (!socket->valid()) { + Debug(this,DebugWarn,"Unable to create listener socket: %s", + strerror(socket->error())); + return false; + } + if (!socket->setBlocking(false)) { + DDebug(this,DebugWarn,"Unable to set listener to nonblocking mode"); + return false; + } + SocketAddr addr(AF_INET); + String address, adr = m_config.getValue("local"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + if (!socket->bind(addr)) { + Debug(DebugNote,"Unable to bind to %s:%u %s",addr.host().c_str(),addr.port(),strerror(errno)); + return false; + } else + DDebug(this,DebugAll,"Socket bind to %s:%u", + addr.host().c_str(),addr.port()); + if (multi && !addAddress(m_config,socket)) + return false; + m_reader->setSocket(socket); + if (m_type == Transport::Udp) + setStatus(Up); + return true; +} + +bool Transport::addAddress(const NamedList ¶m, Socket* socket) +{ + ObjList o; + for (int i = 1; ; i++) { + SocketAddr* addr = new SocketAddr(AF_INET); + String temp = "local"; + temp << i; + const String* adr = param.getParam(temp); + if (!adr) + break; + String address; + int port; + resolveAddress(*adr,address,port); + addr->host(address); + addr->port(port); + o.append(addr); + } + SctpSocket* s = static_cast(socket); + if (!s) { + Debug(this,DebugGoOn,"Failed to cast socket"); + return false; + } + if (!s->bindx(o)) { + DDebug(this,DebugNote,"Failed to bindx sctp socket [%p] %s",s,strerror(errno)); + return false; + } else + Debug(DebugNote,"Socket binded to %d auxiliar addresses",o.count()); + return true; +} + + +bool Transport::connectSocket() +{ + if (!m_streamer && !m_endpoint) + return false; + Socket* sock = 0; + switch (m_type){ + case Sctp : + { + Message m("socket.sctp"); + SockRef* s = new SockRef(&sock); + m.userData(s); + if (!(Engine::dispatch(m) && sock)) { + DDebug(this,DebugNote,"Could not obtain SctpSocket"); + return false; + } + sock->create(AF_INET,m_streamer ? SOCK_STREAM : SOCK_SEQPACKET,IPPROTO_SCTP); + SctpSocket* socket = static_cast(sock); + if (!socket->setStreams(2,2)) + DDebug(this,DebugInfo,"Failed to set sctp stream number"); + if (!socket->subscribeEvents()) + DDebug(this,DebugInfo,"Unable to subscribe to Sctp events"); + break; + } + case Tcp : + sock = new Socket(); + sock->create(AF_INET,SOCK_STREAM); + break; + case Udp : + sock = new Socket(); + m_streamer = false; + sock->create(AF_INET,SOCK_DGRAM); + break; + case Unix : + sock = new Socket(); + sock->create(AF_UNIX,SOCK_STREAM); + break; + default: + DDebug(this,DebugWarn,"Unknown type of socket %s",lookup(m_type,s_transType,"Unknown")); + return false; + } + if (!m_streamer) { + SocketAddr addr(AF_INET); + String address, adr = m_config.getValue("local"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + if (!sock->bind(addr)) + Debug(DebugNote,"Failed to bind Socket. [%p]",sock); + } + if (!m_config.getParam("remote1")) { + SocketAddr addr(AF_INET); + String address, adr = m_config.getValue("remote"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + if (m_endpoint && !sock->connect(addr)) { + DDebug(this,DebugNote,"Unable to connect to %s:%u. %s", + addr.host().c_str(),addr.port(),strerror(errno)); + sock->terminate(); + delete sock; + return false; + } + } + else { + ObjList o; + for (unsigned int i = 0; ; i++) { + SocketAddr* addr = new SocketAddr(AF_INET); + String aux = "remote"; + if (i) + aux << i; + String* adr = m_config.getParam(aux); + if (!adr) + break; + String address; + int port; + resolveAddress(*adr,address,port); + addr->host(address); + addr->port(port); + o.append(addr); + } + SctpSocket* s = static_cast(sock); + if (!s) { + Debug(this,DebugGoOn,"Failed to cast socket"); + return false; + } + if (!s->connectx(o)) { + DDebug(this,DebugNote,"Failed to connectx sctp socket [%p]",s); + s->terminate(); + delete s; + return false; + } else + Debug(DebugNote,"Socket conected to %d addresses",o.count()); + } + m_reader->setSocket(sock); + setStatus(Up); + return true; +} + +void Transport::setStatus(int status) +{ + if (m_state == status) + return; + m_state = status; + m_reader->m_canSend = true; + notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown); +} + +u_int32_t Transport::getMsgLen(unsigned char* buf) +{ + return ((unsigned int)buf[4] << 24) | ((unsigned int)buf[5] << 16) | + ((unsigned int)buf[6] << 8) | (unsigned int)buf[7]; +} + + +bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId) +{ + if (!m_reader) + return false; + bool ret = m_reader->sendMSG(header,msg,streamId); + return ret; +} + +bool Transport::addSocket(Socket* socket,SocketAddr& adress) +{ + if (transType() == Up) + return false; + SocketAddr addr(AF_INET); + String address, adr = m_config.getValue("remote"); + int port; + resolveAddress(adr,address,port); + addr.host(address); + addr.port(port); + switch (m_type) { + case Sctp : + { + Socket* sock = 0; + Message m("socket.sctp"); + m.addParam("handle",String(socket->handle())); + SockRef* s = new SockRef(&sock); + m.userData(s); + if (!(Engine::dispatch(m) && sock)) { + DDebug(this,DebugNote,"Could not obtain SctpSocket"); + return false; + } + SctpSocket* soc = static_cast(sock); + if (!soc->setStreams(2,2)) + DDebug(this,DebugInfo,"Sctp set Streams failed"); + if (!soc->subscribeEvents()) + DDebug(this,DebugInfo,"Sctp subscribe events failed"); + if (m_streamer) + m_reader = new StreamReader(this,soc); + else + m_reader = new MessageReader(this,soc,addr); + break; + } + case Unix : + case Tcp : + m_reader = new StreamReader(this,socket); + break; + case Udp : + m_reader = new MessageReader(this,socket,addr); + break; + default: + Debug(this,DebugNote,"Unknown socket type "); + return false; + } + m_reader->start(); + setStatus(Up); + return true; +} + +/** + * class StreamReader + */ + +StreamReader::StreamReader(Transport* transport,Socket* sock) + : m_transport(transport), m_socket(sock), m_headerLen(8), m_totalPacketLen(0) +{ + DDebug(transport,DebugAll,"Creating StreamReader (%p,%p) [%p]",transport,sock,this); +} + +StreamReader::~StreamReader() +{ + stop(); + if (m_socket) { + m_socket->terminate(); + delete m_socket; + } + DDebug(m_transport,DebugAll,"Destroing StreamReader [%p]",this); +} + +void StreamReader::setSocket(Socket* s) +{ + if (!s || s == m_socket) + return; + Socket* temp = m_socket; + m_socket = s; + if (temp) { + temp->terminate(); + delete temp; + } +} + +bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) +{ + if (!m_canSend) + return false; + m_isSending = true; + if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) { + m_sendBuffer += header; + m_sendBuffer += msg; + } + else + Debug(m_transport,DebugAll,"Buffer Overrun"); + while (m_sendBuffer.length()) { + bool sendOk = false, error = false; + int flags = 0; + if (m_socket->select(0,&sendOk,&error,5000)) { + if (error) { + DDebug(m_transport,DebugAll,"Error detected. %s",strerror(errno)); + break; + } + if (!sendOk) + break; + int len = 0; + if (m_transport->transType() == Transport::Sctp) { + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + break; + } + len = s->sendMsg(m_sendBuffer.data(),m_sendBuffer.length(),streamId,flags); + } + else + len = m_socket->send(m_sendBuffer.data(),m_sendBuffer.length()); + if (len <= 0) + break; + m_sendBuffer.cut(-len); + } + break; + } + m_isSending = false; + return (m_sendBuffer.length() == 0); +} + +bool StreamReader::readData() +{ + if (!m_socket) + return false; + bool readOk = false, error = false; + if (!m_socket->select(&readOk,0,&error,5000)) + return false; + if (!readOk || error) + return false; + int stream = 0, len = 0; + SocketAddr addr; + unsigned char buf[MAX_BUF_SIZE]; + if (m_headerBuffer.length() < 8) { + int flags = 0; + if (m_transport->transType() == Transport::Sctp) { + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + return false; + } + len = s->recvMsg((void*)buf,m_headerLen,addr,stream,flags); + if (flags) { + if (flags == 2) { + Debug(DebugInfo,"Sctp commUp"); + m_transport->setStatus(Transport::Up); + return true; + } + DDebug(m_transport,DebugWarn,"Connection down [%p] %d",m_socket, flags); + m_transport->setStatus(Transport::Down); + while (m_isSending) + Thread::yield(); + m_canSend = false; + m_socket->terminate(); + if (m_transport->listen()) + stop(); + delete m_socket; + return false; + } + } + else { + SocketAddr addr; + len = m_socket->recv((void*)buf,m_headerLen); + } + if (len <= 0) + return false; + m_headerLen -= len; + m_headerBuffer.append(buf,len); + if (m_headerLen > 0) + return true; + unsigned char* auxBuf = (unsigned char*)m_headerBuffer.data(); + m_totalPacketLen = m_transport->getMsgLen(auxBuf); + if (m_totalPacketLen >= 8 && m_totalPacketLen < MAX_BUF_SIZE) + m_totalPacketLen -= 8; + else { + DDebug(m_transport,DebugWarn,"Protocol error - unsupported length of packet %d!", + m_totalPacketLen); + return false; + } + } + unsigned char buf1[MAX_BUF_SIZE]; + if (m_totalPacketLen > 0) { + len = 0; + int flags = 0; + if (m_transport->transType() == Transport::Sctp) { + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + return false; + } + len = s->recvMsg((void*)buf1,m_totalPacketLen,addr,stream,flags); + if (flags) { + DDebug(m_transport,DebugWarn,"Connection down [%p]",m_socket); + m_transport->notifyLayer(SignallingInterface::LinkDown); + } + } + else { + SocketAddr addr; + len = m_socket->recv((void*)buf1,m_totalPacketLen); + } + if (len <= 0) + return false; + m_totalPacketLen -= len; + m_readBuffer.append(buf1,len); + if (m_totalPacketLen > 0) + return true; + m_transport->processMSG(m_transport->getVersion((unsigned char*)m_headerBuffer.data()), + m_transport->getClass((unsigned char*)m_headerBuffer.data()),m_transport->getType((unsigned char*) + m_headerBuffer.data()), m_readBuffer,stream); + m_totalPacketLen = 0; + m_readBuffer.clear(true); + m_headerLen = 8; + m_headerBuffer.clear(true); + return true; + } + return false; +} + +/** + * class MessageReader + */ + +MessageReader::MessageReader(Transport* transport, Socket* sock, SocketAddr& addr) + : m_socket(sock), m_transport(transport), m_remote(addr) +{ + DDebug(DebugAll,"Creating MessageReader [%p]",this); +} + +MessageReader::~MessageReader() +{ + stop(); + if (m_socket) { + m_socket->terminate(); + delete m_socket; + } + DDebug(DebugAll,"Destroing MessageReader [%p]",this); +} + +void MessageReader::setSocket(Socket* s) +{ + if (!s || s == m_socket) + return; + if (m_socket) { + m_socket->terminate(); + delete m_socket; + } + m_socket = s; +} + +bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) +{ + if (!m_canSend) + return false; + bool sendOk = false, error = false; + bool ret = false; + m_isSending = true; + while (m_socket->select(0,&sendOk,&error,5000)) { + if (error) { + DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno)); + m_transport->setStatus(Transport::Down); + break; + } + if (!sendOk) + break; + int totalLen = header.length() + msg.length(); + DataBlock buf(header); + buf += msg; + String aux; + aux.hexify(buf.data(),buf.length(),' '); + XDebug(m_transport,DebugInfo,"Sending :%s",aux.c_str()); + int len = 0; + int flags = 0; + if (m_transport->transType() == Transport::Sctp) { + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + break; + } + len = s->sendTo(buf.data(),totalLen,streamId,m_remote,flags); + } + else + len = m_socket->sendTo(buf.data(),totalLen,m_remote); + if (len == totalLen) { + ret = true; + break; + } + DDebug(m_transport,DebugAll,"Error sending message %d %d %s",len,totalLen,strerror(errno)); + break; + } + m_isSending = false; + return ret; +} + +bool MessageReader::readData() +{ + bool readOk = false,error = false; + if (!m_socket->select(&readOk,0,&error,5000)) + return false; + if (!readOk || error) + return false; + + unsigned char buffer[MAX_BUF_SIZE]; + int stream = 0; + int r = 0; + SocketAddr addr; + if (m_transport->transType() == Transport::Sctp) { + int flags = 0; + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + return false; + } + SocketAddr addr; + r = s->recvMsg((void*)buffer,MAX_BUF_SIZE,addr,stream,flags); + if (flags) { + if (flags == 2) { + DDebug(DebugAll,"Sctp connection is Up"); + m_transport->setStatus(Transport::Up); + return true; + } + DDebug(m_transport,DebugNote,"Connection down [%p] %d",m_socket,flags); + m_transport->setStatus(Transport::Initiating); + return false; + } + } + else + r = m_socket->recvFrom((void*)buffer,MAX_BUF_SIZE,addr); + + if (r <= 0) + return false; + + u_int32_t len = m_transport->getMsgLen(buffer); + if ((unsigned int)r != len) { + Debug(m_transport,DebugNote,"Protocol read error read: %d, expected %d",r,len); + return false; + } + DataBlock packet(buffer,r); + packet.cut(-8); + m_transport->processMSG(m_transport->getVersion((unsigned char*)buffer), + m_transport->getClass((unsigned char*)buffer), m_transport->getType((unsigned char*)buffer),packet,stream); + return true; +} + +/** + * class TransportModule + */ + +TransportModule::TransportModule() + : Module("sigtransport","misc",true), + m_init(false) +{ + Output("Loaded module SigTransport"); +} + +TransportModule::~TransportModule() +{ + Output("Unloading module SigTransport"); +} + +void TransportModule::initialize() +{ + if (!m_init) { + Output("Initializing module SigTransport"); + m_init = true; + setup(); + } +} + +}; // anonymous namespace + +/* vi: set ts=8 sw=4 sts=4 noet: */ diff --git a/modules/server/ysigchan.cpp b/modules/server/ysigchan.cpp index 4f7db472..5e23d894 100644 --- a/modules/server/ysigchan.cpp +++ b/modules/server/ysigchan.cpp @@ -419,6 +419,7 @@ public: SigSS7Router = 0x05 | SigDefaults, SigSS7Management = 0x06 | SigDefaults, SigSS7Maintenance = 0x07 | SigDefaults, + SigSS7M2PA = 0x08 | SigOnDemand, SigSS7Isup = SigTrunk::SS7Isup | SigIsTrunk | SigTopMost, SigSS7Bicc = SigTrunk::SS7Bicc | SigIsTrunk | SigTopMost, SigISDNPN = SigTrunk::IsdnPriNet | SigIsTrunk | SigTopMost, @@ -592,6 +593,7 @@ const TokenDict SigFactory::s_compNames[] = { { "ss7-mtp3", SigSS7Layer3 }, { "ss7-snm", SigSS7Management }, { "ss7-mtn", SigSS7Maintenance }, + { "ss7-m2pa", SigSS7M2PA }, { "ss7-isup", SigSS7Isup }, { "ss7-bicc", SigSS7Bicc }, { "isdn-pri-net", SigISDNPN }, @@ -617,6 +619,7 @@ const TokenDict SigFactory::s_compClass[] = { MAKE_CLASS(SS7Layer3), MAKE_CLASS(SS7Management), MAKE_CLASS(SS7Maintenance), + MAKE_CLASS(SS7M2PA), MAKE_CLASS(SS7Isup), MAKE_CLASS(SS7Bicc), MAKE_CLASS(ISDNPN), @@ -656,8 +659,14 @@ SignallingComponent* SigFactory::create(const String& type, const NamedList& nam return new ISDNQ921Management(*config,name,name.getBoolValue("network",true)); case SigISDNLayer3: return new ISDNQ931(*config,name); - case SigSS7Layer2: + case SigSS7Layer2: { + String* ty = config->getParam("type"); + if (ty && *ty == "ss7-m2pa") + return new SS7M2PA(*config); return new SS7MTP2(*config); + } + case SigSS7M2PA: + return new SS7M2PA(*config); case SigSS7Layer3: return new SS7MTP3(*config); case SigSS7Router: @@ -1705,6 +1714,10 @@ void SigDriver::copySigMsgParams(SignallingEvent* event, const NamedList& params prefix = params.getValue("message-oprefix",prefix); if (prefix.null()) return; + event->message()->params().copySubParams(params,prefix + "."); + +return; + prefix << "."; unsigned int n = params.length(); for (unsigned int i = 0; i < n; i++) { NamedString* param = params.getParam(i);