diff --git a/darkice/trunk/src/DarkIce.h b/darkice/trunk/src/DarkIce.h index 49ae2a5..528037d 100644 --- a/darkice/trunk/src/DarkIce.h +++ b/darkice/trunk/src/DarkIce.h @@ -235,8 +235,8 @@ class DarkIce : public virtual Referable, public virtual Reporter * @return if shouting was successful. * @exception Exception */ - bool - shout ( unsigned int ) throw ( Exception ); + //bool + //shout ( unsigned int ) throw ( Exception ); protected: diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index c426c93..df7fccd 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -68,10 +68,11 @@ void MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) { this->reconnect = reconnect; - pthread_mutex_init( &mutex_start, 0); - pthread_cond_init( &cond_start, 0); - pthread_mutex_init( &mutex_done, 0); - pthread_cond_init( &cond_done, 0); + pthread_mutex_init(&mutex_number_not_listening_yet, 0); + pthread_mutex_init(&mutex_start, 0); + pthread_cond_init(&cond_start, 0); + pthread_mutex_init(&mutex_done, 0); + pthread_cond_init(&cond_done, 0); threads = 0; } @@ -87,10 +88,11 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy( &cond_start); - pthread_mutex_destroy( &mutex_start); - pthread_cond_destroy( &cond_done); - pthread_mutex_destroy( &mutex_done); + pthread_cond_destroy(&cond_done); + pthread_mutex_destroy(&mutex_done); + pthread_cond_destroy(&cond_start); + pthread_mutex_destroy(&mutex_start); + pthread_mutex_destroy(&mutex_number_not_listening_yet); } @@ -104,6 +106,9 @@ MultiThreadedConnector :: MultiThreadedConnector ( { reconnect = connector.reconnect; mutex_start = connector.mutex_start; + mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; + number_not_listening_yet = connector.number_not_listening_yet; + cond_start = connector.cond_start; mutex_done = connector.mutex_done; cond_done = connector.cond_done; @@ -129,6 +134,8 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) reconnect = connector.reconnect; mutex_start = connector.mutex_start; + mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; + number_not_listening_yet = connector.number_not_listening_yet; cond_start = connector.cond_start; mutex_done = connector.mutex_done; cond_done = connector.cond_done; @@ -161,7 +168,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) running = true; - pthread_attr_init( &threadAttr); + pthread_attr_init(&threadAttr); pthread_attr_getstacksize(&threadAttr, &st); if (st < 128 * 1024) { reportEvent( 5, "MultiThreadedConnector :: open, stack size ", @@ -169,8 +176,12 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) st = 128 * 1024; pthread_attr_setstacksize(&threadAttr, st); } - pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); - + pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE); + + pthread_mutex_lock(&mutex_number_not_listening_yet); + number_not_listening_yet = numSinks; + pthread_mutex_unlock(&mutex_number_not_listening_yet); + threads = new ThreadData[numSinks]; for ( i = 0; i < numSinks; ++i ) { ThreadData * threadData = threads + i; @@ -207,6 +218,21 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) return false; } + // we have created all threads, make sure they are waiting for + // command from the producer + while (1) { + pthread_mutex_lock(&mutex_number_not_listening_yet); + if (0 == number_not_listening_yet) { + reportEvent( 6, "MultiThreadedConnector::open() all consumers standing by"); + break; + } else { + pthread_mutex_unlock(&mutex_number_not_listening_yet); + pthread_yield(); // give space to let the consumers running + reportEvent( 6, "MultiThreadedConnector::open() waiting for consumers standing by"); + usleep(10); + } + } + return true; } @@ -240,7 +266,7 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes); byteCounter = 0; // init, no data bytes sent yet - + while (running && (bytes == 0 || byteCounter < bytes)) { if (source->canRead(sec, usec)) { @@ -252,14 +278,15 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, if (dataSize == 0) { reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); break; + } else { + // reportEvent(9, "MultiThreadedConnector::transfer ",dataSize); } - + pthread_mutex_lock(&mutex_start); for (i = 0; i < numSinks; ++i) { - if (threads[i].accepting) - threads[i].isDone = 0; // consumers => RUN + threads[i].isDone = 0; // ALL consumers => RUN } - pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again + pthread_cond_broadcast(&cond_start); // kick ALL the waiting consumers to look again // wait for all sink threads to get done with this data // we do not spin here, we just wait for an event from the consumers @@ -281,22 +308,28 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, for (i = 0; i < numSinks; ++i) { if (threads[i].accepting) { acceptor_count++; // number of accepting threads - if (threads[i].isDone == 1) + if (threads[i].isDone == 1) stopped_count++; // number of accepting threads which have STOP } } + // if no thread is accepting and reconnect is not set stop the application + if (acceptor_count == 0 && reconnect == false) { + running=false; + break; + } + // break when all accepting threads are done if (acceptor_count == stopped_count) { break; } - // at least one thread has not set the STOP flag yet + // at least one thread has not set the isDone flag yet and is still accepting } pthread_mutex_unlock(&mutex_done); - // at this point all consumers are done with the block + // at this point all consumers are done with the block } else { reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); break; - } + } } delete[] dataBuffer; @@ -312,20 +345,26 @@ void MultiThreadedConnector::sinkThread(int ixSink) { ThreadData * threadData = &threads[ixSink]; - Sink * sink = sinks[ixSink].get( ); - - while ( running ) - { + Sink * sink = sinks[ixSink].get(); + + pthread_mutex_lock( &mutex_start ); // LOCK mutex for cond_start + // we now tell the producer we are listening + pthread_mutex_lock(&mutex_number_not_listening_yet); + number_not_listening_yet--; + pthread_mutex_unlock(&mutex_number_not_listening_yet); + + while (1) { // wait for some data to become available // producer sets isDone==0 when consumer can continue // producer sets isDone==2 or running==0 to request termination - pthread_mutex_lock( &mutex_start ); // LOCK + int rc=0; while ( (rc==0) && running && (threadData->isDone==1) ) { // wait for condition, releases lock rc = pthread_cond_wait( &cond_start, &mutex_start ); // we hold the lock again + // we check flags under protection of the lock } pthread_mutex_unlock( &mutex_start ); // UNLOCK @@ -350,20 +389,27 @@ MultiThreadedConnector::sinkThread(int ixSink) } catch ( Exception & e ) { // something wrong. don't accept more data, try to - // reopen the sink next time around + // reopen the sink NEXT time around, for now just report done threadData->accepting = false; + reportEvent( 4, + "MultiThreadedConnector :: sinkThread can't write X", ixSink ); } } else { reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", - ixSink ); + "MultiThreadedConnector :: sinkThread can't write ", ixSink ); // don't care if we can't write } - } + } - if ( !threadData->accepting ) { + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // producer will check this flag + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + + if ( ! threadData->accepting) { + // not accepting if ( reconnect ) { reportEvent( 4, "MultiThreadedConnector :: sinkThread reconnecting ", @@ -377,21 +423,17 @@ MultiThreadedConnector::sinkThread(int ixSink) threadData->accepting = sink->isOpen( ); } catch ( Exception & e ) { // don't care, just try and try again + reportEvent( 4, + "MultiThreadedConnector::sinkThread Reconnect failed", ixSink ); } - } - else { + } else { // if !reconnect, just stop the connector - // running = false; /* kill the whole application */ - // tell that we used the databuffer, do not wait for us anymore - pthread_mutex_lock( &mutex_done ); - threadData->isDone = 1; // 1==STOP - pthread_mutex_unlock( &mutex_done ); reportEvent( 4, "MultiThreadedConnector :: sinkThread no reconnect? ", - ixSink ); + ixSink); try { - threadData->accepting = false; + threadData->accepting = false; // no more data for us sink->close( ); } catch ( Exception & e ) { diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index 1add1c9..00ed04f 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -152,6 +152,20 @@ class MultiThreadedConnector : public virtual Connector pthread_mutex_t mutex_done; pthread_cond_t cond_done; // consumer sets this + /* mutex on number of consumers not listening yet to the producer + * this is to prevent a race during startup + * The producer should only signal the consumers when it knows + * that all consumers are waiting on the condition var to change + * not before, because a consumer might mis the signal and not start + * which would also mean that it will not finish, thereby blocking + * the producer + */ + + pthread_mutex_t mutex_number_not_listening_yet; + // when this is 0 all consumers are + // ready to take commands from the producer + int number_not_listening_yet; + /** * The thread attributes. */ diff --git a/darkice/trunk/src/TcpSocket.cpp b/darkice/trunk/src/TcpSocket.cpp index f065f36..2812138 100644 --- a/darkice/trunk/src/TcpSocket.cpp +++ b/darkice/trunk/src/TcpSocket.cpp @@ -83,6 +83,7 @@ #ifdef HAVE_SIGNAL_H #include +#include #else #error need signal.h #endif @@ -193,6 +194,7 @@ bool TcpSocket :: open ( void ) throw ( Exception ) { int optval; + struct timeval optval2 = {5L, 0L}; socklen_t optlen; #ifdef HAVE_ADDRINFO struct addrinfo hints @@ -221,10 +223,13 @@ TcpSocket :: open ( void ) throw ( Exception ) memcpy ( addr, ptr->ai_addr, ptr->ai_addrlen); freeaddrinfo(ptr); #else + reportEvent(9, "Gonna do gethostbyname()"); if ( !(pHostEntry = gethostbyname( host)) ) { sockfd = 0; + reportEvent(9, "Fail in gethostbyname()"); throw Exception( __FILE__, __LINE__, "gethostbyname error", errno); } + reportEvent(9, "done gethostbyname()"); memset( &addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -241,9 +246,40 @@ TcpSocket :: open ( void ) throw ( Exception ) optval = 1; optlen = sizeof(optval); if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) == -1) { - reportEvent(5, "can't set TCP socket keep-alive mode", errno); + reportEvent(5, "can't set TCP socket SO_KEEPALIVE mode", errno); + } + // set keep alive to some short value, this is a streaming server + // a long value will not work and lead to delay in reconnection + optval=5; + if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) == -1) { + reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPIDLE value", errno); + } + + optval=2; + if (setsockopt(sockfd, SOL_TCP, TCP_KEEPCNT, &optval, optlen) == -1) { + reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPCNT value", errno); + } + + optval=5; + if (setsockopt(sockfd, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) == -1) { + reportEvent(5, "can't set TCP socket keep-alive TCP_KEEPCNT value", errno); + } + + if (-1 == setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (const char *) &optval2, sizeof (optval2))) { + reportEvent(5,"could not set socket option SO_SNDTIMEO"); } + if (-1 == setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &optval2, sizeof (optval2))) { + reportEvent(5,"could not set socket option SO_RCVTIMEO"); + } + + #ifdef TCP_CORK + // send larger network segments, limit buffer upto 0.2 sec before actual sending + if (-1 == setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, (const char *) &optval, sizeof (optval))) { + reportEvent(5,"could not set socket option TCP_CORK"); + } + #endif + // connect if ( connect( sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1 ) { ::close( sockfd); @@ -329,7 +365,7 @@ TcpSocket :: read ( void * buf, /*------------------------------------------------------------------------------ - * Check wether read() would return anything + * Check if write() would block *----------------------------------------------------------------------------*/ bool TcpSocket :: canWrite ( unsigned int sec, @@ -370,7 +406,7 @@ TcpSocket :: canWrite ( unsigned int sec, * Write to the socket *----------------------------------------------------------------------------*/ unsigned int -TcpSocket :: write ( const void * buf, +TcpSocket :: write ( const void * buf, unsigned int len ) throw ( Exception ) { int ret; @@ -378,27 +414,42 @@ TcpSocket :: write ( const void * buf, if ( !isOpen() ) { return 0; } - -#ifdef HAVE_MSG_NOSIGNAL - ret = send( sockfd, buf, len, MSG_NOSIGNAL); -#else - ret = send( sockfd, buf, len, 0); -#endif - - if ( ret == -1 ) { - if ( errno == EAGAIN ) { - ret = 0; + // let us try to write stuff to this socket + // we can not take forever to do it, so the open() call set up + // a send timeout, of 5 seconds + // we give it 2 retries and then give up, the stream has + // been blocked for 10+ seconds and we need to take action anyway + unsigned int bytesleft = len; + int retries = 2; + errno = 0; + while (bytesleft && (retries)) { + reportEvent(9,"before write\n", retries); + #ifdef HAVE_MSG_NOSIGNAL + ret = send( sockfd, buf, bytesleft, MSG_NOSIGNAL); // no SIGPIPE + #else + ret = send( sockfd, buf, bytesleft, 0); + #endif + if ((ret < 0) && ( errno == EAGAIN )) { + // problem happened, but try again + // try again + retries--; } else { - ::close( sockfd); - sockfd = 0; - throw Exception( __FILE__, __LINE__, "send error", errno); + // some data was written + bytesleft -= ret; // we + buf = (char*)buf + ret; // move pointer to unsent portion } + reportEvent(9,"after write\n",ret); + } + if (bytesleft) { + // data not send after this time means serious problem + ::close(sockfd); + sockfd = 0; + throw Exception( __FILE__, __LINE__, "send error", errno); + } else { + return len; // all bytes sent } - - return ret; } - /*------------------------------------------------------------------------------ * Close the socket *----------------------------------------------------------------------------*/ diff --git a/darkice/trunk/src/TcpSocket.h b/darkice/trunk/src/TcpSocket.h index 4a43971..24690f8 100644 --- a/darkice/trunk/src/TcpSocket.h +++ b/darkice/trunk/src/TcpSocket.h @@ -245,7 +245,7 @@ class TcpSocket : public Source, public Sink, public virtual Reporter * @exception Exception */ virtual unsigned int - write ( const void * buf, + write ( const void * buf, unsigned int len ) throw ( Exception ); /**