- extra check in castsink prevent null deref
- fix typos - change bytecounter (size limit) to long instead of int - fix some string/buffer strangeness in icecast2.cpp - increase ringbuffer size from 2 to 5 seconds in JackDspSource.cpp - prevent jack killing us on buffer overflow, we just report and continue - new producer/consumer scheme in MultiThreadedConnector.cpp it now runs parallel really - some compiler warnings fixed
This commit is contained in:
		
							parent
							
								
									8ac1639800
								
							
						
					
					
						commit
						43d89e2796
					
				| 
						 | 
				
			
			@ -1,4 +1,17 @@
 | 
			
		|||
next version
 | 
			
		||||
    o Fix 'Ring Ruffer' reports. 
 | 
			
		||||
      - Increased buffer for jack to 5 seconds
 | 
			
		||||
      - prevent darkice termination by jack, report no fatal problem when we
 | 
			
		||||
        have a ringbuffer overflow, can happen during startup
 | 
			
		||||
        If we can not handle input audio fast enough we just ignore the buffer
 | 
			
		||||
        and skip it, and just report it.
 | 
			
		||||
      - new multithreaded connector code, now handles encoders in parallel
 | 
			
		||||
        and does not spin waiting, cpu load will be very much lower now
 | 
			
		||||
        Codes uses 2 condition variables to report data availability and
 | 
			
		||||
        consumer thread availability
 | 
			
		||||
      - Hopes are that glitching reports will be a thing of the past
 | 
			
		||||
      - minor compiler warnings fixed
 | 
			
		||||
        (Fix by Edwin van den Oetelaar)
 | 
			
		||||
    o Issue #56: Wrong icecast2 password isn't properly reported, fixed.
 | 
			
		||||
	  thanks to Filipe Roque <flip.roque@gmail.com>
 | 
			
		||||
    o Issue #57: BufferedSink makes streams invalid, fixed.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -290,7 +290,11 @@ class CastSink : public Sink, public virtual Reporter
 | 
			
		|||
        inline virtual bool
 | 
			
		||||
        isOpen ( void ) const                       throw ()
 | 
			
		||||
        {
 | 
			
		||||
          return getSink()->isOpen();
 | 
			
		||||
            Sink *s = getSink();
 | 
			
		||||
            if (s) 
 | 
			
		||||
                return getSink()->isOpen();
 | 
			
		||||
            else 
 | 
			
		||||
                return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -252,7 +252,7 @@ Connector :: open ( void )                          throw ( Exception )
 | 
			
		|||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Transfer some data from the source to the sink
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
unsigned int
 | 
			
		||||
unsigned long
 | 
			
		||||
Connector :: transfer ( unsigned long       bytes,
 | 
			
		||||
                        unsigned int        bufSize,
 | 
			
		||||
                        unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			@ -271,7 +271,7 @@ Connector :: transfer ( unsigned long       bytes,
 | 
			
		|||
 | 
			
		||||
    unsigned char * buf = new unsigned char[bufSize];
 | 
			
		||||
 | 
			
		||||
    reportEvent( 6, "Connector :: tranfer, bytes", bytes);
 | 
			
		||||
    reportEvent( 6, "Connector :: transfer, bytes", bytes);
 | 
			
		||||
    
 | 
			
		||||
    for ( b = 0; !bytes || b < bytes; ) {
 | 
			
		||||
        unsigned int    d = 0;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter
 | 
			
		|||
         *  @return the number of bytes read from the Source.
 | 
			
		||||
         *  @exception Exception
 | 
			
		||||
         */
 | 
			
		||||
        virtual unsigned int
 | 
			
		||||
        virtual unsigned long
 | 
			
		||||
        transfer (  unsigned long       bytes,
 | 
			
		||||
                    unsigned int        bufSize,
 | 
			
		||||
                    unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -70,12 +70,6 @@
 | 
			
		|||
static const char fileid[] = "$Id$";
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Size of string conversion buffer
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
#define STRBUF_SIZE         32
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Expected positive response from server begins like this.
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +119,8 @@ IceCast2 :: sendLogin ( void )                           throw ( Exception )
 | 
			
		|||
    Sink          * sink   = getSink();
 | 
			
		||||
    Source        * source = getSocket();
 | 
			
		||||
    const char    * str;
 | 
			
		||||
    char            resp[STRBUF_SIZE];
 | 
			
		||||
    const int       buflen = 1024;  // some small buffer size
 | 
			
		||||
    char            resp[buflen];   // a little buffer
 | 
			
		||||
    unsigned int    len;
 | 
			
		||||
    unsigned int    lenExpected;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -178,7 +173,7 @@ IceCast2 :: sendLogin ( void )                           throw ( Exception )
 | 
			
		|||
    sink->write( str, strlen(str));
 | 
			
		||||
    {
 | 
			
		||||
        // send source:<password> encoded as base64
 | 
			
		||||
        char        * source = "source:";
 | 
			
		||||
        const char  * source = "source:";
 | 
			
		||||
        const char  * pwd    = getPassword();
 | 
			
		||||
        char        * tmp    = new char[Util::strLen(source) +
 | 
			
		||||
                                        Util::strLen(pwd) + 1];
 | 
			
		||||
| 
						 | 
				
			
			@ -197,10 +192,7 @@ IceCast2 :: sendLogin ( void )                           throw ( Exception )
 | 
			
		|||
    // send the ice- headers
 | 
			
		||||
    str = "\nice-bitrate: ";
 | 
			
		||||
    sink->write( str, strlen( str));
 | 
			
		||||
    if ( log10(getBitRate()) >= (STRBUF_SIZE-2) ) {
 | 
			
		||||
        throw Exception( __FILE__, __LINE__,
 | 
			
		||||
                         "bitrate does not fit string buffer", getBitRate());
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    sprintf( resp, "%d", getBitRate());
 | 
			
		||||
    sink->write( resp, strlen( resp));
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -243,10 +235,10 @@ IceCast2 :: sendLogin ( void )                           throw ( Exception )
 | 
			
		|||
 | 
			
		||||
    // read the response, expected response begins with responseOK
 | 
			
		||||
    lenExpected = Util::strLen( responseOK);
 | 
			
		||||
    if ( (len = source->read( resp, STRBUF_SIZE-1)) < lenExpected ) {
 | 
			
		||||
        return false;
 | 
			
		||||
    if ( (len = source->read( resp, buflen )) < lenExpected ) {
 | 
			
		||||
        return false; // short read, no need to continue
 | 
			
		||||
    }
 | 
			
		||||
    resp[lenExpected] = 0;
 | 
			
		||||
    resp[lenExpected] = '\x00'; // end string, truncate to expected length
 | 
			
		||||
 | 
			
		||||
    reportEvent(5,resp);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -261,13 +253,16 @@ IceCast2 :: sendLogin ( void )                           throw ( Exception )
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    if ( !Util::strEq( resp, responseOK) ) {
 | 
			
		||||
        return false;
 | 
			
		||||
        // some unexpected response from server
 | 
			
		||||
        throw Exception( __FILE__, __LINE__,
 | 
			
		||||
                         "Icecast2 - Unexpected response from server");
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    // suck anything that the other side has to say
 | 
			
		||||
    while ( source->canRead( 0, 0) && 
 | 
			
		||||
           (len = source->read( resp, STRBUF_SIZE-1)) );
 | 
			
		||||
 | 
			
		||||
           (len = source->read( resp, buflen )));
 | 
			
		||||
    
 | 
			
		||||
    // all is well, we are connected
 | 
			
		||||
    return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -259,10 +259,12 @@ JackDspSource :: open ( void )                       throw ( Exception )
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
    // Create a ring buffer for each channel
 | 
			
		||||
    rb_size = 2
 | 
			
		||||
            * jack_get_sample_rate(client)
 | 
			
		||||
            * sizeof (jack_default_audio_sample_t);
 | 
			
		||||
    for (c=0; c<getChannel(); c++) {
 | 
			
		||||
    /* will take about 1 MB buffer for each channel */
 | 
			
		||||
    rb_size = 5 /* number of seconds */
 | 
			
		||||
            * jack_get_sample_rate(client) /* eg 48000 */
 | 
			
		||||
            * sizeof (jack_default_audio_sample_t); /* eg 4 bytes */
 | 
			
		||||
            
 | 
			
		||||
    for (c=0; c < getChannel(); c++) {
 | 
			
		||||
        rb[c] = jack_ringbuffer_create(rb_size);
 | 
			
		||||
        if (!rb[c]) {
 | 
			
		||||
            throw Exception( __FILE__, __LINE__,
 | 
			
		||||
| 
						 | 
				
			
			@ -344,8 +346,8 @@ JackDspSource :: read (   void          * buf,
 | 
			
		|||
                          unsigned int    len )     throw ( Exception )
 | 
			
		||||
{
 | 
			
		||||
    jack_nframes_t samples         = len / 2 / getChannel();
 | 
			
		||||
    jack_nframes_t samples_read[2] = {0,0};
 | 
			
		||||
    short        * output          = (short*)buf;
 | 
			
		||||
    jack_nframes_t samples_read[2] = { 0, 0 };
 | 
			
		||||
    short        * output          = (short*) buf;
 | 
			
		||||
    unsigned int c, n;
 | 
			
		||||
 | 
			
		||||
    if ( !isOpen() ) {
 | 
			
		||||
| 
						 | 
				
			
			@ -363,14 +365,14 @@ JackDspSource :: read (   void          * buf,
 | 
			
		|||
    // We must be sure to fetch as many data on both channels
 | 
			
		||||
    int minBytesAvailable = samples * sizeof( jack_default_audio_sample_t );
 | 
			
		||||
 | 
			
		||||
    for (c=0; c<getChannel(); c++) {
 | 
			
		||||
    for (c=0; c < getChannel(); c++) {
 | 
			
		||||
        int readable = jack_ringbuffer_read_space(rb[c]);
 | 
			
		||||
        if (readable < minBytesAvailable) {
 | 
			
		||||
            minBytesAvailable = readable;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (c=0; c<getChannel(); c++) {    
 | 
			
		||||
    for (c=0; c < getChannel(); c++) {    
 | 
			
		||||
        // Copy frames from ring buffer to temporary buffer
 | 
			
		||||
        // and then convert samples to output buffer
 | 
			
		||||
        int bytes_read = jack_ringbuffer_read(rb[c],
 | 
			
		||||
| 
						 | 
				
			
			@ -456,15 +458,26 @@ JackDspSource :: process_callback( jack_nframes_t nframes, void *arg )
 | 
			
		|||
    if (self->client == NULL) {
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    /* copy data to ringbuffer; one per channel */
 | 
			
		||||
    for (c=0; c < self->getChannel(); c++) {    
 | 
			
		||||
        char *buf  = (char*)jack_port_get_buffer(self->ports[c], nframes);
 | 
			
		||||
        size_t len = jack_ringbuffer_write(self->rb[c], buf, to_write);
 | 
			
		||||
        if (len < to_write) {
 | 
			
		||||
            Reporter::reportEvent( 1, "failed to write to ring ruffer");
 | 
			
		||||
            return 1;
 | 
			
		||||
         }
 | 
			
		||||
    for (c=0; c < self->getChannel(); c++) {
 | 
			
		||||
      /* check space */
 | 
			
		||||
        size_t len;
 | 
			
		||||
        if (jack_ringbuffer_write_space(self->rb[c]) < to_write) {
 | 
			
		||||
            /* buffer is overflowing, skip the incoming data */
 | 
			
		||||
            jack_ringbuffer_write_advance(self->rb[c], to_write); 
 | 
			
		||||
            /* prevent blocking the ring buffer by updating internal pointers 
 | 
			
		||||
             * jack will now not terminate on xruns
 | 
			
		||||
             */
 | 
			
		||||
            Reporter::reportEvent( 1, "ring buffer full, skipping data");
 | 
			
		||||
            /* We do not return error to jack callback handler and keep going */
 | 
			
		||||
        } else {
 | 
			
		||||
            /* buffer has space, put data into ringbuffer */
 | 
			
		||||
            len = jack_ringbuffer_write(self->rb[c], (char *) jack_port_get_buffer(
 | 
			
		||||
                    self->ports[c], nframes), to_write);
 | 
			
		||||
            if (len != to_write) 
 | 
			
		||||
                Reporter::reportEvent( 1, "failed to write to ring buffer (can not happen)");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Success
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -68,9 +68,10 @@ void
 | 
			
		|||
MultiThreadedConnector :: init ( bool    reconnect )    throw ( Exception )
 | 
			
		||||
{
 | 
			
		||||
    this->reconnect = reconnect;
 | 
			
		||||
 | 
			
		||||
    pthread_mutex_init( &mutexProduce, 0);
 | 
			
		||||
    pthread_cond_init( &condProduce, 0);
 | 
			
		||||
    pthread_mutex_init( &mutex_start, 0);
 | 
			
		||||
    pthread_cond_init( &cond_start, 0);
 | 
			
		||||
    pthread_mutex_init( &mutex_done, 0);
 | 
			
		||||
    pthread_cond_init( &cond_done, 0);
 | 
			
		||||
    threads = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -86,8 +87,10 @@ MultiThreadedConnector :: strip ( void )                throw ( Exception )
 | 
			
		|||
        threads = 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pthread_cond_destroy( &condProduce);
 | 
			
		||||
    pthread_mutex_destroy( &mutexProduce);
 | 
			
		||||
    pthread_cond_destroy( &cond_start);
 | 
			
		||||
    pthread_mutex_destroy( &mutex_start);
 | 
			
		||||
    pthread_cond_destroy( &cond_done);
 | 
			
		||||
    pthread_mutex_destroy( &mutex_done);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -99,9 +102,11 @@ MultiThreadedConnector :: MultiThreadedConnector (
 | 
			
		|||
                                                            throw ( Exception )
 | 
			
		||||
            : Connector( connector)
 | 
			
		||||
{
 | 
			
		||||
    reconnect       = connector.reconnect;
 | 
			
		||||
    mutexProduce    = connector.mutexProduce;
 | 
			
		||||
    condProduce     = connector.condProduce;
 | 
			
		||||
    reconnect      = connector.reconnect;
 | 
			
		||||
    mutex_start    = connector.mutex_start;
 | 
			
		||||
    cond_start     = connector.cond_start;
 | 
			
		||||
    mutex_done     = connector.mutex_done;
 | 
			
		||||
    cond_done      = connector.cond_done;
 | 
			
		||||
 | 
			
		||||
    if ( threads ) {
 | 
			
		||||
        delete[] threads;
 | 
			
		||||
| 
						 | 
				
			
			@ -112,7 +117,6 @@ MultiThreadedConnector :: MultiThreadedConnector (
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Assignment operator
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
| 
						 | 
				
			
			@ -123,9 +127,11 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
 | 
			
		|||
    if ( this != &connector ) {
 | 
			
		||||
        Connector::operator=( connector);
 | 
			
		||||
 | 
			
		||||
        reconnect       = connector.reconnect;
 | 
			
		||||
        mutexProduce    = connector.mutexProduce;
 | 
			
		||||
        condProduce     = connector.condProduce;
 | 
			
		||||
        reconnect      = connector.reconnect;
 | 
			
		||||
        mutex_start    = connector.mutex_start;
 | 
			
		||||
        cond_start     = connector.cond_start;
 | 
			
		||||
        mutex_done     = connector.mutex_done;
 | 
			
		||||
        cond_done      = connector.cond_done;
 | 
			
		||||
 | 
			
		||||
        if ( threads ) {
 | 
			
		||||
            delete[] threads;
 | 
			
		||||
| 
						 | 
				
			
			@ -139,7 +145,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
 | 
			
		|||
    return *this;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Open the source and all the sinks if needed
 | 
			
		||||
 *  Create the sink threads
 | 
			
		||||
| 
						 | 
				
			
			@ -173,7 +178,7 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
        threadData->connector = this;
 | 
			
		||||
        threadData->ixSink    = i;
 | 
			
		||||
        threadData->accepting = true;
 | 
			
		||||
        threadData->isDone    = true;
 | 
			
		||||
        threadData->isDone    = 1; // 1==STOP, activate thread in transfer()
 | 
			
		||||
        if ( pthread_create( &(threadData->thread),
 | 
			
		||||
                             &threadAttr,
 | 
			
		||||
                             ThreadData::threadFunction,
 | 
			
		||||
| 
						 | 
				
			
			@ -187,10 +192,10 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
        unsigned int    j;
 | 
			
		||||
 | 
			
		||||
        // signal to stop for all running threads
 | 
			
		||||
        pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
        pthread_mutex_lock( &mutex_start);
 | 
			
		||||
        running = false;
 | 
			
		||||
        pthread_cond_broadcast( &condProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
        pthread_cond_broadcast( &cond_start);
 | 
			
		||||
        pthread_mutex_unlock( &mutex_start);
 | 
			
		||||
 | 
			
		||||
        for ( j = 0; j < i; ++j ) {
 | 
			
		||||
            pthread_join( threads[j].thread, 0);
 | 
			
		||||
| 
						 | 
				
			
			@ -209,14 +214,14 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Transfer some data from the source to the sink
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
unsigned int
 | 
			
		||||
unsigned long
 | 
			
		||||
MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		||||
                                     unsigned int        bufSize,
 | 
			
		||||
                                     unsigned int        sec,
 | 
			
		||||
                                     unsigned int        usec )
 | 
			
		||||
                                                            throw ( Exception )
 | 
			
		||||
{   
 | 
			
		||||
    unsigned int        b;
 | 
			
		||||
    unsigned long byteCounter;	// when we reach byteCounter thread will end
 | 
			
		||||
 | 
			
		||||
    if ( numSinks == 0 ) {
 | 
			
		||||
        return 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -229,47 +234,72 @@ MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		|||
    dataBuffer   = new unsigned char[bufSize];
 | 
			
		||||
    dataSize     = 0;
 | 
			
		||||
 | 
			
		||||
    reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);
 | 
			
		||||
 | 
			
		||||
    for ( b = 0; !bytes || b < bytes; ) {
 | 
			
		||||
        if ( source->canRead( sec, usec) ) {
 | 
			
		||||
            unsigned int        i;
 | 
			
		||||
 | 
			
		||||
            pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
            dataSize = source->read( dataBuffer, bufSize);
 | 
			
		||||
            b       += dataSize;
 | 
			
		||||
    /* if bytes==0 transfer until end of program, 
 | 
			
		||||
     * if bytes>0 transfer upto number of bytes 
 | 
			
		||||
     */
 | 
			
		||||
    reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes);
 | 
			
		||||
    byteCounter = 0;		// init, no data bytes sent yet
 | 
			
		||||
    
 | 
			
		||||
    
 | 
			
		||||
    while (running && (bytes == 0 || byteCounter < bytes)) {
 | 
			
		||||
 | 
			
		||||
        if (source->canRead(sec, usec)) {
 | 
			
		||||
	        unsigned int i;
 | 
			
		||||
            dataSize = source->read(dataBuffer, bufSize);
 | 
			
		||||
            byteCounter += dataSize;
 | 
			
		||||
            
 | 
			
		||||
            // check for EOF
 | 
			
		||||
            if ( dataSize == 0 ) {
 | 
			
		||||
                reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
 | 
			
		||||
                pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
            if (dataSize == 0) {
 | 
			
		||||
                reportEvent(3, "MultiThreadedConnector :: transfer, EOF");
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            for ( i = 0; i < numSinks; ++i ) {
 | 
			
		||||
                threads[i].isDone = false;
 | 
			
		||||
            pthread_mutex_lock(&mutex_start);
 | 
			
		||||
            for (i = 0; i < numSinks; ++i) {
 | 
			
		||||
                if (threads[i].accepting)
 | 
			
		||||
                    threads[i].isDone = 0; // consumers => RUN
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // tell sink threads that there is some data available
 | 
			
		||||
            pthread_cond_broadcast( &condProduce);
 | 
			
		||||
 | 
			
		||||
            pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again
 | 
			
		||||
            pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable
 | 
			
		||||
            
 | 
			
		||||
            // wait for all sink threads to get done with this data
 | 
			
		||||
            while ( true ) {
 | 
			
		||||
                for ( i = 0; i < numSinks && threads[i].isDone; ++i );
 | 
			
		||||
                if ( i == numSinks ) {
 | 
			
		||||
            // we do not spin here, we just wait for an event from the consumers
 | 
			
		||||
            pthread_mutex_lock(&mutex_done);
 | 
			
		||||
            while ( 1 ) {
 | 
			
		||||
                int rc = 0;
 | 
			
		||||
                // wait for condition : releases mutex so other thread can change condition
 | 
			
		||||
                rc = pthread_cond_wait(&cond_done, &mutex_done);
 | 
			
		||||
                // mutex is locked again
 | 
			
		||||
                if (rc != 0) {
 | 
			
		||||
                    reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail");
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
                pthread_cond_wait( &condProduce, &mutexProduce);
 | 
			
		||||
 | 
			
		||||
                int acceptor_count=0;
 | 
			
		||||
                int stopped_count=0;		
 | 
			
		||||
                for (i = 0; i < numSinks; ++i) {
 | 
			
		||||
                    if (threads[i].accepting) { 
 | 
			
		||||
                        acceptor_count++; // number of accepting threads
 | 
			
		||||
			if (threads[i].isDone == 1) 
 | 
			
		||||
                            stopped_count++; // number of accepting threads which have STOP
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                // break when all accepting threads are done                
 | 
			
		||||
                if (acceptor_count == stopped_count) {
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
		        // at least one thread has not set the STOP flag yet
 | 
			
		||||
            }
 | 
			
		||||
            pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
            pthread_mutex_unlock(&mutex_done);
 | 
			
		||||
	    // at this point all consumers are done with the block
 | 
			
		||||
        } else {
 | 
			
		||||
            reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
 | 
			
		||||
            reportEvent(3,"MultiThreadedConnector :: transfer, can't read");
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
	}  
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    delete[] dataBuffer;
 | 
			
		||||
    return b;
 | 
			
		||||
    return byteCounter;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -278,70 +308,114 @@ MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		|||
 *  Read the presented data
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
void
 | 
			
		||||
MultiThreadedConnector :: sinkThread( int       ixSink )
 | 
			
		||||
MultiThreadedConnector::sinkThread(int ixSink)
 | 
			
		||||
{
 | 
			
		||||
    ThreadData    * threadData = &threads[ixSink];
 | 
			
		||||
    Sink          * sink       = sinks[ixSink].get();
 | 
			
		||||
    ThreadData * threadData = &threads[ixSink];
 | 
			
		||||
    Sink * sink = sinks[ixSink].get( );
 | 
			
		||||
 | 
			
		||||
    while ( running ) {
 | 
			
		||||
    while ( running )
 | 
			
		||||
    {
 | 
			
		||||
        // wait for some data to become available
 | 
			
		||||
        pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
        while ( running && threadData->isDone ) {
 | 
			
		||||
            pthread_cond_wait( &condProduce, &mutexProduce);
 | 
			
		||||
        // producer sets isDone==0 when consumer can continue
 | 
			
		||||
        // producer sets isDone==2 or running==0 to request termination
 | 
			
		||||
        pthread_mutex_lock( &mutex_start ); // LOCK
 | 
			
		||||
        int rc=0;
 | 
			
		||||
        while ( (rc==0) && running && (threadData->isDone==1) )
 | 
			
		||||
        {
 | 
			
		||||
            // wait for condition, releases lock
 | 
			
		||||
            rc = pthread_cond_wait( &cond_start, &mutex_start );
 | 
			
		||||
            // we hold the lock again 
 | 
			
		||||
        }
 | 
			
		||||
        if ( !running ) {
 | 
			
		||||
            pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutex_start ); // UNLOCK
 | 
			
		||||
        
 | 
			
		||||
        // something wrong or signal to quit detected
 | 
			
		||||
        // break out of this loop, will end the thread
 | 
			
		||||
        if ( running==false || threadData->isDone==2 || rc != 0 ) 
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ( threadData->cut) {
 | 
			
		||||
            sink->cut();
 | 
			
		||||
        if ( threadData->cut )
 | 
			
		||||
        {
 | 
			
		||||
            sink->cut( );
 | 
			
		||||
            threadData->cut = false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ( threadData->accepting ) {
 | 
			
		||||
            if ( sink->canWrite( 0, 0) ) {
 | 
			
		||||
                try {
 | 
			
		||||
                    sink->write( dataBuffer, dataSize);
 | 
			
		||||
                } catch ( Exception     & e ) {
 | 
			
		||||
        if ( threadData->accepting )
 | 
			
		||||
        {
 | 
			
		||||
            if ( sink->canWrite( 0, 0 ) )
 | 
			
		||||
            {
 | 
			
		||||
                try
 | 
			
		||||
                {
 | 
			
		||||
                    sink->write( dataBuffer, dataSize );
 | 
			
		||||
                } catch ( Exception & e )
 | 
			
		||||
                {
 | 
			
		||||
                    // something wrong. don't accept more data, try to
 | 
			
		||||
                    // reopen the sink next time around
 | 
			
		||||
                    threadData->accepting = false;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                            "MultiThreadedConnector :: sinkThread can't write ",
 | 
			
		||||
                             ixSink);
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread can't write ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                // don't care if we can't write
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        threadData->isDone = true;
 | 
			
		||||
        pthread_cond_broadcast( &condProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
 | 
			
		||||
        if ( !threadData->accepting ) {
 | 
			
		||||
            if ( reconnect ) {
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                           "MultiThreadedConnector :: sinkThread reconnecting ",
 | 
			
		||||
                            ixSink);
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread reconnecting ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                // if we're not accepting, try to reopen the sink
 | 
			
		||||
                try {
 | 
			
		||||
                    sink->close();
 | 
			
		||||
                    Util::sleep(1L, 0L);
 | 
			
		||||
                    sink->open();
 | 
			
		||||
                    sched_yield();
 | 
			
		||||
                    threadData->accepting = sink->isOpen();
 | 
			
		||||
                } catch ( Exception   & e ) {
 | 
			
		||||
                    sink->close( );
 | 
			
		||||
                    Util::sleep( 1L, 0L );
 | 
			
		||||
                    sink->open( );
 | 
			
		||||
                    sched_yield( );
 | 
			
		||||
                    threadData->accepting = sink->isOpen( );
 | 
			
		||||
                } catch ( Exception & e ) {
 | 
			
		||||
                    // don't care, just try and try again
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                // if !reconnect, just stop the connector
 | 
			
		||||
                running = false;
 | 
			
		||||
            }
 | 
			
		||||
            else {
 | 
			
		||||
                // if !reconnect, just stop the connector
 | 
			
		||||
                // running = false; /* kill the whole application */	
 | 
			
		||||
                // tell that we used the databuffer, do not wait for us anymore
 | 
			
		||||
                pthread_mutex_lock( &mutex_done );
 | 
			
		||||
                threadData->isDone = 1; // 1==STOP
 | 
			
		||||
                pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread no reconnect? ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                try
 | 
			
		||||
                {
 | 
			
		||||
                    threadData->accepting = false;
 | 
			
		||||
                    sink->close( );
 | 
			
		||||
                } catch ( Exception & e )
 | 
			
		||||
                {
 | 
			
		||||
                    // don't care, just try and try again
 | 
			
		||||
                    reportEvent( 9,
 | 
			
		||||
                                 "MultiThreadedConnector :: sinkThread do not care2 ",
 | 
			
		||||
                                 ixSink );
 | 
			
		||||
                }
 | 
			
		||||
            } 
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
        
 | 
			
		||||
        pthread_mutex_lock( &mutex_done );
 | 
			
		||||
        threadData->isDone = 1; // producer will check this flag
 | 
			
		||||
        pthread_cond_signal( &cond_done ); // signal producer
 | 
			
		||||
        pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
    
 | 
			
		||||
    } /* is running */
 | 
			
		||||
 | 
			
		||||
    /* just make sure nobody will be waiting for us when we terminate */
 | 
			
		||||
    pthread_mutex_lock( &mutex_done );
 | 
			
		||||
    threadData->isDone = 1; // STOP
 | 
			
		||||
    pthread_cond_signal( &cond_done ); // signal producer
 | 
			
		||||
    pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Signal to each sink to cut what they've done so far, and start anew.
 | 
			
		||||
| 
						 | 
				
			
			@ -369,16 +443,15 @@ MultiThreadedConnector :: close ( void )                    throw ( Exception )
 | 
			
		|||
    unsigned int    i;
 | 
			
		||||
 | 
			
		||||
    // signal to stop for all threads
 | 
			
		||||
    pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
    pthread_mutex_lock( &mutex_start );
 | 
			
		||||
    running = false;
 | 
			
		||||
    pthread_cond_broadcast( &condProduce);
 | 
			
		||||
    pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
    pthread_cond_broadcast( &cond_start );
 | 
			
		||||
    pthread_mutex_unlock( &mutex_start );
 | 
			
		||||
 | 
			
		||||
    // wait for all the threads to finish
 | 
			
		||||
    for ( i = 0; i < numSinks; ++i ) {
 | 
			
		||||
        pthread_join( threads[i].thread, 0);
 | 
			
		||||
    }
 | 
			
		||||
    pthread_attr_destroy( &threadAttr);
 | 
			
		||||
 | 
			
		||||
    Connector::close();
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                 *  Marks if the thread has processed the last batch
 | 
			
		||||
                 *  of data.
 | 
			
		||||
                 */
 | 
			
		||||
                bool                        isDone;
 | 
			
		||||
                int                        isDone;
 | 
			
		||||
 | 
			
		||||
                /**
 | 
			
		||||
                 *  A flag to show that the sink should be made to cut in the
 | 
			
		||||
| 
						 | 
				
			
			@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                    this->ixSink    = 0;
 | 
			
		||||
                    this->thread    = 0;
 | 
			
		||||
                    this->accepting = false;
 | 
			
		||||
                    this->isDone    = false;
 | 
			
		||||
                    this->isDone    = 1;        // 0==RUN 1=STOP 2=TERMINATE        
 | 
			
		||||
                    this->cut       = false;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -138,16 +138,20 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                threadFunction( void      * param );
 | 
			
		||||
        };
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  The mutex of this object.
 | 
			
		||||
        /* mutex and cond variable for signaling new data 
 | 
			
		||||
         * the consumers wait for this 
 | 
			
		||||
         */
 | 
			
		||||
        pthread_mutex_t         mutexProduce;
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         *  The conditional variable for presenting new data.
 | 
			
		||||
        pthread_mutex_t         mutex_start;
 | 
			
		||||
        pthread_cond_t          cond_start; // producer sets this
 | 
			
		||||
        
 | 
			
		||||
        /* mutex and cond variable for signaling that a single thread has 
 | 
			
		||||
         * finished working on a block of data 
 | 
			
		||||
         * the producer waits for this, then checks if everyone is done
 | 
			
		||||
         * and maybe waits again until everyone has finished on the data
 | 
			
		||||
         */
 | 
			
		||||
        pthread_cond_t          condProduce;
 | 
			
		||||
 | 
			
		||||
        pthread_mutex_t mutex_done;
 | 
			
		||||
        pthread_cond_t  cond_done; // consumer sets this  
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  The thread attributes.
 | 
			
		||||
         */
 | 
			
		||||
| 
						 | 
				
			
			@ -159,7 +163,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
        ThreadData            * threads;
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         *  Signal if we're running or not, so the threads no if to stop.
 | 
			
		||||
         *  Signal if we're running or not running
 | 
			
		||||
         */
 | 
			
		||||
        bool                    running;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -282,7 +286,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
        virtual MultiThreadedConnector &
 | 
			
		||||
        operator= ( const MultiThreadedConnector &   connector )
 | 
			
		||||
                                                            throw ( Exception );
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  Open the connector. Opens the Source and the Sinks if necessary.
 | 
			
		||||
         *
 | 
			
		||||
| 
						 | 
				
			
			@ -314,7 +318,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
         *  @return the number of bytes read from the Source.
 | 
			
		||||
         *  @exception Exception
 | 
			
		||||
         */
 | 
			
		||||
        virtual unsigned int
 | 
			
		||||
        virtual unsigned long
 | 
			
		||||
        transfer (  unsigned long       bytes,
 | 
			
		||||
                    unsigned int        bufSize,
 | 
			
		||||
                    unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -226,7 +226,7 @@ aflibConverter::resample(       /* number of output samples returned */
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
int
 | 
			
		||||
aflibConverter::err_ret(char *s)
 | 
			
		||||
aflibConverter::err_ret(const char *s)
 | 
			
		||||
{
 | 
			
		||||
    aflib_debug("resample: %s \n\n",s); /* Display error message  */
 | 
			
		||||
    return -1;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -110,7 +110,7 @@ private:
 | 
			
		|||
   operator=(const aflibConverter& op);
 | 
			
		||||
 | 
			
		||||
   int
 | 
			
		||||
   err_ret(char *s);
 | 
			
		||||
   err_ret(const char *s);
 | 
			
		||||
 | 
			
		||||
   void
 | 
			
		||||
   deleteMemory();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -97,7 +97,8 @@ void output_message(::aflibDebug::Level level, const char *msg) {
 | 
			
		|||
	  default:
 | 
			
		||||
		  break; // avoid compile warning
 | 
			
		||||
	}
 | 
			
		||||
	system(buff);
 | 
			
		||||
	int r = system(buff);
 | 
			
		||||
        if (r<0) fprintf(stderr, "aflibDebug, system() failed\n");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue