From 4055ebf9ef22c64c4f6836f2bde7f57a0acdb0d6 Mon Sep 17 00:00:00 2001 From: "rafael@riseup.net" Date: Sat, 18 May 2013 16:33:31 +0000 Subject: [PATCH] pthread changes don't work. lets stick with older code for 1.2 release --- darkice/trunk/src/Connector.cpp | 2 +- darkice/trunk/src/Connector.h | 2 +- darkice/trunk/src/MultiThreadedConnector.cpp | 304 ++++++------------- darkice/trunk/src/MultiThreadedConnector.h | 44 +-- 4 files changed, 109 insertions(+), 243 deletions(-) diff --git a/darkice/trunk/src/Connector.cpp b/darkice/trunk/src/Connector.cpp index 7a34bc7..f2b9139 100644 --- a/darkice/trunk/src/Connector.cpp +++ b/darkice/trunk/src/Connector.cpp @@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned long +unsigned int Connector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/Connector.h b/darkice/trunk/src/Connector.h index 2b77427..5183a38 100644 --- a/darkice/trunk/src/Connector.h +++ b/darkice/trunk/src/Connector.h @@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned long + virtual unsigned int transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index df7fccd..a177b89 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -68,11 +68,9 @@ void MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) { this->reconnect = reconnect; - pthread_mutex_init(&mutex_number_not_listening_yet, 0); - pthread_mutex_init(&mutex_start, 0); - pthread_cond_init(&cond_start, 0); - pthread_mutex_init(&mutex_done, 0); - pthread_cond_init(&cond_done, 0); + + pthread_mutex_init( &mutexProduce, 0); + pthread_cond_init( &condProduce, 0); threads = 0; } @@ -88,11 +86,8 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy(&cond_done); - pthread_mutex_destroy(&mutex_done); - pthread_cond_destroy(&cond_start); - pthread_mutex_destroy(&mutex_start); - pthread_mutex_destroy(&mutex_number_not_listening_yet); + pthread_cond_destroy( &condProduce); + pthread_mutex_destroy( &mutexProduce); } @@ -104,14 +99,9 @@ MultiThreadedConnector :: MultiThreadedConnector ( throw ( Exception ) : Connector( connector) { - reconnect = connector.reconnect; - mutex_start = connector.mutex_start; - mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; - number_not_listening_yet = connector.number_not_listening_yet; - - cond_start = connector.cond_start; - mutex_done = connector.mutex_done; - cond_done = connector.cond_done; + reconnect = connector.reconnect; + mutexProduce = connector.mutexProduce; + condProduce = connector.condProduce; if ( threads ) { delete[] threads; @@ -122,6 +112,7 @@ MultiThreadedConnector :: MultiThreadedConnector ( } } + /*------------------------------------------------------------------------------ * Assignment operator *----------------------------------------------------------------------------*/ @@ -132,13 +123,9 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) if ( this != &connector ) { Connector::operator=( connector); - reconnect = connector.reconnect; - mutex_start = connector.mutex_start; - mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet; - number_not_listening_yet = connector.number_not_listening_yet; - cond_start = connector.cond_start; - mutex_done = connector.mutex_done; - cond_done = connector.cond_done; + reconnect = connector.reconnect; + mutexProduce = connector.mutexProduce; + condProduce = connector.condProduce; if ( threads ) { delete[] threads; @@ -152,6 +139,7 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) return *this; } + /*------------------------------------------------------------------------------ * Open the source and all the sinks if needed * Create the sink threads @@ -168,7 +156,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) running = true; - pthread_attr_init(&threadAttr); + pthread_attr_init( &threadAttr); pthread_attr_getstacksize(&threadAttr, &st); if (st < 128 * 1024) { reportEvent( 5, "MultiThreadedConnector :: open, stack size ", @@ -176,12 +164,8 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) st = 128 * 1024; pthread_attr_setstacksize(&threadAttr, st); } - pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE); - - pthread_mutex_lock(&mutex_number_not_listening_yet); - number_not_listening_yet = numSinks; - pthread_mutex_unlock(&mutex_number_not_listening_yet); - + pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); + threads = new ThreadData[numSinks]; for ( i = 0; i < numSinks; ++i ) { ThreadData * threadData = threads + i; @@ -189,7 +173,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) threadData->connector = this; threadData->ixSink = i; threadData->accepting = true; - threadData->isDone = 1; // 1==STOP, activate thread in transfer() + threadData->isDone = true; if ( pthread_create( &(threadData->thread), &threadAttr, ThreadData::threadFunction, @@ -203,10 +187,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) unsigned int j; // signal to stop for all running threads - pthread_mutex_lock( &mutex_start); + pthread_mutex_lock( &mutexProduce); running = false; - pthread_cond_broadcast( &cond_start); - pthread_mutex_unlock( &mutex_start); + pthread_cond_broadcast( &condProduce); + pthread_mutex_unlock( &mutexProduce); for ( j = 0; j < i; ++j ) { pthread_join( threads[j].thread, 0); @@ -218,21 +202,6 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) return false; } - // we have created all threads, make sure they are waiting for - // command from the producer - while (1) { - pthread_mutex_lock(&mutex_number_not_listening_yet); - if (0 == number_not_listening_yet) { - reportEvent( 6, "MultiThreadedConnector::open() all consumers standing by"); - break; - } else { - pthread_mutex_unlock(&mutex_number_not_listening_yet); - pthread_yield(); // give space to let the consumers running - reportEvent( 6, "MultiThreadedConnector::open() waiting for consumers standing by"); - usleep(10); - } - } - return true; } @@ -240,14 +209,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned long +unsigned int MultiThreadedConnector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, unsigned int usec ) throw ( Exception ) { - unsigned long byteCounter; // when we reach byteCounter thread will end + unsigned int b; if ( numSinks == 0 ) { return 0; @@ -260,80 +229,47 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, dataBuffer = new unsigned char[bufSize]; dataSize = 0; - /* if bytes==0 transfer until end of program, - * if bytes>0 transfer upto number of bytes - */ - reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes); - byteCounter = 0; // init, no data bytes sent yet - - - while (running && (bytes == 0 || byteCounter < bytes)) { + reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); + + for ( b = 0; !bytes || b < bytes; ) { + if ( source->canRead( sec, usec) ) { + unsigned int i; + + pthread_mutex_lock( &mutexProduce); + dataSize = source->read( dataBuffer, bufSize); + b += dataSize; - if (source->canRead(sec, usec)) { - unsigned int i; - dataSize = source->read(dataBuffer, bufSize); - byteCounter += dataSize; - // check for EOF - if (dataSize == 0) { - reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); + if ( dataSize == 0 ) { + reportEvent( 3, "MultiThreadedConnector :: transfer, EOF"); + pthread_mutex_unlock( &mutexProduce); break; - } else { - // reportEvent(9, "MultiThreadedConnector::transfer ",dataSize); } - - pthread_mutex_lock(&mutex_start); - for (i = 0; i < numSinks; ++i) { - threads[i].isDone = 0; // ALL consumers => RUN + + for ( i = 0; i < numSinks; ++i ) { + threads[i].isDone = false; } - pthread_cond_broadcast(&cond_start); // kick ALL the waiting consumers to look again + + // tell sink threads that there is some data available + pthread_cond_broadcast( &condProduce); // wait for all sink threads to get done with this data - // we do not spin here, we just wait for an event from the consumers - pthread_mutex_lock(&mutex_done); - pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable - - while ( 1 ) { - int rc = 0; - // wait for condition : releases mutex so other thread can change condition - rc = pthread_cond_wait(&cond_done, &mutex_done); - // mutex is locked again - if (rc != 0) { - reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail"); + while ( true ) { + for ( i = 0; i < numSinks && threads[i].isDone; ++i ); + if ( i == numSinks ) { break; } - - int acceptor_count=0; - int stopped_count=0; - for (i = 0; i < numSinks; ++i) { - if (threads[i].accepting) { - acceptor_count++; // number of accepting threads - if (threads[i].isDone == 1) - stopped_count++; // number of accepting threads which have STOP - } - } - // if no thread is accepting and reconnect is not set stop the application - if (acceptor_count == 0 && reconnect == false) { - running=false; - break; - } - - // break when all accepting threads are done - if (acceptor_count == stopped_count) { - break; - } - // at least one thread has not set the isDone flag yet and is still accepting + pthread_cond_wait( &condProduce, &mutexProduce); } - pthread_mutex_unlock(&mutex_done); - // at this point all consumers are done with the block + pthread_mutex_unlock( &mutexProduce); } else { - reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); + reportEvent( 3, "MultiThreadedConnector :: transfer, can't read"); break; - } + } } delete[] dataBuffer; - return byteCounter; + return b; } @@ -342,124 +278,71 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, * Read the presented data *----------------------------------------------------------------------------*/ void -MultiThreadedConnector::sinkThread(int ixSink) +MultiThreadedConnector :: sinkThread( int ixSink ) { - ThreadData * threadData = &threads[ixSink]; - Sink * sink = sinks[ixSink].get(); - - pthread_mutex_lock( &mutex_start ); // LOCK mutex for cond_start - // we now tell the producer we are listening - pthread_mutex_lock(&mutex_number_not_listening_yet); - number_not_listening_yet--; - pthread_mutex_unlock(&mutex_number_not_listening_yet); - - while (1) { - // wait for some data to become available - // producer sets isDone==0 when consumer can continue - // producer sets isDone==2 or running==0 to request termination - - int rc=0; - while ( (rc==0) && running && (threadData->isDone==1) ) - { - // wait for condition, releases lock - rc = pthread_cond_wait( &cond_start, &mutex_start ); - // we hold the lock again - // we check flags under protection of the lock - } - pthread_mutex_unlock( &mutex_start ); // UNLOCK - - // something wrong or signal to quit detected - // break out of this loop, will end the thread - if ( running==false || threadData->isDone==2 || rc != 0 ) - break; + ThreadData * threadData = &threads[ixSink]; + Sink * sink = sinks[ixSink].get(); - if ( threadData->cut ) - { - sink->cut( ); + while ( running ) { + // wait for some data to become available + pthread_mutex_lock( &mutexProduce); + while ( running && threadData->isDone ) { + pthread_cond_wait( &condProduce, &mutexProduce); + } + if ( !running ) { + pthread_mutex_unlock( &mutexProduce); + break; + } + + if ( threadData->cut) { + sink->cut(); threadData->cut = false; } - if ( threadData->accepting ) - { - if ( sink->canWrite( 0, 0 ) ) - { - try - { - sink->write( dataBuffer, dataSize ); - } catch ( Exception & e ) - { + if ( threadData->accepting ) { + if ( sink->canWrite( 0, 0) ) { + try { + sink->write( dataBuffer, dataSize); + } catch ( Exception & e ) { // something wrong. don't accept more data, try to - // reopen the sink NEXT time around, for now just report done + // reopen the sink next time around threadData->accepting = false; - reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write X", ixSink ); } - } - else - { + } else { reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", ixSink ); + "MultiThreadedConnector :: sinkThread can't write ", + ixSink); // don't care if we can't write } - } + } + threadData->isDone = true; + pthread_cond_broadcast( &condProduce); + pthread_mutex_unlock( &mutexProduce); - pthread_mutex_lock( &mutex_done ); - threadData->isDone = 1; // producer will check this flag - pthread_cond_signal( &cond_done ); // signal producer - pthread_mutex_unlock( &mutex_done ); - - if ( ! threadData->accepting) { - // not accepting + if ( !threadData->accepting ) { if ( reconnect ) { reportEvent( 4, - "MultiThreadedConnector :: sinkThread reconnecting ", - ixSink ); + "MultiThreadedConnector :: sinkThread reconnecting ", + ixSink); // if we're not accepting, try to reopen the sink try { - sink->close( ); - Util::sleep( 1L, 0L ); - sink->open( ); - sched_yield( ); - threadData->accepting = sink->isOpen( ); - } catch ( Exception & e ) { + sink->close(); + Util::sleep(1L, 0L); + sink->open(); + sched_yield(); + threadData->accepting = sink->isOpen(); + } catch ( Exception & e ) { // don't care, just try and try again - reportEvent( 4, - "MultiThreadedConnector::sinkThread Reconnect failed", ixSink ); } } else { // if !reconnect, just stop the connector - reportEvent( 4, - "MultiThreadedConnector :: sinkThread no reconnect? ", - ixSink); - try - { - threadData->accepting = false; // no more data for us - sink->close( ); - } catch ( Exception & e ) - { - // don't care, just try and try again - reportEvent( 9, - "MultiThreadedConnector :: sinkThread do not care2 ", - ixSink ); - } - } + running = false; + } } - - pthread_mutex_lock( &mutex_done ); - threadData->isDone = 1; // producer will check this flag - pthread_cond_signal( &cond_done ); // signal producer - pthread_mutex_unlock( &mutex_done ); - - } /* is running */ - - /* just make sure nobody will be waiting for us when we terminate */ - pthread_mutex_lock( &mutex_done ); - threadData->isDone = 1; // STOP - pthread_cond_signal( &cond_done ); // signal producer - pthread_mutex_unlock( &mutex_done ); - + } } + /*------------------------------------------------------------------------------ * Signal to each sink to cut what they've done so far, and start anew. *----------------------------------------------------------------------------*/ @@ -486,15 +369,16 @@ MultiThreadedConnector :: close ( void ) throw ( Exception ) unsigned int i; // signal to stop for all threads - pthread_mutex_lock( &mutex_start ); + pthread_mutex_lock( &mutexProduce); running = false; - pthread_cond_broadcast( &cond_start ); - pthread_mutex_unlock( &mutex_start ); + pthread_cond_broadcast( &condProduce); + pthread_mutex_unlock( &mutexProduce); // wait for all the threads to finish for ( i = 0; i < numSinks; ++i ) { pthread_join( threads[i].thread, 0); } + pthread_attr_destroy( &threadAttr); Connector::close(); } diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index 00ed04f..eef1345 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector * Marks if the thread has processed the last batch * of data. */ - int isDone; + bool isDone; /** * A flag to show that the sink should be made to cut in the @@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector this->ixSink = 0; this->thread = 0; this->accepting = false; - this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE + this->isDone = false; this->cut = false; } @@ -138,34 +138,16 @@ class MultiThreadedConnector : public virtual Connector threadFunction( void * param ); }; - /* mutex and cond variable for signaling new data - * the consumers wait for this + /** + * The mutex of this object. */ - pthread_mutex_t mutex_start; - pthread_cond_t cond_start; // producer sets this - - /* mutex and cond variable for signaling that a single thread has - * finished working on a block of data - * the producer waits for this, then checks if everyone is done - * and maybe waits again until everyone has finished on the data + pthread_mutex_t mutexProduce; + + /** + * The conditional variable for presenting new data. */ - pthread_mutex_t mutex_done; - pthread_cond_t cond_done; // consumer sets this - - /* mutex on number of consumers not listening yet to the producer - * this is to prevent a race during startup - * The producer should only signal the consumers when it knows - * that all consumers are waiting on the condition var to change - * not before, because a consumer might mis the signal and not start - * which would also mean that it will not finish, thereby blocking - * the producer - */ - - pthread_mutex_t mutex_number_not_listening_yet; - // when this is 0 all consumers are - // ready to take commands from the producer - int number_not_listening_yet; - + pthread_cond_t condProduce; + /** * The thread attributes. */ @@ -177,7 +159,7 @@ class MultiThreadedConnector : public virtual Connector ThreadData * threads; /** - * Signal if we're running or not running + * Signal if we're running or not, so the threads no if to stop. */ bool running; @@ -300,7 +282,7 @@ class MultiThreadedConnector : public virtual Connector virtual MultiThreadedConnector & operator= ( const MultiThreadedConnector & connector ) throw ( Exception ); - + /** * Open the connector. Opens the Source and the Sinks if necessary. * @@ -332,7 +314,7 @@ class MultiThreadedConnector : public virtual Connector * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned long + virtual unsigned int transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec,