Added configuration file to set default values for sctp parameters.

Show debug message if the sctp parameters values are not reliable for m2pa.


git-svn-id: http://yate.null.ro/svn/yate/trunk@5164 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2012-06-29 14:00:52 +00:00
parent 16d4b8b3c0
commit 36ad5bea74
6 changed files with 331 additions and 33 deletions

44
conf.d/lksctp.conf.sample Normal file
View File

@ -0,0 +1,44 @@
; SCTP sockets default parameters
; This parameters are applied for all sctp sockets created by lksctp module!
; This values can be overridden from sigtransport.conf for each socket.
; Note! On some platforms this parameters may not be supported
;[general]
; Set initial retransmission interval in milliseconds
; Ex: rto_initial = 400
;rto_initial=
; Set maximum retransmission interval in milliseconds
; Ex: rto_max = 400
;rto_max=
; Set minimum retransmission interval in milliseconds
; Ex: rto_min = 200
;rto_min=
; Set heartbeat interval in milliseconds
; Ex: hb_interval = 5000
;hb_interval=
; Set maximum number of retransmissions before the SCTP connection is considered down
; Ex: max_retrans = 5
;max_retrans=
; Enable / disable SCTP heartbeat. Default is enabled
; Ex: hb_enabled = true
;hb_enabled=
; Set sctp delayed ack parameters
; Note! Both sack_delay and sack_freq must be present and have values different than 0 to be successfully applied
; Set ack time interval for sccp to cumulative acknowledge the received packets
; Max value 500 in milliseconds
; Ex: sack_delay = 50
;sack_delay=
; Set the maximum number of unacknowledged packets
; Ex: sack_freq = 2
; Note! If you set this to 1 you will disable the sctp acknowledge algorithm
;sack_freq=

View File

@ -26,6 +26,7 @@
#include <yatephone.h>
#define MAX_UNACK 256
#define AVG_DELAY 100
using namespace TelEngine;
@ -207,6 +208,17 @@ bool SIGTRAN::restart(bool force)
return true;
}
bool SIGTRAN::getSocketParams(const String& params, NamedList& result)
{
m_transMutex.lock();
RefPointer<SIGTransport> trans = m_trans;
m_transMutex.unlock();
if (!trans)
return false;
trans->getSocketParams(params,result);
return true;
}
// Attach or detach an user adaptation layer
void SIGTransport::attach(SIGTRAN* sigtran)
{
@ -1660,12 +1672,28 @@ void SS7M2PA::notifyLayer(SignallingInterface::Notification event)
SS7Layer2::notify();
break;
case SignallingInterface::LinkUp:
{
m_transportState = Established;
Debug(this,DebugInfo,"Interface is up [%p]",this);
String params = "rto_max";
NamedList result("sctp_params");
if (getSocketParams(params,result)) {
int rtoMax = result.getIntValue(YSTRING("rto_max"));
unsigned int maxRetrans = rtoMax + (int)m_confTimer.interval() + AVG_DELAY;
if (maxRetrans > m_ackTimer.interval()) {
Debug(this,DebugConf,
"%s (%d) is greater than ack timer (%d)! Max RTO: %d, conf timer %d, avg delay: %d",
"The maximum time interval to retransmit a packet",
maxRetrans,(int)m_ackTimer.interval(),
rtoMax,(int)m_confTimer.interval(),AVG_DELAY);
}
} else
Debug(this,DebugNote,"Failed to obtain socket params");
if (m_autostart)
startAlignment();
SS7Layer2::notify();
break;
}
case SignallingInterface::HardwareError:
abortAlignment("HardwareError");
if (m_autostart && (m_transportState == Established))

View File

@ -4336,6 +4336,15 @@ public:
virtual void reconnect(bool force = false)
{ }
/**
* Get sctp socket parameters.
* @param params List of parameters to obtain
* @param result List of parameters to fill
* @return True if operation was successful, false if an error occurred
*/
virtual bool getSocketParams(const String& params, NamedList& result)
{ return false; }
protected:
/**
* Constructor
@ -4572,6 +4581,13 @@ public:
*/
bool restart(bool force);
/**
* Get sctp socket parameters.
* @param params List of parameters to obtain
* @param result List of parameters to fill
* @return True if operation was successful, false if an error occurred
*/
bool getSocketParams(const String& params, NamedList& result);
protected:
/**
* Process a complete message

View File

@ -5,7 +5,7 @@
* SCTP sockets provider based on Linux Kernel SCTP
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2009-2010 Null Team
* Copyright (C) 2009-2012 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -51,10 +51,15 @@ public:
{ m_payload = payload; return true; }
virtual int sendTo(void* buf, int buflen, int stream, SocketAddr& addr, int flags);
virtual bool setParams(const NamedList& params);
virtual bool getParams(const String& params, NamedList& result);
virtual bool valid() const;
bool sctpDown(void* buf);
bool sctpUp(void* buf);
bool alive() const;
bool fillRTO(bool min, bool max, bool initial, NamedList& result);
bool fillAddrParams(bool hbEnabled, bool hbInterval, bool maxRetrans,
NamedList& result);
bool fillSackParams(bool sackDelay, bool sackFreq, NamedList& result);
private:
int m_inbound;
int m_outbound;
@ -69,8 +74,34 @@ public:
~LKModule();
virtual void initialize();
virtual void statusParams(String& str);
inline int getRTOMax()
{ return m_rtoMax; }
inline int getRTOMin()
{ return m_rtoMin; }
inline int getRTOInitial()
{ return m_rtoInitial; }
inline int getHBInterval()
{ return m_hbInterval; }
inline bool isHBEnabled()
{ return m_hbEnabled; }
inline int getSACKDelay()
{ return m_sackDelay; }
inline int getSACKFreq()
{ return m_sackFreq; }
inline int getMaxRetrans()
{ return m_maxRetrans; }
private:
bool m_init;
NamedList m_cfg;
unsigned int m_rtoMax;
unsigned int m_rtoMin;
unsigned int m_rtoInitial;
unsigned int m_hbInterval;
bool m_hbEnabled;
unsigned int m_sackDelay;
unsigned int m_sackFreq;
unsigned int m_maxRetrans;
};
static LKModule plugin;
@ -224,25 +255,26 @@ bool LKSocket::setParams(const NamedList& params)
{
bool ret = false;
bool aux = false;
if (params.getParam(YSTRING("rto_initial")) || params.getParam(YSTRING("rto_max")) ||
params.getParam(YSTRING("rto_min"))) {
struct sctp_rtoinfo rto;
bzero(&rto, sizeof(rto));
rto.srto_initial = params.getIntValue("rto_initial",0);
rto.srto_max = params.getIntValue("rto_max",0);
rto.srto_min = params.getIntValue("rto_min",0);
aux = setOption(IPPROTO_SCTP,SCTP_RTOINFO, &rto, sizeof(rto));
if (!aux)
Debug(&plugin,DebugNote,"Failed to set SCTP RTO params! Reason: %s",strerror(errno));
ret |= aux;
}
struct sctp_rtoinfo rto;
bzero(&rto, sizeof(rto));
rto.srto_initial = params.getIntValue("rto_initial",plugin.getRTOInitial());
rto.srto_max = params.getIntValue("rto_max",plugin.getRTOMax());
rto.srto_min = params.getIntValue("rto_min",plugin.getRTOMin());
aux = setOption(IPPROTO_SCTP,SCTP_RTOINFO, &rto, sizeof(rto));
if (!aux)
Debug(&plugin,DebugNote,"Failed to set SCTP RTO params! Reason: %s",strerror(errno));
ret |= aux;
struct sctp_paddrparams paddr_params;
bzero(&paddr_params, sizeof(paddr_params));
if (params.getParam(YSTRING("hb_interval")))
paddr_params.spp_hbinterval = params.getIntValue(YSTRING("hb_interval"),0);
if (params.getParam(YSTRING("max_retrans")))
paddr_params.spp_pathmaxrxt = params.getIntValue(YSTRING("max_retrans"),0);
bool hbEnabled = params.getBoolValue(YSTRING("hb_enabled"),true);
paddr_params.spp_hbinterval = params.getIntValue(YSTRING("hb_interval"),
plugin.getHBInterval());
paddr_params.spp_pathmaxrxt = params.getIntValue(YSTRING("max_retrans"),
plugin.getMaxRetrans());
bool hbEnabled = params.getBoolValue(YSTRING("hb_enabled"),
plugin.isHBEnabled());
paddr_params.spp_flags |= hbEnabled ? SPP_HB_ENABLE : SPP_HB_DISABLE;
if (params.getParam(YSTRING("hb_0")))
#ifdef SPP_HB_TIME_IS_ZERO
@ -256,6 +288,7 @@ bool LKSocket::setParams(const NamedList& params)
#else
Debug(&plugin,DebugNote,"HeartBeat demand is not available");
#endif
aux = setOption(IPPROTO_SCTP,SCTP_PEER_ADDR_PARAMS, &paddr_params, sizeof(paddr_params));
ret |= aux;
if (!aux)
@ -264,13 +297,14 @@ bool LKSocket::setParams(const NamedList& params)
#ifdef HAVE_SACK_INFO_STRUCT
struct sctp_sack_info sack_info;
bzero(&sack_info, sizeof(sack_info));
if (params.getParam(YSTRING("sack_delay"))) {
sack_info.sack_delay = params.getIntValue(YSTRING("sack_delay"));
if (sack_info.sack_delay > 500)
sack_info.sack_delay = 500;
}
if (params.getParam(YSTRING("sack_freq")))
sack_info.sack_freq = params.getIntValue(YSTRING("sack_freq"));
sack_info.sack_delay = params.getIntValue(YSTRING("sack_delay"),
plugin.getSACKDelay());
if (sack_info.sack_delay > 500)
sack_info.sack_delay = 500;
sack_info.sack_freq = params.getIntValue(YSTRING("sack_freq"),
plugin.getSACKFreq());
aux = setOption(IPPROTO_SCTP,SCTP_DELAYED_ACK_TIME, &sack_info, sizeof(sack_info));
ret |= aux;
if (!aux)
@ -278,13 +312,14 @@ bool LKSocket::setParams(const NamedList& params)
#elif HAVE_ASSOC_VALUE_STRUCT
struct sctp_assoc_value sassoc_value;
bzero(&sassoc_value, sizeof(sassoc_value));
if (params.getParam(YSTRING("sack_delay"))) {
sassoc_value.assoc_value = params.getIntValue(YSTRING("sack_delay"));
if (sassoc_value.assoc_value > 500)
sassoc_value.assoc_value = 500;
}
sassoc_value.assoc_value = params.getIntValue(YSTRING("sack_delay"),
plugin.getSACKDelay());
if (sassoc_value.assoc_value > 500)
sassoc_value.assoc_value = 500;
if (params.getParam(YSTRING("sack_freq")))
Debug(&plugin,DebugConf,"Unable to set sack_freq param! sack_info struct is missing!");
aux = setOption(IPPROTO_SCTP,SCTP_DELAYED_ACK_TIME, &sassoc_value, sizeof(sassoc_value));
ret |= aux;
if (!aux)
@ -299,6 +334,127 @@ bool LKSocket::setParams(const NamedList& params)
return ret || aux;
}
bool LKSocket::getParams(const String& params, NamedList& result)
{
ObjList* list = params.split(',',false);
bool ret = fillRTO(list->find(YSTRING("rto_min")) !=0 , list->find(YSTRING("rto_max")) != 0,
list->find(YSTRING("rto_initial")) != 0, result);
ret = fillAddrParams(list->find(YSTRING("hb_enabled")),
list->find(YSTRING("max_retrans")),list->find(YSTRING("hb_interval")),
result) || ret;
ret = fillSackParams(list->find(YSTRING("sack_delay")),
list->find(YSTRING("sack_freq")), result) || ret;
TelEngine::destruct(list);
return Socket::getParams(params,result) || ret;
}
bool LKSocket::fillRTO(bool min, bool max, bool initial, NamedList& result)
{
if (!(min || max || initial))
return false;
struct sctp_rtoinfo rto;
bzero(&rto, sizeof(rto));
socklen_t length = sizeof(rto);
if (!getOption(IPPROTO_SCTP,SCTP_RTOINFO, &rto, &length)) {
Debug(&plugin,DebugNote,"Failed to get SCTP RTO params! Reason: %s",strerror(errno));
return false;
}
if (min) {
result.addParam("rto_min", String(rto.srto_min));
}
if (max) {
result.addParam("rto_max",String(rto.srto_max));
}
if (initial) {
result.addParam("rto_initial",String(rto.srto_initial));
}
return true;
}
bool LKSocket::fillAddrParams(bool hbEnabled, bool hbInterval, bool maxRetrans,
NamedList& result)
{
if (!(hbEnabled || hbInterval || maxRetrans))
return false;
struct sctp_paddrparams paddr_params;
bzero(&paddr_params, sizeof(paddr_params));
socklen_t length = sizeof(paddr_params);
if (!getOption(IPPROTO_SCTP,SCTP_PEER_ADDR_PARAMS, &paddr_params, &length)) {
Debug(&plugin,DebugNote,"Failed to get SCTP paddr params! Reason: %s",
strerror(errno));
return false;
}
if (hbInterval)
result.addParam("hb_interval",String(paddr_params.spp_hbinterval));
if (maxRetrans)
result.addParam("max_retrans",String(paddr_params.spp_pathmaxrxt));
if (!hbEnabled)
return true;
if (paddr_params.spp_flags && SPP_HB_ENABLE)
result.addParam("hb_enabled","true");
else
result.addParam("hb_enabled","false");
return true;
}
bool LKSocket::fillSackParams(bool sackDelay, bool sackFreq, NamedList& result)
{
if (!(sackDelay || sackFreq))
return false;
socklen_t length = 0;
#ifdef SCTP_DELAYED_ACK_TIME
#ifdef HAVE_SACK_INFO_STRUCT
struct sctp_sack_info sack_info;
bzero(&sack_info, sizeof(sack_info));
length = sizeof(sack_info);
if(!getOption(IPPROTO_SCTP,SCTP_DELAYED_ACK_TIME, &sack_info, &length)) {
Debug(&plugin,DebugNote,"Failed to get SCTP sack params! Reason: %s",
strerror(errno));
return false;
}
if (sackDelay)
result.addParam("sack_delay",String(sack_info.sack_delay));
if (sackFreq)
result.addParam("sack_freq",String(sack_info.sack_freq));
#elif HAVE_ASSOC_VALUE_STRUCT
struct sctp_assoc_value sassoc_value;
bzero(&sassoc_value, sizeof(sassoc_value));
length = sizeof(sassoc_value);
if (!getOption(IPPROTO_SCTP,SCTP_DELAYED_ACK_TIME, &sassoc_value, length)) {
Debug(&plugin,DebugNote,"Failed to get SCTP sack params! Reason: %s",
strerror(errno));
return false;
}
if (sackDelay)
result.addParam("sack_delay",String(sassoc_value.assoc_value));
if (sackFreq)
Debug(&plugin,DebugConf,"Unable to set sack_freq param! sack_info struct is missing!");
#else // HAVE_SACK_INFO_STRUCT
Debug(&plugin,DebugConf,"SCTP delayed ack time is unavailable no struct present!!");
#endif
#else // SCTP_DELAYED_ACK_TIME
Debug(&plugin,DebugConf,"SCTP delayed ack time is unavailable");
#endif
return true;
}
bool LKSocket::valid() const
{
if (!Socket::valid())
@ -422,7 +578,9 @@ bool LKHandler::received(Message &msg)
LKModule::LKModule()
: Module("lksctp","misc",true),
m_init(false)
m_init(false), m_cfg("lksctp"), m_rtoMax(400),
m_rtoMin(200), m_rtoInitial(400), m_hbInterval(0),m_hbEnabled(true),
m_sackDelay(50), m_sackFreq(0), m_maxRetrans(0)
{
Output("Loading module LKSCTP");
}
@ -434,12 +592,26 @@ LKModule::~LKModule()
void LKModule::initialize()
{
Output("Initialize module LKSCTP");
if (!m_init) {
Output("Initialize module LKSCTP");
m_init = true;
Engine::install(new LKHandler());
setup();
}
setup();
Configuration cfg(Engine::configFile(name()));
cfg.load();
NamedList* sect = cfg.getSection(YSTRING("general"));
if (!sect)
return;
m_rtoMax = sect->getIntValue(YSTRING("rto_max"),400);
m_rtoMin = sect->getIntValue(YSTRING("rto_min"),200);
m_rtoInitial = sect->getIntValue(YSTRING("rto_initial"),400);
m_hbInterval = sect->getIntValue(YSTRING("hb_interval"),0);
m_hbEnabled = sect->getBoolValue(YSTRING("hb_enabled"),true);
m_sackDelay = sect->getIntValue(YSTRING("sack_delay"),50);
m_sackFreq = sect->getIntValue(YSTRING("sack_freq"),0);
m_maxRetrans = sect->getIntValue(YSTRING("max_retrans"),0);
}
void LKModule::statusParams(String& str)

View File

@ -128,6 +128,7 @@ public:
virtual void listen(int maxConn) = 0;
virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0;
virtual void setSocket(Socket* s) = 0;
virtual bool getSocketParams(const String& params, NamedList& result) = 0;
virtual void reset() = 0;
void reconnect()
{ m_reconnect = true; }
@ -194,6 +195,7 @@ public:
bool bindSocket();
static SignallingComponent* create(const String& type,const NamedList& params);
virtual bool transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0);
virtual bool getSocketParams(const String& params, NamedList& result);
private:
TReader* m_reader;
Mutex m_readerMutex;
@ -220,6 +222,7 @@ public:
virtual void listen(int maxConn)
{ }
virtual bool sendBuffer(int streamId = 0);
virtual bool getSocketParams(const String& params, NamedList& result);
virtual void reset()
{ m_transport->resetReader(this); }
void connectionDown(bool stop = true);
@ -246,6 +249,7 @@ public:
virtual bool needConnect()
{ return m_transport && m_transport->status() == Transport::Down && !m_transport->listen(); }
virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0);
virtual bool getSocketParams(const String& params, NamedList& result);
virtual void setSocket(Socket* s);
virtual void listen(int maxConn)
{ m_socket->listen(maxConn); }
@ -611,6 +615,13 @@ void Transport::reconnect(bool force)
m_reader->reconnect();
}
bool Transport::getSocketParams(const String& params, NamedList& result)
{
if (m_reader)
return m_reader->getSocketParams(params,result);
return false;
}
bool Transport::initialize(const NamedList* params)
{
Configuration cfg(Engine::configFile("sigtransport"));
@ -1274,6 +1285,15 @@ void StreamReader::connectionDown(bool stopTh) {
}
}
bool StreamReader::getSocketParams(const String& params, NamedList& result)
{
Lock reconLock(m_sending,SignallingEngine::maxLockWait());
if (!(reconLock.locked() && m_socket))
return false;
m_socket->getParams(params,result);
return true;
}
void StreamReader::stopThread()
{
m_transport->setStatus(Transport::Down);
@ -1496,6 +1516,15 @@ bool MessageReader::readData()
return true;
}
bool MessageReader::getSocketParams(const String& params, NamedList& result)
{
Lock reconLock(m_sending,SignallingEngine::maxLockWait());
if (!(reconLock.locked() && m_socket))
return false;
m_socket->getParams(params,result);
return true;
}
void MessageReader::updateTransportStatus(int status)
{
if (status == Transport::Up)

View File

@ -5667,6 +5667,15 @@ public:
virtual bool setParams(const NamedList& params)
{ return false; }
/**
* Get specific socket parameters.
* @param params Coma separated list of parameters to obtain
* @param result List of parameters to fill
* @return True if operation was successful, false if an error occurred
*/
virtual bool getParams(const String& params, NamedList& result)
{ return false; }
/**
* Set the Type of Service on the IP level of this socket
* @param tos New TOS bits to set