Added a limit of how many messages can be queued to an external script.

git-svn-id: http://voip.null.ro/svn/yate@6400 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2020-04-15 15:39:14 +00:00
parent f099430479
commit 1db8917b18
3 changed files with 38 additions and 11 deletions

View File

@ -9,6 +9,9 @@
; priority: int: Priority of the call.execute handler ; priority: int: Priority of the call.execute handler
;priority=100 ;priority=100
; maxqueue: int: How many messages can be queued in a receiver, zero disables limit
;maxqueue=1000
; timeout: int: How many milliseconds to wait for a module to answer ; timeout: int: How many milliseconds to wait for a module to answer
;timeout=10000 ;timeout=10000

View File

@ -204,6 +204,7 @@ disconnected (bool) - Enable or disable sending "chan.disconnected" me
trackparam (string) - Set the message handler tracking name, cannot be made empty<br /> trackparam (string) - Set the message handler tracking name, cannot be made empty<br />
reason (string) - Set the disconnect reason that gets received by the peer channel<br /> reason (string) - Set the disconnect reason that gets received by the peer channel<br />
bufsize (int) - Communication buffer size in octets, initially 8192<br /> bufsize (int) - Communication buffer size in octets, initially 8192<br />
maxqueue (int) - Maximum number of queued messages, zero to disable check<br />
timeout (int) - Timeout in milliseconds for answering to messages<br /> timeout (int) - Timeout in milliseconds for answering to messages<br />
timebomb (bool) - Terminate this module instance if a timeout occured<br /> timebomb (bool) - Terminate this module instance if a timeout occured<br />
settime (bool) - Force current time in generated messages ignoring protocol<br /> settime (bool) - Force current time in generated messages ignoring protocol<br />

View File

@ -50,6 +50,11 @@ namespace { // anonymous
// Maximum length of the incoming line buffer // Maximum length of the incoming line buffer
#define MAX_INCOMING_LINE 65536 #define MAX_INCOMING_LINE 65536
// Default maximum messages queued in a receiver
#define DEF_MAXQUEUE 1000
// Maximum maximum messages queued in a receiver
#define MAX_MAXQUEUE 10000
// Default message timeout in milliseconds // Default message timeout in milliseconds
#define MSG_TIMEOUT 10000 #define MSG_TIMEOUT 10000
@ -63,6 +68,7 @@ static Mutex s_mutex(true,"ExtModule");
static Mutex s_uses(false,"ExtModUse"); static Mutex s_uses(false,"ExtModUse");
static int s_waitFlush = WAIT_FLUSH; static int s_waitFlush = WAIT_FLUSH;
static int s_timeout = MSG_TIMEOUT; static int s_timeout = MSG_TIMEOUT;
static int s_maxQueue = DEF_MAXQUEUE;
static bool s_settime = false; static bool s_settime = false;
static bool s_timebomb = false; static bool s_timebomb = false;
static bool s_pluginSafe = true; static bool s_pluginSafe = true;
@ -263,6 +269,7 @@ private:
bool m_dead; bool m_dead;
bool m_quit; bool m_quit;
int m_use; int m_use;
int m_qLength;
pid_t m_pid; pid_t m_pid;
Stream* m_in; Stream* m_in;
Stream* m_out; Stream* m_out;
@ -275,6 +282,7 @@ private:
bool m_setdata; bool m_setdata;
bool m_settime; bool m_settime;
bool m_writing; bool m_writing;
int m_maxQueue;
int m_timeout; int m_timeout;
bool m_timebomb; bool m_timebomb;
bool m_restart; bool m_restart;
@ -788,11 +796,11 @@ bool ExtModReceiver::unuse()
ExtModReceiver::ExtModReceiver(const char* script, const char* args, File* ain, File* aout, ExtModChan* chan) ExtModReceiver::ExtModReceiver(const char* script, const char* args, File* ain, File* aout, ExtModChan* chan)
: Mutex(true,"ExtModReceiver"), : Mutex(true,"ExtModReceiver"),
m_role(RoleUnknown), m_dead(false), m_quit(false), m_use(1), m_pid(-1), m_role(RoleUnknown), m_dead(false), m_quit(false), m_use(1), m_qLength(0), m_pid(-1),
m_in(0), m_out(0), m_ain(ain), m_aout(aout), m_in(0), m_out(0), m_ain(ain), m_aout(aout),
m_chan(chan), m_watcher(0), m_chan(chan), m_watcher(0),
m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false), m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false),
m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false), m_maxQueue(s_maxQueue), m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_buffer(0,DEF_INCOMING_LINE), m_script(script), m_args(args), m_trackName(s_trackName) m_buffer(0,DEF_INCOMING_LINE), m_script(script), m_args(args), m_trackName(s_trackName)
{ {
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this); Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this);
@ -806,11 +814,11 @@ ExtModReceiver::ExtModReceiver(const char* script, const char* args, File* ain,
ExtModReceiver::ExtModReceiver(const char* name, Stream* io, ExtModChan* chan, int role, const char* conn) ExtModReceiver::ExtModReceiver(const char* name, Stream* io, ExtModChan* chan, int role, const char* conn)
: Mutex(true,"ExtModReceiver"), : Mutex(true,"ExtModReceiver"),
m_role(role), m_dead(false), m_quit(false), m_use(1), m_pid(-1), m_role(role), m_dead(false), m_quit(false), m_use(1), m_qLength(0), m_pid(-1),
m_in(io), m_out(io), m_ain(0), m_aout(0), m_in(io), m_out(io), m_ain(0), m_aout(0),
m_chan(chan), m_watcher(0), m_chan(chan), m_watcher(0),
m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false), m_selfWatch(false), m_reenter(false), m_setdata(true), m_settime(s_settime), m_writing(false),
m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false), m_maxQueue(s_maxQueue), m_timeout(s_timeout), m_timebomb(s_timebomb), m_restart(false), m_scripted(false),
m_buffer(0,DEF_INCOMING_LINE), m_script(name), m_args(conn), m_trackName(s_trackName) m_buffer(0,DEF_INCOMING_LINE), m_script(name), m_args(conn), m_trackName(s_trackName)
{ {
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",%p,%p) [%p]",name,io,chan,this); Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",%p,%p) [%p]",name,io,chan,this);
@ -913,9 +921,9 @@ bool ExtModReceiver::flush()
} }
bool flushed = false; bool flushed = false;
if (m_waiting.get()) { if (m_waiting.get()) {
Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]", Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]",m_qLength,this);
m_waiting.count(),this);
m_waiting.clear(); m_waiting.clear();
m_qLength = 0;
needWait = flushed = true; needWait = flushed = true;
} }
unlock(); unlock();
@ -942,8 +950,10 @@ void ExtModReceiver::die(bool clearChan)
if (m_dead) if (m_dead)
return; return;
Lock mylock(this); Lock mylock(this);
if (m_dead) if (m_dead) {
DDebug(DebugInfo,"ExtModReceiver::die() pid=%d is already dead [%p]",m_pid,this);
return; return;
}
m_dead = true; m_dead = true;
m_quit = true; m_quit = true;
use(); use();
@ -1004,6 +1014,10 @@ bool ExtModReceiver::received(Message &msg, int id)
if (m && m->belongsTo(this)) if (m && m->belongsTo(this))
ok = false; ok = false;
} }
if (ok && m_maxQueue && (m_qLength >= m_maxQueue)) {
Debug(DebugWarn,"ExtMod already having %u queued messages [%p]",m_qLength,this);
ok = false;
}
if (!ok) { if (!ok) {
unlock(); unlock();
return false; return false;
@ -1014,8 +1028,9 @@ bool ExtModReceiver::received(Message &msg, int id)
u_int64_t tout = (m_timeout > 0) ? Time::now() + 1000 * m_timeout : 0; u_int64_t tout = (m_timeout > 0) ? Time::now() + 1000 * m_timeout : 0;
MsgHolder h(msg); MsgHolder h(msg);
if (outputLine(msg.encode(h.m_id))) { if (outputLine(msg.encode(h.m_id))) {
m_qLength++;
m_waiting.append(&h)->setDelete(false); m_waiting.append(&h)->setDelete(false);
DDebug(DebugAll,"ExtMod queued message %p '%s' [%p]",&msg,msg.c_str(),this); DDebug(DebugAll,"ExtMod queued message #%u %p '%s' [%p]",m_qLength,&msg,msg.c_str(),this);
} }
else { else {
Debug(DebugWarn,"ExtMod could not queue message %p '%s' [%p]",&msg,msg.c_str(),this); Debug(DebugWarn,"ExtMod could not queue message %p '%s' [%p]",&msg,msg.c_str(),this);
@ -1033,7 +1048,8 @@ bool ExtModReceiver::received(Message &msg, int id)
if (ok && tout && (Time::now() > tout)) { if (ok && tout && (Time::now() > tout)) {
Alarm("extmodule","performance",DebugWarn,"Message %p '%s' did not return in %d msec [%p]", Alarm("extmodule","performance",DebugWarn,"Message %p '%s' did not return in %d msec [%p]",
&msg,msg.c_str(),m_timeout,this); &msg,msg.c_str(),m_timeout,this);
m_waiting.remove(&h,false); if (m_waiting.remove(&h,false) && (m_qLength > 0))
m_qLength--;
ok = false; ok = false;
fail = true; fail = true;
} }
@ -1178,7 +1194,7 @@ void ExtModReceiver::run()
int bufspace = m_buffer.length() - posinbuf - 1; int bufspace = m_buffer.length() - posinbuf - 1;
int readsize = (m_in && bufspace) ? m_in->readData(buffer+posinbuf,bufspace) : 0; int readsize = (m_in && bufspace) ? m_in->readData(buffer+posinbuf,bufspace) : 0;
unlock(); unlock();
if (unuse()) if (unuse() || m_dead)
return; return;
if (!bufspace) { if (!bufspace) {
Debug("ExtModule",DebugWarn,"Overflow reading in buffer of length %u, closing [%p]", Debug("ExtModule",DebugWarn,"Overflow reading in buffer of length %u, closing [%p]",
@ -1401,7 +1417,8 @@ bool ExtModReceiver::processLine(const char* line)
m_chan->waiting(true); m_chan->waiting(true);
} }
msg->unlock(); msg->unlock();
p->remove(false); if (p->remove(false) && (m_qLength > 0))
m_qLength--;
return false; return false;
} }
} }
@ -1530,6 +1547,11 @@ bool ExtModReceiver::processLine(const char* line)
val = m_timebomb; val = m_timebomb;
ok = true; ok = true;
} }
else if (id == "maxqueue") {
m_maxQueue = val.toInteger(m_maxQueue,0,0,MAX_MAXQUEUE);
val = m_maxQueue;
ok = true;
}
else if (id == "bufsize") { else if (id == "bufsize") {
unsigned int len = val.toInteger(m_buffer.length(),0, unsigned int len = val.toInteger(m_buffer.length(),0,
MIN_INCOMING_LINE,MAX_INCOMING_LINE); MIN_INCOMING_LINE,MAX_INCOMING_LINE);
@ -2043,6 +2065,7 @@ void ExtModulePlugin::initialize()
Output("Initializing module ExtModule"); Output("Initializing module ExtModule");
s_cfg = Engine::configFile("extmodule"); s_cfg = Engine::configFile("extmodule");
s_cfg.load(); s_cfg.load();
s_maxQueue = s_cfg.getIntValue("general","maxqueue",DEF_MAXQUEUE,0,MAX_MAXQUEUE);
s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT); s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT);
s_timebomb = s_cfg.getBoolValue("general","timebomb",false); s_timebomb = s_cfg.getBoolValue("general","timebomb",false);
s_settime = s_cfg.getBoolValue("general","settime",false); s_settime = s_cfg.getBoolValue("general","settime",false);