diff --git a/darkice/trunk/ChangeLog b/darkice/trunk/ChangeLog index b5fce78..d7944e7 100644 --- a/darkice/trunk/ChangeLog +++ b/darkice/trunk/ChangeLog @@ -4,6 +4,7 @@ DarkIce next release o bug fix: maximum bitrate setting fixed for Ogg Vorbis streams o changed internals so that now each encoding/server connection is a separate thread + o when a connection is dropped, DarkIce tries to reconnect, indefinitely 20-08-2002: DarkIce 0.11 released diff --git a/darkice/trunk/TODO b/darkice/trunk/TODO index e32875b..1da3104 100644 --- a/darkice/trunk/TODO +++ b/darkice/trunk/TODO @@ -1,6 +1,5 @@ o change Ref to follow inheritance o make a master config file, and a small one ? -o reconnect to server if connection is dropped o add support for multiple servers for one stream ? o libtoolize ? o revisit real-time scheduling diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index cf9f4fd..39ad727 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -57,8 +57,6 @@ MultiThreadedConnector :: init ( void ) throw ( Exception ) { pthread_mutex_init( &mutexProduce, 0); pthread_cond_init( &condProduce, 0); - pthread_mutex_init( &mutexConsume, 0); - pthread_cond_init( &condConsume, 0); threads = 0; } @@ -74,8 +72,6 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy( &condConsume); - pthread_mutex_destroy( &mutexConsume); pthread_cond_destroy( &condProduce); pthread_mutex_destroy( &mutexProduce); } @@ -91,8 +87,6 @@ MultiThreadedConnector :: MultiThreadedConnector ( { mutexProduce = connector.mutexProduce; condProduce = connector.condProduce; - mutexConsume = connector.mutexConsume; - condConsume = connector.condConsume; if ( threads ) { delete[] threads; @@ -116,8 +110,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) mutexProduce = connector.mutexProduce; condProduce = connector.condProduce; - mutexConsume = connector.mutexConsume; - condConsume = connector.condConsume; if ( threads ) { delete[] threads; @@ -152,17 +144,16 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) threads = new ThreadData[numSinks]; for ( i = 0; i < numSinks; ++i ) { - reportEvent( 8, "Connector :: open starting thread ", i); ThreadData * threadData = threads + i; threadData->connector = this; threadData->ixSink = i; + threadData->accepting = true; threadData->isDone = true; if ( pthread_create( &(threadData->thread), &threadAttr, ThreadData::threadFunction, threadData ) ) { - reportEvent( 8, "Connector :: open couldn't start thread ", i); break; } } @@ -220,11 +211,8 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, if ( source->canRead( sec, usec) ) { unsigned int i; -reportEvent( 8, "Connector :: transfer, mutexProduce, enter lock"); pthread_mutex_lock( &mutexProduce); -reportEvent( 8, "Connector :: transfer, mutexProduce, lock"); dataSize = source->read( dataBuffer, bufSize); -reportEvent( 8, "Connector :: transfer, read", dataSize); b += dataSize; /* check for EOF */ @@ -234,46 +222,27 @@ reportEvent( 8, "Connector :: transfer, read", dataSize); } for ( i = 0; i < numSinks; ++i ) { - threads[i].isDone = false; + if ( threads[i].accepting ) { + threads[i].isDone = false; + } } /* tell sink threads that there is some data available */ -reportEvent( 8, "Connector :: transfer, mutexProduce, broadcast"); pthread_cond_broadcast( &condProduce); -reportEvent( 8, "Connector :: transfer, mutexProduce, broadcasted"); /* wait for all sink threads to get done with this data */ while ( true ) { - for ( i = 0; i < numSinks && threads[i].isDone; ++i ); + for ( i = 0; i < numSinks; ++i ) { + if ( threads[i].accepting && !threads[i].isDone ) { + break; + } + } if ( i == numSinks ) { break; } -reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); pthread_cond_wait( &condProduce, &mutexProduce); -reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); } pthread_mutex_unlock( &mutexProduce); -reportEvent( 8, "Connector :: transfer, mutexProduce, exit lock"); - - /* wait for all sink threads to process the presented data */ -/* -reportEvent( 8, "Connector :: transfer, mutexConsume, enter lock"); - pthread_mutex_lock( &mutexConsume); -reportEvent( 8, "Connector :: transfer, mutexConsume, lock"); - while ( true ) { - for ( i = 0; i < numSinks && threads[i].isDone; ++i ); - if ( i == numSinks ) { - break; - } -reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); - pthread_cond_wait( &condConsume, &mutexConsume); -reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); - } - consumeCount = 0; -reportEvent( 8, "Connector :: transfer, mutexConsume, unlock"); - pthread_mutex_unlock( &mutexConsume); -reportEvent( 8, "Connector :: transfer, mutexConsume, exit lock"); -*/ } else { reportEvent( 3, "Connector :: transfer, can't read"); break; @@ -297,49 +266,48 @@ MultiThreadedConnector :: sinkThread( int ixSink ) while ( running ) { /* wait for some data to become available */ -reportEvent( 8, "Connector :: sinkThread, mutexProduce, enter lock ", ixSink); pthread_mutex_lock( &mutexProduce); -reportEvent( 8, "Connector :: sinkThread, mutexProduce, lock ", ixSink); while ( running && threadData->isDone ) { -reportEvent( 8, "Connector :: sinkThread, mutexProduce, wait ", ixSink); pthread_cond_wait( &condProduce, &mutexProduce); -reportEvent( 8, "Connector :: sinkThread, mutexProduce, wake ", ixSink); } if ( !running ) { break; } if ( sink->canWrite( 0, 0) ) { - unsigned int written; - written = sink->write( dataBuffer, dataSize); -/* - unsigned int written; - written = sink->write( dataBuffer, dataSize); -reportEvent( 8, "Connector :: sinkThread, written ", written); -*/ + try { + sink->write( dataBuffer, dataSize); + } catch ( Exception & e ) { + // something wrong. don't accept more data, try to + // reopen the sink + threadData->accepting = false; + pthread_mutex_unlock( &mutexProduce); + + do { + try { + sink->close(); + sink->open(); + } catch ( Exception & e ) { + // don't care, just try and try again + } + // TODO: wait some time every cycle, so as not to + // overload the system + } while ( !sink->isOpen() ); + + // all OK, accept data, act as if we're done + pthread_mutex_lock( &mutexProduce); + threadData->accepting = true; + threadData->isDone = true; + } } else { -reportEvent( 3, "Connector :: sinkThread MAYDAY MAYDAY can't write ", ixSink); + reportEvent( 4, + "MultiThreadedConnector :: sinkThread can't write ", + ixSink); /* don't care if we can't write */ } threadData->isDone = true; -reportEvent( 8, "Connector :: sinkThread, condProduce, broadcast ", ixSink); pthread_cond_broadcast( &condProduce); -reportEvent( 8, "Connector :: sinkThread, condProduce, broadcasted ", ixSink); -reportEvent( 8, "Connector :: sinkThread, mutexProduce, unlock", ixSink); pthread_mutex_unlock( &mutexProduce); -reportEvent( 8, "Connector :: sinkThread, mutexProduce, exit lock", ixSink); - - /* signal that we have read the data */ -/* -reportEvent( 8, "Connector :: sinkThread, mutexConsume, enter lock"); - pthread_mutex_lock( &mutexConsume); -reportEvent( 8, "Connector :: sinkThread, mutexConsume, lock"); -reportEvent( 8, "Connector :: sinkThread, mutexConsume, signal"); - pthread_cond_signal( &condConsume); -reportEvent( 8, "Connector :: sinkThread, mutexConsume, unlock"); - pthread_mutex_unlock( &mutexConsume); -reportEvent( 8, "Connector :: sinkThread, mutexConsume, exit lock"); -*/ } } @@ -375,17 +343,6 @@ void * MultiThreadedConnector :: ThreadData :: threadFunction( void * param ) { ThreadData * threadData = (ThreadData*) param; -threadData->connector->reportEvent( 8, - "ThreadData :: threadFunction thread starts ", - (threadData->ixSink) ); -threadData->connector->reportEvent( 8, - "ThreadData :: threadFunction numSinks ", - threadData->connector->numSinks); -for ( unsigned int i = 0; i < threadData->connector->numSinks; ++i ) { -threadData->connector->reportEvent( 8, - "ThreadData :: threadFunction thread ", i, - " ixSink ", threadData->connector->threads[i].ixSink); -} threadData->connector->sinkThread( threadData->ixSink); return 0; @@ -397,6 +354,10 @@ threadData->connector->reportEvent( 8, $Source$ $Log$ + Revision 1.2 2002/10/19 13:35:21 darkeye + when a connection is dropped, DarkIce tries to reconnect, indefinitely + removed extreme event reporting for thread-related events + Revision 1.1 2002/10/19 12:25:47 darkeye changed internals so that now each encoding/server connection is a separate thread diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index b234406..55da43c 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -94,6 +94,11 @@ class MultiThreadedConnector : public virtual Connector */ pthread_t thread; + /** + * Marks if the thread is accepting data. + */ + bool accepting; + /** * Marks if the thread has processed the last batch * of data. @@ -109,6 +114,7 @@ class MultiThreadedConnector : public virtual Connector this->connector = 0; this->ixSink = 0; this->thread = 0; + this->accepting = false; this->isDone = false; } @@ -133,16 +139,6 @@ class MultiThreadedConnector : public virtual Connector */ pthread_cond_t condProduce; - /** - * The mutex of this object. - */ - pthread_mutex_t mutexConsume; - - /** - * The conditional variable for consuming data. - */ - pthread_cond_t condConsume; - /** * The thread attributes. */ @@ -332,6 +328,10 @@ class MultiThreadedConnector : public virtual Connector $Source$ $Log$ + Revision 1.2 2002/10/19 13:35:21 darkeye + when a connection is dropped, DarkIce tries to reconnect, indefinitely + removed extreme event reporting for thread-related events + Revision 1.1 2002/10/19 12:25:47 darkeye changed internals so that now each encoding/server connection is a separate thread