Fix hangs on network, reconnect works, timeout for networktrafic added, TCP extra options. Most likely fixes issues #84 #81 #82

This commit is contained in:
oetelaar.automatisering@gmail.com 2013-05-16 22:27:07 +00:00
parent 8bee03c4ab
commit 96a104d15e
5 changed files with 168 additions and 61 deletions

View File

@ -235,8 +235,8 @@ class DarkIce : public virtual Referable, public virtual Reporter
* @return if shouting was successful. * @return if shouting was successful.
* @exception Exception * @exception Exception
*/ */
bool //bool
shout ( unsigned int ) throw ( Exception ); //shout ( unsigned int ) throw ( Exception );
protected: protected:

View File

@ -68,10 +68,11 @@ void
MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception )
{ {
this->reconnect = reconnect; this->reconnect = reconnect;
pthread_mutex_init( &mutex_start, 0); pthread_mutex_init(&mutex_number_not_listening_yet, 0);
pthread_cond_init( &cond_start, 0); pthread_mutex_init(&mutex_start, 0);
pthread_mutex_init( &mutex_done, 0); pthread_cond_init(&cond_start, 0);
pthread_cond_init( &cond_done, 0); pthread_mutex_init(&mutex_done, 0);
pthread_cond_init(&cond_done, 0);
threads = 0; threads = 0;
} }
@ -87,10 +88,11 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception )
threads = 0; threads = 0;
} }
pthread_cond_destroy( &cond_start); pthread_cond_destroy(&cond_done);
pthread_mutex_destroy( &mutex_start); pthread_mutex_destroy(&mutex_done);
pthread_cond_destroy( &cond_done); pthread_cond_destroy(&cond_start);
pthread_mutex_destroy( &mutex_done); pthread_mutex_destroy(&mutex_start);
pthread_mutex_destroy(&mutex_number_not_listening_yet);
} }
@ -104,6 +106,9 @@ MultiThreadedConnector :: MultiThreadedConnector (
{ {
reconnect = connector.reconnect; reconnect = connector.reconnect;
mutex_start = connector.mutex_start; mutex_start = connector.mutex_start;
mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet;
number_not_listening_yet = connector.number_not_listening_yet;
cond_start = connector.cond_start; cond_start = connector.cond_start;
mutex_done = connector.mutex_done; mutex_done = connector.mutex_done;
cond_done = connector.cond_done; cond_done = connector.cond_done;
@ -129,6 +134,8 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
reconnect = connector.reconnect; reconnect = connector.reconnect;
mutex_start = connector.mutex_start; mutex_start = connector.mutex_start;
mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet;
number_not_listening_yet = connector.number_not_listening_yet;
cond_start = connector.cond_start; cond_start = connector.cond_start;
mutex_done = connector.mutex_done; mutex_done = connector.mutex_done;
cond_done = connector.cond_done; cond_done = connector.cond_done;
@ -161,7 +168,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
running = true; running = true;
pthread_attr_init( &threadAttr); pthread_attr_init(&threadAttr);
pthread_attr_getstacksize(&threadAttr, &st); pthread_attr_getstacksize(&threadAttr, &st);
if (st < 128 * 1024) { if (st < 128 * 1024) {
reportEvent( 5, "MultiThreadedConnector :: open, stack size ", reportEvent( 5, "MultiThreadedConnector :: open, stack size ",
@ -169,7 +176,11 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
st = 128 * 1024; st = 128 * 1024;
pthread_attr_setstacksize(&threadAttr, st); pthread_attr_setstacksize(&threadAttr, st);
} }
pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
pthread_mutex_lock(&mutex_number_not_listening_yet);
number_not_listening_yet = numSinks;
pthread_mutex_unlock(&mutex_number_not_listening_yet);
threads = new ThreadData[numSinks]; threads = new ThreadData[numSinks];
for ( i = 0; i < numSinks; ++i ) { for ( i = 0; i < numSinks; ++i ) {
@ -207,6 +218,21 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
return false; return false;
} }
// we have created all threads, make sure they are waiting for
// command from the producer
while (1) {
pthread_mutex_lock(&mutex_number_not_listening_yet);
if (0 == number_not_listening_yet) {
reportEvent( 6, "MultiThreadedConnector::open() all consumers standing by");
break;
} else {
pthread_mutex_unlock(&mutex_number_not_listening_yet);
pthread_yield(); // give space to let the consumers running
reportEvent( 6, "MultiThreadedConnector::open() waiting for consumers standing by");
usleep(10);
}
}
return true; return true;
} }
@ -252,14 +278,15 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
if (dataSize == 0) { if (dataSize == 0) {
reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); reportEvent(3, "MultiThreadedConnector :: transfer, EOF");
break; break;
} else {
// reportEvent(9, "MultiThreadedConnector::transfer ",dataSize);
} }
pthread_mutex_lock(&mutex_start); pthread_mutex_lock(&mutex_start);
for (i = 0; i < numSinks; ++i) { for (i = 0; i < numSinks; ++i) {
if (threads[i].accepting) threads[i].isDone = 0; // ALL consumers => RUN
threads[i].isDone = 0; // consumers => RUN
} }
pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again pthread_cond_broadcast(&cond_start); // kick ALL the waiting consumers to look again
// wait for all sink threads to get done with this data // wait for all sink threads to get done with this data
// we do not spin here, we just wait for an event from the consumers // we do not spin here, we just wait for an event from the consumers
@ -281,22 +308,28 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
for (i = 0; i < numSinks; ++i) { for (i = 0; i < numSinks; ++i) {
if (threads[i].accepting) { if (threads[i].accepting) {
acceptor_count++; // number of accepting threads acceptor_count++; // number of accepting threads
if (threads[i].isDone == 1) if (threads[i].isDone == 1)
stopped_count++; // number of accepting threads which have STOP stopped_count++; // number of accepting threads which have STOP
} }
} }
// if no thread is accepting and reconnect is not set stop the application
if (acceptor_count == 0 && reconnect == false) {
running=false;
break;
}
// break when all accepting threads are done // break when all accepting threads are done
if (acceptor_count == stopped_count) { if (acceptor_count == stopped_count) {
break; break;
} }
// at least one thread has not set the STOP flag yet // at least one thread has not set the isDone flag yet and is still accepting
} }
pthread_mutex_unlock(&mutex_done); pthread_mutex_unlock(&mutex_done);
// at this point all consumers are done with the block // at this point all consumers are done with the block
} else { } else {
reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); reportEvent(3,"MultiThreadedConnector :: transfer, can't read");
break; break;
} }
} }
delete[] dataBuffer; delete[] dataBuffer;
@ -312,20 +345,26 @@ void
MultiThreadedConnector::sinkThread(int ixSink) MultiThreadedConnector::sinkThread(int ixSink)
{ {
ThreadData * threadData = &threads[ixSink]; ThreadData * threadData = &threads[ixSink];
Sink * sink = sinks[ixSink].get( ); Sink * sink = sinks[ixSink].get();
while ( running ) pthread_mutex_lock( &mutex_start ); // LOCK mutex for cond_start
{ // we now tell the producer we are listening
pthread_mutex_lock(&mutex_number_not_listening_yet);
number_not_listening_yet--;
pthread_mutex_unlock(&mutex_number_not_listening_yet);
while (1) {
// wait for some data to become available // wait for some data to become available
// producer sets isDone==0 when consumer can continue // producer sets isDone==0 when consumer can continue
// producer sets isDone==2 or running==0 to request termination // producer sets isDone==2 or running==0 to request termination
pthread_mutex_lock( &mutex_start ); // LOCK
int rc=0; int rc=0;
while ( (rc==0) && running && (threadData->isDone==1) ) while ( (rc==0) && running && (threadData->isDone==1) )
{ {
// wait for condition, releases lock // wait for condition, releases lock
rc = pthread_cond_wait( &cond_start, &mutex_start ); rc = pthread_cond_wait( &cond_start, &mutex_start );
// we hold the lock again // we hold the lock again
// we check flags under protection of the lock
} }
pthread_mutex_unlock( &mutex_start ); // UNLOCK pthread_mutex_unlock( &mutex_start ); // UNLOCK
@ -350,20 +389,27 @@ MultiThreadedConnector::sinkThread(int ixSink)
} catch ( Exception & e ) } catch ( Exception & e )
{ {
// something wrong. don't accept more data, try to // something wrong. don't accept more data, try to
// reopen the sink next time around // reopen the sink NEXT time around, for now just report done
threadData->accepting = false; threadData->accepting = false;
reportEvent( 4,
"MultiThreadedConnector :: sinkThread can't write X", ixSink );
} }
} }
else else
{ {
reportEvent( 4, reportEvent( 4,
"MultiThreadedConnector :: sinkThread can't write ", "MultiThreadedConnector :: sinkThread can't write ", ixSink );
ixSink );
// don't care if we can't write // don't care if we can't write
} }
} }
if ( !threadData->accepting ) { pthread_mutex_lock( &mutex_done );
threadData->isDone = 1; // producer will check this flag
pthread_cond_signal( &cond_done ); // signal producer
pthread_mutex_unlock( &mutex_done );
if ( ! threadData->accepting) {
// not accepting
if ( reconnect ) { if ( reconnect ) {
reportEvent( 4, reportEvent( 4,
"MultiThreadedConnector :: sinkThread reconnecting ", "MultiThreadedConnector :: sinkThread reconnecting ",
@ -377,21 +423,17 @@ MultiThreadedConnector::sinkThread(int ixSink)
threadData->accepting = sink->isOpen( ); threadData->accepting = sink->isOpen( );
} catch ( Exception & e ) { } catch ( Exception & e ) {
// don't care, just try and try again // don't care, just try and try again
reportEvent( 4,
"MultiThreadedConnector::sinkThread Reconnect failed", ixSink );
} }
} } else {
else {
// if !reconnect, just stop the connector // if !reconnect, just stop the connector
// running = false; /* kill the whole application */
// tell that we used the databuffer, do not wait for us anymore
pthread_mutex_lock( &mutex_done );
threadData->isDone = 1; // 1==STOP
pthread_mutex_unlock( &mutex_done );
reportEvent( 4, reportEvent( 4,
"MultiThreadedConnector :: sinkThread no reconnect? ", "MultiThreadedConnector :: sinkThread no reconnect? ",
ixSink ); ixSink);
try try
{ {
threadData->accepting = false; threadData->accepting = false; // no more data for us
sink->close( ); sink->close( );
} catch ( Exception & e ) } catch ( Exception & e )
{ {

View File

@ -152,6 +152,20 @@ class MultiThreadedConnector : public virtual Connector
pthread_mutex_t mutex_done; pthread_mutex_t mutex_done;
pthread_cond_t cond_done; // consumer sets this pthread_cond_t cond_done; // consumer sets this
/* mutex on number of consumers not listening yet to the producer
* this is to prevent a race during startup
* The producer should only signal the consumers when it knows
* that all consumers are waiting on the condition var to change
* not before, because a consumer might mis the signal and not start
* which would also mean that it will not finish, thereby blocking
* the producer
*/
pthread_mutex_t mutex_number_not_listening_yet;
// when this is 0 all consumers are
// ready to take commands from the producer
int number_not_listening_yet;
/** /**
* The thread attributes. * The thread attributes.
*/ */

View File

@ -83,6 +83,7 @@
#ifdef HAVE_SIGNAL_H #ifdef HAVE_SIGNAL_H
#include <signal.h> #include <signal.h>
#include <netinet/tcp.h>
#else #else
#error need signal.h #error need signal.h
#endif #endif
@ -193,6 +194,7 @@ bool
TcpSocket :: open ( void ) throw ( Exception ) TcpSocket :: open ( void ) throw ( Exception )
{ {
int optval; int optval;
struct timeval optval2 = {5L, 0L};
socklen_t optlen; socklen_t optlen;
#ifdef HAVE_ADDRINFO #ifdef HAVE_ADDRINFO
struct addrinfo hints struct addrinfo hints
@ -221,10 +223,13 @@ TcpSocket :: open ( void ) throw ( Exception )
memcpy ( addr, ptr->ai_addr, ptr->ai_addrlen); memcpy ( addr, ptr->ai_addr, ptr->ai_addrlen);
freeaddrinfo(ptr); freeaddrinfo(ptr);
#else #else
reportEvent(9, "Gonna do gethostbyname()");
if ( !(pHostEntry = gethostbyname( host)) ) { if ( !(pHostEntry = gethostbyname( host)) ) {
sockfd = 0; sockfd = 0;
reportEvent(9, "Fail in gethostbyname()");
throw Exception( __FILE__, __LINE__, "gethostbyname error", errno); throw Exception( __FILE__, __LINE__, "gethostbyname error", errno);
} }
reportEvent(9, "done gethostbyname()");
memset( &addr, 0, sizeof(addr)); memset( &addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
@ -241,8 +246,39 @@ TcpSocket :: open ( void ) throw ( Exception )
optval = 1; optval = 1;
optlen = sizeof(optval); optlen = sizeof(optval);
if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) == -1) { if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) == -1) {
reportEvent(5, "can't set TCP socket keep-alive mode", errno); reportEvent(5, "can't set TCP socket SO_KEEPALIVE mode", errno);
} }
// set keep alive to some short value, this is a streaming server
// a long value will not work and lead to delay in reconnection
optval=5;
if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) == -1) {
reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPIDLE value", errno);
}
optval=2;
if (setsockopt(sockfd, SOL_TCP, TCP_KEEPCNT, &optval, optlen) == -1) {
reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPCNT value", errno);
}
optval=5;
if (setsockopt(sockfd, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) == -1) {
reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPCNT value", errno);
}
if (-1 == setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (const char *) &optval2, sizeof (optval2))) {
reportEvent(5,"could not set socket option SO_SNDTIMEO");
}
if (-1 == setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &optval2, sizeof (optval2))) {
reportEvent(5,"could not set socket option SO_RCVTIMEO");
}
#ifdef TCP_CORK
// send larger network segments, limit buffer upto 0.2 sec before actual sending
if (-1 == setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, (const char *) &optval, sizeof (optval))) {
reportEvent(5,"could not set socket option TCP_CORK");
}
#endif
// connect // connect
if ( connect( sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1 ) { if ( connect( sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1 ) {
@ -329,7 +365,7 @@ TcpSocket :: read ( void * buf,
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Check wether read() would return anything * Check if write() would block
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
bool bool
TcpSocket :: canWrite ( unsigned int sec, TcpSocket :: canWrite ( unsigned int sec,
@ -370,7 +406,7 @@ TcpSocket :: canWrite ( unsigned int sec,
* Write to the socket * Write to the socket
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
unsigned int unsigned int
TcpSocket :: write ( const void * buf, TcpSocket :: write ( const void * buf,
unsigned int len ) throw ( Exception ) unsigned int len ) throw ( Exception )
{ {
int ret; int ret;
@ -378,27 +414,42 @@ TcpSocket :: write ( const void * buf,
if ( !isOpen() ) { if ( !isOpen() ) {
return 0; return 0;
} }
// let us try to write stuff to this socket
#ifdef HAVE_MSG_NOSIGNAL // we can not take forever to do it, so the open() call set up
ret = send( sockfd, buf, len, MSG_NOSIGNAL); // a send timeout, of 5 seconds
#else // we give it 2 retries and then give up, the stream has
ret = send( sockfd, buf, len, 0); // been blocked for 10+ seconds and we need to take action anyway
#endif unsigned int bytesleft = len;
int retries = 2;
if ( ret == -1 ) { errno = 0;
if ( errno == EAGAIN ) { while (bytesleft && (retries)) {
ret = 0; reportEvent(9,"before write\n", retries);
#ifdef HAVE_MSG_NOSIGNAL
ret = send( sockfd, buf, bytesleft, MSG_NOSIGNAL); // no SIGPIPE
#else
ret = send( sockfd, buf, bytesleft, 0);
#endif
if ((ret < 0) && ( errno == EAGAIN )) {
// problem happened, but try again
// try again
retries--;
} else { } else {
::close( sockfd); // some data was written
sockfd = 0; bytesleft -= ret; // we
throw Exception( __FILE__, __LINE__, "send error", errno); buf = (char*)buf + ret; // move pointer to unsent portion
} }
reportEvent(9,"after write\n",ret);
}
if (bytesleft) {
// data not send after this time means serious problem
::close(sockfd);
sockfd = 0;
throw Exception( __FILE__, __LINE__, "send error", errno);
} else {
return len; // all bytes sent
} }
return ret;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Close the socket * Close the socket
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/

View File

@ -245,7 +245,7 @@ class TcpSocket : public Source, public Sink, public virtual Reporter
* @exception Exception * @exception Exception
*/ */
virtual unsigned int virtual unsigned int
write ( const void * buf, write ( const void * buf,
unsigned int len ) throw ( Exception ); unsigned int len ) throw ( Exception );
/** /**