reverting locking changes
This commit is contained in:
parent
02ca1a2f59
commit
5ff26ff48b
|
@ -252,7 +252,7 @@ Connector :: open ( void ) throw ( Exception )
|
|||
/*------------------------------------------------------------------------------
|
||||
* Transfer some data from the source to the sink
|
||||
*----------------------------------------------------------------------------*/
|
||||
unsigned long
|
||||
unsigned int
|
||||
Connector :: transfer ( unsigned long bytes,
|
||||
unsigned int bufSize,
|
||||
unsigned int sec,
|
||||
|
|
|
@ -228,7 +228,7 @@ class Connector : public virtual Referable, public virtual Reporter
|
|||
* @return the number of bytes read from the Source.
|
||||
* @exception Exception
|
||||
*/
|
||||
virtual unsigned long
|
||||
virtual unsigned int
|
||||
transfer ( unsigned long bytes,
|
||||
unsigned int bufSize,
|
||||
unsigned int sec,
|
||||
|
|
|
@ -68,11 +68,9 @@ void
|
|||
MultiThreadedConnector :: init ( bool reconnect ) throw ( Exception )
|
||||
{
|
||||
this->reconnect = reconnect;
|
||||
pthread_mutex_init(&mutex_number_not_listening_yet, 0);
|
||||
pthread_mutex_init(&mutex_start, 0);
|
||||
pthread_cond_init(&cond_start, 0);
|
||||
pthread_mutex_init(&mutex_done, 0);
|
||||
pthread_cond_init(&cond_done, 0);
|
||||
|
||||
pthread_mutex_init( &mutexProduce, 0);
|
||||
pthread_cond_init( &condProduce, 0);
|
||||
threads = 0;
|
||||
}
|
||||
|
||||
|
@ -88,11 +86,8 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception )
|
|||
threads = 0;
|
||||
}
|
||||
|
||||
pthread_cond_destroy(&cond_done);
|
||||
pthread_mutex_destroy(&mutex_done);
|
||||
pthread_cond_destroy(&cond_start);
|
||||
pthread_mutex_destroy(&mutex_start);
|
||||
pthread_mutex_destroy(&mutex_number_not_listening_yet);
|
||||
pthread_cond_destroy( &condProduce);
|
||||
pthread_mutex_destroy( &mutexProduce);
|
||||
}
|
||||
|
||||
|
||||
|
@ -104,14 +99,9 @@ MultiThreadedConnector :: MultiThreadedConnector (
|
|||
throw ( Exception )
|
||||
: Connector( connector)
|
||||
{
|
||||
reconnect = connector.reconnect;
|
||||
mutex_start = connector.mutex_start;
|
||||
mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet;
|
||||
number_not_listening_yet = connector.number_not_listening_yet;
|
||||
|
||||
cond_start = connector.cond_start;
|
||||
mutex_done = connector.mutex_done;
|
||||
cond_done = connector.cond_done;
|
||||
reconnect = connector.reconnect;
|
||||
mutexProduce = connector.mutexProduce;
|
||||
condProduce = connector.condProduce;
|
||||
|
||||
if ( threads ) {
|
||||
delete[] threads;
|
||||
|
@ -122,6 +112,7 @@ MultiThreadedConnector :: MultiThreadedConnector (
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/*------------------------------------------------------------------------------
|
||||
* Assignment operator
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
@ -132,13 +123,9 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
|
|||
if ( this != &connector ) {
|
||||
Connector::operator=( connector);
|
||||
|
||||
reconnect = connector.reconnect;
|
||||
mutex_start = connector.mutex_start;
|
||||
mutex_number_not_listening_yet = connector.mutex_number_not_listening_yet;
|
||||
number_not_listening_yet = connector.number_not_listening_yet;
|
||||
cond_start = connector.cond_start;
|
||||
mutex_done = connector.mutex_done;
|
||||
cond_done = connector.cond_done;
|
||||
reconnect = connector.reconnect;
|
||||
mutexProduce = connector.mutexProduce;
|
||||
condProduce = connector.condProduce;
|
||||
|
||||
if ( threads ) {
|
||||
delete[] threads;
|
||||
|
@ -152,6 +139,7 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
|
|||
return *this;
|
||||
}
|
||||
|
||||
|
||||
/*------------------------------------------------------------------------------
|
||||
* Open the source and all the sinks if needed
|
||||
* Create the sink threads
|
||||
|
@ -168,7 +156,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
|
||||
running = true;
|
||||
|
||||
pthread_attr_init(&threadAttr);
|
||||
pthread_attr_init( &threadAttr);
|
||||
pthread_attr_getstacksize(&threadAttr, &st);
|
||||
if (st < 128 * 1024) {
|
||||
reportEvent( 5, "MultiThreadedConnector :: open, stack size ",
|
||||
|
@ -176,11 +164,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
st = 128 * 1024;
|
||||
pthread_attr_setstacksize(&threadAttr, st);
|
||||
}
|
||||
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
pthread_mutex_lock(&mutex_number_not_listening_yet);
|
||||
number_not_listening_yet = numSinks;
|
||||
pthread_mutex_unlock(&mutex_number_not_listening_yet);
|
||||
pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
threads = new ThreadData[numSinks];
|
||||
for ( i = 0; i < numSinks; ++i ) {
|
||||
|
@ -189,7 +173,7 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
threadData->connector = this;
|
||||
threadData->ixSink = i;
|
||||
threadData->accepting = true;
|
||||
threadData->isDone = 1; // 1==STOP, activate thread in transfer()
|
||||
threadData->isDone = true;
|
||||
if ( pthread_create( &(threadData->thread),
|
||||
&threadAttr,
|
||||
ThreadData::threadFunction,
|
||||
|
@ -203,10 +187,10 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
unsigned int j;
|
||||
|
||||
// signal to stop for all running threads
|
||||
pthread_mutex_lock( &mutex_start);
|
||||
pthread_mutex_lock( &mutexProduce);
|
||||
running = false;
|
||||
pthread_cond_broadcast( &cond_start);
|
||||
pthread_mutex_unlock( &mutex_start);
|
||||
pthread_cond_broadcast( &condProduce);
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
|
||||
for ( j = 0; j < i; ++j ) {
|
||||
pthread_join( threads[j].thread, 0);
|
||||
|
@ -218,21 +202,6 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
return false;
|
||||
}
|
||||
|
||||
// we have created all threads, make sure they are waiting for
|
||||
// command from the producer
|
||||
while (1) {
|
||||
pthread_mutex_lock(&mutex_number_not_listening_yet);
|
||||
if (0 == number_not_listening_yet) {
|
||||
reportEvent( 6, "MultiThreadedConnector::open() all consumers standing by");
|
||||
break;
|
||||
} else {
|
||||
pthread_mutex_unlock(&mutex_number_not_listening_yet);
|
||||
pthread_yield(); // give space to let the consumers running
|
||||
reportEvent( 6, "MultiThreadedConnector::open() waiting for consumers standing by");
|
||||
usleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -240,14 +209,14 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|||
/*------------------------------------------------------------------------------
|
||||
* Transfer some data from the source to the sink
|
||||
*----------------------------------------------------------------------------*/
|
||||
unsigned long
|
||||
unsigned int
|
||||
MultiThreadedConnector :: transfer ( unsigned long bytes,
|
||||
unsigned int bufSize,
|
||||
unsigned int sec,
|
||||
unsigned int usec )
|
||||
throw ( Exception )
|
||||
{
|
||||
unsigned long byteCounter; // when we reach byteCounter thread will end
|
||||
unsigned int b;
|
||||
|
||||
if ( numSinks == 0 ) {
|
||||
return 0;
|
||||
|
@ -260,80 +229,47 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
|
|||
dataBuffer = new unsigned char[bufSize];
|
||||
dataSize = 0;
|
||||
|
||||
/* if bytes==0 transfer until end of program,
|
||||
* if bytes>0 transfer upto number of bytes
|
||||
*/
|
||||
reportEvent( 6, "MultiThreadedConnector::transfer count:", bytes);
|
||||
byteCounter = 0; // init, no data bytes sent yet
|
||||
reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);
|
||||
|
||||
for ( b = 0; !bytes || b < bytes; ) {
|
||||
if ( source->canRead( sec, usec) ) {
|
||||
unsigned int i;
|
||||
|
||||
while (running && (bytes == 0 || byteCounter < bytes)) {
|
||||
|
||||
if (source->canRead(sec, usec)) {
|
||||
unsigned int i;
|
||||
dataSize = source->read(dataBuffer, bufSize);
|
||||
byteCounter += dataSize;
|
||||
pthread_mutex_lock( &mutexProduce);
|
||||
dataSize = source->read( dataBuffer, bufSize);
|
||||
b += dataSize;
|
||||
|
||||
// check for EOF
|
||||
if (dataSize == 0) {
|
||||
reportEvent(3, "MultiThreadedConnector :: transfer, EOF");
|
||||
if ( dataSize == 0 ) {
|
||||
reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
break;
|
||||
} else {
|
||||
// reportEvent(9, "MultiThreadedConnector::transfer ",dataSize);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&mutex_start);
|
||||
for (i = 0; i < numSinks; ++i) {
|
||||
threads[i].isDone = 0; // ALL consumers => RUN
|
||||
for ( i = 0; i < numSinks; ++i ) {
|
||||
threads[i].isDone = false;
|
||||
}
|
||||
pthread_cond_broadcast(&cond_start); // kick ALL the waiting consumers to look again
|
||||
|
||||
// tell sink threads that there is some data available
|
||||
pthread_cond_broadcast( &condProduce);
|
||||
|
||||
// wait for all sink threads to get done with this data
|
||||
// we do not spin here, we just wait for an event from the consumers
|
||||
pthread_mutex_lock(&mutex_done);
|
||||
pthread_mutex_unlock(&mutex_start); // UNLOCK, release the consumers' cond variable
|
||||
|
||||
while ( 1 ) {
|
||||
int rc = 0;
|
||||
// wait for condition : releases mutex so other thread can change condition
|
||||
rc = pthread_cond_wait(&cond_done, &mutex_done);
|
||||
// mutex is locked again
|
||||
if (rc != 0) {
|
||||
reportEvent(1, "MultiThreadedConnector pthread_cond_wait() fail");
|
||||
while ( true ) {
|
||||
for ( i = 0; i < numSinks && threads[i].isDone; ++i );
|
||||
if ( i == numSinks ) {
|
||||
break;
|
||||
}
|
||||
|
||||
int acceptor_count=0;
|
||||
int stopped_count=0;
|
||||
for (i = 0; i < numSinks; ++i) {
|
||||
if (threads[i].accepting) {
|
||||
acceptor_count++; // number of accepting threads
|
||||
if (threads[i].isDone == 1)
|
||||
stopped_count++; // number of accepting threads which have STOP
|
||||
}
|
||||
}
|
||||
// if no thread is accepting and reconnect is not set stop the application
|
||||
if (acceptor_count == 0 && reconnect == false) {
|
||||
running=false;
|
||||
break;
|
||||
}
|
||||
|
||||
// break when all accepting threads are done
|
||||
if (acceptor_count == stopped_count) {
|
||||
break;
|
||||
}
|
||||
// at least one thread has not set the isDone flag yet and is still accepting
|
||||
pthread_cond_wait( &condProduce, &mutexProduce);
|
||||
}
|
||||
pthread_mutex_unlock(&mutex_done);
|
||||
// at this point all consumers are done with the block
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
} else {
|
||||
reportEvent(3,"MultiThreadedConnector :: transfer, can't read");
|
||||
reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete[] dataBuffer;
|
||||
return byteCounter;
|
||||
return b;
|
||||
}
|
||||
|
||||
|
||||
|
@ -342,124 +278,71 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
|
|||
* Read the presented data
|
||||
*----------------------------------------------------------------------------*/
|
||||
void
|
||||
MultiThreadedConnector::sinkThread(int ixSink)
|
||||
MultiThreadedConnector :: sinkThread( int ixSink )
|
||||
{
|
||||
ThreadData * threadData = &threads[ixSink];
|
||||
Sink * sink = sinks[ixSink].get();
|
||||
ThreadData * threadData = &threads[ixSink];
|
||||
Sink * sink = sinks[ixSink].get();
|
||||
|
||||
pthread_mutex_lock( &mutex_start ); // LOCK mutex for cond_start
|
||||
// we now tell the producer we are listening
|
||||
pthread_mutex_lock(&mutex_number_not_listening_yet);
|
||||
number_not_listening_yet--;
|
||||
pthread_mutex_unlock(&mutex_number_not_listening_yet);
|
||||
|
||||
while (1) {
|
||||
while ( running ) {
|
||||
// wait for some data to become available
|
||||
// producer sets isDone==0 when consumer can continue
|
||||
// producer sets isDone==2 or running==0 to request termination
|
||||
|
||||
int rc=0;
|
||||
while ( (rc==0) && running && (threadData->isDone==1) )
|
||||
{
|
||||
// wait for condition, releases lock
|
||||
rc = pthread_cond_wait( &cond_start, &mutex_start );
|
||||
// we hold the lock again
|
||||
// we check flags under protection of the lock
|
||||
pthread_mutex_lock( &mutexProduce);
|
||||
while ( running && threadData->isDone ) {
|
||||
pthread_cond_wait( &condProduce, &mutexProduce);
|
||||
}
|
||||
pthread_mutex_unlock( &mutex_start ); // UNLOCK
|
||||
|
||||
// something wrong or signal to quit detected
|
||||
// break out of this loop, will end the thread
|
||||
if ( running==false || threadData->isDone==2 || rc != 0 )
|
||||
if ( !running ) {
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
break;
|
||||
}
|
||||
|
||||
if ( threadData->cut )
|
||||
{
|
||||
sink->cut( );
|
||||
if ( threadData->cut) {
|
||||
sink->cut();
|
||||
threadData->cut = false;
|
||||
}
|
||||
|
||||
if ( threadData->accepting )
|
||||
{
|
||||
if ( sink->canWrite( 0, 0 ) )
|
||||
{
|
||||
try
|
||||
{
|
||||
sink->write( dataBuffer, dataSize );
|
||||
} catch ( Exception & e )
|
||||
{
|
||||
if ( threadData->accepting ) {
|
||||
if ( sink->canWrite( 0, 0) ) {
|
||||
try {
|
||||
sink->write( dataBuffer, dataSize);
|
||||
} catch ( Exception & e ) {
|
||||
// something wrong. don't accept more data, try to
|
||||
// reopen the sink NEXT time around, for now just report done
|
||||
// reopen the sink next time around
|
||||
threadData->accepting = false;
|
||||
reportEvent( 4,
|
||||
"MultiThreadedConnector :: sinkThread can't write X", ixSink );
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
reportEvent( 4,
|
||||
"MultiThreadedConnector :: sinkThread can't write ", ixSink );
|
||||
"MultiThreadedConnector :: sinkThread can't write ",
|
||||
ixSink);
|
||||
// don't care if we can't write
|
||||
}
|
||||
}
|
||||
threadData->isDone = true;
|
||||
pthread_cond_broadcast( &condProduce);
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
|
||||
pthread_mutex_lock( &mutex_done );
|
||||
threadData->isDone = 1; // producer will check this flag
|
||||
pthread_cond_signal( &cond_done ); // signal producer
|
||||
pthread_mutex_unlock( &mutex_done );
|
||||
|
||||
if ( ! threadData->accepting) {
|
||||
// not accepting
|
||||
if ( !threadData->accepting ) {
|
||||
if ( reconnect ) {
|
||||
reportEvent( 4,
|
||||
"MultiThreadedConnector :: sinkThread reconnecting ",
|
||||
ixSink );
|
||||
"MultiThreadedConnector :: sinkThread reconnecting ",
|
||||
ixSink);
|
||||
// if we're not accepting, try to reopen the sink
|
||||
try {
|
||||
sink->close( );
|
||||
Util::sleep( 1L, 0L );
|
||||
sink->open( );
|
||||
sched_yield( );
|
||||
threadData->accepting = sink->isOpen( );
|
||||
} catch ( Exception & e ) {
|
||||
sink->close();
|
||||
Util::sleep(1L, 0L);
|
||||
sink->open();
|
||||
sched_yield();
|
||||
threadData->accepting = sink->isOpen();
|
||||
} catch ( Exception & e ) {
|
||||
// don't care, just try and try again
|
||||
reportEvent( 4,
|
||||
"MultiThreadedConnector::sinkThread Reconnect failed", ixSink );
|
||||
}
|
||||
} else {
|
||||
// if !reconnect, just stop the connector
|
||||
reportEvent( 4,
|
||||
"MultiThreadedConnector :: sinkThread no reconnect? ",
|
||||
ixSink);
|
||||
try
|
||||
{
|
||||
threadData->accepting = false; // no more data for us
|
||||
sink->close( );
|
||||
} catch ( Exception & e )
|
||||
{
|
||||
// don't care, just try and try again
|
||||
reportEvent( 9,
|
||||
"MultiThreadedConnector :: sinkThread do not care2 ",
|
||||
ixSink );
|
||||
}
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_lock( &mutex_done );
|
||||
threadData->isDone = 1; // producer will check this flag
|
||||
pthread_cond_signal( &cond_done ); // signal producer
|
||||
pthread_mutex_unlock( &mutex_done );
|
||||
|
||||
} /* is running */
|
||||
|
||||
/* just make sure nobody will be waiting for us when we terminate */
|
||||
pthread_mutex_lock( &mutex_done );
|
||||
threadData->isDone = 1; // STOP
|
||||
pthread_cond_signal( &cond_done ); // signal producer
|
||||
pthread_mutex_unlock( &mutex_done );
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*------------------------------------------------------------------------------
|
||||
* Signal to each sink to cut what they've done so far, and start anew.
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
@ -486,15 +369,16 @@ MultiThreadedConnector :: close ( void ) throw ( Exception )
|
|||
unsigned int i;
|
||||
|
||||
// signal to stop for all threads
|
||||
pthread_mutex_lock( &mutex_start );
|
||||
pthread_mutex_lock( &mutexProduce);
|
||||
running = false;
|
||||
pthread_cond_broadcast( &cond_start );
|
||||
pthread_mutex_unlock( &mutex_start );
|
||||
pthread_cond_broadcast( &condProduce);
|
||||
pthread_mutex_unlock( &mutexProduce);
|
||||
|
||||
// wait for all the threads to finish
|
||||
for ( i = 0; i < numSinks; ++i ) {
|
||||
pthread_join( threads[i].thread, 0);
|
||||
}
|
||||
pthread_attr_destroy( &threadAttr);
|
||||
|
||||
Connector::close();
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ class MultiThreadedConnector : public virtual Connector
|
|||
* Marks if the thread has processed the last batch
|
||||
* of data.
|
||||
*/
|
||||
int isDone;
|
||||
bool isDone;
|
||||
|
||||
/**
|
||||
* A flag to show that the sink should be made to cut in the
|
||||
|
@ -123,7 +123,7 @@ class MultiThreadedConnector : public virtual Connector
|
|||
this->ixSink = 0;
|
||||
this->thread = 0;
|
||||
this->accepting = false;
|
||||
this->isDone = 1; // 0==RUN 1=STOP 2=TERMINATE
|
||||
this->isDone = false;
|
||||
this->cut = false;
|
||||
}
|
||||
|
||||
|
@ -138,33 +138,15 @@ class MultiThreadedConnector : public virtual Connector
|
|||
threadFunction( void * param );
|
||||
};
|
||||
|
||||
/* mutex and cond variable for signaling new data
|
||||
* the consumers wait for this
|
||||
/**
|
||||
* The mutex of this object.
|
||||
*/
|
||||
pthread_mutex_t mutex_start;
|
||||
pthread_cond_t cond_start; // producer sets this
|
||||
pthread_mutex_t mutexProduce;
|
||||
|
||||
/* mutex and cond variable for signaling that a single thread has
|
||||
* finished working on a block of data
|
||||
* the producer waits for this, then checks if everyone is done
|
||||
* and maybe waits again until everyone has finished on the data
|
||||
/**
|
||||
* The conditional variable for presenting new data.
|
||||
*/
|
||||
pthread_mutex_t mutex_done;
|
||||
pthread_cond_t cond_done; // consumer sets this
|
||||
|
||||
/* mutex on number of consumers not listening yet to the producer
|
||||
* this is to prevent a race during startup
|
||||
* The producer should only signal the consumers when it knows
|
||||
* that all consumers are waiting on the condition var to change
|
||||
* not before, because a consumer might mis the signal and not start
|
||||
* which would also mean that it will not finish, thereby blocking
|
||||
* the producer
|
||||
*/
|
||||
|
||||
pthread_mutex_t mutex_number_not_listening_yet;
|
||||
// when this is 0 all consumers are
|
||||
// ready to take commands from the producer
|
||||
int number_not_listening_yet;
|
||||
pthread_cond_t condProduce;
|
||||
|
||||
/**
|
||||
* The thread attributes.
|
||||
|
@ -177,7 +159,7 @@ class MultiThreadedConnector : public virtual Connector
|
|||
ThreadData * threads;
|
||||
|
||||
/**
|
||||
* Signal if we're running or not running
|
||||
* Signal if we're running or not, so the threads no if to stop.
|
||||
*/
|
||||
bool running;
|
||||
|
||||
|
@ -332,7 +314,7 @@ class MultiThreadedConnector : public virtual Connector
|
|||
* @return the number of bytes read from the Source.
|
||||
* @exception Exception
|
||||
*/
|
||||
virtual unsigned long
|
||||
virtual unsigned int
|
||||
transfer ( unsigned long bytes,
|
||||
unsigned int bufSize,
|
||||
unsigned int sec,
|
||||
|
|
Loading…
Reference in New Issue