|
|
|
|
@@ -57,8 +57,6 @@ MultiThreadedConnector :: init ( void ) throw ( Exception )
|
|
|
|
|
{
|
|
|
|
|
pthread_mutex_init( &mutexProduce, 0);
|
|
|
|
|
pthread_cond_init( &condProduce, 0);
|
|
|
|
|
pthread_mutex_init( &mutexConsume, 0);
|
|
|
|
|
pthread_cond_init( &condConsume, 0);
|
|
|
|
|
threads = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -74,8 +72,6 @@ MultiThreadedConnector :: strip ( void ) throw ( Exception )
|
|
|
|
|
threads = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pthread_cond_destroy( &condConsume);
|
|
|
|
|
pthread_mutex_destroy( &mutexConsume);
|
|
|
|
|
pthread_cond_destroy( &condProduce);
|
|
|
|
|
pthread_mutex_destroy( &mutexProduce);
|
|
|
|
|
}
|
|
|
|
|
@@ -91,8 +87,6 @@ MultiThreadedConnector :: MultiThreadedConnector (
|
|
|
|
|
{
|
|
|
|
|
mutexProduce = connector.mutexProduce;
|
|
|
|
|
condProduce = connector.condProduce;
|
|
|
|
|
mutexConsume = connector.mutexConsume;
|
|
|
|
|
condConsume = connector.condConsume;
|
|
|
|
|
|
|
|
|
|
if ( threads ) {
|
|
|
|
|
delete[] threads;
|
|
|
|
|
@@ -116,8 +110,6 @@ MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
|
|
|
|
|
|
|
|
|
|
mutexProduce = connector.mutexProduce;
|
|
|
|
|
condProduce = connector.condProduce;
|
|
|
|
|
mutexConsume = connector.mutexConsume;
|
|
|
|
|
condConsume = connector.condConsume;
|
|
|
|
|
|
|
|
|
|
if ( threads ) {
|
|
|
|
|
delete[] threads;
|
|
|
|
|
@@ -152,17 +144,16 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
|
|
|
|
|
|
|
|
|
|
threads = new ThreadData[numSinks];
|
|
|
|
|
for ( i = 0; i < numSinks; ++i ) {
|
|
|
|
|
reportEvent( 8, "Connector :: open starting thread ", i);
|
|
|
|
|
ThreadData * threadData = threads + i;
|
|
|
|
|
|
|
|
|
|
threadData->connector = this;
|
|
|
|
|
threadData->ixSink = i;
|
|
|
|
|
threadData->accepting = true;
|
|
|
|
|
threadData->isDone = true;
|
|
|
|
|
if ( pthread_create( &(threadData->thread),
|
|
|
|
|
&threadAttr,
|
|
|
|
|
ThreadData::threadFunction,
|
|
|
|
|
threadData ) ) {
|
|
|
|
|
reportEvent( 8, "Connector :: open couldn't start thread ", i);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -220,11 +211,8 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
|
|
|
|
|
if ( source->canRead( sec, usec) ) {
|
|
|
|
|
unsigned int i;
|
|
|
|
|
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexProduce, enter lock");
|
|
|
|
|
pthread_mutex_lock( &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexProduce, lock");
|
|
|
|
|
dataSize = source->read( dataBuffer, bufSize);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, read", dataSize);
|
|
|
|
|
b += dataSize;
|
|
|
|
|
|
|
|
|
|
/* check for EOF */
|
|
|
|
|
@@ -234,46 +222,27 @@ reportEvent( 8, "Connector :: transfer, read", dataSize);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for ( i = 0; i < numSinks; ++i ) {
|
|
|
|
|
threads[i].isDone = false;
|
|
|
|
|
if ( threads[i].accepting ) {
|
|
|
|
|
threads[i].isDone = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* tell sink threads that there is some data available */
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexProduce, broadcast");
|
|
|
|
|
pthread_cond_broadcast( &condProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexProduce, broadcasted");
|
|
|
|
|
|
|
|
|
|
/* wait for all sink threads to get done with this data */
|
|
|
|
|
while ( true ) {
|
|
|
|
|
for ( i = 0; i < numSinks && threads[i].isDone; ++i );
|
|
|
|
|
for ( i = 0; i < numSinks; ++i ) {
|
|
|
|
|
if ( threads[i].accepting && !threads[i].isDone ) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if ( i == numSinks ) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, waiting");
|
|
|
|
|
pthread_cond_wait( &condProduce, &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, waking");
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock( &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexProduce, exit lock");
|
|
|
|
|
|
|
|
|
|
/* wait for all sink threads to process the presented data */
|
|
|
|
|
/*
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, enter lock");
|
|
|
|
|
pthread_mutex_lock( &mutexConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, lock");
|
|
|
|
|
while ( true ) {
|
|
|
|
|
for ( i = 0; i < numSinks && threads[i].isDone; ++i );
|
|
|
|
|
if ( i == numSinks ) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, waiting");
|
|
|
|
|
pthread_cond_wait( &condConsume, &mutexConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, waking");
|
|
|
|
|
}
|
|
|
|
|
consumeCount = 0;
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, unlock");
|
|
|
|
|
pthread_mutex_unlock( &mutexConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: transfer, mutexConsume, exit lock");
|
|
|
|
|
*/
|
|
|
|
|
} else {
|
|
|
|
|
reportEvent( 3, "Connector :: transfer, can't read");
|
|
|
|
|
break;
|
|
|
|
|
@@ -297,49 +266,48 @@ MultiThreadedConnector :: sinkThread( int ixSink )
|
|
|
|
|
|
|
|
|
|
while ( running ) {
|
|
|
|
|
/* wait for some data to become available */
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, enter lock ", ixSink);
|
|
|
|
|
pthread_mutex_lock( &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, lock ", ixSink);
|
|
|
|
|
while ( running && threadData->isDone ) {
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, wait ", ixSink);
|
|
|
|
|
pthread_cond_wait( &condProduce, &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, wake ", ixSink);
|
|
|
|
|
}
|
|
|
|
|
if ( !running ) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ( sink->canWrite( 0, 0) ) {
|
|
|
|
|
unsigned int written;
|
|
|
|
|
written = sink->write( dataBuffer, dataSize);
|
|
|
|
|
/*
|
|
|
|
|
unsigned int written;
|
|
|
|
|
written = sink->write( dataBuffer, dataSize);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, written ", written);
|
|
|
|
|
*/
|
|
|
|
|
try {
|
|
|
|
|
sink->write( dataBuffer, dataSize);
|
|
|
|
|
} catch ( Exception & e ) {
|
|
|
|
|
// something wrong. don't accept more data, try to
|
|
|
|
|
// reopen the sink
|
|
|
|
|
threadData->accepting = false;
|
|
|
|
|
pthread_mutex_unlock( &mutexProduce);
|
|
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
try {
|
|
|
|
|
sink->close();
|
|
|
|
|
sink->open();
|
|
|
|
|
} catch ( Exception & e ) {
|
|
|
|
|
// don't care, just try and try again
|
|
|
|
|
}
|
|
|
|
|
// TODO: wait some time every cycle, so as not to
|
|
|
|
|
// overload the system
|
|
|
|
|
} while ( !sink->isOpen() );
|
|
|
|
|
|
|
|
|
|
// all OK, accept data, act as if we're done
|
|
|
|
|
pthread_mutex_lock( &mutexProduce);
|
|
|
|
|
threadData->accepting = true;
|
|
|
|
|
threadData->isDone = true;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
reportEvent( 3, "Connector :: sinkThread MAYDAY MAYDAY can't write ", ixSink);
|
|
|
|
|
reportEvent( 4,
|
|
|
|
|
"MultiThreadedConnector :: sinkThread can't write ",
|
|
|
|
|
ixSink);
|
|
|
|
|
/* don't care if we can't write */
|
|
|
|
|
}
|
|
|
|
|
threadData->isDone = true;
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, condProduce, broadcast ", ixSink);
|
|
|
|
|
pthread_cond_broadcast( &condProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, condProduce, broadcasted ", ixSink);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, unlock", ixSink);
|
|
|
|
|
pthread_mutex_unlock( &mutexProduce);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexProduce, exit lock", ixSink);
|
|
|
|
|
|
|
|
|
|
/* signal that we have read the data */
|
|
|
|
|
/*
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexConsume, enter lock");
|
|
|
|
|
pthread_mutex_lock( &mutexConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexConsume, lock");
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexConsume, signal");
|
|
|
|
|
pthread_cond_signal( &condConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexConsume, unlock");
|
|
|
|
|
pthread_mutex_unlock( &mutexConsume);
|
|
|
|
|
reportEvent( 8, "Connector :: sinkThread, mutexConsume, exit lock");
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -375,17 +343,6 @@ void *
|
|
|
|
|
MultiThreadedConnector :: ThreadData :: threadFunction( void * param )
|
|
|
|
|
{
|
|
|
|
|
ThreadData * threadData = (ThreadData*) param;
|
|
|
|
|
threadData->connector->reportEvent( 8,
|
|
|
|
|
"ThreadData :: threadFunction thread starts ",
|
|
|
|
|
(threadData->ixSink) );
|
|
|
|
|
threadData->connector->reportEvent( 8,
|
|
|
|
|
"ThreadData :: threadFunction numSinks ",
|
|
|
|
|
threadData->connector->numSinks);
|
|
|
|
|
for ( unsigned int i = 0; i < threadData->connector->numSinks; ++i ) {
|
|
|
|
|
threadData->connector->reportEvent( 8,
|
|
|
|
|
"ThreadData :: threadFunction thread ", i,
|
|
|
|
|
" ixSink ", threadData->connector->threads[i].ixSink);
|
|
|
|
|
}
|
|
|
|
|
threadData->connector->sinkThread( threadData->ixSink);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
@@ -397,6 +354,10 @@ threadData->connector->reportEvent( 8,
|
|
|
|
|
$Source$
|
|
|
|
|
|
|
|
|
|
$Log$
|
|
|
|
|
Revision 1.2 2002/10/19 13:35:21 darkeye
|
|
|
|
|
when a connection is dropped, DarkIce tries to reconnect, indefinitely
|
|
|
|
|
removed extreme event reporting for thread-related events
|
|
|
|
|
|
|
|
|
|
Revision 1.1 2002/10/19 12:25:47 darkeye
|
|
|
|
|
changed internals so that now each encoding/server connection is
|
|
|
|
|
a separate thread
|
|
|
|
|
|