diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index 39ad727..76f7eef 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -158,11 +158,11 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) } } - /* if could not create all, delete the ones created */ + // if could not create all, delete the ones created if ( i < numSinks ) { unsigned int j; - /* signal to stop for all running threads */ + // signal to stop for all running threads pthread_mutex_lock( &mutexProduce); running = false; pthread_cond_broadcast( &condProduce); @@ -215,28 +215,23 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, dataSize = source->read( dataBuffer, bufSize); b += dataSize; - /* check for EOF */ + // check for EOF if ( dataSize == 0 ) { - reportEvent( 3, "Connector :: transfer, EOF"); + reportEvent( 3, "MultiThreadedConnector :: transfer, EOF"); + pthread_mutex_unlock( &mutexProduce); break; } for ( i = 0; i < numSinks; ++i ) { - if ( threads[i].accepting ) { - threads[i].isDone = false; - } + threads[i].isDone = false; } - /* tell sink threads that there is some data available */ + // tell sink threads that there is some data available pthread_cond_broadcast( &condProduce); - /* wait for all sink threads to get done with this data */ + // wait for all sink threads to get done with this data while ( true ) { - for ( i = 0; i < numSinks; ++i ) { - if ( threads[i].accepting && !threads[i].isDone ) { - break; - } - } + for ( i = 0; i < numSinks && threads[i].isDone; ++i ); if ( i == numSinks ) { break; } @@ -244,7 +239,7 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, } pthread_mutex_unlock( &mutexProduce); } else { - reportEvent( 3, "Connector :: transfer, can't read"); + reportEvent( 3, "MultiThreadedConnector :: transfer, can't read"); break; } } @@ -265,49 +260,46 @@ MultiThreadedConnector :: sinkThread( int ixSink ) Sink * sink = sinks[ixSink].get(); while ( running ) { - /* wait for some data to become available */ + // wait for some data to become available pthread_mutex_lock( &mutexProduce); while ( running && threadData->isDone ) { pthread_cond_wait( &condProduce, &mutexProduce); } if ( !running ) { + pthread_mutex_unlock( &mutexProduce); break; } - if ( sink->canWrite( 0, 0) ) { - try { - sink->write( dataBuffer, dataSize); - } catch ( Exception & e ) { - // something wrong. don't accept more data, try to - // reopen the sink - threadData->accepting = false; - pthread_mutex_unlock( &mutexProduce); - - do { - try { - sink->close(); - sink->open(); - } catch ( Exception & e ) { - // don't care, just try and try again - } - // TODO: wait some time every cycle, so as not to - // overload the system - } while ( !sink->isOpen() ); - - // all OK, accept data, act as if we're done - pthread_mutex_lock( &mutexProduce); - threadData->accepting = true; - threadData->isDone = true; + if ( threadData->accepting ) { + if ( sink->canWrite( 0, 0) ) { + try { + sink->write( dataBuffer, dataSize); + } catch ( Exception & e ) { + // something wrong. don't accept more data, try to + // reopen the sink next time around + threadData->accepting = false; + } + } else { + reportEvent( 4, + "MultiThreadedConnector :: sinkThread can't write ", + ixSink); + // don't care if we can't write } - } else { - reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", - ixSink); - /* don't care if we can't write */ } threadData->isDone = true; pthread_cond_broadcast( &condProduce); pthread_mutex_unlock( &mutexProduce); + + if ( !threadData->accepting ) { + // if we're not accepting, try to reopen the sink + try { + sink->close(); + sink->open(); + threadData->accepting = sink->isOpen(); + } catch ( Exception & e ) { + // don't care, just try and try again + } + } } } @@ -321,12 +313,13 @@ MultiThreadedConnector :: close ( void ) throw ( Exception ) { unsigned int i; - /* signal to stop for all threads */ + // signal to stop for all threads pthread_mutex_lock( &mutexProduce); running = false; pthread_cond_broadcast( &condProduce); pthread_mutex_unlock( &mutexProduce); + // wait for all the threads to finish for ( i = 0; i < numSinks; ++i ) { pthread_join( threads[i].thread, 0); } @@ -354,6 +347,9 @@ MultiThreadedConnector :: ThreadData :: threadFunction( void * param ) $Source$ $Log$ + Revision 1.3 2002/10/20 20:43:17 darkeye + more graceful reconnect + Revision 1.2 2002/10/19 13:35:21 darkeye when a connection is dropped, DarkIce tries to reconnect, indefinitely removed extreme event reporting for thread-related events