diff --git a/darkice/trunk/src/Connector.cpp b/darkice/trunk/src/Connector.cpp index f2b9139..7a34bc7 100644 --- a/darkice/trunk/src/Connector.cpp +++ b/darkice/trunk/src/Connector.cpp @@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long Connector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/Connector.h b/darkice/trunk/src/Connector.h index 5183a38..2b77427 100644 --- a/darkice/trunk/src/Connector.h +++ b/darkice/trunk/src/Connector.h @@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index a177b89..df7fccd 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -68,9 +68,11 @@ void MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) { this->reconnect = reconnect; - - pthread_mutex_init( &mutexProduce, 0); - pthread_cond_init( &condProduce, 0); + pthread_mutex_init(&mutex_number_not_listening_yet, 0); + pthread_mutex_init(&mutex_start, 0); + pthread_cond_init(&cond_start, 0); + pthread_mutex_init(&mutex_done, 0); + pthread_cond_init(&cond_done, 0); threads = 0; } @@ -86,8 +88,11 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy( &condProduce); - pthread_mutex_destroy( &mutexProduce); + pthread_cond_destroy(&cond_done); + pthread_mutex_destroy(&mutex_done); + pthread_cond_destroy(&cond_start); + pthread_mutex_destroy(&mutex_start); + pthread_mutex_destroy(&mutex_number_not_listening_yet); } @@ -99,9 +104,14 @@ MultiThreadedConnector :: MultiThreadedConnector ( throw ( Exception ) : Connector( connector) { - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; + number_not_listening_yet = connector.number_not_listening_yet; + + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -112,7 +122,6 @@ MultiThreadedConnector :: MultiThreadedConnector ( } } - /*------------------------------------------------------------------------------ * Assignment operator *----------------------------------------------------------------------------*/ @@ -123,9 +132,13 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) if ( this != &connector ) { Connector::operator=( connector); - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; + number_not_listening_yet = connector.number_not_listening_yet; + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -139,7 +152,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) return *this; } - /*------------------------------------------------------------------------------ * Open the source and all the sinks if needed * Create the sink threads @@ -156,7 +168,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) running = true; - pthread_attr_init( &threadAttr); + pthread_attr_init(&threadAttr); pthread_attr_getstacksize(&threadAttr, &st); if (st < 128 * 1024) { reportEvent( 5, "MultiThreadedConnector :: open, stack size ", @@ -164,8 +176,12 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) st = 128 * 1024; pthread_attr_setstacksize(&threadAttr, st); } - pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); - + pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE); + + pthread_mutex_lock(&mutex_number_not_listening_yet); + number_not_listening_yet = numSinks; + pthread_mutex_unlock(&mutex_number_not_listening_yet); + threads = new ThreadData[numSinks]; for ( i = 0; i < numSinks; ++i ) { ThreadData * threadData = threads + i; @@ -173,7 +189,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) threadData->connector = this; threadData->ixSink = i; threadData->accepting = true; - threadData->isDone = true; + threadData->isDone = 1; // 1==STOP, activate thread in transfer() if ( pthread_create( &(threadData->thread), &threadAttr, ThreadData::threadFunction, @@ -187,10 +203,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) unsigned int j; // signal to stop for all running threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start); + pthread_mutex_unlock( &mutex_start); for ( j = 0; j < i; ++j ) { pthread_join( threads[j].thread, 0); @@ -202,6 +218,21 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) return false; } + // we have created all threads, make sure they are waiting for + // command from the producer + while (1) { + pthread_mutex_lock(&mutex_number_not_listening_yet); + if (0 == number_not_listening_yet) { + reportEvent( 6, "MultiThreadedConnector::open() all consumers standing by"); + break; + } else { + pthread_mutex_unlock(&mutex_number_not_listening_yet); + pthread_yield(); // give space to let the consumers running + reportEvent( 6, "MultiThreadedConnector::open() waiting for consumers standing by"); + usleep(10); + } + } + return true; } @@ -209,14 +240,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long MultiThreadedConnector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, unsigned int usec ) throw ( Exception ) { - unsigned int b; + unsigned long byteCounter; // when we reach byteCounter thread will end if ( numSinks == 0 ) { return 0; @@ -229,47 +260,80 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, dataBuffer = new unsigned char[bufSize]; dataSize = 0; - reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); - - for ( b = 0; !bytes || b < bytes; ) { - if ( source->canRead( sec, usec) ) { - unsigned int i; - - pthread_mutex_lock( &mutexProduce); - dataSize = source->read( dataBuffer, bufSize); - b += dataSize; + /* if bytes==0 transfer until end of program, + * if bytes>0 transfer upto number of bytes + */ + reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes); + byteCounter = 0; // init, no data bytes sent yet + + + while (running && (bytes == 0 || byteCounter < bytes)) { + if (source->canRead(sec, usec)) { + unsigned int i; + dataSize = source->read(dataBuffer, bufSize); + byteCounter += dataSize; + // check for EOF - if ( dataSize == 0 ) { - reportEvent( 3, "MultiThreadedConnector :: transfer, EOF"); - pthread_mutex_unlock( &mutexProduce); + if (dataSize == 0) { + reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); break; + } else { + // reportEvent(9, "MultiThreadedConnector::transfer ",dataSize); } - - for ( i = 0; i < numSinks; ++i ) { - threads[i].isDone = false; + + pthread_mutex_lock(&mutex_start); + for (i = 0; i < numSinks; ++i) { + threads[i].isDone = 0; // ALL consumers => RUN } - - // tell sink threads that there is some data available - pthread_cond_broadcast( &condProduce); + pthread_cond_broadcast(&cond_start); // kick ALL the waiting consumers to look again // wait for all sink threads to get done with this data - while ( true ) { - for ( i = 0; i < numSinks && threads[i].isDone; ++i ); - if ( i == numSinks ) { + // we do not spin here, we just wait for an event from the consumers + pthread_mutex_lock(&mutex_done); + pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable + + while ( 1 ) { + int rc = 0; + // wait for condition : releases mutex so other thread can change condition + rc = pthread_cond_wait(&cond_done, &mutex_done); + // mutex is locked again + if (rc != 0) { + reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail"); break; } - pthread_cond_wait( &condProduce, &mutexProduce); + + int acceptor_count=0; + int stopped_count=0; + for (i = 0; i < numSinks; ++i) { + if (threads[i].accepting) { + acceptor_count++; // number of accepting threads + if (threads[i].isDone == 1) + stopped_count++; // number of accepting threads which have STOP + } + } + // if no thread is accepting and reconnect is not set stop the application + if (acceptor_count == 0 && reconnect == false) { + running=false; + break; + } + + // break when all accepting threads are done + if (acceptor_count == stopped_count) { + break; + } + // at least one thread has not set the isDone flag yet and is still accepting } - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock(&mutex_done); + // at this point all consumers are done with the block } else { - reportEvent( 3, "MultiThreadedConnector :: transfer, can't read"); + reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); break; - } + } } delete[] dataBuffer; - return b; + return byteCounter; } @@ -278,70 +342,123 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, * Read the presented data *----------------------------------------------------------------------------*/ void -MultiThreadedConnector :: sinkThread( int ixSink ) +MultiThreadedConnector::sinkThread(int ixSink) { - ThreadData * threadData = &threads[ixSink]; - Sink * sink = sinks[ixSink].get(); - - while ( running ) { + ThreadData * threadData = &threads[ixSink]; + Sink * sink = sinks[ixSink].get(); + + pthread_mutex_lock( &mutex_start ); // LOCK mutex for cond_start + // we now tell the producer we are listening + pthread_mutex_lock(&mutex_number_not_listening_yet); + number_not_listening_yet--; + pthread_mutex_unlock(&mutex_number_not_listening_yet); + + while (1) { // wait for some data to become available - pthread_mutex_lock( &mutexProduce); - while ( running && threadData->isDone ) { - pthread_cond_wait( &condProduce, &mutexProduce); + // producer sets isDone==0 when consumer can continue + // producer sets isDone==2 or running==0 to request termination + + int rc=0; + while ( (rc==0) && running && (threadData->isDone==1) ) + { + // wait for condition, releases lock + rc = pthread_cond_wait( &cond_start, &mutex_start ); + // we hold the lock again + // we check flags under protection of the lock } - if ( !running ) { - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock( &mutex_start ); // UNLOCK + + // something wrong or signal to quit detected + // break out of this loop, will end the thread + if ( running==false || threadData->isDone==2 || rc != 0 ) break; - } - if ( threadData->cut) { - sink->cut(); + if ( threadData->cut ) + { + sink->cut( ); threadData->cut = false; } - if ( threadData->accepting ) { - if ( sink->canWrite( 0, 0) ) { - try { - sink->write( dataBuffer, dataSize); - } catch ( Exception & e ) { + if ( threadData->accepting ) + { + if ( sink->canWrite( 0, 0 ) ) + { + try + { + sink->write( dataBuffer, dataSize ); + } catch ( Exception & e ) + { // something wrong. don't accept more data, try to - // reopen the sink next time around + // reopen the sink NEXT time around, for now just report done threadData->accepting = false; + reportEvent( 4, + "MultiThreadedConnector :: sinkThread can't write X", ixSink ); } - } else { + } + else + { reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", - ixSink); + "MultiThreadedConnector :: sinkThread can't write ", ixSink ); // don't care if we can't write } - } - threadData->isDone = true; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + } - if ( !threadData->accepting ) { + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // producer will check this flag + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + + if ( ! threadData->accepting) { + // not accepting if ( reconnect ) { reportEvent( 4, - "MultiThreadedConnector :: sinkThread reconnecting ", - ixSink); + "MultiThreadedConnector :: sinkThread reconnecting ", + ixSink ); // if we're not accepting, try to reopen the sink try { - sink->close(); - Util::sleep(1L, 0L); - sink->open(); - sched_yield(); - threadData->accepting = sink->isOpen(); - } catch ( Exception & e ) { + sink->close( ); + Util::sleep( 1L, 0L ); + sink->open( ); + sched_yield( ); + threadData->accepting = sink->isOpen( ); + } catch ( Exception & e ) { // don't care, just try and try again + reportEvent( 4, + "MultiThreadedConnector::sinkThread Reconnect failed", ixSink ); } } else { // if !reconnect, just stop the connector - running = false; - } + reportEvent( 4, + "MultiThreadedConnector :: sinkThread no reconnect? ", + ixSink); + try + { + threadData->accepting = false; // no more data for us + sink->close( ); + } catch ( Exception & e ) + { + // don't care, just try and try again + reportEvent( 9, + "MultiThreadedConnector :: sinkThread do not care2 ", + ixSink ); + } + } } - } -} + + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // producer will check this flag + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + + } /* is running */ + /* just make sure nobody will be waiting for us when we terminate */ + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // STOP + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + +} /*------------------------------------------------------------------------------ * Signal to each sink to cut what they've done so far, and start anew. @@ -369,16 +486,15 @@ MultiThreadedConnector :: close ( void ) throw ( Exception ) unsigned int i; // signal to stop for all threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start ); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start ); + pthread_mutex_unlock( &mutex_start ); // wait for all the threads to finish for ( i = 0; i < numSinks; ++i ) { pthread_join( threads[i].thread, 0); } - pthread_attr_destroy( &threadAttr); Connector::close(); } diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index eef1345..00ed04f 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector * Marks if the thread has processed the last batch * of data. */ - bool isDone; + int isDone; /** * A flag to show that the sink should be made to cut in the @@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector this->ixSink = 0; this->thread = 0; this->accepting = false; - this->isDone = false; + this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE this->cut = false; } @@ -138,16 +138,34 @@ class MultiThreadedConnector : public virtual Connector threadFunction( void * param ); }; - /** - * The mutex of this object. + /* mutex and cond variable for signaling new data + * the consumers wait for this */ - pthread_mutex_t mutexProduce; - - /** - * The conditional variable for presenting new data. + pthread_mutex_t mutex_start; + pthread_cond_t cond_start; // producer sets this + + /* mutex and cond variable for signaling that a single thread has + * finished working on a block of data + * the producer waits for this, then checks if everyone is done + * and maybe waits again until everyone has finished on the data */ - pthread_cond_t condProduce; - + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; // consumer sets this + + /* mutex on number of consumers not listening yet to the producer + * this is to prevent a race during startup + * The producer should only signal the consumers when it knows + * that all consumers are waiting on the condition var to change + * not before, because a consumer might mis the signal and not start + * which would also mean that it will not finish, thereby blocking + * the producer + */ + + pthread_mutex_t mutex_number_not_listening_yet; + // when this is 0 all consumers are + // ready to take commands from the producer + int number_not_listening_yet; + /** * The thread attributes. */ @@ -159,7 +177,7 @@ class MultiThreadedConnector : public virtual Connector ThreadData * threads; /** - * Signal if we're running or not, so the threads no if to stop. + * Signal if we're running or not running */ bool running; @@ -282,7 +300,7 @@ class MultiThreadedConnector : public virtual Connector virtual MultiThreadedConnector & operator= ( const MultiThreadedConnector & connector ) throw ( Exception ); - + /** * Open the connector. Opens the Source and the Sinks if necessary. * @@ -314,7 +332,7 @@ class MultiThreadedConnector : public virtual Connector * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec,