regression for commit 510

This commit is contained in:
rafael@riseup.net 2013-05-14 15:12:25 +00:00
parent 7c796baa9b
commit 8227296d5c
4 changed files with 100 additions and 177 deletions

View File

@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception )
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Transfer some data from the source to the sink * Transfer some data from the source to the sink
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
unsigned long unsigned int
Connector :: transfer ( unsigned long bytes, Connector :: transfer ( unsigned long bytes,
unsigned int bufSize, unsigned int bufSize,
unsigned int sec, unsigned int sec,

View File

@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter
* @return the number of bytes read from the Source. * @return the number of bytes read from the Source.
* @exception Exception * @exception Exception
*/ */
virtual unsigned long virtual unsigned int
transfer ( unsigned long bytes, transfer ( unsigned long bytes,
unsigned int bufSize, unsigned int bufSize,
unsigned int sec, unsigned int sec,

View File

@ -68,10 +68,9 @@ void
MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception ) MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception )
{ {
this->reconnect = reconnect; this->reconnect = reconnect;
pthread_mutex_init( &mutex_start, 0);
pthread_cond_init( &cond_start, 0); pthread_mutex_init( &mutexProduce, 0);
pthread_mutex_init( &mutex_done, 0); pthread_cond_init( &condProduce, 0);
pthread_cond_init( &cond_done, 0);
threads = 0; threads = 0;
} }
@ -87,10 +86,8 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception )
threads = 0; threads = 0;
} }
pthread_cond_destroy( &cond_start); pthread_cond_destroy( &condProduce);
pthread_mutex_destroy( &mutex_start); pthread_mutex_destroy( &mutexProduce);
pthread_cond_destroy( &cond_done);
pthread_mutex_destroy( &mutex_done);
} }
@ -102,11 +99,9 @@ MultiThreadedConnector :: MultiThreadedConnector (
throw ( Exception ) throw ( Exception )
: Connector( connector) : Connector( connector)
{ {
reconnect = connector.reconnect; reconnect = connector.reconnect;
mutex_start = connector.mutex_start; mutexProduce = connector.mutexProduce;
cond_start = connector.cond_start; condProduce = connector.condProduce;
mutex_done = connector.mutex_done;
cond_done = connector.cond_done;
if ( threads ) { if ( threads ) {
delete[] threads; delete[] threads;
@ -117,6 +112,7 @@ MultiThreadedConnector :: MultiThreadedConnector (
} }
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Assignment operator * Assignment operator
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
@ -127,11 +123,9 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
if ( this != &connector ) { if ( this != &connector ) {
Connector::operator=( connector); Connector::operator=( connector);
reconnect = connector.reconnect; reconnect = connector.reconnect;
mutex_start = connector.mutex_start; mutexProduce = connector.mutexProduce;
cond_start = connector.cond_start; condProduce = connector.condProduce;
mutex_done = connector.mutex_done;
cond_done = connector.cond_done;
if ( threads ) { if ( threads ) {
delete[] threads; delete[] threads;
@ -145,6 +139,7 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
return *this; return *this;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Open the source and all the sinks if needed * Open the source and all the sinks if needed
* Create the sink threads * Create the sink threads
@ -178,7 +173,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
threadData->connector = this; threadData->connector = this;
threadData->ixSink = i; threadData->ixSink = i;
threadData->accepting = true; threadData->accepting = true;
threadData->isDone = 1; // 1==STOP, activate thread in transfer() threadData->isDone = true;
if ( pthread_create( &(threadData->thread), if ( pthread_create( &(threadData->thread),
&threadAttr, &threadAttr,
ThreadData::threadFunction, ThreadData::threadFunction,
@ -192,10 +187,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
unsigned int j; unsigned int j;
// signal to stop for all running threads // signal to stop for all running threads
pthread_mutex_lock( &mutex_start); pthread_mutex_lock( &mutexProduce);
running = false; running = false;
pthread_cond_broadcast( &cond_start); pthread_cond_broadcast( &condProduce);
pthread_mutex_unlock( &mutex_start); pthread_mutex_unlock( &mutexProduce);
for ( j = 0; j < i; ++j ) { for ( j = 0; j < i; ++j ) {
pthread_join( threads[j].thread, 0); pthread_join( threads[j].thread, 0);
@ -214,14 +209,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Transfer some data from the source to the sink * Transfer some data from the source to the sink
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
unsigned long unsigned int
MultiThreadedConnector :: transfer ( unsigned long bytes, MultiThreadedConnector :: transfer ( unsigned long bytes,
unsigned int bufSize, unsigned int bufSize,
unsigned int sec, unsigned int sec,
unsigned int usec ) unsigned int usec )
throw ( Exception ) throw ( Exception )
{ {
unsigned long byteCounter; // when we reach byteCounter thread will end unsigned int b;
if ( numSinks == 0 ) { if ( numSinks == 0 ) {
return 0; return 0;
@ -234,72 +229,47 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
dataBuffer = new unsigned char[bufSize]; dataBuffer = new unsigned char[bufSize];
dataSize = 0; dataSize = 0;
/* if bytes==0 transfer until end of program, reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);
* if bytes>0 transfer upto number of bytes
*/
reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes);
byteCounter = 0; // init, no data bytes sent yet
for ( b = 0; !bytes || b < bytes; ) {
if ( source->canRead( sec, usec) ) {
unsigned int i;
while (running && (bytes == 0 || byteCounter < bytes)) { pthread_mutex_lock( &mutexProduce);
dataSize = source->read( dataBuffer, bufSize);
if (source->canRead(sec, usec)) { b += dataSize;
unsigned int i;
dataSize = source->read(dataBuffer, bufSize);
byteCounter += dataSize;
// check for EOF // check for EOF
if (dataSize == 0) { if ( dataSize == 0 ) {
reportEvent(3, "MultiThreadedConnector :: transfer, EOF"); reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
pthread_mutex_unlock( &mutexProduce);
break; break;
} }
pthread_mutex_lock(&mutex_start); for ( i = 0; i < numSinks; ++i ) {
for (i = 0; i < numSinks; ++i) { threads[i].isDone = false;
if (threads[i].accepting)
threads[i].isDone = 0; // consumers => RUN
} }
pthread_cond_broadcast(&cond_start); // kick the waiting consumers to look again
pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable // tell sink threads that there is some data available
pthread_cond_broadcast( &condProduce);
// wait for all sink threads to get done with this data // wait for all sink threads to get done with this data
// we do not spin here, we just wait for an event from the consumers while ( true ) {
pthread_mutex_lock(&mutex_done); for ( i = 0; i < numSinks && threads[i].isDone; ++i );
while ( 1 ) { if ( i == numSinks ) {
int rc = 0;
// wait for condition : releases mutex so other thread can change condition
rc = pthread_cond_wait(&cond_done, &mutex_done);
// mutex is locked again
if (rc != 0) {
reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail");
break; break;
} }
pthread_cond_wait( &condProduce, &mutexProduce);
int acceptor_count=0;
int stopped_count=0;
for (i = 0; i < numSinks; ++i) {
if (threads[i].accepting) {
acceptor_count++; // number of accepting threads
if (threads[i].isDone == 1)
stopped_count++; // number of accepting threads which have STOP
}
}
// break when all accepting threads are done
if (acceptor_count == stopped_count) {
break;
}
// at least one thread has not set the STOP flag yet
} }
pthread_mutex_unlock(&mutex_done); pthread_mutex_unlock( &mutexProduce);
// at this point all consumers are done with the block
} else { } else {
reportEvent(3,"MultiThreadedConnector :: transfer, can't read"); reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
break; break;
} }
} }
delete[] dataBuffer; delete[] dataBuffer;
return byteCounter; return b;
} }
@ -308,115 +278,71 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
* Read the presented data * Read the presented data
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
void void
MultiThreadedConnector::sinkThread(int ixSink) MultiThreadedConnector :: sinkThread( int ixSink )
{ {
ThreadData * threadData = &threads[ixSink]; ThreadData * threadData = &threads[ixSink];
Sink * sink = sinks[ixSink].get( ); Sink * sink = sinks[ixSink].get();
while ( running ) while ( running ) {
{
// wait for some data to become available // wait for some data to become available
// producer sets isDone==0 when consumer can continue pthread_mutex_lock( &mutexProduce);
// producer sets isDone==2 or running==0 to request termination while ( running && threadData->isDone ) {
pthread_mutex_lock( &mutex_start ); // LOCK pthread_cond_wait( &condProduce, &mutexProduce);
int rc=0;
while ( (rc==0) && running && (threadData->isDone==1) )
{
// wait for condition, releases lock
rc = pthread_cond_wait( &cond_start, &mutex_start );
// we hold the lock again
} }
pthread_mutex_unlock( &mutex_start ); // UNLOCK if ( !running ) {
pthread_mutex_unlock( &mutexProduce);
// something wrong or signal to quit detected
// break out of this loop, will end the thread
if ( running==false || threadData->isDone==2 || rc != 0 )
break; break;
}
if ( threadData->cut ) if ( threadData->cut) {
{ sink->cut();
sink->cut( );
threadData->cut = false; threadData->cut = false;
} }
if ( threadData->accepting ) if ( threadData->accepting ) {
{ if ( sink->canWrite( 0, 0) ) {
if ( sink->canWrite( 0, 0 ) ) try {
{ sink->write( dataBuffer, dataSize);
try } catch ( Exception & e ) {
{
sink->write( dataBuffer, dataSize );
} catch ( Exception & e )
{
// something wrong. don't accept more data, try to // something wrong. don't accept more data, try to
// reopen the sink next time around // reopen the sink next time around
threadData->accepting = false; threadData->accepting = false;
} }
} } else {
else
{
reportEvent( 4, reportEvent( 4,
"MultiThreadedConnector :: sinkThread can't write ", "MultiThreadedConnector :: sinkThread can't write ",
ixSink ); ixSink);
// don't care if we can't write // don't care if we can't write
} }
} }
threadData->isDone = true;
pthread_cond_broadcast( &condProduce);
pthread_mutex_unlock( &mutexProduce);
if ( !threadData->accepting ) { if ( !threadData->accepting ) {
if ( reconnect ) { if ( reconnect ) {
reportEvent( 4, reportEvent( 4,
"MultiThreadedConnector :: sinkThread reconnecting ", "MultiThreadedConnector :: sinkThread reconnecting ",
ixSink ); ixSink);
// if we're not accepting, try to reopen the sink // if we're not accepting, try to reopen the sink
try { try {
sink->close( ); sink->close();
Util::sleep( 1L, 0L ); Util::sleep(1L, 0L);
sink->open( ); sink->open();
sched_yield( ); sched_yield();
threadData->accepting = sink->isOpen( ); threadData->accepting = sink->isOpen();
} catch ( Exception & e ) { } catch ( Exception & e ) {
// don't care, just try and try again // don't care, just try and try again
} }
} } else {
else {
// if !reconnect, just stop the connector // if !reconnect, just stop the connector
// running = false; /* kill the whole application */ running = false;
// tell that we used the databuffer, do not wait for us anymore
pthread_mutex_lock( &mutex_done );
threadData->isDone = 1; // 1==STOP
pthread_mutex_unlock( &mutex_done );
reportEvent( 4,
"MultiThreadedConnector :: sinkThread no reconnect? ",
ixSink );
try
{
threadData->accepting = false;
sink->close( );
} catch ( Exception & e )
{
// don't care, just try and try again
reportEvent( 9,
"MultiThreadedConnector :: sinkThread do not care2 ",
ixSink );
}
} }
} }
}
pthread_mutex_lock( &mutex_done );
threadData->isDone = 1; // producer will check this flag
pthread_cond_signal( &cond_done ); // signal producer
pthread_mutex_unlock( &mutex_done );
} /* is running */
/* just make sure nobody will be waiting for us when we terminate */
pthread_mutex_lock( &mutex_done );
threadData->isDone = 1; // STOP
pthread_cond_signal( &cond_done ); // signal producer
pthread_mutex_unlock( &mutex_done );
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Signal to each sink to cut what they've done so far, and start anew. * Signal to each sink to cut what they've done so far, and start anew.
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
@ -443,15 +369,16 @@ MultiThreadedConnector :: close ( void ) throw ( Exception )
unsigned int i; unsigned int i;
// signal to stop for all threads // signal to stop for all threads
pthread_mutex_lock( &mutex_start ); pthread_mutex_lock( &mutexProduce);
running = false; running = false;
pthread_cond_broadcast( &cond_start ); pthread_cond_broadcast( &condProduce);
pthread_mutex_unlock( &mutex_start ); pthread_mutex_unlock( &mutexProduce);
// wait for all the threads to finish // wait for all the threads to finish
for ( i = 0; i < numSinks; ++i ) { for ( i = 0; i < numSinks; ++i ) {
pthread_join( threads[i].thread, 0); pthread_join( threads[i].thread, 0);
} }
pthread_attr_destroy( &threadAttr);
Connector::close(); Connector::close();
} }

View File

@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector
* Marks if the thread has processed the last batch * Marks if the thread has processed the last batch
* of data. * of data.
*/ */
int isDone; bool isDone;
/** /**
* A flag to show that the sink should be made to cut in the * A flag to show that the sink should be made to cut in the
@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector
this->ixSink = 0; this->ixSink = 0;
this->thread = 0; this->thread = 0;
this->accepting = false; this->accepting = false;
this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE this->isDone = false;
this->cut = false; this->cut = false;
} }
@ -138,19 +138,15 @@ class MultiThreadedConnector : public virtual Connector
threadFunction( void * param ); threadFunction( void * param );
}; };
/* mutex and cond variable for signaling new data /**
* the consumers wait for this * The mutex of this object.
*/ */
pthread_mutex_t mutex_start; pthread_mutex_t mutexProduce;
pthread_cond_t cond_start; // producer sets this
/* mutex and cond variable for signaling that a single thread has /**
* finished working on a block of data * The conditional variable for presenting new data.
* the producer waits for this, then checks if everyone is done
* and maybe waits again until everyone has finished on the data
*/ */
pthread_mutex_t mutex_done; pthread_cond_t condProduce;
pthread_cond_t cond_done; // consumer sets this
/** /**
* The thread attributes. * The thread attributes.
@ -163,7 +159,7 @@ class MultiThreadedConnector : public virtual Connector
ThreadData * threads; ThreadData * threads;
/** /**
* Signal if we're running or not running * Signal if we're running or not, so the threads no if to stop.
*/ */
bool running; bool running;
@ -318,7 +314,7 @@ class MultiThreadedConnector : public virtual Connector
* @return the number of bytes read from the Source. * @return the number of bytes read from the Source.
* @exception Exception * @exception Exception
*/ */
virtual unsigned long virtual unsigned int
transfer ( unsigned long bytes, transfer ( unsigned long bytes,
unsigned int bufSize, unsigned int bufSize,
unsigned int sec, unsigned int sec,