From 43d89e279627ca131249a2eb6e0b1c454a3ac097 Mon Sep 17 00:00:00 2001 From: "oetelaar.automatisering@gmail.com" Date: Thu, 11 Apr 2013 18:52:57 +0000 Subject: [PATCH] - extra check in castsink prevent null deref - fix typos - change bytecounter (size limit) to long instead of int - fix some string/buffer strangeness in icecast2.cpp - increase ringbuffer size from 2 to 5 seconds in JackDspSource.cpp - prevent jack killing us on buffer overflow, we just report and continue - new producer/consumer scheme in MultiThreadedConnector.cpp it now runs parallel really - some compiler warnings fixed --- darkice/trunk/ChangeLog | 13 + darkice/trunk/src/CastSink.h | 6 +- darkice/trunk/src/Connector.cpp | 4 +- darkice/trunk/src/Connector.h | 2 +- darkice/trunk/src/IceCast2.cpp | 31 +-- darkice/trunk/src/JackDspSource.cpp | 45 ++-- darkice/trunk/src/MultiThreadedConnector.cpp | 243 ++++++++++++------- darkice/trunk/src/MultiThreadedConnector.h | 30 ++- darkice/trunk/src/aflibConverter.cc | 2 +- darkice/trunk/src/aflibConverter.h | 2 +- darkice/trunk/src/aflibDebug.cc | 3 +- 11 files changed, 242 insertions(+), 139 deletions(-) diff --git a/darkice/trunk/ChangeLog b/darkice/trunk/ChangeLog index 87aeb2a..6254a00 100644 --- a/darkice/trunk/ChangeLog +++ b/darkice/trunk/ChangeLog @@ -1,4 +1,17 @@ next version + o Fix 'Ring Ruffer' reports. + - Increased buffer for jack to 5 seconds + - prevent darkice termination by jack, report no fatal problem when we + have a ringbuffer overflow, can happen during startup + If we can not handle input audio fast enough we just ignore the buffer + and skip it, and just report it. + - new multithreaded connector code, now handles encoders in parallel + and does not spin waiting, cpu load will be very much lower now + Codes uses 2 condition variables to report data availability and + consumer thread availability + - Hopes are that glitching reports will be a thing of the past + - minor compiler warnings fixed + (Fix by Edwin van den Oetelaar) o Issue #56: Wrong icecast2 password isn't properly reported, fixed. thanks to Filipe Roque o Issue #57: BufferedSink makes streams invalid, fixed. diff --git a/darkice/trunk/src/CastSink.h b/darkice/trunk/src/CastSink.h index e070955..366e7bc 100644 --- a/darkice/trunk/src/CastSink.h +++ b/darkice/trunk/src/CastSink.h @@ -290,7 +290,11 @@ class CastSink : public Sink, public virtual Reporter inline virtual bool isOpen ( void ) const throw () { - return getSink()->isOpen(); + Sink *s = getSink(); + if (s) + return getSink()->isOpen(); + else + return false; } /** diff --git a/darkice/trunk/src/Connector.cpp b/darkice/trunk/src/Connector.cpp index ebb3403..7a34bc7 100644 --- a/darkice/trunk/src/Connector.cpp +++ b/darkice/trunk/src/Connector.cpp @@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long Connector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, @@ -271,7 +271,7 @@ Connector :: transfer ( unsigned long bytes, unsigned char * buf = new unsigned char[bufSize]; - reportEvent( 6, "Connector :: tranfer, bytes", bytes); + reportEvent( 6, "Connector :: transfer, bytes", bytes); for ( b = 0; !bytes || b < bytes; ) { unsigned int d = 0; diff --git a/darkice/trunk/src/Connector.h b/darkice/trunk/src/Connector.h index 5183a38..2b77427 100644 --- a/darkice/trunk/src/Connector.h +++ b/darkice/trunk/src/Connector.h @@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/IceCast2.cpp b/darkice/trunk/src/IceCast2.cpp index 4759760..24b5f21 100644 --- a/darkice/trunk/src/IceCast2.cpp +++ b/darkice/trunk/src/IceCast2.cpp @@ -70,12 +70,6 @@ static const char fileid[] = "$Id$"; -/*------------------------------------------------------------------------------ - * Size of string conversion buffer - *----------------------------------------------------------------------------*/ -#define STRBUF_SIZE 32 - - /*------------------------------------------------------------------------------ * Expected positive response from server begins like this. *----------------------------------------------------------------------------*/ @@ -125,7 +119,8 @@ IceCast2 :: sendLogin ( void ) throw ( Exception ) Sink * sink = getSink(); Source * source = getSocket(); const char * str; - char resp[STRBUF_SIZE]; + const int buflen = 1024; // some small buffer size + char resp[buflen]; // a little buffer unsigned int len; unsigned int lenExpected; @@ -178,7 +173,7 @@ IceCast2 :: sendLogin ( void ) throw ( Exception ) sink->write( str, strlen(str)); { // send source: encoded as base64 - char * source = "source:"; + const char * source = "source:"; const char * pwd = getPassword(); char * tmp = new char[Util::strLen(source) + Util::strLen(pwd) + 1]; @@ -197,10 +192,7 @@ IceCast2 :: sendLogin ( void ) throw ( Exception ) // send the ice- headers str = "\nice-bitrate: "; sink->write( str, strlen( str)); - if ( log10(getBitRate()) >= (STRBUF_SIZE-2) ) { - throw Exception( __FILE__, __LINE__, - "bitrate does not fit string buffer", getBitRate()); - } + sprintf( resp, "%d", getBitRate()); sink->write( resp, strlen( resp)); @@ -243,10 +235,10 @@ IceCast2 :: sendLogin ( void ) throw ( Exception ) // read the response, expected response begins with responseOK lenExpected = Util::strLen( responseOK); - if ( (len = source->read( resp, STRBUF_SIZE-1)) < lenExpected ) { - return false; + if ( (len = source->read( resp, buflen )) < lenExpected ) { + return false; // short read, no need to continue } - resp[lenExpected] = 0; + resp[lenExpected] = '\x00'; // end string, truncate to expected length reportEvent(5,resp); @@ -261,13 +253,16 @@ IceCast2 :: sendLogin ( void ) throw ( Exception ) } if ( !Util::strEq( resp, responseOK) ) { - return false; + // some unexpected response from server + throw Exception( __FILE__, __LINE__, + "Icecast2 - Unexpected response from server"); } // suck anything that the other side has to say while ( source->canRead( 0, 0) && - (len = source->read( resp, STRBUF_SIZE-1)) ); - + (len = source->read( resp, buflen ))); + + // all is well, we are connected return true; } diff --git a/darkice/trunk/src/JackDspSource.cpp b/darkice/trunk/src/JackDspSource.cpp index adfb83e..7ff1d02 100644 --- a/darkice/trunk/src/JackDspSource.cpp +++ b/darkice/trunk/src/JackDspSource.cpp @@ -259,10 +259,12 @@ JackDspSource :: open ( void ) throw ( Exception ) // Create a ring buffer for each channel - rb_size = 2 - * jack_get_sample_rate(client) - * sizeof (jack_default_audio_sample_t); - for (c=0; cclient == NULL) { return 0; } - + /* copy data to ringbuffer; one per channel */ - for (c=0; c < self->getChannel(); c++) { - char *buf = (char*)jack_port_get_buffer(self->ports[c], nframes); - size_t len = jack_ringbuffer_write(self->rb[c], buf, to_write); - if (len < to_write) { - Reporter::reportEvent( 1, "failed to write to ring ruffer"); - return 1; - } + for (c=0; c < self->getChannel(); c++) { + /* check space */ + size_t len; + if (jack_ringbuffer_write_space(self->rb[c]) < to_write) { + /* buffer is overflowing, skip the incoming data */ + jack_ringbuffer_write_advance(self->rb[c], to_write); + /* prevent blocking the ring buffer by updating internal pointers + * jack will now not terminate on xruns + */ + Reporter::reportEvent( 1, "ring buffer full, skipping data"); + /* We do not return error to jack callback handler and keep going */ + } else { + /* buffer has space, put data into ringbuffer */ + len = jack_ringbuffer_write(self->rb[c], (char *) jack_port_get_buffer( + self->ports[c], nframes), to_write); + if (len != to_write) + Reporter::reportEvent( 1, "failed to write to ring buffer (can not happen)"); + } } // Success diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp index a177b89..3e2ef41 100644 --- a/darkice/trunk/src/MultiThreadedConnector.cpp +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -68,9 +68,10 @@ void MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) { this->reconnect = reconnect; - - pthread_mutex_init( &mutexProduce, 0); - pthread_cond_init( &condProduce, 0); + pthread_mutex_init( &mutex_start, 0); + pthread_cond_init( &cond_start, 0); + pthread_mutex_init( &mutex_done, 0); + pthread_cond_init( &cond_done, 0); threads = 0; } @@ -86,8 +87,10 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception ) threads = 0; } - pthread_cond_destroy( &condProduce); - pthread_mutex_destroy( &mutexProduce); + pthread_cond_destroy( &cond_start); + pthread_mutex_destroy( &mutex_start); + pthread_cond_destroy( &cond_done); + pthread_mutex_destroy( &mutex_done); } @@ -99,9 +102,11 @@ MultiThreadedConnector :: MultiThreadedConnector ( throw ( Exception ) : Connector( connector) { - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -112,7 +117,6 @@ MultiThreadedConnector :: MultiThreadedConnector ( } } - /*------------------------------------------------------------------------------ * Assignment operator *----------------------------------------------------------------------------*/ @@ -123,9 +127,11 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) if ( this != &connector ) { Connector::operator=( connector); - reconnect = connector.reconnect; - mutexProduce = connector.mutexProduce; - condProduce = connector.condProduce; + reconnect = connector.reconnect; + mutex_start = connector.mutex_start; + cond_start = connector.cond_start; + mutex_done = connector.mutex_done; + cond_done = connector.cond_done; if ( threads ) { delete[] threads; @@ -139,7 +145,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) return *this; } - /*------------------------------------------------------------------------------ * Open the source and all the sinks if needed * Create the sink threads @@ -173,7 +178,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) threadData->connector = this; threadData->ixSink = i; threadData->accepting = true; - threadData->isDone = true; + threadData->isDone = 1; // 1==STOP, activate thread in transfer() if ( pthread_create( &(threadData->thread), &threadAttr, ThreadData::threadFunction, @@ -187,10 +192,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) unsigned int j; // signal to stop for all running threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start); + pthread_mutex_unlock( &mutex_start); for ( j = 0; j < i; ++j ) { pthread_join( threads[j].thread, 0); @@ -209,14 +214,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception ) /*------------------------------------------------------------------------------ * Transfer some data from the source to the sink *----------------------------------------------------------------------------*/ -unsigned int +unsigned long MultiThreadedConnector :: transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, unsigned int usec ) throw ( Exception ) { - unsigned int b; + unsigned long byteCounter; // when we reach byteCounter thread will end if ( numSinks == 0 ) { return 0; @@ -229,47 +234,72 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, dataBuffer = new unsigned char[bufSize]; dataSize = 0; - reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); - - for ( b = 0; !bytes || b < bytes; ) { - if ( source->canRead( sec, usec) ) { - unsigned int i; - - pthread_mutex_lock( &mutexProduce); - dataSize = source->read( dataBuffer, bufSize); - b += dataSize; + /* if bytes==0 transfer until end of program, + * if bytes>0 transfer upto number of bytes + */ + reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes); + byteCounter = 0; // init, no data bytes sent yet + + + while (running && (bytes == 0 || byteCounter < bytes)) { + if (source->canRead(sec, usec)) { + unsigned int i; + dataSize = source->read(dataBuffer, bufSize); + byteCounter += dataSize; + // check for EOF - if ( dataSize == 0 ) { - reportEvent( 3, "MultiThreadedConnector :: transfer, EOF"); - pthread_mutex_unlock( &mutexProduce); + if (dataSize == 0) { + reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); break; } - for ( i = 0; i < numSinks; ++i ) { - threads[i].isDone = false; + pthread_mutex_lock(&mutex_start); + for (i = 0; i < numSinks; ++i) { + if (threads[i].accepting) + threads[i].isDone = 0; // consumers => RUN } - - // tell sink threads that there is some data available - pthread_cond_broadcast( &condProduce); - + pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again + pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable + // wait for all sink threads to get done with this data - while ( true ) { - for ( i = 0; i < numSinks && threads[i].isDone; ++i ); - if ( i == numSinks ) { + // we do not spin here, we just wait for an event from the consumers + pthread_mutex_lock(&mutex_done); + while ( 1 ) { + int rc = 0; + // wait for condition : releases mutex so other thread can change condition + rc = pthread_cond_wait(&cond_done, &mutex_done); + // mutex is locked again + if (rc != 0) { + reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail"); break; } - pthread_cond_wait( &condProduce, &mutexProduce); + + int acceptor_count=0; + int stopped_count=0; + for (i = 0; i < numSinks; ++i) { + if (threads[i].accepting) { + acceptor_count++; // number of accepting threads + if (threads[i].isDone == 1) + stopped_count++; // number of accepting threads which have STOP + } + } + // break when all accepting threads are done + if (acceptor_count == stopped_count) { + break; + } + // at least one thread has not set the STOP flag yet } - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock(&mutex_done); + // at this point all consumers are done with the block } else { - reportEvent( 3, "MultiThreadedConnector :: transfer, can't read"); + reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); break; - } + } } delete[] dataBuffer; - return b; + return byteCounter; } @@ -278,70 +308,114 @@ MultiThreadedConnector :: transfer ( unsigned long bytes, * Read the presented data *----------------------------------------------------------------------------*/ void -MultiThreadedConnector :: sinkThread( int ixSink ) +MultiThreadedConnector::sinkThread(int ixSink) { - ThreadData * threadData = &threads[ixSink]; - Sink * sink = sinks[ixSink].get(); + ThreadData * threadData = &threads[ixSink]; + Sink * sink = sinks[ixSink].get( ); - while ( running ) { + while ( running ) + { // wait for some data to become available - pthread_mutex_lock( &mutexProduce); - while ( running && threadData->isDone ) { - pthread_cond_wait( &condProduce, &mutexProduce); + // producer sets isDone==0 when consumer can continue + // producer sets isDone==2 or running==0 to request termination + pthread_mutex_lock( &mutex_start ); // LOCK + int rc=0; + while ( (rc==0) && running && (threadData->isDone==1) ) + { + // wait for condition, releases lock + rc = pthread_cond_wait( &cond_start, &mutex_start ); + // we hold the lock again } - if ( !running ) { - pthread_mutex_unlock( &mutexProduce); + pthread_mutex_unlock( &mutex_start ); // UNLOCK + + // something wrong or signal to quit detected + // break out of this loop, will end the thread + if ( running==false || threadData->isDone==2 || rc != 0 ) break; - } - if ( threadData->cut) { - sink->cut(); + if ( threadData->cut ) + { + sink->cut( ); threadData->cut = false; } - if ( threadData->accepting ) { - if ( sink->canWrite( 0, 0) ) { - try { - sink->write( dataBuffer, dataSize); - } catch ( Exception & e ) { + if ( threadData->accepting ) + { + if ( sink->canWrite( 0, 0 ) ) + { + try + { + sink->write( dataBuffer, dataSize ); + } catch ( Exception & e ) + { // something wrong. don't accept more data, try to // reopen the sink next time around threadData->accepting = false; } - } else { + } + else + { reportEvent( 4, - "MultiThreadedConnector :: sinkThread can't write ", - ixSink); + "MultiThreadedConnector :: sinkThread can't write ", + ixSink ); // don't care if we can't write } } - threadData->isDone = true; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); if ( !threadData->accepting ) { if ( reconnect ) { reportEvent( 4, - "MultiThreadedConnector :: sinkThread reconnecting ", - ixSink); + "MultiThreadedConnector :: sinkThread reconnecting ", + ixSink ); // if we're not accepting, try to reopen the sink try { - sink->close(); - Util::sleep(1L, 0L); - sink->open(); - sched_yield(); - threadData->accepting = sink->isOpen(); - } catch ( Exception & e ) { + sink->close( ); + Util::sleep( 1L, 0L ); + sink->open( ); + sched_yield( ); + threadData->accepting = sink->isOpen( ); + } catch ( Exception & e ) { // don't care, just try and try again } - } else { - // if !reconnect, just stop the connector - running = false; } + else { + // if !reconnect, just stop the connector + // running = false; /* kill the whole application */ + // tell that we used the databuffer, do not wait for us anymore + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // 1==STOP + pthread_mutex_unlock( &mutex_done ); + reportEvent( 4, + "MultiThreadedConnector :: sinkThread no reconnect? ", + ixSink ); + try + { + threadData->accepting = false; + sink->close( ); + } catch ( Exception & e ) + { + // don't care, just try and try again + reportEvent( 9, + "MultiThreadedConnector :: sinkThread do not care2 ", + ixSink ); + } + } } - } -} + + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // producer will check this flag + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + + } /* is running */ + /* just make sure nobody will be waiting for us when we terminate */ + pthread_mutex_lock( &mutex_done ); + threadData->isDone = 1; // STOP + pthread_cond_signal( &cond_done ); // signal producer + pthread_mutex_unlock( &mutex_done ); + +} /*------------------------------------------------------------------------------ * Signal to each sink to cut what they've done so far, and start anew. @@ -369,16 +443,15 @@ MultiThreadedConnector :: close ( void ) throw ( Exception ) unsigned int i; // signal to stop for all threads - pthread_mutex_lock( &mutexProduce); + pthread_mutex_lock( &mutex_start ); running = false; - pthread_cond_broadcast( &condProduce); - pthread_mutex_unlock( &mutexProduce); + pthread_cond_broadcast( &cond_start ); + pthread_mutex_unlock( &mutex_start ); // wait for all the threads to finish for ( i = 0; i < numSinks; ++i ) { pthread_join( threads[i].thread, 0); } - pthread_attr_destroy( &threadAttr); Connector::close(); } diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h index eef1345..1add1c9 100644 --- a/darkice/trunk/src/MultiThreadedConnector.h +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector * Marks if the thread has processed the last batch * of data. */ - bool isDone; + int isDone; /** * A flag to show that the sink should be made to cut in the @@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector this->ixSink = 0; this->thread = 0; this->accepting = false; - this->isDone = false; + this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE this->cut = false; } @@ -138,16 +138,20 @@ class MultiThreadedConnector : public virtual Connector threadFunction( void * param ); }; - /** - * The mutex of this object. + /* mutex and cond variable for signaling new data + * the consumers wait for this */ - pthread_mutex_t mutexProduce; - - /** - * The conditional variable for presenting new data. + pthread_mutex_t mutex_start; + pthread_cond_t cond_start; // producer sets this + + /* mutex and cond variable for signaling that a single thread has + * finished working on a block of data + * the producer waits for this, then checks if everyone is done + * and maybe waits again until everyone has finished on the data */ - pthread_cond_t condProduce; - + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; // consumer sets this + /** * The thread attributes. */ @@ -159,7 +163,7 @@ class MultiThreadedConnector : public virtual Connector ThreadData * threads; /** - * Signal if we're running or not, so the threads no if to stop. + * Signal if we're running or not running */ bool running; @@ -282,7 +286,7 @@ class MultiThreadedConnector : public virtual Connector virtual MultiThreadedConnector & operator= ( const MultiThreadedConnector & connector ) throw ( Exception ); - + /** * Open the connector. Opens the Source and the Sinks if necessary. * @@ -314,7 +318,7 @@ class MultiThreadedConnector : public virtual Connector * @return the number of bytes read from the Source. * @exception Exception */ - virtual unsigned int + virtual unsigned long transfer ( unsigned long bytes, unsigned int bufSize, unsigned int sec, diff --git a/darkice/trunk/src/aflibConverter.cc b/darkice/trunk/src/aflibConverter.cc index 99bdffe..67a2cda 100644 --- a/darkice/trunk/src/aflibConverter.cc +++ b/darkice/trunk/src/aflibConverter.cc @@ -226,7 +226,7 @@ aflibConverter::resample( /* number of output samples returned */ int -aflibConverter::err_ret(char *s) +aflibConverter::err_ret(const char *s) { aflib_debug("resample: %s \n\n",s); /* Display error message */ return -1; diff --git a/darkice/trunk/src/aflibConverter.h b/darkice/trunk/src/aflibConverter.h index 32573eb..816bee6 100644 --- a/darkice/trunk/src/aflibConverter.h +++ b/darkice/trunk/src/aflibConverter.h @@ -110,7 +110,7 @@ private: operator=(const aflibConverter& op); int - err_ret(char *s); + err_ret(const char *s); void deleteMemory(); diff --git a/darkice/trunk/src/aflibDebug.cc b/darkice/trunk/src/aflibDebug.cc index d78aa01..9546c73 100644 --- a/darkice/trunk/src/aflibDebug.cc +++ b/darkice/trunk/src/aflibDebug.cc @@ -97,7 +97,8 @@ void output_message(::aflibDebug::Level level, const char *msg) { default: break; // avoid compile warning } - system(buff); + int r = system(buff); + if (r<0) fprintf(stderr, "aflibDebug, system() failed\n"); } /*