diff --git a/darkice/trunk/src/Connector.cpp b/darkice/trunk/src/Connector.cpp index f2b9139..7a34bc7 100644 --- a/darkice/trunk/src/Connector.cpp +++ b/darkice/trunk/src/Connector.cpp @@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long Connector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/Connector.h b/darkice/trunk/src/Connector.h index 5183a38..2b77427 100644 --- a/darkice/trunk/src/Connector.h +++ b/darkice/trunk/src/Connector.h @@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index a177b89..c426c93 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -68,9 +68,10 @@ void MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) { this->reconnect = reconnect; - - pthread_mutex_init( &mutexProduce, 0); - pthread_cond_init( &condProduce, 0); + pthread_mutex_init( &mutex_start, 0); + pthread_cond_init( &cond_start, 0); + pthread_mutex_init( &mutex_done, 0); + pthread_cond_init( &cond_done, 0); threads = 0; } @@ -86,8 +87,10 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy( &condProduce); - pthread_mutex_destroy( &mutexProduce); + pthread_cond_destroy( &cond_start); + pthread_mutex_destroy( &mutex_start); + pthread_cond_destroy( &cond_done); + pthread_mutex_destroy( &mutex_done); } @@ -99,9 +102,11 @@ MultiThreadedConnector :: MultiThreadedConnector ( throw ( Exception ) : Connector( connector) { - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -112,7 +117,6 @@ MultiThreadedConnector :: MultiThreadedConnector ( } } - /*------------------------------------------------------------------------------ * Assignment operator *----------------------------------------------------------------------------*/ @@ -123,9 +127,11 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) if ( this != &connector ) { Connector::operator=( connector); - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -139,7 +145,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) return *this; } - /*------------------------------------------------------------------------------ * Open the source and all the sinks if needed * Create the sink threads @@ -173,7 +178,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) threadData->connector = this; threadData->ixSink = i; threadData->accepting = true; - threadData->isDone = true; + threadData->isDone = 1; // 1==STOP, activate thread in transfer() if ( pthread_create( &(threadData->thread), &threadAttr, ThreadData::threadFunction, @@ -187,10 +192,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) unsigned int j; // signal to stop for all running threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start); + pthread_mutex_unlock( &mutex_start); for ( j = 0; j < i; ++j ) { pthread_join( threads[j].thread, 0); @@ -209,14 +214,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long MultiThreadedConnector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, unsigned int usec ) throw ( Exception ) { - unsigned int b; + unsigned long byteCounter; // when we reach byteCounter thread will end if ( numSinks == 0 ) { return 0; @@ -229,47 +234,73 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, dataBuffer = new unsigned char[bufSize]; dataSize = 0; - reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); - - for ( b = 0; !bytes || b < bytes; ) { - if ( source->canRead( sec, usec) ) { - unsigned int i; - - pthread_mutex_lock( &mutexProduce); - dataSize = source->read( dataBuffer, bufSize); - b += dataSize; + /* if bytes==0 transfer until end of program, + * if bytes>0 transfer upto number of bytes + */ + reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes); + byteCounter = 0; // init, no data bytes sent yet + + + while (running && (bytes == 0 || byteCounter < bytes)) { + if (source->canRead(sec, usec)) { + unsigned int i; + dataSize = source->read(dataBuffer, bufSize); + byteCounter += dataSize; + // check for EOF - if ( dataSize == 0 ) { - reportEvent( 3, "MultiThreadedConnector :: transfer, EOF"); - pthread_mutex_unlock( &mutexProduce); + if (dataSize == 0) { + reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); break; } - for ( i = 0; i < numSinks; ++i ) { - threads[i].isDone = false; + pthread_mutex_lock(&mutex_start); + for (i = 0; i < numSinks; ++i) { + if (threads[i].accepting) + threads[i].isDone = 0; // consumers => RUN } - - // tell sink threads that there is some data available - pthread_cond_broadcast( &condProduce); + pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again // wait for all sink threads to get done with this data - while ( true ) { - for ( i = 0; i < numSinks && threads[i].isDone; ++i ); - if ( i == numSinks ) { + // we do not spin here, we just wait for an event from the consumers + pthread_mutex_lock(&mutex_done); + pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable + + while ( 1 ) { + int rc = 0; + // wait for condition : releases mutex so other thread can change condition + rc = pthread_cond_wait(&cond_done, &mutex_done); + // mutex is locked again + if (rc != 0) { + reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail"); break; } - pthread_cond_wait( &condProduce, &mutexProduce); + + int acceptor_count=0; + int stopped_count=0; + for (i = 0; i < numSinks; ++i) { + if (threads[i].accepting) { + acceptor_count++; // number of accepting threads + if (threads[i].isDone == 1) + stopped_count++; // number of accepting threads which have STOP + } + } + // break when all accepting threads are done + if (acceptor_count == stopped_count) { + break; + } + // at least one thread has not set the STOP flag yet } - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock(&mutex_done); + // at this point all consumers are done with the block } else { - reportEvent( 3, "MultiThreadedConnector :: transfer, can't read"); + reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); break; - } + } } delete[] dataBuffer; - return b; + return byteCounter; } @@ -278,70 +309,114 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, * Read the presented data *----------------------------------------------------------------------------*/ void -MultiThreadedConnector :: sinkThread( int ixSink ) +MultiThreadedConnector::sinkThread(int ixSink) { - ThreadData * threadData = &threads[ixSink]; - Sink * sink = sinks[ixSink].get(); + ThreadData * threadData = &threads[ixSink]; + Sink * sink = sinks[ixSink].get( ); - while ( running ) { + while ( running ) + { // wait for some data to become available - pthread_mutex_lock( &mutexProduce); - while ( running && threadData->isDone ) { - pthread_cond_wait( &condProduce, &mutexProduce); + // producer sets isDone==0 when consumer can continue + // producer sets isDone==2 or running==0 to request termination + pthread_mutex_lock( &mutex_start ); // LOCK + int rc=0; + while ( (rc==0) && running && (threadData->isDone==1) ) + { + // wait for condition, releases lock + rc = pthread_cond_wait( &cond_start, &mutex_start ); + // we hold the lock again } - if ( !running ) { - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock( &mutex_start ); // UNLOCK + + // something wrong or signal to quit detected + // break out of this loop, will end the thread + if ( running==false || threadData->isDone==2 || rc != 0 ) break; - } - if ( threadData->cut) { - sink->cut(); + if ( threadData->cut ) + { + sink->cut( ); threadData->cut = false; } - if ( threadData->accepting ) { - if ( sink->canWrite( 0, 0) ) { - try { - sink->write( dataBuffer, dataSize); - } catch ( Exception & e ) { + if ( threadData->accepting ) + { + if ( sink->canWrite( 0, 0 ) ) + { + try + { + sink->write( dataBuffer, dataSize ); + } catch ( Exception & e ) + { // something wrong. don't accept more data, try to // reopen the sink next time around threadData->accepting = false; } - } else { + } + else + { reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", - ixSink); + "MultiThreadedConnector :: sinkThread can't write ", + ixSink ); // don't care if we can't write } } - threadData->isDone = true; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); if ( !threadData->accepting ) { if ( reconnect ) { reportEvent( 4, - "MultiThreadedConnector :: sinkThread reconnecting ", - ixSink); + "MultiThreadedConnector :: sinkThread reconnecting ", + ixSink ); // if we're not accepting, try to reopen the sink try { - sink->close(); - Util::sleep(1L, 0L); - sink->open(); - sched_yield(); - threadData->accepting = sink->isOpen(); - } catch ( Exception & e ) { + sink->close( ); + Util::sleep( 1L, 0L ); + sink->open( ); + sched_yield( ); + threadData->accepting = sink->isOpen( ); + } catch ( Exception & e ) { // don't care, just try and try again } - } else { - // if !reconnect, just stop the connector - running = false; } + else { + // if !reconnect, just stop the connector + // running = false; /* kill the whole application */ + // tell that we used the databuffer, do not wait for us anymore + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // 1==STOP + pthread_mutex_unlock( &mutex_done ); + reportEvent( 4, + "MultiThreadedConnector :: sinkThread no reconnect? ", + ixSink ); + try + { + threadData->accepting = false; + sink->close( ); + } catch ( Exception & e ) + { + // don't care, just try and try again + reportEvent( 9, + "MultiThreadedConnector :: sinkThread do not care2 ", + ixSink ); + } + } } - } -} + + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // producer will check this flag + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + + } /* is running */ + /* just make sure nobody will be waiting for us when we terminate */ + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // STOP + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + +} /*------------------------------------------------------------------------------ * Signal to each sink to cut what they've done so far, and start anew. @@ -369,16 +444,15 @@ MultiThreadedConnector :: close ( void ) throw ( Exception ) unsigned int i; // signal to stop for all threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start ); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start ); + pthread_mutex_unlock( &mutex_start ); // wait for all the threads to finish for ( i = 0; i < numSinks; ++i ) { pthread_join( threads[i].thread, 0); } - pthread_attr_destroy( &threadAttr); Connector::close(); } diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index eef1345..1add1c9 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector * Marks if the thread has processed the last batch * of data. */ - bool isDone; + int isDone; /** * A flag to show that the sink should be made to cut in the @@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector this->ixSink = 0; this->thread = 0; this->accepting = false; - this->isDone = false; + this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE this->cut = false; } @@ -138,16 +138,20 @@ class MultiThreadedConnector : public virtual Connector threadFunction( void * param ); }; - /** - * The mutex of this object. + /* mutex and cond variable for signaling new data + * the consumers wait for this */ - pthread_mutex_t mutexProduce; - - /** - * The conditional variable for presenting new data. + pthread_mutex_t mutex_start; + pthread_cond_t cond_start; // producer sets this + + /* mutex and cond variable for signaling that a single thread has + * finished working on a block of data + * the producer waits for this, then checks if everyone is done + * and maybe waits again until everyone has finished on the data */ - pthread_cond_t condProduce; - + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; // consumer sets this + /** * The thread attributes. */ @@ -159,7 +163,7 @@ class MultiThreadedConnector : public virtual Connector ThreadData * threads; /** - * Signal if we're running or not, so the threads no if to stop. + * Signal if we're running or not running */ bool running; @@ -282,7 +286,7 @@ class MultiThreadedConnector : public virtual Connector virtual MultiThreadedConnector & operator= ( const MultiThreadedConnector & connector ) throw ( Exception ); - + /** * Open the connector. Opens the Source and the Sinks if necessary. * @@ -314,7 +318,7 @@ class MultiThreadedConnector : public virtual Connector * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec,