Allow a source's wait to drop interval to be set from message.

git-svn-id: http://yate.null.ro/svn/yate/trunk@5495 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2013-04-26 11:47:28 +00:00
parent 6afb29da41
commit a52c828885
1 changed files with 42 additions and 27 deletions

View File

@ -48,9 +48,9 @@ class FileDriver; // The driver
class FileHolder
{
public:
inline FileHolder(const String& name)
inline FileHolder(const String& name, const String& dropChan)
: m_fileName(name), m_fileTime(0), m_fileSize(-1), m_transferred(0),
m_params("")
m_params(""), m_dropChan(dropChan), m_waitOnDropMs(0)
{}
// Get file name
inline const String& fileName() const
@ -71,6 +71,18 @@ public:
}
return m_fileTime;
}
// Set drop chan id
inline void setDropChan(const String& id)
{ m_dropChan = id; }
// Build drop message. Reset drop chan
inline Message* dropMessage() {
if (!m_dropChan)
return 0;
Message* m = new Message("call.drop");
m->addParam("id",m_dropChan);
m_dropChan = "";
return m;
}
// Add MD5 and/or file info parameters
void addFileInfo(NamedList& params, bool md5, bool extra);
// Add saved params to another list
@ -83,6 +95,9 @@ protected:
int64_t m_transferred; // Transferred bytes
String m_md5HexDigest; // MD5 digest of the file
NamedList m_params; // Parameters to copy in notifications
String m_dropChan; // Channel to drop on termination
unsigned int m_waitOnDropMs; // Time to wait to drop channel
};
// A file data source
@ -94,9 +109,6 @@ public:
// Create the data source, and init it
FileSource(const String& file, NamedList* params = 0, const char* chan = 0,
const char* format = 0);
// Set drop chan id
inline void setDropChan(const String& id)
{ m_dropChan = id; }
// Check if this data source is connected
inline bool connected() {
Lock mylock(this);
@ -112,7 +124,6 @@ private:
virtual void destroyed();
String m_notify; // Target id to notify
String m_dropChan; // Channel to drop on termination
bool m_notifyProgress; // Notify file transfer progress
bool m_notifyPercent; // Notify percent changes only
int m_percent; // Notify current percent
@ -133,9 +144,6 @@ public:
// Check if file should be overwritten
inline bool overWrite() const
{ return m_overWrite; }
// Set drop chan id
inline void setDropChan(const String& id)
{ m_dropChan = id; }
// Check if this data consumer is connected
inline bool connected() const
{ return 0 != getConnSource(); }
@ -155,7 +163,6 @@ protected:
private:
String m_notify; // Target id to notify
String m_tmpFileName;
String m_dropChan; // Channel to drop on termination
bool m_notifyProgress; // Notify file transfer progress
bool m_notifyPercent; // Notify percent changes only
int m_percent; // Notify current percent
@ -358,8 +365,7 @@ void FileHolder::addFileInfo(NamedList& params, bool md5, bool extra)
FileSource::FileSource(const String& file, NamedList* params, const char* chan,
const char* format)
: DataSource(!null(format) ? format : "data"),
FileHolder(file),
m_dropChan(chan),
FileHolder(file,chan),
m_notifyProgress(s_notifyProgress),
m_notifyPercent(s_notifyPercent),
m_percent(0),
@ -371,8 +377,11 @@ FileSource::FileSource(const String& file, NamedList* params, const char* chan,
m_notifyProgress = params->getBoolValue("notify_progress",m_notifyProgress);
m_buflen = getIntValue(*params,"send_chunk_size",s_sendChunk,SEND_CHUNK_MIN,true);
m_sleepMs = getIntValue(*params,"send_interval",s_sendIntervalMs,SEND_SLEEP_MIN,false);
m_waitOnDropMs = params->getIntValue("wait_on_drop",0,0);
__plugin.copyParams(m_params,*params);
}
if (!m_sleepMs)
m_sleepMs = SEND_SLEEP_DEF;
Debug(&__plugin,DebugAll,"FileSource('%s') [%p]",file.c_str(),this);
}
@ -501,10 +510,7 @@ void FileSource::run()
break;
}
tStamp += m_sleepMs;
if (m_sleepMs)
Thread::msleep(m_sleepMs,false);
else
Thread::yield(false);
Thread::msleep(m_sleepMs,false);
}
break;
}
@ -525,14 +531,25 @@ void FileSource::run()
FileDriver::notifyStatus(true,m_notify,"terminated",m_fileName,
m_transferred,m_fileSize,error,&m_params);
if (m_dropChan) {
// Wait a while to give some time to the remote party to receive the data
unsigned int n = !error ? s_srcLingerIntervals : 0;
Message* m = dropMessage();
if (m) {
// Wait for a while to give some time to the remote party to receive the data
unsigned int n = 0;
if (!error) {
if (m_waitOnDropMs) {
n = m_waitOnDropMs / m_sleepMs;
if (!n)
n = 1;
}
else
n = s_srcLingerIntervals;
}
XDebug(&__plugin,DebugAll,
"FileSource(%s) dropping chan '%s' waiting %u intervals of %ums [%p]",
m_fileName.c_str(),m->getValue("id"),n,m_sleepMs,this);
for (; n && !Thread::check(false); n--)
Thread::msleep(m_sleepMs ? m_sleepMs : SEND_SLEEP_DEF,false);
Thread::msleep(m_sleepMs,false);
// Drop channel
Message* m = new Message("call.drop");
m->addParam("id",m_dropChan);
if (error) {
if (error == "cancelled")
m->addParam("reason","cancelled");
@ -575,8 +592,7 @@ void FileSource::destroyed()
FileConsumer::FileConsumer(const String& file, NamedList* params, const char* chan,
const char* format)
: DataConsumer(!null(format) ? format : "data"),
FileHolder(file),
m_dropChan(chan),
FileHolder(file,chan),
m_notifyProgress(s_notifyProgress),
m_notifyPercent(s_notifyPercent),
m_percent(0),
@ -742,9 +758,8 @@ void FileConsumer::terminate(const char* error)
// Notify and terminate drop the channel
FileDriver::notifyStatus(false,m_notify,"terminated",m_fileName,
m_transferred,m_fileSize,err,&m_params);
if (m_dropChan) {
Message* m = new Message("call.drop");
m->addParam("id",m_dropChan);
Message* m = dropMessage();
if (m) {
if (err) {
m->addParam("reason","failure");
m->addParam("error",err);