Added a limit of how many messages can be queued to an external script.

git-svn-id: http://yate.null.ro/svn/yate/trunk@6400 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2020-04-15 15:39:14 +00:00
parent 7ac83d9218
commit 03985c7dbb
3 changed files with 38 additions and 11 deletions

View File

@ -9,6 +9,9 @@
; priority: int: Priority of the call.execute handler
;priority=100
; maxqueue: int: How many messages can be queued in a receiver, zero disables limit
;maxqueue=1000
; timeout: int: How many milliseconds to wait for a module to answer
;timeout=10000

View File

@ -204,6 +204,7 @@ disconnected (bool) - Enable or disable sending "chan.disconnected" me
trackparam (string) - Set the message handler tracking name, cannot be made empty<br />
reason (string) - Set the disconnect reason that gets received by the peer channel<br />
bufsize (int) - Communication buffer size in octets, initially 8192<br />
maxqueue (int) - Maximum number of queued messages, zero to disable check<br />
timeout (int) - Timeout in milliseconds for answering to messages<br />
timebomb (bool) - Terminate this module instance if a timeout occured<br />
settime (bool) - Force current time in generated messages ignoring protocol<br />

View File

@ -50,6 +50,11 @@ namespace { // anonymous
// Maximum length of the incoming line buffer
#define MAX_INCOMING_LINE 65536
// Default maximum messages queued in a receiver
#define DEF_MAXQUEUE 1000
// Maximum maximum messages queued in a receiver
#define MAX_MAXQUEUE 10000
// Default message timeout in milliseconds
#define MSG_TIMEOUT 10000
@ -63,6 +68,7 @@ static Mutex s_mutex(true,"ExtModule");
static Mutex s_uses(false,"ExtModUse");
static int s_waitFlush = WAIT_FLUSH;
static int s_timeout = MSG_TIMEOUT;
static int s_maxQueue = DEF_MAXQUEUE;
static bool s_settime = false;
static bool s_timebomb = false;
static bool s_pluginSafe = true;
@ -263,6 +269,7 @@ private:
bool m_dead;
bool m_quit;
int m_use;
int m_qLength;
pid_t m_pid;
Stream* m_in;
Stream* m_out;
@ -275,6 +282,7 @@ private:
bool m_setdata;
bool m_settime;
bool m_writing;
int m_maxQueue;
int m_timeout;
bool m_timebomb;
bool m_restart;
@ -788,11 +796,11 @@ bool ExtModReceiver::unuse()
ExtModReceiver::ExtModReceiver(const char* script, const char* args, File* ain, File* aout, ExtModChan* chan)
: Mutex(true,"ExtModReceiver"),
m_role(RoleUnknown), m_dead(false), m_quit(false), m_use(1), m_pid(-1),
m_role(RoleUnknown), m_dead(false), m_quit(false), m_use(1), m_qLength(0), m_pid(-1),
m_in(0), m_out(0), m_ain(ain), m_aout(aout),
m_chan(chan), m_watcher(0),
m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false),
m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_maxQueue(s_maxQueue), m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_buffer(0,DEF_INCOMING_LINE), m_script(script), m_args(args), m_trackName(s_trackName)
{
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this);
@ -806,11 +814,11 @@ ExtModReceiver::ExtModReceiver(const char* script, const char* args, File* ain,
ExtModReceiver::ExtModReceiver(const char* name, Stream* io, ExtModChan* chan, int role, const char* conn)
: Mutex(true,"ExtModReceiver"),
m_role(role), m_dead(false), m_quit(false), m_use(1), m_pid(-1),
m_role(role), m_dead(false), m_quit(false), m_use(1), m_qLength(0), m_pid(-1),
m_in(io), m_out(io), m_ain(0), m_aout(0),
m_chan(chan), m_watcher(0),
m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false),
m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_maxQueue(s_maxQueue), m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_buffer(0,DEF_INCOMING_LINE), m_script(name), m_args(conn), m_trackName(s_trackName)
{
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",%p,%p) [%p]",name,io,chan,this);
@ -913,9 +921,9 @@ bool ExtModReceiver::flush()
}
bool flushed = false;
if (m_waiting.get()) {
Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]",
m_waiting.count(),this);
Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]",m_qLength,this);
m_waiting.clear();
m_qLength = 0;
needWait = flushed = true;
}
unlock();
@ -942,8 +950,10 @@ void ExtModReceiver::die(bool clearChan)
if (m_dead)
return;
Lock mylock(this);
if (m_dead)
if (m_dead) {
DDebug(DebugInfo,"ExtModReceiver::die() pid=%d is already dead [%p]",m_pid,this);
return;
}
m_dead = true;
m_quit = true;
use();
@ -1004,6 +1014,10 @@ bool ExtModReceiver::received(Message &msg, int id)
if (m && m->belongsTo(this))
ok = false;
}
if (ok && m_maxQueue && (m_qLength >= m_maxQueue)) {
Debug(DebugWarn,"ExtMod already having %u queued messages [%p]",m_qLength,this);
ok = false;
}
if (!ok) {
unlock();
return false;
@ -1014,8 +1028,9 @@ bool ExtModReceiver::received(Message &msg, int id)
u_int64_t tout = (m_timeout > 0) ? Time::now() + 1000 * m_timeout : 0;
MsgHolder h(msg);
if (outputLine(msg.encode(h.m_id))) {
m_qLength++;
m_waiting.append(&h)->setDelete(false);
DDebug(DebugAll,"ExtMod queued message %p '%s' [%p]",&msg,msg.c_str(),this);
DDebug(DebugAll,"ExtMod queued message #%u %p '%s' [%p]",m_qLength,&msg,msg.c_str(),this);
}
else {
Debug(DebugWarn,"ExtMod could not queue message %p '%s' [%p]",&msg,msg.c_str(),this);
@ -1033,7 +1048,8 @@ bool ExtModReceiver::received(Message &msg, int id)
if (ok && tout && (Time::now() > tout)) {
Alarm("extmodule","performance",DebugWarn,"Message %p '%s' did not return in %d msec [%p]",
&msg,msg.c_str(),m_timeout,this);
m_waiting.remove(&h,false);
if (m_waiting.remove(&h,false) && (m_qLength > 0))
m_qLength--;
ok = false;
fail = true;
}
@ -1178,7 +1194,7 @@ void ExtModReceiver::run()
int bufspace = m_buffer.length() - posinbuf - 1;
int readsize = (m_in && bufspace) ? m_in->readData(buffer+posinbuf,bufspace) : 0;
unlock();
if (unuse())
if (unuse() || m_dead)
return;
if (!bufspace) {
Debug("ExtModule",DebugWarn,"Overflow reading in buffer of length %u, closing [%p]",
@ -1401,7 +1417,8 @@ bool ExtModReceiver::processLine(const char* line)
m_chan->waiting(true);
}
msg->unlock();
p->remove(false);
if (p->remove(false) && (m_qLength > 0))
m_qLength--;
return false;
}
}
@ -1530,6 +1547,11 @@ bool ExtModReceiver::processLine(const char* line)
val = m_timebomb;
ok = true;
}
else if (id == "maxqueue") {
m_maxQueue = val.toInteger(m_maxQueue,0,0,MAX_MAXQUEUE);
val = m_maxQueue;
ok = true;
}
else if (id == "bufsize") {
unsigned int len = val.toInteger(m_buffer.length(),0,
MIN_INCOMING_LINE,MAX_INCOMING_LINE);
@ -2043,6 +2065,7 @@ void ExtModulePlugin::initialize()
Output("Initializing module ExtModule");
s_cfg = Engine::configFile("extmodule");
s_cfg.load();
s_maxQueue = s_cfg.getIntValue("general","maxqueue",DEF_MAXQUEUE,0,MAX_MAXQUEUE);
s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT);
s_timebomb = s_cfg.getBoolValue("general","timebomb",false);
s_settime = s_cfg.getBoolValue("general","settime",false);