RTP library and channel started to actually work.

git-svn-id: http://yate.null.ro/svn/yate/trunk@341 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-05-09 22:55:47 +00:00
parent 554da5ba98
commit 34b6db3a40
4 changed files with 148 additions and 59 deletions

View File

@ -109,6 +109,10 @@ void RTPReceiver::rtpData(const void* data, int len)
rtpRecv(marker,typ,ts-m_ts,pc,len);
}
void RTPReceiver::rtcpData(const void* data, int len)
{
}
bool RTPReceiver::rtpRecv(bool marker, int payload, unsigned int timestamp, const void* data, int len)
{
if ((payload != dataPayload()) && (payload != eventPayload()))
@ -263,10 +267,12 @@ void RTPSender::timerTick(const Time& when)
RTPSession::RTPSession()
: m_transport(0), m_direction(FullStop), m_send(0), m_recv(0)
{
XDebug(DebugInfo,"RTPSession::RTPSession() [%p]",this);
}
RTPSession::~RTPSession()
{
XDebug(DebugInfo,"RTPSession::~RTPSession() [%p]",this);
direction(FullStop);
sender(0);
receiver(0);
@ -312,8 +318,38 @@ RTPReceiver* RTPSession::createReceiver()
return new RTPReceiver(this);
}
RTPTransport* RTPSession::createTransport()
{
return new RTPTransport(group());
}
bool RTPSession::initGroup()
{
if (m_group)
return true;
// try to pick the grop from the transport if it has one
if (m_transport)
group(m_transport->group());
if (!m_group)
group(new RTPGroup());
if (!m_group)
return false;
if (m_transport)
m_transport->group(m_group);
return true;
}
bool RTPSession::initTransport()
{
if (m_transport)
return true;
transport(createTransport());
return (m_transport != 0);
}
void RTPSession::transport(RTPTransport* trans)
{
XDebug(DebugInfo,"RTPSession::transport(%p) old=%p [%p]",trans,m_transport,this);
if (trans == m_transport)
return;
if (m_transport)
@ -327,6 +363,7 @@ void RTPSession::transport(RTPTransport* trans)
void RTPSession::sender(RTPSender* send)
{
XDebug(DebugInfo,"RTPSession::sender(%p) old=%p [%p]",send,m_send,this);
if (send == m_send)
return;
RTPSender* tmp = m_send;
@ -337,6 +374,7 @@ void RTPSession::sender(RTPSender* send)
void RTPSession::receiver(RTPReceiver* recv)
{
XDebug(DebugInfo,"RTPSession::receiver(%p) old=%p [%p]",recv,m_recv,this);
if (recv == m_recv)
return;
RTPReceiver* tmp = m_recv;
@ -347,6 +385,7 @@ void RTPSession::receiver(RTPReceiver* recv)
bool RTPSession::direction(Direction dir)
{
XDebug(DebugInfo,"RTPSession::direction(%d) old=%d [%p]",dir,m_direction,this);
if ((dir != FullStop) && !m_transport)
return false;
// make sure we have sender and/or receiver for our direction

View File

@ -30,14 +30,17 @@ using namespace TelEngine;
RTPGroup::RTPGroup(Priority prio)
: Mutex(true), Thread("RTP Group",prio)
{
XDebug(DebugInfo,"RTPGroup::RTPGroup() [%p]",this);
}
RTPGroup::~RTPGroup()
{
XDebug(DebugInfo,"RTPGroup::~RTPGroup() [%p]",this);
}
void RTPGroup::cleanup()
{
XDebug(DebugInfo,"RTPGroup::cleanup() [%p]",this);
lock();
ObjList* l = &m_processors;
for (;l;l = l->next()) {
@ -70,6 +73,7 @@ void RTPGroup::run()
void RTPGroup::join(RTPProcessor* proc)
{
XDebug(DebugAll,"RTPGroup::join(%p) [%p]",proc,this);
lock();
m_processors.append(proc)->setDelete(false);
startup();
@ -78,6 +82,7 @@ void RTPGroup::join(RTPProcessor* proc)
void RTPGroup::part(RTPProcessor* proc)
{
XDebug(DebugAll,"RTPGroup::part(%p) [%p]",proc,this);
lock();
m_processors.remove(proc,false);
unlock();
@ -86,16 +91,19 @@ void RTPGroup::part(RTPProcessor* proc)
RTPProcessor::RTPProcessor(RTPGroup* grp)
: m_group(0)
{
XDebug(DebugAll,"RTPProcessor::RTPProcessor(%p) [%p]",grp,this);
group(grp);
}
RTPProcessor::~RTPProcessor()
{
XDebug(DebugAll,"RTPProcessor::~RTPProcessor() [%p]",this);
group(0);
}
void RTPProcessor::group(RTPGroup* newgrp)
{
XDebug(DebugAll,"RTPProcessor::group(%p) old=%p [%p]",newgrp,m_group,this);
if (newgrp == m_group)
return;
if (m_group)
@ -189,12 +197,39 @@ void RTPTransport::setMonitor(RTPProcessor* monitor)
bool RTPTransport::localAddr(SocketAddr& addr)
{
// check if sockets are already created and bound
if (m_rtpSock.valid())
return false;
int p = addr.port();
// make sure we don't have a port or it's an even one
if ((p & 1) == 0) {
m_localAddr = addr;
return true;
if ((p & 1))
return false;
if (m_rtpSock.create(addr.family(),SOCK_DGRAM) && m_rtpSock.bind(addr)) {
if (!p) {
m_rtpSock.getSockName(addr);
p = addr.port();
if (p & 1) {
// allocated odd port - have to swap sockets
m_rtcpSock.attach(m_rtpSock.detach());
addr.port(p-1);
if (m_rtpSock.create(addr.family(),SOCK_DGRAM) && m_rtpSock.bind(addr)) {
m_localAddr = addr;
return true;
}
m_rtpSock.terminate();
m_rtcpSock.terminate();
return false;
}
}
addr.port(p+1);
if (m_rtcpSock.create(addr.family(),SOCK_DGRAM) && m_rtcpSock.bind(addr)) {
addr.port(p);
m_localAddr = addr;
return true;
}
}
m_rtpSock.terminate();
m_rtcpSock.terminate();
return false;
}

View File

@ -531,6 +531,25 @@ public:
*/
virtual RTPReceiver* createReceiver();
/**
* Create a new RTP transport for this session.
* Override this method to create objects derived from RTPTransport.
* @return Pointer to the new transport or NULL on failure
*/
virtual RTPTransport* createTransport();
/**
* Initialize the RTP session, attach a transport if there is none
* @return True if initialized, false on some failure
*/
bool initTransport();
/**
* Initialize the RTP session, attach a group if none is present
* @return True if initialized, false on some failure
*/
bool initGroup();
/**
* Send one RTP payload packet
* @param marker Set to true if the marker bit must be set

View File

@ -66,14 +66,14 @@ public:
bool startRTP(const char* raddr, unsigned int rport, int payload, const char* format);
bool sendDTMF(char dtmf);
void gotDTMF(char tone);
inline CallEndpoint* conn() const
{ return m_conn; }
inline const String& id() const
{ return m_id; }
inline RTPSession* rtp() const
{ return m_rtp; }
inline RTPSession::Direction dir() const
{ return m_dir; }
inline CallEndpoint* conn() const
{ return m_conn; }
inline const String& id() const
{ return m_id; }
inline unsigned int bufSize() const
{ return m_bufsize; }
inline unsigned int port() const
@ -82,7 +82,7 @@ public:
{ if (master) m_master = master; }
static YRTPWrapper* find(const CallEndpoint* conn, RTPSession::Direction direction = RTPSession::SendRecv);
static YRTPWrapper* find(const String& id);
static String guessLocal(const char* remoteip);
static void guessLocal(const char* remoteip, String& localip);
private:
RTPSession* m_rtp;
RTPSession::Direction m_dir;
@ -95,17 +95,14 @@ private:
unsigned int m_port;
};
class YRTPAudioSource : public ThreadedSource
class YRTPAudioSource : public DataSource
{
friend class YRTPWrapper;
public:
YRTPAudioSource(YRTPWrapper* wrap);
~YRTPAudioSource();
virtual void run(void);
private:
YRTPWrapper* m_wrap;
gchar* m_buffer;
DataBlock m_data;
};
class YRTPAudioConsumer : public DataConsumer
@ -157,19 +154,6 @@ static YRTPPlugin splugin;
static ObjList s_calls;
static Mutex s_mutex;
static void tel_event_cb(RtpSession* session,gint type,gpointer user_data)
{
DDebug(DebugAll,"tel_event_cb(%p,%d,%p)",session,type,user_data);
static const char dtmf[] = "0123456789*#ABCDF";
if (user_data && (type >= 0) && (type <= 16))
static_cast<YRTPWrapper*>(user_data)->gotDTMF(dtmf[type]);
}
static void tel_packet_cb(RtpSession* session,mblk_t* blk,gpointer user_data)
{
DDebug(DebugAll,"tel_packet_cb(%p,%p,%p)",session,blk,user_data);
}
YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, RTPSession::Direction direction)
: m_rtp(0), m_dir(direction), m_conn(conn),
m_source(0), m_consumer(0), m_bufsize(0), m_port(0)
@ -237,6 +221,7 @@ void YRTPWrapper::setupRTP(const char* localip)
{
Debug(DebugAll,"YRTPWrapper::setupRTP(\"%s\") [%p]",localip,this);
m_rtp = new RTPSession;
m_rtp->initTransport();
int minport = s_cfg.getIntValue("rtp","minport",16384);
int maxport = s_cfg.getIntValue("rtp","maxport",32768);
int attempt = 10;
@ -249,9 +234,15 @@ void YRTPWrapper::setupRTP(const char* localip)
maxport++;
attempt = 1;
}
SocketAddr addr(AF_INET);
if (!addr.host(localip)) {
Debug(DebugWarn,"YRTPWrapper [%p] could not parse address '%s'",this,localip);
return;
}
for (; attempt; attempt--) {
int lport = (minport + (::random() % (maxport - minport))) & 0xfffe;
if (::rtp_session_set_local_addr(m_rtp, (gchar *)localip, lport) == 0) {
addr.port(lport);
if (m_rtp->localAddr(addr)) {
m_port = lport;
Debug(DebugAll,"YRTPWrapper [%p] RTP %p bound to %s:%u",this,m_rtp,localip,m_port);
return;
@ -294,6 +285,12 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, c
Debug(DebugAll,"RTP format '%s' payload %d",format,payload);
SocketAddr addr(AF_INET);
if (!(addr.host(raddr) && addr.port(rport) && m_rtp->remoteAddr(addr))) {
Debug(DebugWarn,"RTP failed to set remote address %s:%d [%p]",raddr,rport,this);
return false;
}
#if 0
::rtp_session_set_scheduling_mode(m_rtp, s_cfg.getBoolValue("rtp","scheduled",true)); /* yes */
if (::rtp_session_set_remote_addr(m_rtp, (gchar *)raddr, rport)) {
Debug(DebugWarn,"RTP failed to set remote address %s:%d [%p]",raddr,rport,this);
@ -306,6 +303,7 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, c
::rtp_session_signal_connect(m_rtp,"telephone-event",(RtpCallback)tel_event_cb,this);
::rtp_session_signal_connect(m_rtp,"telephone-event_packet",(RtpCallback)tel_packet_cb,this);
::rtp_session_set_jitter_compensation(m_rtp, s_cfg.getIntValue("rtp","jitter",50));
#endif
// Change format of source and/or consumer,
// reinstall them to rebuild codec chains
if (m_source) {
@ -318,7 +316,9 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, c
m_conn->setSource(m_source);
m_source->deref();
}
m_source->start("RTP Source");
#if 0
m_source->start("YRTP Source");
#endif
}
if (m_consumer) {
if (m_conn) {
@ -331,6 +331,12 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, c
m_consumer->deref();
}
}
if (!(m_rtp->initGroup() && m_rtp->direction(m_dir)))
return false;
if (m_rtp->receiver())
m_rtp->receiver()->dataPayload(payload);
if (m_rtp->sender())
m_rtp->sender()->dataPayload(payload);
m_bufsize = s_cfg.getIntValue("rtp","buffer",160);
return true;
}
@ -338,7 +344,7 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, int payload, c
bool YRTPWrapper::sendDTMF(char dtmf)
{
if (m_rtp && m_consumer) {
::rtp_session_send_dtmf(m_rtp, dtmf, m_consumer->timestamp());
m_rtp->rtpSendKey(dtmf,0);
return true;
}
return false;
@ -359,29 +365,25 @@ void YRTPWrapper::gotDTMF(char tone)
Engine::enqueue(m);
}
String YRTPWrapper::guessLocal(const char* remoteip)
void YRTPWrapper::guessLocal(const char* remoteip, String& localip)
{
String ret;
struct sockaddr_in addr;
if (remoteip && *remoteip && ::inet_aton(remoteip, &addr.sin_addr)) {
addr.sin_family = AF_INET;
addr.sin_port = htons(6666); // whatever
int s = ::socket(PF_INET,SOCK_DGRAM,IPPROTO_UDP);
if (s != -1) {
if (0 == ::connect(s, (const sockaddr *)&addr, sizeof(addr))) {
socklen_t len = sizeof(addr);
if (0 == ::getsockname(s, (sockaddr *)&addr, &len))
ret = ::inet_ntoa(addr.sin_addr);
}
::close(s);
}
localip.clear();
SocketAddr r(AF_INET);
if (!r.host(remoteip)) {
Debug(DebugInfo,"Guess - Could not parse remote '%s'",remoteip);
return;
}
Debug(DebugInfo,"Guessed local IP '%s' for remote '%s'",ret.c_str(),remoteip);
return ret;
SocketAddr l;
if (!l.local(r)) {
Debug(DebugInfo,"Guess - Could not guess local for remote '%s'",remoteip);
return;
}
localip = l.host();
Debug(DebugInfo,"Guessed local IP '%s' for remote '%s'",localip.c_str(),remoteip);
}
YRTPAudioSource::YRTPAudioSource(YRTPWrapper* wrap)
: m_wrap(wrap), m_buffer(0)
: m_wrap(wrap)
{
Debug(DebugAll,"YRTPAudioSource::YRTPAudioSource(%p) [%p]",wrap,this);
m_format.clear();
@ -395,7 +397,6 @@ YRTPAudioSource::~YRTPAudioSource()
{
Debug(DebugAll,"YRTPAudioSource::~YRTPAudioSource() [%p] wrapper=%p",this,m_wrap);
m_mutex.lock();
stop();
if (m_wrap) {
YRTPWrapper* tmp = m_wrap;
m_wrap = 0;
@ -403,14 +404,10 @@ YRTPAudioSource::~YRTPAudioSource()
tmp->deref();
Thread::yield();
}
m_data.clear(false);
if (m_buffer) {
::free(m_buffer);
m_buffer = 0;
}
m_mutex.unlock();
}
#if 0
void YRTPAudioSource::run(void)
{
// wait until the rtp is fully initialized
@ -419,7 +416,7 @@ void YRTPAudioSource::run(void)
int bufsize = m_wrap->bufSize();
int timestamp = 0;
int have_more;
m_buffer = (gchar*)::malloc(bufsize);
m_buffer = (char*)::malloc(bufsize);
::memset(m_buffer, 0, bufsize);
Debug(DebugAll,"YRTPAudioSource::run() entering loop [%p]",this);
for (;;) {
@ -445,6 +442,7 @@ void YRTPAudioSource::run(void)
}
Debug(DebugAll,"YRTPAudioSource::run() exiting loop [%p]",this);
}
#endif
YRTPAudioConsumer::YRTPAudioConsumer(YRTPWrapper *wrap)
: m_wrap(wrap), m_timestamp(0)
@ -474,7 +472,7 @@ void YRTPAudioConsumer::Consume(const DataBlock &data, unsigned long timeDelta)
return;
XDebug(DebugAll,"YRTPAudioConsumer writing %d bytes, ts=%d [%p]",
data.length(),m_timestamp,this);
::rtp_session_send_with_ts(m_wrap->rtp(),(gchar *)data.data(),data.length(),m_timestamp);
m_wrap->rtp()->rtpSendData(false,m_timestamp,data.data(),data.length());
// if timestamp increment is not provided we have to guess...
m_timestamp += timeDelta ? timeDelta : data.length();
}
@ -514,7 +512,7 @@ bool AttachHandler::received(Message &msg)
String rip(msg.getValue("remoteip"));
String rport(msg.getValue("remoteport"));
if (lip.null())
lip = YRTPWrapper::guessLocal(rip);
YRTPWrapper::guessLocal(rip,lip);
CallEndpoint *ch = static_cast<CallEndpoint*>(msg.userData());
if (!ch) {
if (!src.null())
@ -604,7 +602,7 @@ bool RtpHandler::received(Message &msg)
if (!w) {
String lip(msg.getValue("localip"));
if (lip.null())
lip = YRTPWrapper::guessLocal(rip);
YRTPWrapper::guessLocal(rip,lip);
if (lip.null()) {
Debug(DebugWarn,"RTP request with no local address!");
return false;
@ -651,7 +649,7 @@ bool DTMFHandler::received(Message &msg)
if (text.null())
return false;
YRTPWrapper* wrap = YRTPWrapper::find(targetid);
if (wrap && wrap->rtp() && (::rtp_session_telephone_events_supported(wrap->rtp()) > 0)) {
if (wrap && wrap->rtp()) {
Debug(DebugInfo,"RTP DTMF '%s' targetid '%s'",text.c_str(),targetid.c_str());
for (unsigned int i=0;i<text.length();i++)
wrap->sendDTMF(text[i]);
@ -679,8 +677,6 @@ void YRTPPlugin::initialize()
s_cfg.load();
if (m_first) {
m_first = false;
// rtp_profile_set_payload(&av_profile,96,&telephone_event);
rtp_profile_set_payload(&av_profile,101,&telephone_event);
Engine::install(new AttachHandler);
Engine::install(new RtpHandler);
Engine::install(new DTMFHandler);