Trying again locking patch
This commit is contained in:
		
							parent
							
								
									94dd6d7a80
								
							
						
					
					
						commit
						95fe8f1ab9
					
				| 
						 | 
				
			
			@ -252,7 +252,7 @@ Connector :: open ( void )                          throw ( Exception )
 | 
			
		|||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Transfer some data from the source to the sink
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
unsigned int
 | 
			
		||||
unsigned long
 | 
			
		||||
Connector :: transfer ( unsigned long       bytes,
 | 
			
		||||
                        unsigned int        bufSize,
 | 
			
		||||
                        unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter
 | 
			
		|||
         *  @return the number of bytes read from the Source.
 | 
			
		||||
         *  @exception Exception
 | 
			
		||||
         */
 | 
			
		||||
        virtual unsigned int
 | 
			
		||||
        virtual unsigned long
 | 
			
		||||
        transfer (  unsigned long       bytes,
 | 
			
		||||
                    unsigned int        bufSize,
 | 
			
		||||
                    unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -68,9 +68,10 @@ void
 | 
			
		|||
MultiThreadedConnector :: init ( bool    reconnect )    throw ( Exception )
 | 
			
		||||
{
 | 
			
		||||
    this->reconnect = reconnect;
 | 
			
		||||
 | 
			
		||||
    pthread_mutex_init( &mutexProduce, 0);
 | 
			
		||||
    pthread_cond_init( &condProduce, 0);
 | 
			
		||||
    pthread_mutex_init( &mutex_start, 0);
 | 
			
		||||
    pthread_cond_init( &cond_start, 0);
 | 
			
		||||
    pthread_mutex_init( &mutex_done, 0);
 | 
			
		||||
    pthread_cond_init( &cond_done, 0);
 | 
			
		||||
    threads = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -86,8 +87,10 @@ MultiThreadedConnector :: strip ( void )                throw ( Exception )
 | 
			
		|||
        threads = 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pthread_cond_destroy( &condProduce);
 | 
			
		||||
    pthread_mutex_destroy( &mutexProduce);
 | 
			
		||||
    pthread_cond_destroy( &cond_start);
 | 
			
		||||
    pthread_mutex_destroy( &mutex_start);
 | 
			
		||||
    pthread_cond_destroy( &cond_done);
 | 
			
		||||
    pthread_mutex_destroy( &mutex_done);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -99,9 +102,11 @@ MultiThreadedConnector :: MultiThreadedConnector (
 | 
			
		|||
                                                            throw ( Exception )
 | 
			
		||||
            : Connector( connector)
 | 
			
		||||
{
 | 
			
		||||
    reconnect       = connector.reconnect;
 | 
			
		||||
    mutexProduce    = connector.mutexProduce;
 | 
			
		||||
    condProduce     = connector.condProduce;
 | 
			
		||||
    reconnect      = connector.reconnect;
 | 
			
		||||
    mutex_start    = connector.mutex_start;
 | 
			
		||||
    cond_start     = connector.cond_start;
 | 
			
		||||
    mutex_done     = connector.mutex_done;
 | 
			
		||||
    cond_done      = connector.cond_done;
 | 
			
		||||
 | 
			
		||||
    if ( threads ) {
 | 
			
		||||
        delete[] threads;
 | 
			
		||||
| 
						 | 
				
			
			@ -112,7 +117,6 @@ MultiThreadedConnector :: MultiThreadedConnector (
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Assignment operator
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
| 
						 | 
				
			
			@ -123,9 +127,11 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
 | 
			
		|||
    if ( this != &connector ) {
 | 
			
		||||
        Connector::operator=( connector);
 | 
			
		||||
 | 
			
		||||
        reconnect       = connector.reconnect;
 | 
			
		||||
        mutexProduce    = connector.mutexProduce;
 | 
			
		||||
        condProduce     = connector.condProduce;
 | 
			
		||||
        reconnect      = connector.reconnect;
 | 
			
		||||
        mutex_start    = connector.mutex_start;
 | 
			
		||||
        cond_start     = connector.cond_start;
 | 
			
		||||
        mutex_done     = connector.mutex_done;
 | 
			
		||||
        cond_done      = connector.cond_done;
 | 
			
		||||
 | 
			
		||||
        if ( threads ) {
 | 
			
		||||
            delete[] threads;
 | 
			
		||||
| 
						 | 
				
			
			@ -139,7 +145,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
 | 
			
		|||
    return *this;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Open the source and all the sinks if needed
 | 
			
		||||
 *  Create the sink threads
 | 
			
		||||
| 
						 | 
				
			
			@ -173,7 +178,7 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
        threadData->connector = this;
 | 
			
		||||
        threadData->ixSink    = i;
 | 
			
		||||
        threadData->accepting = true;
 | 
			
		||||
        threadData->isDone    = true;
 | 
			
		||||
        threadData->isDone    = 1; // 1==STOP, activate thread in transfer()
 | 
			
		||||
        if ( pthread_create( &(threadData->thread),
 | 
			
		||||
                             &threadAttr,
 | 
			
		||||
                             ThreadData::threadFunction,
 | 
			
		||||
| 
						 | 
				
			
			@ -187,10 +192,10 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
        unsigned int    j;
 | 
			
		||||
 | 
			
		||||
        // signal to stop for all running threads
 | 
			
		||||
        pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
        pthread_mutex_lock( &mutex_start);
 | 
			
		||||
        running = false;
 | 
			
		||||
        pthread_cond_broadcast( &condProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
        pthread_cond_broadcast( &cond_start);
 | 
			
		||||
        pthread_mutex_unlock( &mutex_start);
 | 
			
		||||
 | 
			
		||||
        for ( j = 0; j < i; ++j ) {
 | 
			
		||||
            pthread_join( threads[j].thread, 0);
 | 
			
		||||
| 
						 | 
				
			
			@ -209,14 +214,14 @@ MultiThreadedConnector :: open ( void )                     throw ( Exception )
 | 
			
		|||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Transfer some data from the source to the sink
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
unsigned int
 | 
			
		||||
unsigned long
 | 
			
		||||
MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		||||
                                     unsigned int        bufSize,
 | 
			
		||||
                                     unsigned int        sec,
 | 
			
		||||
                                     unsigned int        usec )
 | 
			
		||||
                                                            throw ( Exception )
 | 
			
		||||
{   
 | 
			
		||||
    unsigned int        b;
 | 
			
		||||
    unsigned long byteCounter;	// when we reach byteCounter thread will end
 | 
			
		||||
 | 
			
		||||
    if ( numSinks == 0 ) {
 | 
			
		||||
        return 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -229,47 +234,73 @@ MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		|||
    dataBuffer   = new unsigned char[bufSize];
 | 
			
		||||
    dataSize     = 0;
 | 
			
		||||
 | 
			
		||||
    reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);
 | 
			
		||||
 | 
			
		||||
    for ( b = 0; !bytes || b < bytes; ) {
 | 
			
		||||
        if ( source->canRead( sec, usec) ) {
 | 
			
		||||
            unsigned int        i;
 | 
			
		||||
 | 
			
		||||
            pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
            dataSize = source->read( dataBuffer, bufSize);
 | 
			
		||||
            b       += dataSize;
 | 
			
		||||
    /* if bytes==0 transfer until end of program, 
 | 
			
		||||
     * if bytes>0 transfer upto number of bytes 
 | 
			
		||||
     */
 | 
			
		||||
    reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes);
 | 
			
		||||
    byteCounter = 0;		// init, no data bytes sent yet
 | 
			
		||||
    
 | 
			
		||||
    
 | 
			
		||||
    while (running && (bytes == 0 || byteCounter < bytes)) {
 | 
			
		||||
 | 
			
		||||
        if (source->canRead(sec, usec)) {
 | 
			
		||||
	        unsigned int i;
 | 
			
		||||
            dataSize = source->read(dataBuffer, bufSize);
 | 
			
		||||
            byteCounter += dataSize;
 | 
			
		||||
            
 | 
			
		||||
            // check for EOF
 | 
			
		||||
            if ( dataSize == 0 ) {
 | 
			
		||||
                reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
 | 
			
		||||
                pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
            if (dataSize == 0) {
 | 
			
		||||
                reportEvent(3, "MultiThreadedConnector :: transfer, EOF");
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            for ( i = 0; i < numSinks; ++i ) {
 | 
			
		||||
                threads[i].isDone = false;
 | 
			
		||||
            pthread_mutex_lock(&mutex_start);
 | 
			
		||||
            for (i = 0; i < numSinks; ++i) {
 | 
			
		||||
                if (threads[i].accepting)
 | 
			
		||||
                    threads[i].isDone = 0; // consumers => RUN
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // tell sink threads that there is some data available
 | 
			
		||||
            pthread_cond_broadcast( &condProduce);
 | 
			
		||||
            pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again
 | 
			
		||||
 | 
			
		||||
            // wait for all sink threads to get done with this data
 | 
			
		||||
            while ( true ) {
 | 
			
		||||
                for ( i = 0; i < numSinks && threads[i].isDone; ++i );
 | 
			
		||||
                if ( i == numSinks ) {
 | 
			
		||||
            // we do not spin here, we just wait for an event from the consumers
 | 
			
		||||
            pthread_mutex_lock(&mutex_done);
 | 
			
		||||
            pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable
 | 
			
		||||
            
 | 
			
		||||
            while ( 1 ) {
 | 
			
		||||
                int rc = 0;
 | 
			
		||||
                // wait for condition : releases mutex so other thread can change condition
 | 
			
		||||
                rc = pthread_cond_wait(&cond_done, &mutex_done);
 | 
			
		||||
                // mutex is locked again
 | 
			
		||||
                if (rc != 0) {
 | 
			
		||||
                    reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail");
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
                pthread_cond_wait( &condProduce, &mutexProduce);
 | 
			
		||||
 | 
			
		||||
                int acceptor_count=0;
 | 
			
		||||
                int stopped_count=0;		
 | 
			
		||||
                for (i = 0; i < numSinks; ++i) {
 | 
			
		||||
                    if (threads[i].accepting) { 
 | 
			
		||||
                        acceptor_count++; // number of accepting threads
 | 
			
		||||
			if (threads[i].isDone == 1) 
 | 
			
		||||
                            stopped_count++; // number of accepting threads which have STOP
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                // break when all accepting threads are done                
 | 
			
		||||
                if (acceptor_count == stopped_count) {
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
		        // at least one thread has not set the STOP flag yet
 | 
			
		||||
            }
 | 
			
		||||
            pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
            pthread_mutex_unlock(&mutex_done);
 | 
			
		||||
	    // at this point all consumers are done with the block
 | 
			
		||||
        } else {
 | 
			
		||||
            reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
 | 
			
		||||
            reportEvent(3,"MultiThreadedConnector :: transfer, can't read");
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
	}  
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    delete[] dataBuffer;
 | 
			
		||||
    return b;
 | 
			
		||||
    return byteCounter;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -278,70 +309,114 @@ MultiThreadedConnector :: transfer ( unsigned long       bytes,
 | 
			
		|||
 *  Read the presented data
 | 
			
		||||
 *----------------------------------------------------------------------------*/
 | 
			
		||||
void
 | 
			
		||||
MultiThreadedConnector :: sinkThread( int       ixSink )
 | 
			
		||||
MultiThreadedConnector::sinkThread(int ixSink)
 | 
			
		||||
{
 | 
			
		||||
    ThreadData    * threadData = &threads[ixSink];
 | 
			
		||||
    Sink          * sink       = sinks[ixSink].get();
 | 
			
		||||
    ThreadData * threadData = &threads[ixSink];
 | 
			
		||||
    Sink * sink = sinks[ixSink].get( );
 | 
			
		||||
 | 
			
		||||
    while ( running ) {
 | 
			
		||||
    while ( running )
 | 
			
		||||
    {
 | 
			
		||||
        // wait for some data to become available
 | 
			
		||||
        pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
        while ( running && threadData->isDone ) {
 | 
			
		||||
            pthread_cond_wait( &condProduce, &mutexProduce);
 | 
			
		||||
        // producer sets isDone==0 when consumer can continue
 | 
			
		||||
        // producer sets isDone==2 or running==0 to request termination
 | 
			
		||||
        pthread_mutex_lock( &mutex_start ); // LOCK
 | 
			
		||||
        int rc=0;
 | 
			
		||||
        while ( (rc==0) && running && (threadData->isDone==1) )
 | 
			
		||||
        {
 | 
			
		||||
            // wait for condition, releases lock
 | 
			
		||||
            rc = pthread_cond_wait( &cond_start, &mutex_start );
 | 
			
		||||
            // we hold the lock again 
 | 
			
		||||
        }
 | 
			
		||||
        if ( !running ) {
 | 
			
		||||
            pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutex_start ); // UNLOCK
 | 
			
		||||
        
 | 
			
		||||
        // something wrong or signal to quit detected
 | 
			
		||||
        // break out of this loop, will end the thread
 | 
			
		||||
        if ( running==false || threadData->isDone==2 || rc != 0 ) 
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ( threadData->cut) {
 | 
			
		||||
            sink->cut();
 | 
			
		||||
        if ( threadData->cut )
 | 
			
		||||
        {
 | 
			
		||||
            sink->cut( );
 | 
			
		||||
            threadData->cut = false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ( threadData->accepting ) {
 | 
			
		||||
            if ( sink->canWrite( 0, 0) ) {
 | 
			
		||||
                try {
 | 
			
		||||
                    sink->write( dataBuffer, dataSize);
 | 
			
		||||
                } catch ( Exception     & e ) {
 | 
			
		||||
        if ( threadData->accepting )
 | 
			
		||||
        {
 | 
			
		||||
            if ( sink->canWrite( 0, 0 ) )
 | 
			
		||||
            {
 | 
			
		||||
                try
 | 
			
		||||
                {
 | 
			
		||||
                    sink->write( dataBuffer, dataSize );
 | 
			
		||||
                } catch ( Exception & e )
 | 
			
		||||
                {
 | 
			
		||||
                    // something wrong. don't accept more data, try to
 | 
			
		||||
                    // reopen the sink next time around
 | 
			
		||||
                    threadData->accepting = false;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                            "MultiThreadedConnector :: sinkThread can't write ",
 | 
			
		||||
                             ixSink);
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread can't write ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                // don't care if we can't write
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        threadData->isDone = true;
 | 
			
		||||
        pthread_cond_broadcast( &condProduce);
 | 
			
		||||
        pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
 | 
			
		||||
        if ( !threadData->accepting ) {
 | 
			
		||||
            if ( reconnect ) {
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                           "MultiThreadedConnector :: sinkThread reconnecting ",
 | 
			
		||||
                            ixSink);
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread reconnecting ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                // if we're not accepting, try to reopen the sink
 | 
			
		||||
                try {
 | 
			
		||||
                    sink->close();
 | 
			
		||||
                    Util::sleep(1L, 0L);
 | 
			
		||||
                    sink->open();
 | 
			
		||||
                    sched_yield();
 | 
			
		||||
                    threadData->accepting = sink->isOpen();
 | 
			
		||||
                } catch ( Exception   & e ) {
 | 
			
		||||
                    sink->close( );
 | 
			
		||||
                    Util::sleep( 1L, 0L );
 | 
			
		||||
                    sink->open( );
 | 
			
		||||
                    sched_yield( );
 | 
			
		||||
                    threadData->accepting = sink->isOpen( );
 | 
			
		||||
                } catch ( Exception & e ) {
 | 
			
		||||
                    // don't care, just try and try again
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                // if !reconnect, just stop the connector
 | 
			
		||||
                running = false;
 | 
			
		||||
            }
 | 
			
		||||
            else {
 | 
			
		||||
                // if !reconnect, just stop the connector
 | 
			
		||||
                // running = false; /* kill the whole application */	
 | 
			
		||||
                // tell that we used the databuffer, do not wait for us anymore
 | 
			
		||||
                pthread_mutex_lock( &mutex_done );
 | 
			
		||||
                threadData->isDone = 1; // 1==STOP
 | 
			
		||||
                pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
                reportEvent( 4,
 | 
			
		||||
                             "MultiThreadedConnector :: sinkThread no reconnect? ",
 | 
			
		||||
                             ixSink );
 | 
			
		||||
                try
 | 
			
		||||
                {
 | 
			
		||||
                    threadData->accepting = false;
 | 
			
		||||
                    sink->close( );
 | 
			
		||||
                } catch ( Exception & e )
 | 
			
		||||
                {
 | 
			
		||||
                    // don't care, just try and try again
 | 
			
		||||
                    reportEvent( 9,
 | 
			
		||||
                                 "MultiThreadedConnector :: sinkThread do not care2 ",
 | 
			
		||||
                                 ixSink );
 | 
			
		||||
                }
 | 
			
		||||
            } 
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
        
 | 
			
		||||
        pthread_mutex_lock( &mutex_done );
 | 
			
		||||
        threadData->isDone = 1; // producer will check this flag
 | 
			
		||||
        pthread_cond_signal( &cond_done ); // signal producer
 | 
			
		||||
        pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
    
 | 
			
		||||
    } /* is running */
 | 
			
		||||
 | 
			
		||||
    /* just make sure nobody will be waiting for us when we terminate */
 | 
			
		||||
    pthread_mutex_lock( &mutex_done );
 | 
			
		||||
    threadData->isDone = 1; // STOP
 | 
			
		||||
    pthread_cond_signal( &cond_done ); // signal producer
 | 
			
		||||
    pthread_mutex_unlock( &mutex_done );
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*------------------------------------------------------------------------------
 | 
			
		||||
 *  Signal to each sink to cut what they've done so far, and start anew.
 | 
			
		||||
| 
						 | 
				
			
			@ -369,16 +444,15 @@ MultiThreadedConnector :: close ( void )                    throw ( Exception )
 | 
			
		|||
    unsigned int    i;
 | 
			
		||||
 | 
			
		||||
    // signal to stop for all threads
 | 
			
		||||
    pthread_mutex_lock( &mutexProduce);
 | 
			
		||||
    pthread_mutex_lock( &mutex_start );
 | 
			
		||||
    running = false;
 | 
			
		||||
    pthread_cond_broadcast( &condProduce);
 | 
			
		||||
    pthread_mutex_unlock( &mutexProduce);
 | 
			
		||||
    pthread_cond_broadcast( &cond_start );
 | 
			
		||||
    pthread_mutex_unlock( &mutex_start );
 | 
			
		||||
 | 
			
		||||
    // wait for all the threads to finish
 | 
			
		||||
    for ( i = 0; i < numSinks; ++i ) {
 | 
			
		||||
        pthread_join( threads[i].thread, 0);
 | 
			
		||||
    }
 | 
			
		||||
    pthread_attr_destroy( &threadAttr);
 | 
			
		||||
 | 
			
		||||
    Connector::close();
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                 *  Marks if the thread has processed the last batch
 | 
			
		||||
                 *  of data.
 | 
			
		||||
                 */
 | 
			
		||||
                bool                        isDone;
 | 
			
		||||
                int                        isDone;
 | 
			
		||||
 | 
			
		||||
                /**
 | 
			
		||||
                 *  A flag to show that the sink should be made to cut in the
 | 
			
		||||
| 
						 | 
				
			
			@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                    this->ixSink    = 0;
 | 
			
		||||
                    this->thread    = 0;
 | 
			
		||||
                    this->accepting = false;
 | 
			
		||||
                    this->isDone    = false;
 | 
			
		||||
                    this->isDone    = 1;        // 0==RUN 1=STOP 2=TERMINATE        
 | 
			
		||||
                    this->cut       = false;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -138,16 +138,20 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
                threadFunction( void      * param );
 | 
			
		||||
        };
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  The mutex of this object.
 | 
			
		||||
        /* mutex and cond variable for signaling new data 
 | 
			
		||||
         * the consumers wait for this 
 | 
			
		||||
         */
 | 
			
		||||
        pthread_mutex_t         mutexProduce;
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         *  The conditional variable for presenting new data.
 | 
			
		||||
        pthread_mutex_t         mutex_start;
 | 
			
		||||
        pthread_cond_t          cond_start; // producer sets this
 | 
			
		||||
        
 | 
			
		||||
        /* mutex and cond variable for signaling that a single thread has 
 | 
			
		||||
         * finished working on a block of data 
 | 
			
		||||
         * the producer waits for this, then checks if everyone is done
 | 
			
		||||
         * and maybe waits again until everyone has finished on the data
 | 
			
		||||
         */
 | 
			
		||||
        pthread_cond_t          condProduce;
 | 
			
		||||
 | 
			
		||||
        pthread_mutex_t mutex_done;
 | 
			
		||||
        pthread_cond_t  cond_done; // consumer sets this  
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  The thread attributes.
 | 
			
		||||
         */
 | 
			
		||||
| 
						 | 
				
			
			@ -159,7 +163,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
        ThreadData            * threads;
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         *  Signal if we're running or not, so the threads no if to stop.
 | 
			
		||||
         *  Signal if we're running or not running
 | 
			
		||||
         */
 | 
			
		||||
        bool                    running;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -282,7 +286,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
        virtual MultiThreadedConnector &
 | 
			
		||||
        operator= ( const MultiThreadedConnector &   connector )
 | 
			
		||||
                                                            throw ( Exception );
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         *  Open the connector. Opens the Source and the Sinks if necessary.
 | 
			
		||||
         *
 | 
			
		||||
| 
						 | 
				
			
			@ -314,7 +318,7 @@ class MultiThreadedConnector : public virtual Connector
 | 
			
		|||
         *  @return the number of bytes read from the Source.
 | 
			
		||||
         *  @exception Exception
 | 
			
		||||
         */
 | 
			
		||||
        virtual unsigned int
 | 
			
		||||
        virtual unsigned long
 | 
			
		||||
        transfer (  unsigned long       bytes,
 | 
			
		||||
                    unsigned int        bufSize,
 | 
			
		||||
                    unsigned int        sec,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue