Fixed RTP dejitter code, use it automatically in client mode.

git-svn-id: http://yate.null.ro/svn/yate/trunk@4961 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2012-03-13 09:43:47 +00:00
parent 6d3767365f
commit fa6153dd82
6 changed files with 177 additions and 114 deletions

View File

@ -36,6 +36,14 @@
; drillhole: bool: Attempt to drill a hole through a firewall or NAT
;drillhole=disable in server mode, enable in client mode
; minjitter: int: Amount to attempt to keep in the dejitter buffer in msec
; Valid values 5 to maxjitter-30, negative disables dejitter buffer
;minjitter=50
; maxjitter: int: Maximum dejitter buffer size in msec
; Valid values 50 to 1000, 0 disables dejitter buffer
;maxjitter=120 in client mode, 0 in server mode
; thread: keyword: Default priority of the data service threads
; Can be one of: lowest, low, normal, high, highest
; It is a bad idea to set a low priority for anything but testing

View File

@ -25,112 +25,132 @@
using namespace TelEngine;
namespace { // anonymous
class RTPDelayedData : public DataBlock
{
public:
inline RTPDelayedData(u_int64_t when, bool mark, unsigned int tstamp, const void* data, int len)
: DataBlock(const_cast<void*>(data),len), m_scheduled(when), m_marker(mark), m_timestamp(tstamp)
inline RTPDelayedData(u_int64_t when, bool mark, int payload,
unsigned int tstamp, const void* data, int len)
: DataBlock(const_cast<void*>(data),len), m_scheduled(when),
m_marker(mark), m_payload(payload), m_timestamp(tstamp)
{ }
inline u_int64_t scheduled() const
{ return m_scheduled; }
inline bool marker() const
{ return m_marker; }
int payload() const
{ return m_payload; }
inline unsigned int timestamp() const
{ return m_timestamp; }
inline void schedule(u_int64_t when)
{ m_scheduled = when; }
private:
u_int64_t m_scheduled;
bool m_marker;
int m_payload;
unsigned int m_timestamp;
};
}; // anonymous namespace
RTPDejitter::RTPDejitter(RTPReceiver* receiver, unsigned int mindelay, unsigned int maxdelay)
: m_receiver(receiver), m_mindelay(mindelay), m_maxdelay(maxdelay),
m_headStamp(0), m_tailStamp(0), m_headTime(0), m_tailTime(0)
: m_receiver(receiver), m_minDelay(mindelay), m_maxDelay(maxdelay),
m_headStamp(0), m_tailStamp(0), m_headTime(0), m_sampRate(125000), m_fastRate(10)
{
if (m_maxdelay > 2000000)
m_maxdelay = 2000000;
if (m_maxdelay < 50000)
m_maxdelay = 50000;
if (m_mindelay < 5000)
m_mindelay = 5000;
if (m_mindelay > m_maxdelay - 20000)
m_mindelay = m_maxdelay - 20000;
if (m_maxDelay > 1000000)
m_maxDelay = 1000000;
if (m_maxDelay < 50000)
m_maxDelay = 50000;
if (m_minDelay < 5000)
m_minDelay = 5000;
if (m_minDelay > m_maxDelay - 30000)
m_minDelay = m_maxDelay - 30000;
}
RTPDejitter::~RTPDejitter()
{
DDebug(DebugMild,"Dejitter destroyed with %u packets [%p]",m_packets.count(),this);
DDebug(DebugInfo,"Dejitter destroyed with %u packets [%p]",m_packets.count(),this);
}
bool RTPDejitter::rtpRecvData(bool marker, unsigned int timestamp, const void* data, int len)
void RTPDejitter::clear()
{
m_packets.clear();
m_headStamp = m_tailStamp = 0;
}
bool RTPDejitter::rtpRecv(bool marker, int payload, unsigned int timestamp, const void* data, int len)
{
u_int64_t when = 0;
bool insert = false;
if (m_headStamp && (m_tailStamp != m_headStamp)) {
// at least one packet got out of the queue and another is waiting
if (m_headStamp) {
// at least one packet got out of the queue
int dTs = timestamp - m_headStamp;
if (dTs < 0) {
DDebug(DebugMild,"Dejitter got TS %u while last delivered was %u [%p]",timestamp,m_headStamp,this);
if (dTs == 0)
return true;
else if (dTs < 0) {
DDebug(DebugNote,"Dejitter dropping TS %u, last delivered was %u [%p]",
timestamp,m_headStamp,this);
return false;
}
u_int64_t bufTime = m_tailTime - m_headTime;
int bufStamp = m_tailStamp - m_headStamp;
if (bufStamp <= 0)
Debug(DebugWarn,"Oops! %d [%p]",bufStamp,this);
// interpolate or extrapolate the delivery time for the packet
// rounding down is ok - the buffer will slowly shrink as expected
when = dTs * bufTime / bufStamp;
DDebug(DebugMild,"Dejitter when=" FMT64U " dTs=%d bufTime=" FMT64U " bufSTamp=%d [%p]",
when,dTs,bufTime,bufStamp,this);
when += m_headTime;
if (dTs > bufStamp) {
bufTime = when - m_headTime;
if (bufTime > m_maxdelay) {
// buffer has lagged behind so we must drop some old packets
// and also reschedule the others
DDebug(DebugMild,"Dejitter grew to " FMT64U " [%p]",bufTime,this);
// when = m_headTime -
u_int64_t now = Time::now();
int64_t rate = 1000 * (now - m_headTime) / dTs;
if (rate > 0) {
if (m_sampRate) {
if (m_fastRate) {
m_fastRate--;
rate = (7 * m_sampRate + rate) >> 3;
}
else
rate = (31 * m_sampRate + rate) >> 5;
}
if (rate > 150000)
rate = 150000; // 6.67 kHz
else if (rate < 20000)
rate = 20000; // 50 kHz
m_sampRate = rate;
XDebug(DebugAll,"Time per sample " FMT64, rate);
}
else
// timestamp falls inside buffer so we must insert the packet
// between the already scheduled ones
insert = true;
}
else {
rate = m_sampRate;
if (rate > 0)
when = m_headTime + (dTs * rate / 1000) + m_minDelay;
else
when = now + m_minDelay;
if (m_tailStamp) {
int dTs = timestamp - m_tailStamp;
if (dTs < 0) {
// until we get some statistics don't attempt to reorder packets
DDebug(DebugMild,"Dejitter got TS %u while last queued was %u [%p]",timestamp,m_tailStamp,this);
if (timestamp == m_tailStamp)
return true;
if (((int)(timestamp - m_tailStamp)) < 0)
insert = true;
else if (when > now + m_maxDelay) {
DDebug(DebugNote,"Packet with TS %u falls after max buffer [%p]",timestamp,this);
return false;
}
}
}
else {
if (m_tailStamp && ((int)(timestamp - m_tailStamp)) < 0) {
// until we get some statistics don't attempt to reorder packets
DDebug(DebugNote,"Dejitter got TS %u while last queued was %u [%p]",timestamp,m_tailStamp,this);
return false;
}
// we got no packets out yet so use a fixed interval
when = Time::now() + m_mindelay;
when = Time::now() + m_minDelay;
}
if (when > m_tailTime) {
// remember the latest in the queue
m_tailStamp = timestamp;
m_tailTime = when;
}
RTPDelayedData* packet = new RTPDelayedData(when,marker,timestamp,data,len);
if (insert) {
for (ObjList* l = m_packets.skipNull();l;l = l->skipNext()) {
for (ObjList* l = m_packets.skipNull(); l; l = l->skipNext()) {
RTPDelayedData* pkt = static_cast<RTPDelayedData*>(l->get());
if (pkt->scheduled() > when) {
DDebug(DebugMild,"Dejitter inserting packet %p before %p [%p]",packet,pkt,this);
l->insert(packet);
if (pkt->timestamp() == timestamp)
return true;
if (pkt->timestamp() > timestamp && pkt->scheduled() > when) {
l->insert(new RTPDelayedData(when,marker,payload,timestamp,data,len));
return true;
}
}
}
m_packets.append(packet);
m_tailStamp = timestamp;
m_packets.append(new RTPDelayedData(when,marker,payload,timestamp,data,len));
return true;
}
@ -138,18 +158,33 @@ void RTPDejitter::timerTick(const Time& when)
{
RTPDelayedData* packet = static_cast<RTPDelayedData*>(m_packets.get());
if (!packet) {
// queue is empty - reset timestamps
m_headStamp = m_tailStamp = 0;
m_tailStamp = 0;
if (m_headStamp && (m_headTime + m_maxDelay < when))
m_headStamp = 0;
return;
}
if (packet->scheduled() <= when) {
// remember the last delivered
m_headStamp = packet->timestamp();
m_headTime = when;
if (m_receiver)
m_receiver->rtpRecvData(packet->marker(),packet->timestamp(),packet->data(),packet->length());
m_packets.remove(packet);
if (packet->scheduled() > when)
return;
m_packets.remove(packet,false);
// remember the last delivered
m_headStamp = packet->timestamp();
m_headTime = packet->scheduled();
if (m_receiver)
m_receiver->rtpRecv(packet->marker(),packet->payload(),
packet->timestamp(),packet->data(),packet->length());
TelEngine::destruct(packet);
unsigned int count = 0;
while ((packet = static_cast<RTPDelayedData*>(m_packets.get()))) {
long delayed = when - packet->scheduled();
if (delayed <= 0 || delayed <= m_minDelay)
break;
// we are too delayed - probably rtpRecv() took too long to complete...
m_packets.remove(packet,true);
count++;
}
if (count)
Debug((count > 1) ? DebugMild : DebugNote,
"Dropped %u delayed packet%s from buffer [%p]",count,((count > 1) ? "s" : ""),this);
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -244,9 +244,9 @@ bool RTPSecure::deriveKey(Cipher& cipher, DataBlock& key, unsigned int len, unsi
bool RTPSecure::rtpDecipher(unsigned char* data, int len, const void* secData, u_int32_t ssrc, u_int64_t seq)
{
if (!m_rtpEncrypted)
if (!(m_rtpEncrypted && data))
return true;
if (!(len && data && m_rtpCipher))
if (!(len && m_rtpCipher))
return false;
DataBlock iv(m_cipherSalt);
int i;

View File

@ -164,6 +164,8 @@ void RTPReceiver::rtpData(const void* data, int len)
}
if (len < 0)
return;
if (!len)
pc = 0;
// grab some data at the first packet received or resync
if (m_ssrcInit) {
@ -173,6 +175,8 @@ void RTPReceiver::rtpData(const void* data, int len)
m_seq = seq-1;
m_seqCount = 0;
m_warn = true;
if (m_dejitter)
m_dejitter->clear();
}
if (ss != m_ssrc) {
@ -191,19 +195,35 @@ void RTPReceiver::rtpData(const void* data, int len)
m_seq = seq;
m_ts = ts - m_tsLast;
m_seqCount = 0;
if (m_dejitter)
m_dejitter->clear();
// drop this packet, next packet will come in correctly
return;
}
// substraction with overflow
u_int32_t rollover = m_rollover;
// compare unsigned to detect rollovers
if (seq < m_seq)
rollover++;
u_int64_t seq48 = rollover;
seq48 = (seq48 << 16) | seq;
// if some security data is present authenticate the packet now
if (secPtr && !rtpCheckIntegrity((const unsigned char*)data,len + padding + 12,secPtr + m_mkiLen,ss,seq48))
return;
// substraction with overflow to compute sequence difference
int16_t ds = seq - m_seq;
if (ds != 1)
m_seqLost++;
// check if we received duplicate or delayed packet
if (ds == 0)
return;
// check if we received a packet too much out of sequence
// be much more tolerant when authenticating as we cannot resync
if ((ds <= 0) || ((ds > SEQ_DESYNC_COUNT) && !secPtr)) {
if ((ds <= -SEQ_DESYNC_COUNT) || ((ds > SEQ_DESYNC_COUNT) && !secPtr)) {
m_ioLostPkt++;
if (ds && !secPtr) {
if (!secPtr) {
// try to resync sequence unless we need to authenticate
if (m_seqCount++) {
if (seq == ++m_seqSync) {
@ -216,6 +236,8 @@ void RTPReceiver::rtpData(const void* data, int len)
m_seqCount = 0;
m_warn = true;
m_syncLost++;
if (m_dejitter)
m_dejitter->clear();
// drop this packet, next packet will come in correctly
return;
}
@ -226,38 +248,32 @@ void RTPReceiver::rtpData(const void* data, int len)
else
m_seqSync = seq;
}
if (m_warn && ds) {
if (m_warn) {
m_warn = false;
Debug(DebugWarn,"RTP received SEQ %u while current is %u [%p]",seq,m_seq,this);
}
return;
}
if (ds > 1)
m_ioLostPkt += (ds - 1);
u_int32_t rollover = m_rollover;
// this time compare unsigned to detect rollovers
if (seq < m_seq)
rollover++;
u_int64_t seq48 = rollover;
seq48 = (seq48 << 16) | seq;
// if some security data is present authenticate the packet now
if (secPtr && !rtpCheckIntegrity((const unsigned char*)data,len + padding + 12,secPtr + m_mkiLen,ss,seq48))
if (!rtpDecipher(const_cast<unsigned char*>(pc),len + padding,secPtr,ss,seq48))
return;
// keep track of the last valid sequence number and timestamp we have seen
m_seq = seq;
m_rollover = rollover;
m_tsLast = ts - m_ts;
m_seqCount = 0;
m_ioPackets++;
m_ioOctets += len;
// keep track of the last valid sequence number and timestamp we have seen
m_seq = seq;
m_rollover = rollover;
if (!len)
pc = 0;
if (rtpDecipher(const_cast<unsigned char*>(pc),len + padding,secPtr,ss,seq48))
if (m_dejitter) {
if (!m_dejitter->rtpRecv(marker,typ,m_tsLast,pc,len))
m_ioLostPkt++;
return;
}
if (ds > 1)
m_ioLostPkt += (ds - 1);
if (ds >= 1)
rtpRecv(marker,typ,m_tsLast,pc,len);
}
@ -274,15 +290,8 @@ bool RTPReceiver::rtpRecv(bool marker, int payload, unsigned int timestamp, cons
if (payload == silencePayload())
return decodeSilence(marker,timestamp,data,len);
finishEvent(timestamp);
if (payload == dataPayload()) {
#if 0
// dejitter is broken - don't use it
if (m_dejitter)
return m_dejitter->rtpRecvData(marker,timestamp,data,len);
else
#endif
return rtpRecvData(marker,timestamp,data,len);
}
if (payload == dataPayload())
return rtpRecvData(marker,timestamp,data,len);
return false;
}

View File

@ -360,14 +360,20 @@ public:
/**
* Process and store one RTP data packet
* @param marker True if the marker bit is set in data packet
* @param payload Payload number
* @param timestamp Sampling instant of the packet data
* @param data Pointer to data block to process
* @param len Length of the data block in bytes
* @return True if data was handled
* @return True if the data packet was queued
*/
virtual bool rtpRecvData(bool marker, unsigned int timestamp,
virtual bool rtpRecv(bool marker, int payload, unsigned int timestamp,
const void* data, int len);
/**
* Clear the delayed packets queue and all variables
*/
void clear();
protected:
/**
* Method called periodically to keep the data flowing
@ -378,12 +384,13 @@ protected:
private:
ObjList m_packets;
RTPReceiver* m_receiver;
unsigned int m_mindelay;
unsigned int m_maxdelay;
unsigned int m_minDelay;
unsigned int m_maxDelay;
unsigned int m_headStamp;
unsigned int m_tailStamp;
u_int64_t m_headTime;
u_int64_t m_tailTime;
u_int64_t m_sampRate;
unsigned char m_fastRate;
};
/**
@ -586,6 +593,7 @@ private:
class YRTP_API RTPReceiver : public RTPBaseIO
{
friend class RTPSession;
friend class RTPDejitter;
public:
/**
* Constructor

View File

@ -106,8 +106,8 @@ static int s_interval= 0;
static int s_timeout = 0;
static int s_udptlTimeout = 0;
static int s_minjitter = 0;
static int s_maxjitter = 0;
static int s_minJitter = 0;
static int s_maxJitter = 0;
class YRTPSource;
class YRTPConsumer;
@ -659,8 +659,6 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg)
}
Debug(&splugin,DebugInfo,"RTP starting format '%s' payload %d [%p]",format,payload,this);
int minJitter = msg.getIntValue(YSTRING("minjitter"),s_minjitter);
int maxJitter = msg.getIntValue(YSTRING("maxjitter"),s_maxjitter);
if (!setRemote(raddr,rport,msg))
return false;
@ -726,8 +724,13 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg)
}
setTimeout(msg,s_timeout);
m_rtp->setReports(msg.getIntValue(YSTRING("rtcp_interval"),s_interval));
// if (maxJitter > 0)
// m_rtp->setDejitter(minJitter*1000,maxJitter*1000);
// dejittering is only meaningful for audio
if (isAudio()){
int minJitter = msg.getIntValue(YSTRING("minjitter"),s_minJitter);
int maxJitter = msg.getIntValue(YSTRING("maxjitter"),s_maxJitter);
if (minJitter >= 0 && maxJitter > 0)
m_rtp->setDejitter(minJitter*1000,maxJitter*1000);
}
m_bufsize = s_bufsize;
return true;
}
@ -1830,8 +1833,8 @@ void YRTPPlugin::initialize()
s_minport = cfg.getIntValue("general","minport",MIN_PORT);
s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
s_minjitter = cfg.getIntValue("general","minjitter");
s_maxjitter = cfg.getIntValue("general","maxjitter");
s_minJitter = cfg.getIntValue("general","minjitter",50);
s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
s_tos = cfg.getIntValue("general","tos",dict_tos);
s_localip = cfg.getValue("general","localip");
s_autoaddr = cfg.getBoolValue("general","autoaddr",true);