Added generic support for SS7 SIGTRAN protocol stack.

Added SCTP sockets support based on Linux Kernel SCTP.
Added SS7 Sigtran M2PA protocol support.


git-svn-id: http://yate.null.ro/svn/yate/trunk@3058 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2010-02-02 14:38:12 +00:00
parent e85bb9b262
commit 91ec0a8770
8 changed files with 2319 additions and 23 deletions

View File

@ -0,0 +1,40 @@
; Each section in this file describes a SIGTRAN connection
; Connections are referenced from other configurations describing the upper layer
;[name-of-connection]
; The name of the section identifies the connection
; type: keyword: Socket type - sctp, tcp, udp, unix
;type=sctp
; stream: bool: Socket connection type.
; Designed for SCTP sockets to create a stream socket or a sequenced packet socket
; NOTE: for M2PA if stream is false the M2PA autostart should be on true on both ends,
; if stream is false the M2PA autostart should be true only at one end
;stream=true
; local: string: Primary local address
; Format is ipv4:port like: 1.1.1.1:3566
;local=
; localN: string: Additional local addresses, SCTP only
; Multiple addresses can be specified by incrementing the 1-based index at the end of 'local'
; The address format is ip:port like:
; local1=1.2.3.4:3566
; local2=2.3.4.5:3566
;local1=
; remote: string: Primary remote address
; Format is ipv4:port like: 2.2.2.2:3566
;remote=
; remoteN: string: Additional remote addresses, SCTP only
; Multiple addresses can be specified by incrementing the 1-based index at the end of 'remote'
; The address format is ip:port like:
; remote1=5.6.7.8:3566
; remote2=6.7.8.9:3566
;remote1=
; endpoint: bool: Set to true if this is an endpoint that actively tries to connect,
; false to listen for remote connections
;endpoint=false

View File

@ -94,6 +94,8 @@ SignallingComponent* SignallingFactory::build(const String& type, const NamedLis
// now build some objects we know about
if (type == "SS7MTP2")
return new SS7MTP2(*name);
else if (type == "SS7M2PA")
return new SS7M2PA(*name);
else if (type == "SS7MTP3")
return new SS7MTP3(*name);
else if (type == "SS7Router")

View File

@ -76,7 +76,7 @@ void SIGTRAN::attach(SIGTransport* trans)
Lock lock(m_transMutex);
if (trans == m_trans)
return;
if (trans && trans->ref())
if (!(trans && trans->ref()))
trans = 0;
SIGTransport* tmp = m_trans;
m_trans = trans;
@ -85,8 +85,10 @@ void SIGTRAN::attach(SIGTransport* trans)
tmp->attach(0);
tmp->destruct();
}
if (trans)
if (trans) {
trans->attach(this);
trans->deref();
}
}
// Transmit a SIGTRAN message over the attached transport
@ -116,6 +118,11 @@ bool SIGTransport::processMSG(unsigned char msgVersion, unsigned char msgClass,
return m_sigtran && m_sigtran->processMSG(msgVersion,msgClass,msgType,msg,streamId);
}
void SIGTransport::notifyLayer(SignallingInterface::Notification event)
{
if (m_sigtran)
m_sigtran->notifyLayer(event);
}
// Build the common header and transmit a message to the network
bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId)
@ -140,4 +147,569 @@ bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass,
return ok;
}
/**
* Class SS7M2PA
*/
static TokenDict s_state[] = {
{"Alignment", SS7M2PA::Alignment},
{"ProvingNormal", SS7M2PA::ProvingNormal},
{"ProvingEmergency", SS7M2PA::ProvingEmergency},
{"Ready", SS7M2PA::Ready},
{"ProcessorOutage", SS7M2PA::ProcessorOutage},
{"ProcessorRecovered", SS7M2PA::ProcessorRecovered},
{"Busy", SS7M2PA::Busy},
{"BusyEnded", SS7M2PA::BusyEnded},
{"OutOfService", SS7M2PA::OutOfService},
{0,0}
};
static TokenDict s_messageType[] = {
{"UserData", SS7M2PA::UserData},
{"LinkStatus", SS7M2PA::LinkStatus},
{0,0}
};
SS7M2PA::SS7M2PA(const NamedList& params)
: SignallingComponent(params.safe("SS7M2PA"),&params), m_seqNr(0),
m_needToAck(0), m_lastAck(0), m_localStatus(OutOfService), m_state(OutOfService),
m_remoteStatus(OutOfService), m_transportState(Idle), m_mutex(String("Mutex:") + debugName()), m_t1(0),
m_t2(0), m_t3(0), m_t4(0), m_ackTimer(0), m_confTimer(0), m_dumpMsg(false)
{
// Alignemnt ready timer ~45s
m_t1.interval(params,"t1",45000,50000,false);
// Not Aligned timer ~5s
m_t2.interval(params,"t2",5000,5500,false);
// Aligned timer ~1s
m_t3.interval(params,"t3",1000,1500,false);
// Prouving timer Normal ~8s, Emergency ~0.5s
m_t4.interval(params,"t4",1000,1000,false);
// Acknowledge timer ~1s
m_ackTimer.interval(params,"ack_timer",1000,1100,false);
// Confirmation timer 1/2 t4
m_confTimer.interval(params,"conf_timer",500,600,false);
DDebug(this,DebugAll,"Creating SS7M2PA [%p]",this);
}
SS7M2PA::~SS7M2PA()
{
Lock lock(m_mutex);
m_ackList.clear();
m_bufMsg.clear();
DDebug(this,DebugAll,"Destroing SS7M2PA [%p]",this);
}
bool SS7M2PA::initialize(const NamedList* config)
{
#ifdef DEBUG
String tmp;
if (config && debugAt(DebugAll))
config->dump(tmp,"\r\n ",'\'',true);
Debug(this,DebugInfo,"SS7M2PA::initialize(%p) [%p]%s",config,this,tmp.c_str());
#endif
m_dumpMsg = config->getBoolValue("dumpMsg",false);
m_autostart = config->getBoolValue("autostart",true);
if (config && !transport()) {
NamedString* name = config->getParam("sig");
if (!name)
name = config->getParam("basename");
if (name) {
NamedPointer* ptr = YOBJECT(NamedPointer,name);
NamedList* trConfig = ptr ? YOBJECT(NamedList,ptr->userData()) : 0;
NamedList params(name->c_str());
params.addParam("basename",*name);
params.addParam("protocol","ss7");
if (trConfig)
params.copyParams(*trConfig);
else {
params.copySubParams(*config,params + ".");
trConfig = &params;
}
SIGTransport* tr = YSIGCREATE(SIGTransport,&params);
if (!tr)
return false;
SIGTRAN::attach(tr);
if (!tr->initialize(trConfig))
SIGTRAN::attach(0);
}
}
if (transport())
m_reliable = transport()->reliable();
return transport() && (control(Resume,const_cast<NamedList*>(config)));
}
void SS7M2PA::dumpMsg(u_int8_t version, u_int8_t mClass, u_int8_t type,
const DataBlock& data, int stream, bool send)
{
String dump = "SS7M2PA ";
dump << (send ? "Sending:" : "Received:");
dump << "\n-----";
String indent = "\n ";
dump << indent << "Version: " << version;
dump << " " << "Message class: " << mClass;
dump << " " << "Message type: " << lookup(type,s_messageType,"Unknown");
dump << indent << "Stream: " << stream;
u_int32_t fsn = (data[1] << 16) | (data[2] << 8) | data[3];
u_int32_t bsn = (data[5] << 16) | (data[6] << 8) | data[7];
dump << indent << "FSN : " << fsn << " BSN: " << bsn;
if (type == LinkStatus) {
u_int32_t status = (data[8] << 24) | (data[9] << 16) | (data[10] << 8) | data[11];
dump << indent << "Status: " << lookup(status,s_state);
}
else {
String hex;
hex.hexify((u_int8_t*)data.data() + 8,data.length() - 8,' ');
dump << indent << "Data: " << hex;
}
dump << "\n-----";
Debug(this,DebugInfo,"%s",dump.c_str());
}
bool SS7M2PA::processMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId)
{
if (msgClass != M2PA) {
Debug(this,DebugWarn,"M2PA received non M2PA message class %d",msgClass);
dumpMsg(msgVersion,msgClass,msgType,msg,streamId,false);
return false;
}
if (m_dumpMsg)
dumpMsg(msgVersion,msgClass,msgType,msg,streamId,false);
Lock lock(m_mutex);
if (!operational() && msgType == UserData) {
if (m_remoteStatus != ProcessorOutage || m_remoteStatus != Busy)
return false;
// If we are not operational buffer the received messages and ack them when we are op
m_bufMsg.append(new DataBlock(msg));
DDebug(this,DebugAll,"Buffering data message while non operational, %d messages buffered",
m_bufMsg.count());
return true;
}
if (!operational() && msgType == UserData)
return false;
if (!decodeSeq(msg,(u_int8_t)msgType))
return false;
DataBlock data(msg);
data.cut(-8);
if (!data.length())
return true;
if (msgType == LinkStatus)
return processLinkStatus(data,streamId);
if (streamId != 1)
DDebug(this,DebugNote,"Received data message on Link status stream");
// We should return false only if transport is sctp!!!!!!!!!!
lock.drop();
SS7MSU msu(data);
return receivedMSU(msu);
}
bool SS7M2PA::decodeSeq(const DataBlock& data,u_int8_t msgType)
{
if (data.length() < 8)
return false;
u_int32_t fsn = (data[1] << 16) | (data[2] << 8) | data[3];
u_int32_t bsn = (data[5] << 16) | (data[6] << 8) | data[7];
if (msgType == LinkStatus) {
if (fsn != m_needToAck) {
DDebug(this,DebugNote,"Received LinkStatus message with wrong sequence number %d expected %d",
fsn,m_needToAck);
abortAlignment("Wrong Sequence number");
transmitLS();
return false;
}
if (bsn == getNext(m_lastAck))
removeFrame(bsn);
if (bsn == m_lastAck)
return true;
// If we are here meens that something went wrong
abortAlignment("msgType == LinkStatus");
transmitLS();
return false;
}
if (fsn != getNext(m_needToAck) && fsn != m_needToAck) {
abortAlignment("Received Out of sequence frame");
transmitLS();
return false;
}
else {
if (fsn == getNext(m_needToAck) && operational()) {
if (m_confTimer.started()) {
sendAck();
m_confTimer.stop();
}
m_needToAck = fsn;
m_confTimer.start();
}
else if (!operational())
m_bufMsg.append(new DataBlock(data));
}
if (bsn == getNext(m_lastAck))
removeFrame(bsn);
if (bsn != m_lastAck) {
abortAlignment(String("Received unexpected bsn: ") << bsn);
transmitLS();
return false;
}
return true;
}
void SS7M2PA::timerTick(const Time& when)
{
Lock lock(m_mutex);
if (m_confTimer.started() && m_confTimer.timeout(when.msec())) {
sendAck(); // Acknowledge last received message before endpoint drops down the link
m_confTimer.stop();
}
if (m_ackTimer.started() && m_ackTimer.timeout(when.msec())) {
m_ackTimer.stop();
if (m_reliable) {
lock.drop();
abortAlignment("Ack timer timeout");
} else
retransData();
}
if (m_t2.started() && m_t2.timeout(when.msec())) {
m_t2.stop();
abortAlignment("T2 timeout");
return;
}
if (m_t3.started() && m_t3.timeout(when.msec())) {
m_t3.stop();
abortAlignment("T3 timeout");
return;
}
if (m_t4.started() && m_t4.timeout(when.msec())) {
m_t4.stop();
setLocalStatus(Ready);
transmitLS();
m_t1.start();
return;
}
if (m_t1.started() && m_t1.timeout(when.msec())) {
m_t1.stop();
abortAlignment("T1 timeout.");
}
}
void SS7M2PA::removeFrame(u_int32_t bsn)
{
Lock lock(m_mutex);
for (ObjList* o = m_ackList.skipNull();o;o = o->skipNext()) {
DataBlock* d = static_cast<DataBlock*>(o->get());
u_int32_t seq = (d->at(1) << 16) | (d->at(2) << 8) | d->at(3);
if (bsn != seq)
continue;
m_lastAck = bsn;
m_ackList.remove(d);
m_ackTimer.stop();
break;
}
}
void SS7M2PA::setLocalStatus(unsigned int status)
{
if (status == m_localStatus)
return;
m_localStatus = status;
}
void SS7M2PA::setRemoteStatus(unsigned int status)
{
if (status == m_remoteStatus)
return;
m_remoteStatus = status;
}
bool SS7M2PA::aligned() const
{
return ((m_localStatus == ProvingNormal) || (m_localStatus == ProvingEmergency)) &&
((m_remoteStatus == ProvingNormal) || (m_remoteStatus == ProvingEmergency));
}
bool SS7M2PA::operational() const
{
return m_localStatus == Ready && m_remoteStatus == Ready;
}
void SS7M2PA::sendAck()
{
DataBlock data;
setHeader(data);
dumpMsg(1,M2PA,UserData,data,1,true);
transmitMSG(1,M2PA,UserData,data,1);
}
bool SS7M2PA::control(Operation oper, NamedList* params)
{
switch (oper) {
case Pause:
m_state = OutOfService;
abortAlignment("Control request pause.");
transmitLS();
return true;
case Resume:
if (aligned())
return true;
case Align:
{
bool em = params && params->getBoolValue("emergency");
m_state = em ? ProvingNormal : ProvingEmergency;
if (m_autostart)
startAlignment();
return true;
}
case Status:
return operational();
default:
return false;
}
}
void SS7M2PA::startAlignment(bool emergency)
{
setLocalStatus(OutOfService);
transmitLS();
setLocalStatus(Alignment);
SS7Layer2::notify();
}
void SS7M2PA::transmitLS(int streamId)
{
if (m_transportState != Established)
return;
DataBlock data;
setHeader(data);
u_int8_t ms[4];
ms[1] = ms[2] = ms[3] = ms[0] = 0;
ms[3] = m_localStatus;
data.append(ms,4);
if (m_dumpMsg)
dumpMsg(1,M2PA, 2,data,streamId,true);
transmitMSG(1,M2PA, 2, data,streamId);
XDebug(this,DebugInfo,"Sending LinkStatus %s",lookup(m_localStatus,s_state));
}
void SS7M2PA::setHeader(DataBlock& data)
{
u_int8_t head[8];
head[0] = head[4] = 0;
head[1] = (m_seqNr >> 16) & 0xff;
head[2] = (m_seqNr >> 8) & 0xff;
head[3] = m_seqNr & 0xff ;
head[5] = (m_needToAck >> 16) & 0xff;
head[6] = (m_needToAck >> 8) & 0xff;
head[7] = m_needToAck & 0xff ;
data.append(head,8);
}
void SS7M2PA::abortAlignment(String from)
{
DDebug(this,DebugNote,"Aborting alignment: %s",from.c_str());
setLocalStatus(OutOfService);
setRemoteStatus(OutOfService);
m_needToAck = m_lastAck = m_seqNr = 0;
if (m_confTimer.started())
m_confTimer.stop();
if (m_ackTimer.started())
m_ackTimer.stop();
if (m_t2.started())
m_t2.stop();
if (m_t3.started())
m_t3.stop();
if (m_t4.started())
m_t4.stop();
if (m_t1.started())
m_t1.stop();
if (m_state == ProvingNormal || m_state == ProvingEmergency)
startAlignment();
SS7Layer2::notify();
}
bool SS7M2PA::processLinkStatus(DataBlock& data,int streamId)
{
if (data.length() < 4)
return false;
u_int32_t status = (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
if (m_remoteStatus == status && status != OutOfService)
return true;
XDebug(this,DebugAll,"Received link status: %s , local status : %s , requested status %s",
lookup(status,s_state),lookup(m_localStatus,s_state),lookup(m_state,s_state));
switch (status) {
case Alignment:
if (m_t2.started()) {
m_t2.stop();
setLocalStatus(m_state);
m_t3.start();
transmitLS();
}
else if (m_state == ProvingNormal || m_state == ProvingEmergency)
transmitLS();
else
return false;
setRemoteStatus(status);
break;
case ProvingNormal:
case ProvingEmergency:
if (m_localStatus != ProvingNormal && m_localStatus != ProvingEmergency &&
(m_localStatus == Alignment && m_t3.started()))
return false;
if (m_t3.started()) {
m_t3.stop();
m_t4.start();
}
else if (m_state == ProvingNormal || m_state == ProvingEmergency) {
setLocalStatus(status);
transmitLS();
m_t4.start();
}
setRemoteStatus(status);
break;
case Ready:
if (m_localStatus != Ready) {
setLocalStatus(Ready);
transmitLS();
}
setRemoteStatus(status);
SS7Layer2::notify();
if (m_t4.started())
m_t4.stop();
if (m_t1.started())
m_t1.stop();
if (m_bufMsg.count())
dequeueMsg();
break;
case ProcessorRecovered:
transmitLS();
setRemoteStatus(status);
break;
case BusyEnded:
setRemoteStatus(Ready);
SS7Layer2::notify();
break;
case ProcessorOutage:
case Busy:
setRemoteStatus(status);
SS7Layer2::notify();
break;
case OutOfService:
if (m_localStatus == Ready) {
abortAlignment("Received : LinkStatus Out of service, local status Ready");
SS7Layer2::notify();
}
if ((m_state == ProvingNormal || m_state == ProvingEmergency)) {
if (m_localStatus == Alignment) {
transmitLS();
m_t2.start();
} else if (m_localStatus == OutOfService)
startAlignment();
else
return false;
}
setRemoteStatus(status);
break;
default:
Debug(this,DebugNote,"Received unknown link status message %d",status);
return false;
}
return true;
}
ObjList* SS7M2PA::recoverMSU()
{
Lock lock(m_mutex);
ObjList* lst = 0;
for (;;) {
DataBlock* pkt = static_cast<DataBlock*>(m_ackList.remove(false));
if (!pkt)
break;
if (pkt->length() > 8) {
SS7MSU* msu = new SS7MSU(8 + (char*)pkt->data(),pkt->length() - 8);
if (!lst)
lst = new ObjList;
lst->append(msu);
}
TelEngine::destruct(pkt);
}
return lst;
}
void SS7M2PA::retransData()
{
for (ObjList* o = m_ackList.skipNull();o;o = o->skipNext()) {
DataBlock* msg = static_cast<DataBlock*>(o->get());
u_int8_t* head = (u_int8_t*)msg->data();
head[5] = (m_needToAck >> 16) & 0xff;
head[6] = (m_needToAck >> 8) & 0xff;
head[7] = m_needToAck & 0xff ;
if (m_confTimer.started())
m_confTimer.stop();
transmitMSG(1,M2PA, 1, *msg,1);
if (!m_ackTimer.started())
m_ackTimer.start();
}
}
void SS7M2PA::dequeueMsg()
{
for (ObjList* o = m_bufMsg.skipNull();o;o = o->skipNext()) {
DataBlock* msg = static_cast<DataBlock*>(o->get());
if (!decodeSeq(*msg,UserData))
return;
msg->cut(-8); // Remove M2PA Header
SS7MSU msu(*msg);
receivedMSU(msu);
sendAck();
}
}
bool SS7M2PA::transmitMSU(const SS7MSU& msu)
{
if (msu.length() < 3) {
Debug(this,DebugWarn,"Asked to send too short MSU of length %u [%p]",
msu.length(),this);
return false;
}
// If we don't have an attached interface don't bother
if (!transport())
return false;
Lock lock(m_mutex);
DataBlock packet;
increment(m_seqNr);
setHeader(packet);
if (m_confTimer.started())
m_confTimer.stop();
packet += msu;
m_ackList.append(new DataBlock(packet));
if (m_dumpMsg)
dumpMsg(1,M2PA,1,packet,1,true);
bool ok = transmitMSG(1,M2PA,1,packet,1);
lock.drop();
if (!m_ackTimer.started())
m_ackTimer.start();
return ok;
}
void SS7M2PA::notifyLayer(SignallingInterface::Notification event)
{
switch (event) {
case SignallingInterface::LinkDown:
m_transportState = Idle;
m_seqNr = m_needToAck = m_lastAck = 0;
abortAlignment("LinkDown");
SS7Layer2::notify();
break;
case SignallingInterface::LinkUp:
m_transportState = Established;
Debug(this,DebugInfo,"Interface is up [%p]",this);
if (m_autostart)
startAlignment();
SS7Layer2::notify();
break;
default:
return;
}
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -3788,9 +3788,25 @@ public:
* Get the SIGTRAN component attached to this transport
* @return Pointer to adaptation layer or NULL
*/
inline SIGTRAN* sigtran() const
inline SIGTRAN* sigtran() const
{ return m_sigtran; }
/**
* Check if transport layer is reliable
* @return true if transport is reliable
*/
virtual bool reliable() const = 0;
void notifyLayer(SignallingInterface::Notification status);
/**
* Configure and initialize the component and any subcomponents it may have
* @param config Optional configuration parameters override
* @return True if the component was initialized properly
*/
virtual bool initialize(const NamedList* config)
{ return false;}
/**
* Check if the network transport layer is connected
* @param streamId Identifier of the stream to check if applicable
@ -3798,27 +3814,12 @@ public:
*/
virtual bool connected(int streamId) const = 0;
protected:
/**
* Constructor
* @param name Default empty component name
*/
inline SIGTransport(const char* name = 0)
: SignallingComponent(name), m_sigtran(0)
{ }
/**
* Attach an user adaptation layer
* @param sigtran SIGTRAN component to attach, can be NULL
*/
void attach(SIGTRAN* sigtran);
/**
* Notification if the attached state changed
* @param hasUAL True if an User Adaptation Layer is now attached
*/
virtual void attached(bool hasUAL) = 0;
/**
* Send a complete message to the adaptation layer for processing
* @param msgVersion Version of the protocol
@ -3831,6 +3832,21 @@ protected:
bool processMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId) const;
protected:
/**
* Constructor
* @param name Default empty component name
*/
inline SIGTransport(const char* name = 0)
: SignallingComponent(name), m_sigtran(0)
{ }
/**
* Notification if the attached state changed
* @param hasUAL True if an User Adaptation Layer is now attached
*/
virtual void attached(bool hasUAL) = 0;
/**
* Transmit a message to the network
* @param msgVersion Version of the protocol
@ -3924,6 +3940,9 @@ public:
*/
bool connected(int streamId = 0) const;
virtual void notifyLayer(SignallingInterface::Notification status)
{}
/**
* Message class names dictionary
* @return Pointer to dictionary of message classes
@ -4676,6 +4695,211 @@ protected:
*/
class YSIG_API SS7M2PA : public SS7Layer2, public SIGTRAN
{
public:
enum m2paState {
Alignment = 1,
ProvingNormal = 2,
ProvingEmergency = 3,
Ready = 4,
ProcessorOutage = 5,
ProcessorRecovered = 6,
Busy = 7,
BusyEnded = 8,
OutOfService = 9,
};
enum msgType {
UserData = 1,
LinkStatus = 2
};
enum sctpState {
Idle,
Associating,
Established
};
/**
* Constructor
*/
SS7M2PA(const NamedList& params);
/**
* Destructor
*/
~SS7M2PA();
/**
* Configure and initialize MTP2 and its interface
* @param config Optional configuration parameters override
* @return True if MTP2 and the interface were initialized properly
*/
virtual bool initialize(const NamedList* config);
/**
* Execute a control operation. Operations can change the link status or
* can query the aligned status.
* @param oper Operation to execute
* @param params Optional parameters for the operation
* @return True if the command completed successfully, for query operations
* also indicates the data link is aligned and operational
*/
virtual bool control(Operation oper, NamedList* params = 0);
/**
* Push a Message Signal Unit down the protocol stack
* @param msu MSU data to transmit
* @return True if message was successfully queued
*/
virtual bool transmitMSU(const SS7MSU& msu);
/**
* Method called when the transport status has been changed
* @param status Up or down
*/
virtual void notifyLayer(SignallingInterface::Notification status);
/**
* Remove the MSUs waiting in the transmit queue and return them
* @return List of MSUs taken from the queue
*/
virtual ObjList* recoverMSU();
/**
* Decode sequence numbers from message and process them
* @param data The message
* @param msgType The message type
* @return True if sequence numbers ar as we expected to be
*/
bool decodeSeq(const DataBlock& data, u_int8_t msgType);
/**
* Helper method called when an error was detected
* Change state to OutOfService and notifys upper layer
* @param from Debuging purpose, Information about detected error
*/
void abortAlignment(String from);
/**
* Send link status message to inform the peer about ouer curent state
* @param streamId The id of the stream who should send the message
*/
void transmitLS(int streamId = 0);
/**
* Create M2PA header (sequence numbers)
* @param data The data where the header will be stored
*/
void setHeader(DataBlock& data);
/**
* Decode and process link status message
* @param data The message
* @param streamId The stream id witch received the message
* @return True if the message was procesed
*/
bool processLinkStatus(DataBlock& data, int streamId);
/**
* Helper method used to acknowledge the last received message
* when no data are to transmit
*/
void sendAck();
/**
* Remove a frame from acknowledgement list
* @param bsn The sequence number of the frame to be removed
*/
void removeFrame(u_int32_t bsn);
/**
* Increment the given sequence number
* @param nr The number to increment
* @return The incremented number
*/
inline u_int32_t increment(u_int32_t &nr)
{ return (nr == 0xffffff) ?(nr = 0) : nr++; }
/**
* Obtain next sequence number
* @param nr The number
* @return The next number in sequence
*/
inline u_int32_t getNext(u_int32_t nr)
{ return (nr == 0xffffff) ? 0 : nr + 1; }
protected:
/**
* Periodical timer tick used to perform alignment and housekeeping
* @param when Time to use as computing base for events and timeouts
*/
virtual void timerTick(const Time& when);
/**
* Check if the link is aligned.
* The link may not be operational, the other side may be still proving.
* @return True if the link is aligned
*/
virtual bool aligned() const;
/**
* Check if the link is aligned and operational
* @return True if the link is operational
*/
virtual bool operational() const;
/**
* Process a complete message
* @param msgVersion Version of the protocol
* @param msgClass Class of the message
* @param msgType Type of the message, depends on the class
* @param msg Message data, may be empty
* @param streamId Identifier of the stream the message was received on
* @return True if the message was handled
*/
virtual bool processMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId);
/**
* Initiates alignment and proving procedure
* @param emergency True if emergency alignment is desired
*/
void startAlignment(bool emergency = false);
/**
* Retransmit unacknowledged data
*/
void retransData();
/**
* Transmit the buffered messages to mtp3 and acknowledge them
*/
void dequeueMsg();
private:
void dumpMsg(u_int8_t version, u_int8_t mClass, u_int8_t type,
const DataBlock& data, int stream, bool send);
void setLocalStatus(unsigned int status);
void setRemoteStatus(unsigned int status);
u_int32_t m_seqNr;
u_int32_t m_needToAck;
u_int32_t m_lastAck;
unsigned int m_localStatus;
unsigned int m_state;
unsigned int m_remoteStatus;
unsigned int m_transportState;
Mutex m_mutex;
bool m_reliable;
bool m_autostart;
ObjList m_ackList;
ObjList m_bufMsg;
SignallingTimer m_t1;
SignallingTimer m_t2;
SignallingTimer m_t3;
SignallingTimer m_t4;
SignallingTimer m_ackTimer;
SignallingTimer m_confTimer;
bool m_dumpMsg;
};
/**

View File

@ -43,7 +43,7 @@ PROGS := cdrbuild.yate cdrfile.yate regexroute.yate \
server/mgcpgw.yate server/mgcpca.yate \
server/mrcpspeech.yate \
server/ysigchan.yate \
server/ciscosm.yate \
server/ciscosm.yate server/sigtransport.yate \
server/presence.yate server/subscription.yate \
server/users.yate \
server/analog.yate server/analogdetect.yate \
@ -91,6 +91,10 @@ endif
endif
endif
ifneq (@HAVE_SCTP_NETINET@,no)
PROGS := $(PROGS) server/lksctp.yate
endif
ifneq (@HAVE_SPANDSP@,no)
PROGS := $(PROGS) faxchan.yate
endif
@ -219,11 +223,11 @@ qt4/%.moc: @srcdir@/qt4/%.h $(MKDEPS) $(INCFILES)
# Take special care of the modules that depend on optional libs
server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate: ../libyatesig.so
server/ysigchan.yate server/analog.yate server/ciscosm.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig
server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: ../libyatesig.so
server/ysigchan.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig
server/wpcard.yate server/tdmcard.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig @WANPIPE_FLAGS@
server/zapcard.yate: LOCALFLAGS = -I@top_srcdir@/libs/ysig @ZAP_FLAGS@
server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate: LOCALLIBS = -lyatesig
server/ysigchan.yate server/wpcard.yate server/tdmcard.yate server/zapcard.yate server/analog.yate server/ciscosm.yate server/sigtransport.yate: LOCALLIBS = -lyatesig
server/analogdetect.yate: ../libs/ymodem/libyatemodem.a
server/analogdetect.yate: LOCALFLAGS = -I@top_srcdir@/libs/ymodem
@ -262,6 +266,9 @@ server/mgcpgw.yate: ../libyatemgcp.so
server/mgcpgw.yate: LOCALFLAGS = -I@top_srcdir@/libs/ymgcp
server/mgcpgw.yate: LOCALLIBS = -lyatemgcp
server/lksctp.yate: LOCALFLAGS = @SCTP_FLAGS@
server/lksctp.yate: LOCALLIBS = -lsctp
ilbccodec.yate: ../libs/ilbc/libilbc.a
ilbccodec.yate: LOCALLIBS = -L../libs/ilbc -lilbc
ilbccodec.yate: LOCALFLAGS = @ILBC_INC@

274
modules/server/lksctp.cpp Normal file
View File

@ -0,0 +1,274 @@
/**
* lksctp.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* SCTP sockets provider based on Linux Kernel SCTP
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2009-2010 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <yatephone.h>
#include <string.h>
#include <netinet/sctp.h>
using namespace TelEngine;
namespace { // anonymous
class LKSocket;
class LKModule;
class LKHandler;
class LKSocket : public SctpSocket
{
public:
LKSocket();
LKSocket(SOCKET fd);
virtual ~LKSocket();
virtual bool bindx(ObjList& addrs);
virtual bool connectx(ObjList& addrs);
virtual int sendMsg(const void* buf, int length, int stream, int& flags);
virtual int recvMsg(void* buf, int length, SocketAddr& addr, int& stream, int& flags);
virtual Socket* accept(SocketAddr& addr);
virtual bool setStreams(int inbound, int outbound);
virtual bool getStreams(int& in, int& out);
virtual bool subscribeEvents();
virtual int sendTo(void* buf, int buflen, int stream, SocketAddr& addr, int flags);
bool sctpDown(void* buf);
bool sctpUp(void* buf);
private:
int m_inbound;
int m_outbound;
};
class LKHandler : public MessageHandler
{
public:
LKHandler() : MessageHandler("socket.sctp") { }
virtual bool received(Message &msg);
};
class LKModule : public Module
{
public:
LKModule();
~LKModule();
virtual void initialize();
private:
bool m_init;
};
static LKModule plugin;
/**
* class LKSocket
*/
LKSocket::LKSocket()
{
XDebug(&plugin,DebugAll,"Creating LKSocket [%p]",this);
}
LKSocket::LKSocket(SOCKET fd)
: SctpSocket(fd)
{
XDebug(&plugin,DebugAll,"Creating LKSocket [%p]",this);
}
LKSocket::~LKSocket()
{
XDebug(&plugin,DebugAl,"Destroying LKSocket [%p]",this);
}
bool LKSocket::bindx(ObjList& addresses)
{
struct sockaddr addr[addresses.count()];
int i = 0;
for (ObjList* o = addresses.skipNull();o;o = o->skipNext()) {
SocketAddr* a = static_cast<SocketAddr*>(o->get());
addr[i++] = *(a->address());
}
int error = sctp_bindx(handle(),addr,addresses.count(),SCTP_BINDX_ADD_ADDR);
return (error >= 0);
}
bool LKSocket::connectx(ObjList& addresses)
{
struct sockaddr addr[addresses.count()];
int i = 0;
for (ObjList* o = addresses.skipNull();o;o = o->skipNext()) {
SocketAddr* a = static_cast<SocketAddr*>(o->get());
addr[i++] = *(a->address());
}
int error = sctp_connectx(handle(),addr,addresses.count(),NULL);
return (error >= 0);
}
Socket* LKSocket::accept(SocketAddr& addr)
{
struct sockaddr address;
socklen_t len = 0;
SOCKET sock = acceptHandle(&address,&len);
LKSocket* ret = (sock == invalidHandle()) ? 0 : new LKSocket(sock);
if (ret)
addr.assign(&address,len);
return ret;
}
int LKSocket::recvMsg(void* buf, int length, SocketAddr& addr, int& stream, int& flags)
{
sctp_sndrcvinfo sri;
memset(&sri,0,sizeof(sri));
struct sockaddr address;
socklen_t len = 0;
int flag = 0;
int r = sctp_recvmsg(handle(),buf,length,&address,&len,&sri,&flag);
addr.assign(&address,len);
if (flag & MSG_NOTIFICATION) {
if (sctpDown(buf)) {
flags = 1;
}
else if (sctpUp(buf))
flags = 2;
else
flags = 0;
r = 0;
}
stream = sri.sinfo_stream;
return r;
}
int LKSocket::sendMsg(const void* buf, int length, int stream, int& flags)
{
sctp_sndrcvinfo sri;
memset(&sri,0,sizeof(sri));
sri.sinfo_stream = stream;
int r = sctp_send(handle(),buf,length,&sri,flags);
return r;
}
int LKSocket::sendTo(void* buf, int buflen, int stream, SocketAddr& addr, int flags)
{
return sctp_sendmsg(handle(),buf,buflen,addr.address(),addr.length(),0,flags,stream,0,0);
}
bool LKSocket::setStreams(int inbound, int outbound)
{
sctp_initmsg initMsg;
memset(&initMsg,0,sizeof(initMsg));
initMsg.sinit_max_instreams = inbound;
initMsg.sinit_num_ostreams = outbound;
if (setsockopt(handle(),IPPROTO_SCTP,SCTP_INITMSG,&initMsg,sizeof(initMsg)) < 0) {
DDebug(&plugin,DebugNote,"Unable to set streams number. Error: %s",strerror(errno));
return false;
}
return true;
}
bool LKSocket::subscribeEvents()
{
struct sctp_event_subscribe events;
bzero(&events, sizeof(events));
events.sctp_data_io_event = 1;
events.sctp_send_failure_event = 1;
events.sctp_peer_error_event = 1;
events.sctp_shutdown_event = 1;
events.sctp_association_event = 1;
int ret = setsockopt(handle(),IPPROTO_SCTP,SCTP_EVENTS, &events, sizeof(events));
return (ret != -1);
}
bool LKSocket::getStreams(int& in, int& out)
{
sctp_status status;
memset(&status,0,sizeof(status));
socklen_t len;
if (getsockopt(handle(),IPPROTO_SCTP,SCTP_STATUS, &status,&len) < 0) {
DDebug(&plugin,DebugNote,"Unable to find the number of negotiated streams: %s",
strerror(errno));
return false;
}
XDebug(&plugin,DebugAll,"Sctp streams inbound = %u , outbound = %u",
status.sstat_instrms,status.sstat_outstrms);
m_inbound = status.sstat_instrms;
m_outbound = status.sstat_outstrms;
return true;
}
bool LKSocket::sctpDown(void* buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_LOST:
case SCTP_SHUTDOWN_COMP:
case SCTP_CANT_STR_ASSOC:
case SCTP_RESTART:
return true;
}
return false;
}
bool LKSocket::sctpUp(void* buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_UP:
return true;
}
return false;
}
/**
* class LKHandler
*/
bool LKHandler::received(Message &msg)
{
Socket** ppSock = static_cast<Socket**>(msg.userObject("Socket*"));
int fd = msg.getIntValue("handle",-1);
*ppSock = new LKSocket(fd);
return true;
}
/**
* class LKModule
*/
LKModule::LKModule()
: Module("lksctp","misc",true),
m_init(false)
{
Output("Loading module LKSCTP");
}
LKModule::~LKModule()
{
Output("Unloading module LKSCTP");
}
void LKModule::initialize()
{
if (!m_init) {
Output("Initialize module LKSCTP");
m_init = true;
Engine::install(new LKHandler());
}
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */

File diff suppressed because it is too large Load Diff

View File

@ -419,6 +419,7 @@ public:
SigSS7Router = 0x05 | SigDefaults,
SigSS7Management = 0x06 | SigDefaults,
SigSS7Maintenance = 0x07 | SigDefaults,
SigSS7M2PA = 0x08 | SigOnDemand,
SigSS7Isup = SigTrunk::SS7Isup | SigIsTrunk | SigTopMost,
SigSS7Bicc = SigTrunk::SS7Bicc | SigIsTrunk | SigTopMost,
SigISDNPN = SigTrunk::IsdnPriNet | SigIsTrunk | SigTopMost,
@ -592,6 +593,7 @@ const TokenDict SigFactory::s_compNames[] = {
{ "ss7-mtp3", SigSS7Layer3 },
{ "ss7-snm", SigSS7Management },
{ "ss7-mtn", SigSS7Maintenance },
{ "ss7-m2pa", SigSS7M2PA },
{ "ss7-isup", SigSS7Isup },
{ "ss7-bicc", SigSS7Bicc },
{ "isdn-pri-net", SigISDNPN },
@ -617,6 +619,7 @@ const TokenDict SigFactory::s_compClass[] = {
MAKE_CLASS(SS7Layer3),
MAKE_CLASS(SS7Management),
MAKE_CLASS(SS7Maintenance),
MAKE_CLASS(SS7M2PA),
MAKE_CLASS(SS7Isup),
MAKE_CLASS(SS7Bicc),
MAKE_CLASS(ISDNPN),
@ -656,8 +659,14 @@ SignallingComponent* SigFactory::create(const String& type, const NamedList& nam
return new ISDNQ921Management(*config,name,name.getBoolValue("network",true));
case SigISDNLayer3:
return new ISDNQ931(*config,name);
case SigSS7Layer2:
case SigSS7Layer2: {
String* ty = config->getParam("type");
if (ty && *ty == "ss7-m2pa")
return new SS7M2PA(*config);
return new SS7MTP2(*config);
}
case SigSS7M2PA:
return new SS7M2PA(*config);
case SigSS7Layer3:
return new SS7MTP3(*config);
case SigSS7Router:
@ -1705,6 +1714,10 @@ void SigDriver::copySigMsgParams(SignallingEvent* event, const NamedList& params
prefix = params.getValue("message-oprefix",prefix);
if (prefix.null())
return;
event->message()->params().copySubParams(params,prefix + ".");
return;
prefix << ".";
unsigned int n = params.length();
for (unsigned int i = 0; i < n; i++) {
NamedString* param = params.getParam(i);