more graceful reconnect

This commit is contained in:
darkeye 2002-10-20 20:43:17 +00:00
parent 45ae8c1049
commit 16b5017fa5
1 changed files with 42 additions and 46 deletions

View File

@ -158,11 +158,11 @@ MultiThreadedConnector :: open ( void ) throw ( Exception )
} }
} }
/* if could not create all, delete the ones created */ // if could not create all, delete the ones created
if ( i < numSinks ) { if ( i < numSinks ) {
unsigned int j; unsigned int j;
/* signal to stop for all running threads */ // signal to stop for all running threads
pthread_mutex_lock( &mutexProduce); pthread_mutex_lock( &mutexProduce);
running = false; running = false;
pthread_cond_broadcast( &condProduce); pthread_cond_broadcast( &condProduce);
@ -215,28 +215,23 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
dataSize = source->read( dataBuffer, bufSize); dataSize = source->read( dataBuffer, bufSize);
b += dataSize; b += dataSize;
/* check for EOF */ // check for EOF
if ( dataSize == 0 ) { if ( dataSize == 0 ) {
reportEvent( 3, "Connector :: transfer, EOF"); reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
pthread_mutex_unlock( &mutexProduce);
break; break;
} }
for ( i = 0; i < numSinks; ++i ) { for ( i = 0; i < numSinks; ++i ) {
if ( threads[i].accepting ) { threads[i].isDone = false;
threads[i].isDone = false;
}
} }
/* tell sink threads that there is some data available */ // tell sink threads that there is some data available
pthread_cond_broadcast( &condProduce); pthread_cond_broadcast( &condProduce);
/* wait for all sink threads to get done with this data */ // wait for all sink threads to get done with this data
while ( true ) { while ( true ) {
for ( i = 0; i < numSinks; ++i ) { for ( i = 0; i < numSinks && threads[i].isDone; ++i );
if ( threads[i].accepting && !threads[i].isDone ) {
break;
}
}
if ( i == numSinks ) { if ( i == numSinks ) {
break; break;
} }
@ -244,7 +239,7 @@ MultiThreadedConnector :: transfer ( unsigned long bytes,
} }
pthread_mutex_unlock( &mutexProduce); pthread_mutex_unlock( &mutexProduce);
} else { } else {
reportEvent( 3, "Connector :: transfer, can't read"); reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
break; break;
} }
} }
@ -265,49 +260,46 @@ MultiThreadedConnector :: sinkThread( int ixSink )
Sink * sink = sinks[ixSink].get(); Sink * sink = sinks[ixSink].get();
while ( running ) { while ( running ) {
/* wait for some data to become available */ // wait for some data to become available
pthread_mutex_lock( &mutexProduce); pthread_mutex_lock( &mutexProduce);
while ( running && threadData->isDone ) { while ( running && threadData->isDone ) {
pthread_cond_wait( &condProduce, &mutexProduce); pthread_cond_wait( &condProduce, &mutexProduce);
} }
if ( !running ) { if ( !running ) {
pthread_mutex_unlock( &mutexProduce);
break; break;
} }
if ( sink->canWrite( 0, 0) ) { if ( threadData->accepting ) {
try { if ( sink->canWrite( 0, 0) ) {
sink->write( dataBuffer, dataSize); try {
} catch ( Exception & e ) { sink->write( dataBuffer, dataSize);
// something wrong. don't accept more data, try to } catch ( Exception & e ) {
// reopen the sink // something wrong. don't accept more data, try to
threadData->accepting = false; // reopen the sink next time around
pthread_mutex_unlock( &mutexProduce); threadData->accepting = false;
}
do { } else {
try { reportEvent( 4,
sink->close(); "MultiThreadedConnector :: sinkThread can't write ",
sink->open(); ixSink);
} catch ( Exception & e ) { // don't care if we can't write
// don't care, just try and try again
}
// TODO: wait some time every cycle, so as not to
// overload the system
} while ( !sink->isOpen() );
// all OK, accept data, act as if we're done
pthread_mutex_lock( &mutexProduce);
threadData->accepting = true;
threadData->isDone = true;
} }
} else {
reportEvent( 4,
"MultiThreadedConnector :: sinkThread can't write ",
ixSink);
/* don't care if we can't write */
} }
threadData->isDone = true; threadData->isDone = true;
pthread_cond_broadcast( &condProduce); pthread_cond_broadcast( &condProduce);
pthread_mutex_unlock( &mutexProduce); pthread_mutex_unlock( &mutexProduce);
if ( !threadData->accepting ) {
// if we're not accepting, try to reopen the sink
try {
sink->close();
sink->open();
threadData->accepting = sink->isOpen();
} catch ( Exception & e ) {
// don't care, just try and try again
}
}
} }
} }
@ -321,12 +313,13 @@ MultiThreadedConnector :: close ( void ) throw ( Exception )
{ {
unsigned int i; unsigned int i;
/* signal to stop for all threads */ // signal to stop for all threads
pthread_mutex_lock( &mutexProduce); pthread_mutex_lock( &mutexProduce);
running = false; running = false;
pthread_cond_broadcast( &condProduce); pthread_cond_broadcast( &condProduce);
pthread_mutex_unlock( &mutexProduce); pthread_mutex_unlock( &mutexProduce);
// wait for all the threads to finish
for ( i = 0; i < numSinks; ++i ) { for ( i = 0; i < numSinks; ++i ) {
pthread_join( threads[i].thread, 0); pthread_join( threads[i].thread, 0);
} }
@ -354,6 +347,9 @@ MultiThreadedConnector :: ThreadData :: threadFunction( void * param )
$Source$ $Source$
$Log$ $Log$
Revision 1.3 2002/10/20 20:43:17 darkeye
more graceful reconnect
Revision 1.2 2002/10/19 13:35:21 darkeye Revision 1.2 2002/10/19 13:35:21 darkeye
when a connection is dropped, DarkIce tries to reconnect, indefinitely when a connection is dropped, DarkIce tries to reconnect, indefinitely
removed extreme event reporting for thread-related events removed extreme event reporting for thread-related events