Asynchronously connect a SOCKS client if possible.

git-svn-id: http://yate.null.ro/svn/yate/trunk@5490 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2013-04-25 14:39:43 +00:00
parent 7e2d02111f
commit e86838284e
2 changed files with 385 additions and 116 deletions

View File

@ -16,6 +16,13 @@
; Defaults to 30000
;reply-timeout=30000
; connect_timeout: integer: The time (in milliseconds) to wait to connect
; Set it to 0 to wait for system default, minimum allowed value is 1000,
; maximum allowed value is 120000
; This parameter can be overridden in chan.socks messages
; Defaults to 10000
;connect_timeout=10000
; buflen: integer: The number of bytes in receive buffer
; Can't be less then 1024
; Defaults to 4096 (4KB)

View File

@ -34,7 +34,6 @@ class SOCKSConn; // SOCKS TCP connection
class SOCKSListener; // A socket listener
class SOCKSEngine; // The SOCKS engine
class YSocksEngine;
class YSocksWrapper; // A link between a data source and/or
// consumer and a SOCKS connection
@ -43,6 +42,8 @@ class YSocksSource; // A data source
class YSocksConsumer; // A data consumer
class YSocksListenerThread; // A socket listener thread
class YSocksProcessThread; // A connection processor thread
class YSocksConnectThread; // A connect thread
class YSocksPlugin;
/*
@ -84,7 +85,7 @@ public:
* @param proxy True if this is a proxy, false if it is a server
* @param address The address used by the endpoint
* @param port The port used by the endpoint
* @param external Exetrnal (public) address of the endpoint
* @param external External (public) address of the endpoint
* @param uname Username used to authenticate
* @param pwd Username used to authenticate
*/
@ -590,11 +591,17 @@ public:
}
/**
* Build socket and connect it (outgoing only)
* Set connecting state (outgoing only)
*/
void setConnecting();
/**
* Set socket (outgoing only)
* @param sock The socket. It will be consumed
* @param sendAuthMeth True to send auth methods after succesfully connected
* @return True on success
*/
bool connect(bool sendAuthMeth = true);
bool setSocket(Socket* sock, bool sendAuthMeth = true);
/**
* Terminate and delete the socket
@ -628,6 +635,19 @@ public:
static inline const char* statusName(int stat, const char* def = "Unknown")
{ return TelEngine::lookup(stat,s_statusName,def); }
/**
* Connect a socket
* @param engine The engine owning the object requesting connect (used for debug)
* @param address Address to connect to
* @param port Port to connect to
* @param connToutMs Connect timeout in milliseconds
* @param error Error code on failure
* @param timeout Connection timeout flag
* @return Connected Socket pointer, 0 on failure
*/
static Socket* connect(SOCKSEngine* engine, const String& address, int port,
unsigned int connToutMs, int& error, bool& timeout);
/**
* Status names
*/
@ -904,6 +924,13 @@ public:
inline u_int64_t waitMsgReplyInterval() const
{ return m_waitMsgReplyInterval; }
/**
* Retrieve the connect timeout interval
* @return Connect timeout interval in milliseconds
*/
inline unsigned int connectTimeout() const
{ return m_connectToutMs; }
/**
* Initialize engine's parameters
* @param params The engine's parameter list
@ -917,9 +944,7 @@ public:
/**
* Connect a connection, increase its reference counter, add it to the
* list and start negotisting SOCKS
* Note that this method should be called from its own thread, since
* connect() might take some time
* list and start negotisting SOCKS when connected
* @param conn The connection
* @return True on success
*/
@ -1025,6 +1050,20 @@ public:
*/
static void destroySocket(Socket*& sock);
/**
* Retrieve connect timeout from parameters
* @param params Parameter list
* @param defVal Default value to return if missing/invalid
* @return Connect timeout value
*/
static inline unsigned int getConnectTimeout(const NamedList& params,
unsigned int defVal) {
unsigned int val = params.getIntValue(YSTRING("connect_timeout"),defVal,0,120000);
if (!val || val >= 1000)
return val;
return 1000;
}
protected:
/**
* Process a SOCKS request
@ -1063,6 +1102,7 @@ protected:
bool m_exiting;
u_int64_t m_waitMsgAuthInterval;
u_int64_t m_waitMsgReplyInterval;
unsigned int m_connectToutMs; // Connect timeout in milliseconds
bool m_showMsg; // Print message on output
bool m_dumpExtended; // Dump names and binary msg if printed
ObjList m_epDef; // The endpoint definition list
@ -1081,7 +1121,10 @@ public:
// Find a wrapper with a given DST ADDR/PORT
// Return a referenced object if found
YSocksWrapper* findWrapper(bool client, const String& dstAddr, int dstPort);
// Find a wrapper. Return a referenced object if found
YSocksWrapper* findWrapper(const String& wID);
// Find a wrapper with a given connection
// Return a referenced object if found
YSocksWrapper* findWrapper(SOCKSConn* conn);
// Remove a wrapper from list
void removeWrapper(YSocksWrapper* w, bool delObj);
@ -1106,7 +1149,9 @@ class YSocksWrapper : public RefObject, public Mutex, public DebugEnabler
public:
enum State {
Pending,
Connecting,
WaitStart,
Established,
Running,
Terminated
};
@ -1137,8 +1182,14 @@ public:
{ return m_srvAddr; }
inline int srvPort() const
{ return m_srvPort; }
inline unsigned int connectTimeoutInterval() const
{ return m_connectToutMs; }
inline YSocksEngine* engine() const
{ return m_engine; }
// Connect socket if client
bool connect();
void connectTerminated(YSocksConnectThread* th, Socket* sock, int error,
bool timeout);
// Client connection got reply
void connRecvReply();
// Connection error while negotiating the protocol
@ -1150,7 +1201,7 @@ public:
YSocksConsumer* getConsumer();
// Build and start or stop worker thread
bool startWorker();
void stopWorker();
void stopWorker(bool wait);
// Read data from conn and forward it
bool recvData();
// Enable data transfer. Change state, set source/consumer format
@ -1158,7 +1209,7 @@ public:
// Get the wrapper id
virtual const String& toString() const;
// Notify status in chan.notify
void notify(const char* status) const;
void notify(int stat);
protected:
// Release memory
virtual void destroyed();
@ -1182,6 +1233,9 @@ private:
YSocksConsumer* m_consumer;
SOCKSConn* m_conn;
YSocksWrapperWorker* m_thread;
// Client connect
unsigned int m_connectToutMs; // Connect timeout interval
YSocksConnectThread* m_connect; // Connect thread
};
// Worker thread for a wrapper
@ -1244,7 +1298,7 @@ public:
inline YSocksListenerThread(SOCKSEngine* engine, SOCKSEndpointDef* proxy,
unsigned int backlog, Thread::Priority prio = Thread::Normal)
: SOCKSListener(engine,proxy,backlog),
Thread("SOCKS Listener",prio)
Thread("SOCKSListen",prio)
{}
// Add the listener to engine and start it
@ -1267,11 +1321,29 @@ class YSocksProcessThread : public Thread
{
public:
inline YSocksProcessThread(Thread::Priority prio = Thread::Normal)
: Thread("SOCKS Processor",prio)
: Thread("SOCKSProcess",prio)
{}
void run();
};
// A connect thread
class YSocksConnectThread : public Thread
{
public:
YSocksConnectThread(YSocksWrapper* w, Thread::Priority prio = Thread::Normal);
virtual void cleanup()
{ notify(); }
virtual void run();
protected:
void notify(Socket* sock = 0, int error = 0, bool timeout = false);
YSocksEngine* m_engine;
String m_wrapperId;
String m_address;
int m_port;
unsigned int m_toutIntervalMs;
};
// The plugin
class YSocksPlugin : public Module
{
@ -1933,63 +2005,38 @@ bool SOCKSConn::enableDataTransfer()
return true;
}
// Build socket and connect it (outgoing only)
bool SOCKSConn::connect(bool sendAuthMeth)
// Set connecting state (outgoing only)
void SOCKSConn::setConnecting()
{
Lock lock(this);
if (!outgoing() || m_status == Connecting || m_status == Terminated)
return false;
if (m_socket && m_socket->valid())
return true;
if (!outgoing())
return;
if (m_socket)
terminate();
if (!m_epDef)
return false;
changeStatus(Connecting);
String address = m_epDef->address();
int port = m_epDef->port();
m_socket = new Socket;
bool ok = m_socket->create(PF_INET,SOCK_STREAM);
const char* result = "connect";
// Avoid resolving address and connect with mutex locked
lock.drop();
if (ok) {
SocketAddr addr(PF_INET);
addr.host(address);
addr.port(port);
Debug(m_engine,DebugAll,"SOCKSConn(%s) connecting to '%s:%d' [%p]",
m_id.c_str(),addr.host().c_str(),addr.port(),this);
ok = m_socket->connect(addr);
}
// Set socket (outgoing only)
bool SOCKSConn::setSocket(Socket* sock, bool sendAuthMeth)
{
Lock lck(this);
if (!outgoing() || m_status != Connecting) {
SOCKSEngine::destroySocket(sock);
return false;
}
else
result = "create";
Lock lock2(this);
if (m_socket)
terminate();
changeStatus(Idle);
m_socket = sock;
buildId();
if (ok) {
if (!valid())
return false;
Debug(m_engine,DebugAll,"SOCKSConn(%s) connected to '%s:%d' [%p]",
m_id.c_str(),address.c_str(),port,this);
Debug(m_engine,DebugAll,"SOCKSConn(%s)::setSocket(%p) [%p]",m_id.c_str(),m_socket,this);
if (m_socket) {
m_socket->setBlocking(false);
changeStatus(Idle);
if (sendAuthMeth)
sendAuthMethods();
return true;
}
else {
if (valid()) {
String s;
Thread::errorString(s,m_socket->error());
Debug(m_engine,DebugWarn,
"SOCKSConn(%s) failed to %s socket '%s:%d'. %d: '%s' [%p]",
m_id.c_str(),result,address.c_str(),port,
m_socket->error(),s.c_str(),this);
}
terminate();
}
return ok;
terminate();
return false;
}
// Terminate and delete the socket
@ -2133,7 +2180,7 @@ bool SOCKSConn::recv(void* buf, unsigned int& len)
int read = m_socket->readData(buf,len);
if (read != Socket::socketError()) {
#ifdef XDEBUG
if (len) {
if (read) {
String s;
s.hexify(buf,read,' ');
Debug(m_engine,DebugAll,"SOCKSConn(%s) recv %d bytes '%s' [%p]",
@ -2154,6 +2201,61 @@ bool SOCKSConn::recv(void* buf, unsigned int& len)
return false;
}
// Connect a socket
Socket* SOCKSConn::connect(SOCKSEngine* engine, const String& address, int port,
unsigned int connToutMs, int& error, bool& timeout)
{
SocketAddr addr(PF_INET);
addr.host(address);
if (!addr.host()) {
Debug(engine,DebugNote,"Failed to resolve '%s'",address.c_str());
error = Thread::lastError();
return 0;
}
addr.port(port);
String sa;
if (!engine || engine->debugAt(DebugNote)) {
sa << addr.host().c_str() << ":" << addr.port();
if (addr.host() != address)
sa << " (" << address << ")";
}
Debug(engine,DebugAll,"Connecting to '%s'",sa.safe());
Socket* sock = new Socket;
bool ok = false;
error = 0;
timeout = false;
if (sock->create(PF_INET,SOCK_STREAM)) {
if (connToutMs && sock->canSelect() && sock->setBlocking(false))
ok = sock->connectAsync(addr,connToutMs * 1000,&timeout);
else
ok = sock->connect(addr);
if (Thread::check(false)) {
SOCKSEngine::destroySocket(sock);
XDebug(engine,DebugAll,"Connect to %s cancelled",sa.c_str());
return 0;
}
}
if (ok) {
Debug(engine,DebugAll,"Connected to '%s'",sa.safe());
return sock;
}
if (!timeout)
error = sock->error();
SOCKSEngine::destroySocket(sock);
if (!engine || engine->debugAt(DebugNote)) {
String s;
if (timeout)
s = "Timeout";
else {
String tmp;
Thread::errorString(tmp,error);
s << error << " " << tmp;
}
Debug(engine,DebugNote,"Failed to connect to %s: %s",sa.c_str(),s.c_str());
}
return 0;
}
// Changed connection status
bool SOCKSConn::changeStatus(Status stat)
{
@ -2509,6 +2611,7 @@ SOCKSEngine::SOCKSEngine(NamedList& params)
: Mutex(true,"SOCKSEngine"),
m_exiting(false),
m_waitMsgAuthInterval(10000), m_waitMsgReplyInterval(15000),
m_connectToutMs(0),
m_showMsg(false), m_dumpExtended(false)
{
debugName(params.getValue("debugname","socks"));
@ -2518,20 +2621,11 @@ SOCKSEngine::SOCKSEngine(NamedList& params)
// Initialize engine's parameters
void SOCKSEngine::initialize(NamedList& params)
{
m_showMsg = params.getBoolValue("print-msg",false);
m_dumpExtended = params.getBoolValue("print-extended",false);
int tmp = params.getIntValue("auth-timeout",10000);
if (tmp < 3000)
tmp = 3000;
else if (tmp > 30000)
tmp = 30000;
m_waitMsgAuthInterval = tmp;
tmp = params.getIntValue("reply-timeout",30000);
if (tmp < 5000)
tmp = 5000;
else if (tmp > 120000)
tmp = 120000;
m_waitMsgReplyInterval = tmp;
m_showMsg = params.getBoolValue(YSTRING("print-msg"),false);
m_dumpExtended = params.getBoolValue(YSTRING("print-extended"),false);
m_waitMsgAuthInterval = params.getIntValue(YSTRING("auth-timeout"),10000,3000,30000);
m_waitMsgReplyInterval = params.getIntValue(YSTRING("reply-timeout"),30000,5000,120000);
m_connectToutMs = getConnectTimeout(params,10000);
}
// Cleanup the engine. Stop listeners
@ -2546,13 +2640,13 @@ void SOCKSEngine::cleanup()
// list and start negotisting SOCKS
bool SOCKSEngine::addConnection(SOCKSConn* conn)
{
if (!(conn && conn->connect()))
if (!conn)
return false;
if (!conn->ref()) {
conn->terminate();
return false;
}
Lock lock(this);
Lock lck(this);
m_socksConn.append(conn);
Debug(this,DebugAll,"Added outgoing connection (%p,'%s')",
conn,conn->toString().c_str());
@ -2631,7 +2725,6 @@ bool SOCKSEngine::processSocksConnection(SOCKSConn* conn, const Time& now)
bool error = false;
bool timeout = false;
SOCKSPacket* packet = conn->processSocks(now,error,timeout);
const char* reason = 0;
if (packet) {
if (packet->type() == SOCKSPacket::Request) {
SOCKSPacket::Error err = processSOCKSRequest(*packet,conn);
@ -2646,12 +2739,10 @@ bool SOCKSEngine::processSocksConnection(SOCKSConn* conn, const Time& now)
}
else if (!error)
return false;
else
reason = timeout ? "timeout" : "received invalid packet";
if (error) {
lock.drop();
socksConnError(conn,timeout);
removeSocksConn(conn,reason);
removeSocksConn(conn,timeout ? "timeout" : "received invalid packet");
}
return true;
}
@ -2904,6 +2995,19 @@ YSocksWrapper* YSocksEngine::findWrapper(bool client, const String& dstAddr, int
return 0;
}
// Find a wrapper
YSocksWrapper* YSocksEngine::findWrapper(const String& wID)
{
if (!wID)
return false;
Lock lock(this);
ObjList* o = m_wrappers.find(wID);
if (!o)
return 0;
YSocksWrapper* w = static_cast<YSocksWrapper*>(o->get());
return w->ref() ? w : 0;
}
// Find a wrapper with a given connection
YSocksWrapper* YSocksEngine::findWrapper(SOCKSConn* conn)
{
@ -2922,11 +3026,13 @@ void YSocksEngine::removeWrapper(YSocksWrapper* w, bool delObj)
if (!w)
return;
Lock lock(this);
ObjList* o = m_wrappers.find(w);
if (!o)
GenObject* gen = m_wrappers.remove(w,false);
if (!(gen && gen->alive()))
return;
Debug(this,DebugAll,"Removing wrapper (%p,'%s')",w,w->toString().c_str());
o->remove(delObj);
Debug(this,DebugAll,"Removed wrapper (%p,'%s') delObj=%s",
w,w->toString().c_str(),String::boolText(delObj));
if (delObj)
TelEngine::destruct(gen);
}
// Add a wrapper
@ -3017,7 +3123,8 @@ YSocksWrapper::YSocksWrapper(const char* id, YSocksEngine* engine, CallEndpoint*
m_state(Pending), m_client(epDef != 0), m_dir(0), m_autoStart(true),
m_id(id), m_notify(notify), m_callEp(cp), m_dstPort(0), m_srvPort(-1),
m_engine(engine), m_source(0), m_consumer(0), m_conn(0),
m_thread(0)
m_thread(0),
m_connectToutMs(0), m_connect(0)
{
debugName(m_id);
debugChain(&__plugin);
@ -3026,9 +3133,11 @@ YSocksWrapper::YSocksWrapper(const char* id, YSocksEngine* engine, CallEndpoint*
m_dstPort = params.getIntValue("dst_port",0);
m_dir = lookup(params.getValue("direction"),dict_conn_dir,SOCKSConn::Both);
m_autoStart = params.getBoolValue("autostart",false);
if (m_client)
if (m_client) {
m_connectToutMs = SOCKSEngine::getConnectTimeout(params,engine->connectTimeout());
m_conn = new SOCKSConn(engine,epDef,SOCKSPacket::Connect,
SOCKSPacket::Domain,m_dstAddr,m_dstPort);
}
else if (m_engine) {
SOCKSEndpointDef* srv = m_engine->findEpDef("server");
if (!srv) {
@ -3076,12 +3185,87 @@ YSocksWrapper::YSocksWrapper(const char* id, YSocksEngine* engine, CallEndpoint*
// Connect socket if client
bool YSocksWrapper::connect()
{
Lock lck(this);
if (!(m_engine && m_client && m_conn))
return false;
bool ok = m_engine->addConnection(m_conn);
if (!ok)
Debug(this,DebugMild,"Failed to connect [%p]",this);
return ok;
if (m_connect)
m_connect->cancel();
m_connect = new YSocksConnectThread(this);
if (!m_connect->startup()) {
Debug(this,DebugWarn,"Failed to start connect thread [%p]",this);
return false;
}
XDebug(this,DebugAll,"Started connect thread (%p) [%p]",m_connect,this);
m_conn->setConnecting();
u_int64_t tout = 0;
if (m_connectToutMs)
tout = Time::now() + m_connectToutMs * 1000 + 500000;
lck.drop();
// Wait for connect to complete
bool timeout = false;
while (m_connect && !timeout) {
Thread::idle();
if (Thread::check(false))
break;
if (tout)
timeout = tout < Time::now();
}
lck.acquire(this);
if (m_connect) {
m_connect->cancel();
m_connect = 0;
if (!m_conn)
return false;
m_conn->setSocket(0);
if (timeout)
Debug(this,DebugNote,"Connect timed out [%p]",this);
else
XDebug(this,DebugAll,"Worker cancelled while connecting [%p]",this);
return false;
}
if (m_conn && m_conn->valid() && !Thread::check(false))
return m_engine->addConnection(m_conn);
return false;
}
void YSocksWrapper::connectTerminated(YSocksConnectThread* th, Socket* sock, int error,
bool timeout)
{
XDebug(this,DebugAll,"connectTerminated(%p,%p,%d,%d) [%p]",th,sock,error,timeout,this);
if (!(th && m_connect)) {
if (sock)
SOCKSEngine::destroySocket(sock);
return;
}
Lock lck(this);
if (m_connect != th || !m_conn) {
if (sock)
SOCKSEngine::destroySocket(sock);
return;
}
m_connect = 0;
m_conn->setSocket(sock);
if (sock)
return;
if (!debugAt(DebugMild))
return;
String err;
if (timeout)
Debug(this,DebugMild,"Connect to '%s:%d' timeout [%p]",
m_conn->epDef()->address().c_str(),m_conn->epDef()->port(),this);
else {
String s;
if (error) {
String tmp;
Thread::errorString(tmp,error);
s << ": " << error << " " << tmp;
}
String addr;
if (m_conn->epDef())
addr << m_conn->epDef()->address() << ":" << m_conn->epDef()->port();
Debug(this,DebugMild,"Failed to connect to '%s'%s [%p]",
addr.c_str(),s.safe(),this);
}
}
// Client connection got reply
@ -3114,11 +3298,11 @@ void YSocksWrapper::connError(bool timeout)
{
Debug(this,DebugNote,"Connection got error while negotiating timeout=%s [%p]",
String::boolText(timeout),this);
notify(Terminated);
stopWorker(false);
Lock lock(this);
m_state = Terminated;
m_conn->terminate();
// TODO:
Debug(this,DebugStub,"Possible incomplete YSocksWrapper::connError() [%p]",this);
}
// Set connection with valid request for server wrapper
@ -3246,25 +3430,42 @@ bool YSocksWrapper::startWorker()
return true;
lock.drop();
m_thread = new YSocksWrapperWorker(this);
bool ok = m_thread->startup();
if (!ok)
Debug(this,DebugGoOn,"Failed to start worker thread [%p]",this);
return ok;
if (m_thread->startup())
return true;
m_thread = 0;
Debug(this,DebugGoOn,"Failed to start worker thread [%p]",this);
return false;
}
// Build and start worker thread
void YSocksWrapper::stopWorker()
void YSocksWrapper::stopWorker(bool wait)
{
Lock lock(this);
if (!m_thread)
return;
if (m_connect) {
m_connect->cancel();
m_connect = 0;
if (m_conn)
m_conn->setSocket(0);
}
bool hard = (m_conn && m_conn->status() == SOCKSConn::Connecting);
DDebug(this,DebugAll,"Stopping worker thread hard=%s [%p]",
String::boolText(hard),this);
DDebug(this,DebugAll,"Stopping worker thread hard=%s wait=%s [%p]",
String::boolText(hard),String::boolText(wait),this);
m_thread->cancel(hard);
if (hard) {
m_thread = 0;
return;
}
if (!wait)
return;
lock.drop();
while (!hard && m_thread)
Thread::yield(true);
#ifdef XDEBUG
Debugger debug("YSocksWrapper::stopWorker"," %p crt=%p,'%s' [%p]",
m_thread,Thread::current(),Thread::currentName(),this);
#endif
while (m_thread)
Thread::idle(true);
}
// Get the wrapper id
@ -3274,21 +3475,39 @@ const String& YSocksWrapper::toString() const
}
// Notify status in chan.notify
void YSocksWrapper::notify(const char* status) const
void YSocksWrapper::notify(int stat)
{
Lock lck(this);
if (m_state == Terminated)
return;
if (m_notify.null())
return;
XDebug(this,DebugAll,"Notifying %s notifier=%s [%p]",status,m_notify.c_str(),this);
const char* what = 0;
switch (stat) {
case Established:
what = "established";
break;
case Running:
what = "running";
break;
case Terminated:
what = "terminated";
break;
default:
return;
}
XDebug(this,DebugAll,"Notifying %s notifier=%s [%p]",what,m_notify.c_str(),this);
Message* m = new Message("chan.notify");
m->addParam("module",__plugin.name());
m->addParam("id",m_id);
m->addParam("notify",m_notify);
m->addParam("status",status);
m->addParam("status",what);
SocketAddr remote;
if (!client() && m_conn && m_conn->getAddr(false,remote)) {
m->addParam("remoteip",remote.host());
m->addParam("remoteport",String(remote.port()));
}
lck.drop();
Engine::enqueue(m);
}
@ -3301,16 +3520,21 @@ void YSocksWrapper::destroyed()
if (!m_client)
m_engine->removeEpDef(m_id);
}
stopWorker();
stopWorker(true);
lock();
if (m_source && m_source->alive())
TelEngine::destruct(m_source);
if (m_consumer && m_consumer->alive())
TelEngine::destruct(m_consumer);
if (m_engine && m_conn)
m_engine->removeSocksConn(m_conn,"terminated");
SOCKSConn* tmp = m_conn;
TelEngine::destruct(m_conn);
if (m_connect) {
m_connect->cancel();
m_connect = 0;
}
unlock();
if (m_engine && tmp)
m_engine->removeSocksConn(tmp,"terminated");
Debug(this,DebugAll,"Destroyed [%p]",this);
RefObject::destroyed();
}
@ -3336,29 +3560,27 @@ void YSocksWrapperWorker::run()
// NOTE: The SOCKS protocol is negotiated by the engine
bool waitStart = !m_wrapper->autoStart();
while (!invalid() && m_wrapper->state() != YSocksWrapper::Running) {
Thread::msleep(20);
Thread::idle();
if (waitStart && m_wrapper->state() == YSocksWrapper::WaitStart) {
waitStart = false;
m_wrapper->notify("established");
m_wrapper->notify(YSocksWrapper::Established);
}
}
if (invalid())
break;
m_wrapper->notify("running");
m_wrapper->notify(YSocksWrapper::Running);
// Read data
while (!invalid()) {
if (!m_wrapper->canRecv()) {
Thread::idle();
continue;
}
if (m_wrapper->recvData())
Thread::yield();
else
Thread::idle();
m_wrapper->recvData();
Thread::idle();
}
break;
}
m_wrapper->notify("terminated");
m_wrapper->notify(YSocksWrapper::Terminated);
Debug(&__plugin,DebugAll,"Worker terminated for (%p) '%s' [%p]",
m_wrapper,m_wrapper->toString().c_str(),this);
m_wrapper->m_thread = 0;
@ -3386,8 +3608,6 @@ void YSocksSource::destroyed()
{
Debug(m_wrapper,DebugAll,"YSocksSource(%s) destroyed [%p]",
m_wrapper ? m_wrapper->toString().c_str() : "",this);
TelEngine::destruct(m_wrapper);
if (m_wrapper) {
s_srcMutex.lock();
YSocksWrapper* tmp = m_wrapper;
@ -3460,6 +3680,48 @@ void YSocksProcessThread::run()
}
/*
* YSocksConnectThread
*/
YSocksConnectThread::YSocksConnectThread(YSocksWrapper* w, Thread::Priority prio)
: Thread("SOCKSConnect",prio),
m_engine(0), m_port(0), m_toutIntervalMs(0)
{
if (!(w && w->engine()))
return;
m_engine = w->engine();
m_wrapperId = w->toString();
m_toutIntervalMs = w->connectTimeoutInterval();
if (w->conn() && w->conn()->epDef()) {
m_address = w->conn()->epDef()->address();
m_port = w->conn()->epDef()->port();
}
}
void YSocksConnectThread::run()
{
Socket* sock = 0;
int error = 0;
bool tout = false;
if (m_address)
sock = SOCKSConn::connect(static_cast<SOCKSEngine*>(m_engine),m_address,m_port,
m_toutIntervalMs,error,tout);
notify(sock,error,tout);
}
void YSocksConnectThread::notify(Socket* sock, int error, bool timeout)
{
YSocksWrapper* w = m_engine ? m_engine->findWrapper(m_wrapperId) : 0;
m_engine = 0;
if (w) {
w->connectTerminated(this,sock,error,timeout);
TelEngine::destruct(w);
}
else if (sock)
SOCKSEngine::destroySocket(sock);
}
/*
* YSocksPlugin
*/