changed internals so that now each encoding/server connection is
a separate thread
This commit is contained in:
		
							parent
							
								
									8fd167a179
								
							
						
					
					
						commit
						a83e8c8c7a
					
				|  | @ -0,0 +1,407 @@ | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  | 
 | ||||||
|  |    Copyright (c) 2000 Tyrell Corporation. All rights reserved. | ||||||
|  | 
 | ||||||
|  |    Tyrell DarkIce | ||||||
|  | 
 | ||||||
|  |    File     : MultiThreadedConnector.cpp | ||||||
|  |    Version  : $Revision$ | ||||||
|  |    Author   : $Author$ | ||||||
|  |    Location : $Source$ | ||||||
|  |     | ||||||
|  |    Copyright notice: | ||||||
|  | 
 | ||||||
|  |     This program is free software; you can redistribute it and/or | ||||||
|  |     modify it under the terms of the GNU General Public License   | ||||||
|  |     as published by the Free Software Foundation; either version 2 | ||||||
|  |     of the License, or (at your option) any later version. | ||||||
|  |     | ||||||
|  |     This program is distributed in the hope that it will be useful, | ||||||
|  |     but WITHOUT ANY WARRANTY; without even the implied warranty of  | ||||||
|  |     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  | ||||||
|  |     GNU General Public License for more details. | ||||||
|  |     | ||||||
|  |     You should have received a copy of the GNU General Public License | ||||||
|  |     along with this program; if not, write to the Free Software | ||||||
|  |     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. | ||||||
|  | 
 | ||||||
|  | ------------------------------------------------------------------------------*/ | ||||||
|  | 
 | ||||||
|  | /* ============================================================ include files */ | ||||||
|  | 
 | ||||||
|  | #include "Exception.h" | ||||||
|  | #include "MultiThreadedConnector.h" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ===================================================  local data structures */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ================================================  local constants & macros */ | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  File identity | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | static const char fileid[] = "$Id$"; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ===============================================  local function prototypes */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* =============================================================  module code */ | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Initialize the object | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | void | ||||||
|  | MultiThreadedConnector :: init ( void )                 throw ( Exception ) | ||||||
|  | { | ||||||
|  |     pthread_mutex_init( &mutexProduce, 0); | ||||||
|  |     pthread_cond_init( &condProduce, 0); | ||||||
|  |     pthread_mutex_init( &mutexConsume, 0); | ||||||
|  |     pthread_cond_init( &condConsume, 0); | ||||||
|  |     threads = 0; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  De-initialize the object | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | void | ||||||
|  | MultiThreadedConnector :: strip ( void )                throw ( Exception ) | ||||||
|  | { | ||||||
|  |     if ( threads ) { | ||||||
|  |         delete[] threads; | ||||||
|  |         threads = 0; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pthread_cond_destroy( &condConsume); | ||||||
|  |     pthread_mutex_destroy( &mutexConsume); | ||||||
|  |     pthread_cond_destroy( &condProduce); | ||||||
|  |     pthread_mutex_destroy( &mutexProduce); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Constructor | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | MultiThreadedConnector :: MultiThreadedConnector ( | ||||||
|  |                                 const MultiThreadedConnector &   connector ) | ||||||
|  |                                                             throw ( Exception ) | ||||||
|  |             : Connector( connector) | ||||||
|  | { | ||||||
|  |     mutexProduce    = connector.mutexProduce; | ||||||
|  |     condProduce     = connector.condProduce; | ||||||
|  |     mutexConsume    = connector.mutexConsume; | ||||||
|  |     condConsume     = connector.condConsume; | ||||||
|  | 
 | ||||||
|  |     if ( threads ) { | ||||||
|  |         delete[] threads; | ||||||
|  |     } | ||||||
|  |     threads = new ThreadData[numSinks]; | ||||||
|  |     for ( unsigned int  i = 0; i < numSinks; ++i ) { | ||||||
|  |         threads[i] = connector.threads[i]; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Assignment operator | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | MultiThreadedConnector & | ||||||
|  | MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) | ||||||
|  |                                                             throw ( Exception ) | ||||||
|  | { | ||||||
|  |     if ( this != &connector ) { | ||||||
|  |         Connector::operator=( connector); | ||||||
|  | 
 | ||||||
|  |         mutexProduce    = connector.mutexProduce; | ||||||
|  |         condProduce     = connector.condProduce; | ||||||
|  |         mutexConsume    = connector.mutexConsume; | ||||||
|  |         condConsume     = connector.condConsume; | ||||||
|  | 
 | ||||||
|  |         if ( threads ) { | ||||||
|  |             delete[] threads; | ||||||
|  |         } | ||||||
|  |         threads = new ThreadData[numSinks]; | ||||||
|  |         for ( unsigned int  i = 0; i < numSinks; ++i ) { | ||||||
|  |             threads[i] = connector.threads[i]; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     return *this; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Open the source and all the sinks if needed | ||||||
|  |  *  Create the sink threads | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | bool | ||||||
|  | MultiThreadedConnector :: open ( void )                     throw ( Exception ) | ||||||
|  | { | ||||||
|  |     unsigned int        i; | ||||||
|  | 
 | ||||||
|  |     if ( !Connector::open() ) { | ||||||
|  |         return false; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     running = true; | ||||||
|  | 
 | ||||||
|  |     pthread_attr_init( &threadAttr); | ||||||
|  |     pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); | ||||||
|  | 
 | ||||||
|  |     threads = new ThreadData[numSinks]; | ||||||
|  |     for ( i = 0; i < numSinks; ++i ) { | ||||||
|  |  reportEvent( 8, "Connector :: open starting thread ", i); | ||||||
|  |         ThreadData    * threadData = threads + i; | ||||||
|  | 
 | ||||||
|  |         threadData->connector = this; | ||||||
|  |         threadData->ixSink    = i; | ||||||
|  |         threadData->isDone    = true; | ||||||
|  |         if ( pthread_create( &(threadData->thread), | ||||||
|  |                              &threadAttr, | ||||||
|  |                              ThreadData::threadFunction, | ||||||
|  |                              threadData ) ) { | ||||||
|  |  reportEvent( 8, "Connector :: open couldn't start thread ", i); | ||||||
|  |             break; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /* if could not create all, delete the ones created */ | ||||||
|  |     if ( i < numSinks ) { | ||||||
|  |         unsigned int    j; | ||||||
|  | 
 | ||||||
|  |         /* signal to stop for all running threads */ | ||||||
|  |         pthread_mutex_lock( &mutexProduce); | ||||||
|  |         running = false; | ||||||
|  |         pthread_cond_broadcast( &condProduce); | ||||||
|  |         pthread_mutex_unlock( &mutexProduce); | ||||||
|  | 
 | ||||||
|  |         for ( j = 0; j < i; ++j ) { | ||||||
|  |             pthread_join( threads[j].thread, 0); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         delete[] threads; | ||||||
|  |         threads = 0; | ||||||
|  | 
 | ||||||
|  |         return false; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     return true; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Transfer some data from the source to the sink | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | unsigned int | ||||||
|  | MultiThreadedConnector :: transfer ( unsigned long       bytes, | ||||||
|  |                                      unsigned int        bufSize, | ||||||
|  |                                      unsigned int        sec, | ||||||
|  |                                      unsigned int        usec ) | ||||||
|  |                                                             throw ( Exception ) | ||||||
|  | {    | ||||||
|  |     unsigned int        b; | ||||||
|  | 
 | ||||||
|  |     if ( numSinks == 0 ) { | ||||||
|  |         return 0; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     if ( bufSize == 0 ) { | ||||||
|  |         return 0; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     dataBuffer   = new unsigned char[bufSize]; | ||||||
|  |     dataSize     = 0; | ||||||
|  | 
 | ||||||
|  |     reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); | ||||||
|  | 
 | ||||||
|  |     for ( b = 0; !bytes || b < bytes; ) { | ||||||
|  |         if ( source->canRead( sec, usec) ) { | ||||||
|  |             unsigned int        i; | ||||||
|  | 
 | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexProduce, enter lock"); | ||||||
|  |             pthread_mutex_lock( &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexProduce, lock"); | ||||||
|  |             dataSize = source->read( dataBuffer, bufSize); | ||||||
|  | reportEvent( 8, "Connector :: transfer, read", dataSize); | ||||||
|  |             b       += dataSize; | ||||||
|  | 
 | ||||||
|  |             /* check for EOF */ | ||||||
|  |             if ( dataSize == 0 ) { | ||||||
|  |                 reportEvent( 3, "Connector :: transfer, EOF"); | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             for ( i = 0; i < numSinks; ++i ) { | ||||||
|  |                 threads[i].isDone = false; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             /* tell sink threads that there is some data available */ | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexProduce, broadcast"); | ||||||
|  |             pthread_cond_broadcast( &condProduce); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexProduce, broadcasted"); | ||||||
|  | 
 | ||||||
|  |             /* wait for all sink threads to get done with this data */ | ||||||
|  |             while ( true ) { | ||||||
|  |                 for ( i = 0; i < numSinks && threads[i].isDone; ++i ); | ||||||
|  |                 if ( i == numSinks ) { | ||||||
|  |                     break; | ||||||
|  |                 } | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); | ||||||
|  |                 pthread_cond_wait( &condProduce, &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); | ||||||
|  |             } | ||||||
|  |             pthread_mutex_unlock( &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexProduce, exit lock"); | ||||||
|  | 
 | ||||||
|  |             /* wait for all sink threads to process the presented data */ | ||||||
|  | /*
 | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, enter lock"); | ||||||
|  |             pthread_mutex_lock( &mutexConsume); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, lock"); | ||||||
|  |             while ( true ) { | ||||||
|  |                 for ( i = 0; i < numSinks && threads[i].isDone; ++i ); | ||||||
|  |                 if ( i == numSinks ) { | ||||||
|  |                     break; | ||||||
|  |                 } | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); | ||||||
|  |                 pthread_cond_wait( &condConsume, &mutexConsume); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); | ||||||
|  |             } | ||||||
|  |             consumeCount = 0; | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, unlock"); | ||||||
|  |             pthread_mutex_unlock( &mutexConsume); | ||||||
|  | reportEvent( 8, "Connector :: transfer, mutexConsume, exit lock"); | ||||||
|  | */ | ||||||
|  |         } else { | ||||||
|  |             reportEvent( 3, "Connector :: transfer, can't read"); | ||||||
|  |             break; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     delete[] dataBuffer; | ||||||
|  |     return b; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  The function for each thread. | ||||||
|  |  *  Read the presented data | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | void | ||||||
|  | MultiThreadedConnector :: sinkThread( int       ixSink ) | ||||||
|  | { | ||||||
|  |     ThreadData    * threadData = &threads[ixSink]; | ||||||
|  |     Sink          * sink       = sinks[ixSink].get(); | ||||||
|  | 
 | ||||||
|  |     while ( running ) { | ||||||
|  |         /* wait for some data to become available */ | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, enter lock ", ixSink); | ||||||
|  |         pthread_mutex_lock( &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, lock ", ixSink); | ||||||
|  |         while ( running && threadData->isDone ) { | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, wait ", ixSink); | ||||||
|  |             pthread_cond_wait( &condProduce, &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, wake ", ixSink); | ||||||
|  |         } | ||||||
|  |         if ( !running ) { | ||||||
|  |             break; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if ( sink->canWrite( 0, 0) ) { | ||||||
|  |             unsigned int        written; | ||||||
|  |             written = sink->write( dataBuffer, dataSize); | ||||||
|  | /*            
 | ||||||
|  |             unsigned int        written; | ||||||
|  |             written = sink->write( dataBuffer, dataSize); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, written ", written); | ||||||
|  | */ | ||||||
|  |         } else { | ||||||
|  | reportEvent( 3, "Connector :: sinkThread MAYDAY MAYDAY can't write ", ixSink); | ||||||
|  |             /* don't care if we can't write */ | ||||||
|  |         } | ||||||
|  |         threadData->isDone = true; | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, condProduce, broadcast ", ixSink); | ||||||
|  |         pthread_cond_broadcast( &condProduce); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, condProduce, broadcasted ", ixSink); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, unlock", ixSink); | ||||||
|  |         pthread_mutex_unlock( &mutexProduce); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexProduce, exit lock", ixSink); | ||||||
|  | 
 | ||||||
|  |         /* signal that we have read the data */ | ||||||
|  | /*
 | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexConsume, enter lock"); | ||||||
|  |         pthread_mutex_lock( &mutexConsume); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexConsume, lock"); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexConsume, signal"); | ||||||
|  |         pthread_cond_signal( &condConsume); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexConsume, unlock"); | ||||||
|  |         pthread_mutex_unlock( &mutexConsume); | ||||||
|  | reportEvent( 8, "Connector :: sinkThread, mutexConsume, exit lock"); | ||||||
|  | */ | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  Stop the treads | ||||||
|  |  *  Close the source and all the sinks if needed | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | void | ||||||
|  | MultiThreadedConnector :: close ( void )                    throw ( Exception ) | ||||||
|  | { | ||||||
|  |     unsigned int    i; | ||||||
|  | 
 | ||||||
|  |     /* signal to stop for all threads */ | ||||||
|  |     pthread_mutex_lock( &mutexProduce); | ||||||
|  |     running = false; | ||||||
|  |     pthread_cond_broadcast( &condProduce); | ||||||
|  |     pthread_mutex_unlock( &mutexProduce); | ||||||
|  | 
 | ||||||
|  |     for ( i = 0; i < numSinks; ++i ) { | ||||||
|  |         pthread_join( threads[i].thread, 0); | ||||||
|  |     } | ||||||
|  |     pthread_attr_destroy( &threadAttr); | ||||||
|  | 
 | ||||||
|  |     Connector::close(); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |  *  The thread function | ||||||
|  |  *----------------------------------------------------------------------------*/ | ||||||
|  | void * | ||||||
|  | MultiThreadedConnector :: ThreadData :: threadFunction( void  * param ) | ||||||
|  | { | ||||||
|  |     ThreadData     * threadData = (ThreadData*) param; | ||||||
|  | threadData->connector->reportEvent( 8, | ||||||
|  |                   "ThreadData :: threadFunction thread starts ", | ||||||
|  |                  (threadData->ixSink) ); | ||||||
|  | threadData->connector->reportEvent( 8, | ||||||
|  |                   "ThreadData :: threadFunction numSinks ", | ||||||
|  |                   threadData->connector->numSinks); | ||||||
|  | for ( unsigned int i = 0; i < threadData->connector->numSinks; ++i ) { | ||||||
|  | threadData->connector->reportEvent( 8, | ||||||
|  |                   "ThreadData :: threadFunction thread ", i, | ||||||
|  |                   " ixSink ", threadData->connector->threads[i].ixSink); | ||||||
|  | } | ||||||
|  |     threadData->connector->sinkThread( threadData->ixSink); | ||||||
|  | 
 | ||||||
|  |     return 0; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |   | ||||||
|  |   $Source$ | ||||||
|  | 
 | ||||||
|  |   $Log$ | ||||||
|  |   Revision 1.1  2002/10/19 12:25:47  darkeye | ||||||
|  |   changed internals so that now each encoding/server connection is | ||||||
|  |   a separate thread | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |    | ||||||
|  | ------------------------------------------------------------------------------*/ | ||||||
|  | 
 | ||||||
|  | @ -0,0 +1,342 @@ | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  | 
 | ||||||
|  |    Copyright (c) 2000 Tyrell Corporation. All rights reserved. | ||||||
|  | 
 | ||||||
|  |    Tyrell DarkIce | ||||||
|  | 
 | ||||||
|  |    File     : MultiThreadedConnector.h | ||||||
|  |    Version  : $Revision$ | ||||||
|  |    Author   : $Author$ | ||||||
|  |    Location : $Source$ | ||||||
|  |     | ||||||
|  |    Copyright notice: | ||||||
|  | 
 | ||||||
|  |     This program is free software; you can redistribute it and/or | ||||||
|  |     modify it under the terms of the GNU General Public License   | ||||||
|  |     as published by the Free Software Foundation; either version 2 | ||||||
|  |     of the License, or (at your option) any later version. | ||||||
|  |     | ||||||
|  |     This program is distributed in the hope that it will be useful, | ||||||
|  |     but WITHOUT ANY WARRANTY; without even the implied warranty of  | ||||||
|  |     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  | ||||||
|  |     GNU General Public License for more details. | ||||||
|  |     | ||||||
|  |     You should have received a copy of the GNU General Public License | ||||||
|  |     along with this program; if not, write to the Free Software | ||||||
|  |     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. | ||||||
|  | 
 | ||||||
|  | ------------------------------------------------------------------------------*/ | ||||||
|  | #ifndef MULTI_THREADED_CONNECTOR_H | ||||||
|  | #define MULTI_THREADED_CONNECTOR_H | ||||||
|  | 
 | ||||||
|  | #ifndef __cplusplus | ||||||
|  | #error This is a C++ include file | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ============================================================ include files */ | ||||||
|  | 
 | ||||||
|  | #ifdef HAVE_CONFIG_H | ||||||
|  | #include "config.h" | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
|  | #ifdef HAVE_PTHREAD_H | ||||||
|  | #include <pthread.h> | ||||||
|  | #else | ||||||
|  | #error need pthread.h | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
|  | #include "Referable.h" | ||||||
|  | #include "Ref.h" | ||||||
|  | #include "Reporter.h" | ||||||
|  | #include "Source.h" | ||||||
|  | #include "Sink.h" | ||||||
|  | #include "Connector.h" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ================================================================ constants */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* =================================================================== macros */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* =============================================================== data types */ | ||||||
|  | 
 | ||||||
|  | /**
 | ||||||
|  |  *  Connects a source to one or more sinks, using a multi-threaded | ||||||
|  |  *  producer - consumer approach. | ||||||
|  |  * | ||||||
|  |  *  @author  $Author$ | ||||||
|  |  *  @version $Revision$ | ||||||
|  |  */ | ||||||
|  | class MultiThreadedConnector : public virtual Connector | ||||||
|  | { | ||||||
|  |     private: | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Helper class to collect information for starting threads. | ||||||
|  |          */ | ||||||
|  |         class ThreadData | ||||||
|  |         { | ||||||
|  |             public: | ||||||
|  |                 /**
 | ||||||
|  |                  *  The connector starting the thread | ||||||
|  |                  */ | ||||||
|  |                 MultiThreadedConnector    * connector; | ||||||
|  | 
 | ||||||
|  |                 /**
 | ||||||
|  |                  *  The index of the sink this thread writes to. | ||||||
|  |                  */ | ||||||
|  |                 unsigned int                ixSink; | ||||||
|  | 
 | ||||||
|  |                 /**
 | ||||||
|  |                  *  The POSIX thread itself. | ||||||
|  |                  */ | ||||||
|  |                 pthread_t                   thread; | ||||||
|  | 
 | ||||||
|  |                 /**
 | ||||||
|  |                  *  Marks if the thread has processed the last batch | ||||||
|  |                  *  of data. | ||||||
|  |                  */ | ||||||
|  |                 bool                        isDone; | ||||||
|  | 
 | ||||||
|  |                 /**
 | ||||||
|  |                  *  Default constructor. | ||||||
|  |                  */ | ||||||
|  |                 inline | ||||||
|  |                 ThreadData() | ||||||
|  |                 { | ||||||
|  |                     this->connector = 0; | ||||||
|  |                     this->ixSink    = 0; | ||||||
|  |                     this->thread    = 0; | ||||||
|  |                     this->isDone    = false; | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 /**
 | ||||||
|  |                  *  The thread function. | ||||||
|  |                  * | ||||||
|  |                  *  @param param thread parameter, a pointer to a | ||||||
|  |                  *               ThreadData | ||||||
|  |                  *  @return nothing | ||||||
|  |                  */ | ||||||
|  |                 static void * | ||||||
|  |                 threadFunction( void      * param ); | ||||||
|  |         }; | ||||||
|  |          | ||||||
|  |         /**
 | ||||||
|  |          *  The mutex of this object. | ||||||
|  |          */ | ||||||
|  |         pthread_mutex_t         mutexProduce; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The conditional variable for presenting new data. | ||||||
|  |          */ | ||||||
|  |         pthread_cond_t          condProduce; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The mutex of this object. | ||||||
|  |          */ | ||||||
|  |         pthread_mutex_t         mutexConsume; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The conditional variable for consuming data. | ||||||
|  |          */ | ||||||
|  |         pthread_cond_t          condConsume; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The thread attributes. | ||||||
|  |          */ | ||||||
|  |         pthread_attr_t          threadAttr; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The threads for the sinks. | ||||||
|  |          */ | ||||||
|  |         ThreadData            * threads; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Signal if we're running or not, so the threads no if to stop. | ||||||
|  |          */ | ||||||
|  |         bool                    running; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The buffer of information presented to each thread. | ||||||
|  |          */ | ||||||
|  |         unsigned char         * dataBuffer; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  The amount of information presented to each thread. | ||||||
|  |          */ | ||||||
|  |         unsigned int            dataSize; | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Initialize the object. | ||||||
|  |          * | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         void | ||||||
|  |         init ( void )                               throw ( Exception ); | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  De-initialize the object. | ||||||
|  |          * | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         void | ||||||
|  |         strip ( void )                              throw ( Exception ); | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  This is the function for each thread. | ||||||
|  |          *  This function has to return fast | ||||||
|  |          * | ||||||
|  |          *  @param ixSink the index of the sink this thread works on. | ||||||
|  |          */ | ||||||
|  |         void | ||||||
|  |         sinkThread( int     ixSink ); | ||||||
|  | 
 | ||||||
|  |     protected: | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Default constructor. Always throws an Exception. | ||||||
|  |          * | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         inline | ||||||
|  |         MultiThreadedConnector ( void )                 throw ( Exception ) | ||||||
|  |         { | ||||||
|  |             throw Exception( __FILE__, __LINE__); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |     public: | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Constructor based on a Source. | ||||||
|  |          * | ||||||
|  |          *  @param source the source to connect to the sinks. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         inline | ||||||
|  |         MultiThreadedConnector (    Source        * source ) | ||||||
|  |                                                             throw ( Exception ) | ||||||
|  |                     : Connector( source ) | ||||||
|  |         { | ||||||
|  |             init(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Constructor based on a Source and a Sink. | ||||||
|  |          * | ||||||
|  |          *  @param source the source to connect to the sinks. | ||||||
|  |          *  @param sink a sink to connect to the source. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         inline | ||||||
|  |         MultiThreadedConnector ( Source            * source, | ||||||
|  |                                  Sink              * sink ) | ||||||
|  |                                                             throw ( Exception ) | ||||||
|  |                     : Connector( source, sink) | ||||||
|  |         { | ||||||
|  |             init(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Copy constructor. | ||||||
|  |          * | ||||||
|  |          *  @param connector the object to copy. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         MultiThreadedConnector ( const MultiThreadedConnector &   connector ) | ||||||
|  |                                                             throw ( Exception ); | ||||||
|  |          | ||||||
|  |         /**
 | ||||||
|  |          *  Destructor. | ||||||
|  |          * | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         inline virtual | ||||||
|  |         ~MultiThreadedConnector( void )                     throw ( Exception ) | ||||||
|  |         { | ||||||
|  |             strip(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Assignment operator. | ||||||
|  |          * | ||||||
|  |          *  @param connector the object to assign to this one. | ||||||
|  |          *  @return a reference to this object. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         virtual MultiThreadedConnector & | ||||||
|  |         operator= ( const MultiThreadedConnector &   connector ) | ||||||
|  |                                                             throw ( Exception ); | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Open the connector. Opens the Source and the Sinks if necessary. | ||||||
|  |          * | ||||||
|  |          *  @return true if opening was successful, false otherwise. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         virtual bool | ||||||
|  |         open ( void )                                   throw ( Exception ); | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Transfer a given amount of data from the Source to all the | ||||||
|  |          *  Sinks attached. | ||||||
|  |          *  If an attached Sink closes or encounteres an error during the | ||||||
|  |          *  process, it is detached and the function carries on with the | ||||||
|  |          *  rest of the Sinks. If no Sinks remain, or an error is encountered | ||||||
|  |          *  with the Source, the function returns prematurely. | ||||||
|  |          * | ||||||
|  |          *  @param bytes the amount of data to transfer, in bytes. | ||||||
|  |          *               If 0, transfer forever. | ||||||
|  |          *  @param bufSize the size of the buffer to use for transfering. | ||||||
|  |          *                 This amount of data is read from the Source and | ||||||
|  |          *                 written to each Sink on each turn. | ||||||
|  |          *  @param sec the number of seconds to wait for the Source to have | ||||||
|  |          *             data available in each turn, and the number of seconds | ||||||
|  |          *             to wait for the Sinks to accept data. | ||||||
|  |          *  @param usec the number of micros seconds to wait for the Source to | ||||||
|  |          *             have data available in each turn, and the number of | ||||||
|  |          *             micro seconds to wait for the Sinks to accept data. | ||||||
|  |          *  @return the number of bytes read from the Source. | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         virtual unsigned int | ||||||
|  |         transfer (  unsigned long       bytes, | ||||||
|  |                     unsigned int        bufSize, | ||||||
|  |                     unsigned int        sec, | ||||||
|  |                     unsigned int        usec )          throw ( Exception ); | ||||||
|  | 
 | ||||||
|  |         /**
 | ||||||
|  |          *  Close the Connector. The Source and all Sinks are closed. | ||||||
|  |          * | ||||||
|  |          *  @exception Exception | ||||||
|  |          */ | ||||||
|  |         virtual void | ||||||
|  |         close ( void )                                  throw ( Exception ); | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ================================================= external data structures */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /* ====================================================== function prototypes */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | #endif  /* MULTI_THREADED_CONNECTOR_H */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /*------------------------------------------------------------------------------
 | ||||||
|  |   | ||||||
|  |   $Source$ | ||||||
|  | 
 | ||||||
|  |   $Log$ | ||||||
|  |   Revision 1.1  2002/10/19 12:25:47  darkeye | ||||||
|  |   changed internals so that now each encoding/server connection is | ||||||
|  |   a separate thread | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |    | ||||||
|  | ------------------------------------------------------------------------------*/ | ||||||
|  | 
 | ||||||
		Loading…
	
		Reference in New Issue