diff --git a/conf.d/yrtpchan.conf.sample b/conf.d/yrtpchan.conf.sample index f36ce70f..9a8a95a2 100644 --- a/conf.d/yrtpchan.conf.sample +++ b/conf.d/yrtpchan.conf.sample @@ -36,6 +36,14 @@ ; drillhole: bool: Attempt to drill a hole through a firewall or NAT ;drillhole=disable in server mode, enable in client mode +; minjitter: int: Amount to attempt to keep in the dejitter buffer in msec +; Valid values 5 to maxjitter-30, negative disables dejitter buffer +;minjitter=50 + +; maxjitter: int: Maximum dejitter buffer size in msec +; Valid values 50 to 1000, 0 disables dejitter buffer +;maxjitter=120 in client mode, 0 in server mode + ; thread: keyword: Default priority of the data service threads ; Can be one of: lowest, low, normal, high, highest ; It is a bad idea to set a low priority for anything but testing diff --git a/libs/yrtp/dejitter.cpp b/libs/yrtp/dejitter.cpp index 0235edc4..e1857847 100644 --- a/libs/yrtp/dejitter.cpp +++ b/libs/yrtp/dejitter.cpp @@ -25,112 +25,132 @@ using namespace TelEngine; +namespace { // anonymous + class RTPDelayedData : public DataBlock { public: - inline RTPDelayedData(u_int64_t when, bool mark, unsigned int tstamp, const void* data, int len) - : DataBlock(const_cast(data),len), m_scheduled(when), m_marker(mark), m_timestamp(tstamp) + inline RTPDelayedData(u_int64_t when, bool mark, int payload, + unsigned int tstamp, const void* data, int len) + : DataBlock(const_cast(data),len), m_scheduled(when), + m_marker(mark), m_payload(payload), m_timestamp(tstamp) { } inline u_int64_t scheduled() const { return m_scheduled; } inline bool marker() const { return m_marker; } + int payload() const + { return m_payload; } inline unsigned int timestamp() const { return m_timestamp; } - inline void schedule(u_int64_t when) - { m_scheduled = when; } private: u_int64_t m_scheduled; bool m_marker; + int m_payload; unsigned int m_timestamp; }; +}; // anonymous namespace + RTPDejitter::RTPDejitter(RTPReceiver* receiver, unsigned int mindelay, unsigned int maxdelay) - : m_receiver(receiver), m_mindelay(mindelay), m_maxdelay(maxdelay), - m_headStamp(0), m_tailStamp(0), m_headTime(0), m_tailTime(0) + : m_receiver(receiver), m_minDelay(mindelay), m_maxDelay(maxdelay), + m_headStamp(0), m_tailStamp(0), m_headTime(0), m_sampRate(125000), m_fastRate(10) { - if (m_maxdelay > 2000000) - m_maxdelay = 2000000; - if (m_maxdelay < 50000) - m_maxdelay = 50000; - if (m_mindelay < 5000) - m_mindelay = 5000; - if (m_mindelay > m_maxdelay - 20000) - m_mindelay = m_maxdelay - 20000; + if (m_maxDelay > 1000000) + m_maxDelay = 1000000; + if (m_maxDelay < 50000) + m_maxDelay = 50000; + if (m_minDelay < 5000) + m_minDelay = 5000; + if (m_minDelay > m_maxDelay - 30000) + m_minDelay = m_maxDelay - 30000; } RTPDejitter::~RTPDejitter() { - DDebug(DebugMild,"Dejitter destroyed with %u packets [%p]",m_packets.count(),this); + DDebug(DebugInfo,"Dejitter destroyed with %u packets [%p]",m_packets.count(),this); } -bool RTPDejitter::rtpRecvData(bool marker, unsigned int timestamp, const void* data, int len) +void RTPDejitter::clear() +{ + m_packets.clear(); + m_headStamp = m_tailStamp = 0; +} + +bool RTPDejitter::rtpRecv(bool marker, int payload, unsigned int timestamp, const void* data, int len) { u_int64_t when = 0; bool insert = false; - if (m_headStamp && (m_tailStamp != m_headStamp)) { - // at least one packet got out of the queue and another is waiting + if (m_headStamp) { + // at least one packet got out of the queue int dTs = timestamp - m_headStamp; - if (dTs < 0) { - DDebug(DebugMild,"Dejitter got TS %u while last delivered was %u [%p]",timestamp,m_headStamp,this); + if (dTs == 0) + return true; + else if (dTs < 0) { + DDebug(DebugNote,"Dejitter dropping TS %u, last delivered was %u [%p]", + timestamp,m_headStamp,this); return false; } - u_int64_t bufTime = m_tailTime - m_headTime; - int bufStamp = m_tailStamp - m_headStamp; - if (bufStamp <= 0) - Debug(DebugWarn,"Oops! %d [%p]",bufStamp,this); - // interpolate or extrapolate the delivery time for the packet - // rounding down is ok - the buffer will slowly shrink as expected - when = dTs * bufTime / bufStamp; -DDebug(DebugMild,"Dejitter when=" FMT64U " dTs=%d bufTime=" FMT64U " bufSTamp=%d [%p]", - when,dTs,bufTime,bufStamp,this); - when += m_headTime; - if (dTs > bufStamp) { - bufTime = when - m_headTime; - if (bufTime > m_maxdelay) { - // buffer has lagged behind so we must drop some old packets - // and also reschedule the others - DDebug(DebugMild,"Dejitter grew to " FMT64U " [%p]",bufTime,this); -// when = m_headTime - + u_int64_t now = Time::now(); + int64_t rate = 1000 * (now - m_headTime) / dTs; + if (rate > 0) { + if (m_sampRate) { + if (m_fastRate) { + m_fastRate--; + rate = (7 * m_sampRate + rate) >> 3; + } + else + rate = (31 * m_sampRate + rate) >> 5; } + if (rate > 150000) + rate = 150000; // 6.67 kHz + else if (rate < 20000) + rate = 20000; // 50 kHz + m_sampRate = rate; + XDebug(DebugAll,"Time per sample " FMT64, rate); } else - // timestamp falls inside buffer so we must insert the packet - // between the already scheduled ones - insert = true; - } - else { + rate = m_sampRate; + if (rate > 0) + when = m_headTime + (dTs * rate / 1000) + m_minDelay; + else + when = now + m_minDelay; if (m_tailStamp) { - int dTs = timestamp - m_tailStamp; - if (dTs < 0) { - // until we get some statistics don't attempt to reorder packets - DDebug(DebugMild,"Dejitter got TS %u while last queued was %u [%p]",timestamp,m_tailStamp,this); + if (timestamp == m_tailStamp) + return true; + if (((int)(timestamp - m_tailStamp)) < 0) + insert = true; + else if (when > now + m_maxDelay) { + DDebug(DebugNote,"Packet with TS %u falls after max buffer [%p]",timestamp,this); return false; } } + } + else { + if (m_tailStamp && ((int)(timestamp - m_tailStamp)) < 0) { + // until we get some statistics don't attempt to reorder packets + DDebug(DebugNote,"Dejitter got TS %u while last queued was %u [%p]",timestamp,m_tailStamp,this); + return false; + } // we got no packets out yet so use a fixed interval - when = Time::now() + m_mindelay; + when = Time::now() + m_minDelay; } - if (when > m_tailTime) { - // remember the latest in the queue - m_tailStamp = timestamp; - m_tailTime = when; - } - RTPDelayedData* packet = new RTPDelayedData(when,marker,timestamp,data,len); if (insert) { - for (ObjList* l = m_packets.skipNull();l;l = l->skipNext()) { + for (ObjList* l = m_packets.skipNull(); l; l = l->skipNext()) { RTPDelayedData* pkt = static_cast(l->get()); - if (pkt->scheduled() > when) { - DDebug(DebugMild,"Dejitter inserting packet %p before %p [%p]",packet,pkt,this); - l->insert(packet); + if (pkt->timestamp() == timestamp) + return true; + if (pkt->timestamp() > timestamp && pkt->scheduled() > when) { + l->insert(new RTPDelayedData(when,marker,payload,timestamp,data,len)); return true; } } } - m_packets.append(packet); + m_tailStamp = timestamp; + m_packets.append(new RTPDelayedData(when,marker,payload,timestamp,data,len)); return true; } @@ -138,18 +158,33 @@ void RTPDejitter::timerTick(const Time& when) { RTPDelayedData* packet = static_cast(m_packets.get()); if (!packet) { - // queue is empty - reset timestamps - m_headStamp = m_tailStamp = 0; + m_tailStamp = 0; + if (m_headStamp && (m_headTime + m_maxDelay < when)) + m_headStamp = 0; return; } - if (packet->scheduled() <= when) { - // remember the last delivered - m_headStamp = packet->timestamp(); - m_headTime = when; - if (m_receiver) - m_receiver->rtpRecvData(packet->marker(),packet->timestamp(),packet->data(),packet->length()); - m_packets.remove(packet); + if (packet->scheduled() > when) + return; + m_packets.remove(packet,false); + // remember the last delivered + m_headStamp = packet->timestamp(); + m_headTime = packet->scheduled(); + if (m_receiver) + m_receiver->rtpRecv(packet->marker(),packet->payload(), + packet->timestamp(),packet->data(),packet->length()); + TelEngine::destruct(packet); + unsigned int count = 0; + while ((packet = static_cast(m_packets.get()))) { + long delayed = when - packet->scheduled(); + if (delayed <= 0 || delayed <= m_minDelay) + break; + // we are too delayed - probably rtpRecv() took too long to complete... + m_packets.remove(packet,true); + count++; } + if (count) + Debug((count > 1) ? DebugMild : DebugNote, + "Dropped %u delayed packet%s from buffer [%p]",count,((count > 1) ? "s" : ""),this); } /* vi: set ts=8 sw=4 sts=4 noet: */ diff --git a/libs/yrtp/secure.cpp b/libs/yrtp/secure.cpp index bc488450..8ed69506 100644 --- a/libs/yrtp/secure.cpp +++ b/libs/yrtp/secure.cpp @@ -244,9 +244,9 @@ bool RTPSecure::deriveKey(Cipher& cipher, DataBlock& key, unsigned int len, unsi bool RTPSecure::rtpDecipher(unsigned char* data, int len, const void* secData, u_int32_t ssrc, u_int64_t seq) { - if (!m_rtpEncrypted) + if (!(m_rtpEncrypted && data)) return true; - if (!(len && data && m_rtpCipher)) + if (!(len && m_rtpCipher)) return false; DataBlock iv(m_cipherSalt); int i; diff --git a/libs/yrtp/session.cpp b/libs/yrtp/session.cpp index 26bb1b19..9f2ac758 100644 --- a/libs/yrtp/session.cpp +++ b/libs/yrtp/session.cpp @@ -164,6 +164,8 @@ void RTPReceiver::rtpData(const void* data, int len) } if (len < 0) return; + if (!len) + pc = 0; // grab some data at the first packet received or resync if (m_ssrcInit) { @@ -173,6 +175,8 @@ void RTPReceiver::rtpData(const void* data, int len) m_seq = seq-1; m_seqCount = 0; m_warn = true; + if (m_dejitter) + m_dejitter->clear(); } if (ss != m_ssrc) { @@ -191,19 +195,35 @@ void RTPReceiver::rtpData(const void* data, int len) m_seq = seq; m_ts = ts - m_tsLast; m_seqCount = 0; + if (m_dejitter) + m_dejitter->clear(); // drop this packet, next packet will come in correctly return; } - // substraction with overflow + u_int32_t rollover = m_rollover; + // compare unsigned to detect rollovers + if (seq < m_seq) + rollover++; + u_int64_t seq48 = rollover; + seq48 = (seq48 << 16) | seq; + + // if some security data is present authenticate the packet now + if (secPtr && !rtpCheckIntegrity((const unsigned char*)data,len + padding + 12,secPtr + m_mkiLen,ss,seq48)) + return; + + // substraction with overflow to compute sequence difference int16_t ds = seq - m_seq; if (ds != 1) m_seqLost++; - // check if we received duplicate or delayed packet + if (ds == 0) + return; + + // check if we received a packet too much out of sequence // be much more tolerant when authenticating as we cannot resync - if ((ds <= 0) || ((ds > SEQ_DESYNC_COUNT) && !secPtr)) { + if ((ds <= -SEQ_DESYNC_COUNT) || ((ds > SEQ_DESYNC_COUNT) && !secPtr)) { m_ioLostPkt++; - if (ds && !secPtr) { + if (!secPtr) { // try to resync sequence unless we need to authenticate if (m_seqCount++) { if (seq == ++m_seqSync) { @@ -216,6 +236,8 @@ void RTPReceiver::rtpData(const void* data, int len) m_seqCount = 0; m_warn = true; m_syncLost++; + if (m_dejitter) + m_dejitter->clear(); // drop this packet, next packet will come in correctly return; } @@ -226,38 +248,32 @@ void RTPReceiver::rtpData(const void* data, int len) else m_seqSync = seq; } - if (m_warn && ds) { + if (m_warn) { m_warn = false; Debug(DebugWarn,"RTP received SEQ %u while current is %u [%p]",seq,m_seq,this); } return; } - if (ds > 1) - m_ioLostPkt += (ds - 1); - - u_int32_t rollover = m_rollover; - // this time compare unsigned to detect rollovers - if (seq < m_seq) - rollover++; - u_int64_t seq48 = rollover; - seq48 = (seq48 << 16) | seq; - - // if some security data is present authenticate the packet now - if (secPtr && !rtpCheckIntegrity((const unsigned char*)data,len + padding + 12,secPtr + m_mkiLen,ss,seq48)) + if (!rtpDecipher(const_cast(pc),len + padding,secPtr,ss,seq48)) return; - // keep track of the last valid sequence number and timestamp we have seen - m_seq = seq; - m_rollover = rollover; m_tsLast = ts - m_ts; m_seqCount = 0; m_ioPackets++; m_ioOctets += len; + // keep track of the last valid sequence number and timestamp we have seen + m_seq = seq; + m_rollover = rollover; - if (!len) - pc = 0; - if (rtpDecipher(const_cast(pc),len + padding,secPtr,ss,seq48)) + if (m_dejitter) { + if (!m_dejitter->rtpRecv(marker,typ,m_tsLast,pc,len)) + m_ioLostPkt++; + return; + } + if (ds > 1) + m_ioLostPkt += (ds - 1); + if (ds >= 1) rtpRecv(marker,typ,m_tsLast,pc,len); } @@ -274,15 +290,8 @@ bool RTPReceiver::rtpRecv(bool marker, int payload, unsigned int timestamp, cons if (payload == silencePayload()) return decodeSilence(marker,timestamp,data,len); finishEvent(timestamp); - if (payload == dataPayload()) { -#if 0 -// dejitter is broken - don't use it - if (m_dejitter) - return m_dejitter->rtpRecvData(marker,timestamp,data,len); - else -#endif - return rtpRecvData(marker,timestamp,data,len); - } + if (payload == dataPayload()) + return rtpRecvData(marker,timestamp,data,len); return false; } diff --git a/libs/yrtp/yatertp.h b/libs/yrtp/yatertp.h index ba4a8224..e37d0fbf 100644 --- a/libs/yrtp/yatertp.h +++ b/libs/yrtp/yatertp.h @@ -360,14 +360,20 @@ public: /** * Process and store one RTP data packet * @param marker True if the marker bit is set in data packet + * @param payload Payload number * @param timestamp Sampling instant of the packet data * @param data Pointer to data block to process * @param len Length of the data block in bytes - * @return True if data was handled + * @return True if the data packet was queued */ - virtual bool rtpRecvData(bool marker, unsigned int timestamp, + virtual bool rtpRecv(bool marker, int payload, unsigned int timestamp, const void* data, int len); + /** + * Clear the delayed packets queue and all variables + */ + void clear(); + protected: /** * Method called periodically to keep the data flowing @@ -378,12 +384,13 @@ protected: private: ObjList m_packets; RTPReceiver* m_receiver; - unsigned int m_mindelay; - unsigned int m_maxdelay; + unsigned int m_minDelay; + unsigned int m_maxDelay; unsigned int m_headStamp; unsigned int m_tailStamp; u_int64_t m_headTime; - u_int64_t m_tailTime; + u_int64_t m_sampRate; + unsigned char m_fastRate; }; /** @@ -586,6 +593,7 @@ private: class YRTP_API RTPReceiver : public RTPBaseIO { friend class RTPSession; + friend class RTPDejitter; public: /** * Constructor diff --git a/modules/yrtpchan.cpp b/modules/yrtpchan.cpp index 51b2523c..e397a812 100644 --- a/modules/yrtpchan.cpp +++ b/modules/yrtpchan.cpp @@ -106,8 +106,8 @@ static int s_interval= 0; static int s_timeout = 0; static int s_udptlTimeout = 0; -static int s_minjitter = 0; -static int s_maxjitter = 0; +static int s_minJitter = 0; +static int s_maxJitter = 0; class YRTPSource; class YRTPConsumer; @@ -659,8 +659,6 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg) } Debug(&splugin,DebugInfo,"RTP starting format '%s' payload %d [%p]",format,payload,this); - int minJitter = msg.getIntValue(YSTRING("minjitter"),s_minjitter); - int maxJitter = msg.getIntValue(YSTRING("maxjitter"),s_maxjitter); if (!setRemote(raddr,rport,msg)) return false; @@ -726,8 +724,13 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg) } setTimeout(msg,s_timeout); m_rtp->setReports(msg.getIntValue(YSTRING("rtcp_interval"),s_interval)); -// if (maxJitter > 0) -// m_rtp->setDejitter(minJitter*1000,maxJitter*1000); + // dejittering is only meaningful for audio + if (isAudio()){ + int minJitter = msg.getIntValue(YSTRING("minjitter"),s_minJitter); + int maxJitter = msg.getIntValue(YSTRING("maxjitter"),s_maxJitter); + if (minJitter >= 0 && maxJitter > 0) + m_rtp->setDejitter(minJitter*1000,maxJitter*1000); + } m_bufsize = s_bufsize; return true; } @@ -1830,8 +1833,8 @@ void YRTPPlugin::initialize() s_minport = cfg.getIntValue("general","minport",MIN_PORT); s_maxport = cfg.getIntValue("general","maxport",MAX_PORT); s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE); - s_minjitter = cfg.getIntValue("general","minjitter"); - s_maxjitter = cfg.getIntValue("general","maxjitter"); + s_minJitter = cfg.getIntValue("general","minjitter",50); + s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0); s_tos = cfg.getIntValue("general","tos",dict_tos); s_localip = cfg.getValue("general","localip"); s_autoaddr = cfg.getBoolValue("general","autoaddr",true);