Fixed buffer fragmenting algorithm. RTCP can be disabled by chan.rtp message.

git-svn-id: http://yate.null.ro/svn/yate/trunk@1029 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2006-09-13 18:51:10 +00:00
parent 2b90733b21
commit 880b3c1458
1 changed files with 25 additions and 20 deletions

View File

@ -32,6 +32,7 @@
#define MIN_PORT 16384
#define MAX_PORT 32768
#define BUF_SIZE 240
#define BUF_PREF 160
using namespace TelEngine;
namespace { // anonymous
@ -91,9 +92,9 @@ class YRTPWrapper : public RefObject
friend class YRTPConsumer;
friend class YRTPSession;
public:
YRTPWrapper(const char *localip, CallEndpoint* conn = 0, const char* media = "audio", RTPSession::Direction direction = RTPSession::SendRecv);
YRTPWrapper(const char *localip, CallEndpoint* conn = 0, const char* media = "audio", RTPSession::Direction direction = RTPSession::SendRecv, bool rtcp = true);
~YRTPWrapper();
void setupRTP(const char* localip);
void setupRTP(const char* localip, bool rtcp);
bool startRTP(const char* raddr, unsigned int rport, const Message& msg);
bool sendDTMF(char dtmf, int duration = 0);
void gotDTMF(char tone);
@ -175,8 +176,11 @@ public:
YRTPConsumer(YRTPWrapper* wrap);
~YRTPConsumer();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
inline void setSplitable()
{ m_splitable = (m_format == "alaw") || (m_format = "mulaw"); }
private:
YRTPWrapper* m_wrap;
bool m_splitable;
};
class AttachHandler : public MessageHandler
@ -216,13 +220,13 @@ static ObjList s_calls;
static Mutex s_mutex;
static Mutex s_srcMutex;
YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* media, RTPSession::Direction direction)
YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* media, RTPSession::Direction direction, bool rtcp)
: m_rtp(0), m_dir(direction), m_conn(conn),
m_source(0), m_consumer(0), m_media(media),
m_bufsize(0), m_port(0)
{
Debug(&splugin,DebugAll,"YRTPWrapper::YRTPWrapper('%s',%p,'%s',%s) [%p]",
localip,conn,media,lookup(direction,dict_yrtp_dir),this);
Debug(&splugin,DebugAll,"YRTPWrapper::YRTPWrapper('%s',%p,'%s',%s,%s) [%p]",
localip,conn,media,lookup(direction,dict_yrtp_dir),String::boolText(rtcp),this);
m_id = "yrtp/";
m_id << (unsigned int)::random();
if (conn)
@ -230,7 +234,7 @@ YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* me
m_audio = (m_media == "audio");
s_mutex.lock();
s_calls.append(this);
setupRTP(localip);
setupRTP(localip,rtcp);
s_mutex.unlock();
}
@ -283,7 +287,7 @@ YRTPWrapper* YRTPWrapper::find(const String& id)
return 0;
}
void YRTPWrapper::setupRTP(const char* localip)
void YRTPWrapper::setupRTP(const char* localip, bool rtcp)
{
Debug(&splugin,DebugAll,"YRTPWrapper::setupRTP(\"%s\") [%p]",localip,this);
m_rtp = new YRTPSession(this);
@ -308,7 +312,7 @@ void YRTPWrapper::setupRTP(const char* localip)
for (; attempt; attempt--) {
int lport = (minport + (::random() % (maxport - minport))) & 0xfffe;
addr.port(lport);
if (m_rtp->localAddr(addr)) {
if (m_rtp->localAddr(addr,rtcp)) {
m_port = lport;
Debug(&splugin,DebugAll,"Session %p bound to %s:%u [%p]",m_rtp,localip,m_port,this);
return;
@ -391,6 +395,7 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, const Message&
m_conn->setConsumer(0,m_media);
}
m_consumer->m_format = format;
m_consumer->setSplitable();
if (m_conn) {
m_conn->setConsumer(m_consumer,m_media);
m_consumer->deref();
@ -538,7 +543,7 @@ YRTPSource::~YRTPSource()
}
YRTPConsumer::YRTPConsumer(YRTPWrapper *wrap)
: m_wrap(wrap)
: m_wrap(wrap), m_splitable(false)
{
Debug(&splugin,DebugAll,"YRTPConsumer::YRTPConsumer(%p) [%p]",wrap,this);
m_format.clear();
@ -563,24 +568,24 @@ void YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp)
{
if (!(m_wrap && m_wrap->bufSize() && m_wrap->rtp()))
return;
unsigned long delta = (tStamp && m_timestamp) ? (tStamp - m_timestamp) : 0;
XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, delta=%lu ts=%lu [%p]",
data.length(),delta,tStamp,this);
XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, ts=%lu [%p]",
data.length(),tStamp,this);
unsigned int buf = m_wrap->bufSize();
const char* ptr = (const char*)data.data();
unsigned int len = data.length();
// make it safe to break a long octet buffer
if (len == delta)
delta = 0;
while (len && m_wrap && m_wrap->rtp()) {
unsigned int sz = len;
if (m_wrap->isAudio() && (sz > buf) && !delta) {
DDebug(&splugin,DebugAll,"Creating %u bytes fragment of %u bytes buffer",buf,len);
sz = buf;
if (m_splitable && m_wrap->isAudio() && (sz > buf)) {
// divide evenly a buffer that is multiple of preferred size
if ((buf > BUF_PREF) && ((len % BUF_PREF) == 0))
sz = BUF_PREF;
else
sz = buf;
DDebug(&splugin,DebugAll,"Creating %u bytes fragment of %u bytes buffer",sz,len);
}
m_wrap->rtp()->rtpSendData(false,tStamp,ptr,sz);
// if timestamp increment is not provided we have to guess...
tStamp += delta ? delta : sz;
tStamp += sz;
len -= sz;
ptr += sz;
}
@ -716,7 +721,7 @@ bool RtpHandler::received(Message &msg)
}
msg.setParam("localip",lip);
w = new YRTPWrapper(lip,ch,media,direction);
w = new YRTPWrapper(lip,ch,media,direction,msg.getBoolValue("rtcp",true));
w->setMaster(msg.getValue("id"));
}
else {