diff --git a/darkice/trunk/src/MultiThreadedConnector.cpp b/darkice/trunk/src/MultiThreadedConnector.cpp new file mode 100644 index 0000000..cf9f4fd --- /dev/null +++ b/darkice/trunk/src/MultiThreadedConnector.cpp @@ -0,0 +1,407 @@ +/*------------------------------------------------------------------------------ + + Copyright (c) 2000 Tyrell Corporation. All rights reserved. + + Tyrell DarkIce + + File : MultiThreadedConnector.cpp + Version : $Revision$ + Author : $Author$ + Location : $Source$ + + Copyright notice: + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +------------------------------------------------------------------------------*/ + +/* ============================================================ include files */ + +#include "Exception.h" +#include "MultiThreadedConnector.h" + + +/* =================================================== local data structures */ + + +/* ================================================ local constants & macros */ + +/*------------------------------------------------------------------------------ + * File identity + *----------------------------------------------------------------------------*/ +static const char fileid[] = "$Id$"; + + +/* =============================================== local function prototypes */ + + +/* ============================================================= module code */ + +/*------------------------------------------------------------------------------ + * Initialize the object + *----------------------------------------------------------------------------*/ +void +MultiThreadedConnector :: init ( void ) throw ( Exception ) +{ + pthread_mutex_init( &mutexProduce, 0); + pthread_cond_init( &condProduce, 0); + pthread_mutex_init( &mutexConsume, 0); + pthread_cond_init( &condConsume, 0); + threads = 0; +} + + +/*------------------------------------------------------------------------------ + * De-initialize the object + *----------------------------------------------------------------------------*/ +void +MultiThreadedConnector :: strip ( void ) throw ( Exception ) +{ + if ( threads ) { + delete[] threads; + threads = 0; + } + + pthread_cond_destroy( &condConsume); + pthread_mutex_destroy( &mutexConsume); + pthread_cond_destroy( &condProduce); + pthread_mutex_destroy( &mutexProduce); +} + + +/*------------------------------------------------------------------------------ + * Constructor + *----------------------------------------------------------------------------*/ +MultiThreadedConnector :: MultiThreadedConnector ( + const MultiThreadedConnector & connector ) + throw ( Exception ) + : Connector( connector) +{ + mutexProduce = connector.mutexProduce; + condProduce = connector.condProduce; + mutexConsume = connector.mutexConsume; + condConsume = connector.condConsume; + + if ( threads ) { + delete[] threads; + } + threads = new ThreadData[numSinks]; + for ( unsigned int i = 0; i < numSinks; ++i ) { + threads[i] = connector.threads[i]; + } +} + + +/*------------------------------------------------------------------------------ + * Assignment operator + *----------------------------------------------------------------------------*/ +MultiThreadedConnector & +MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector ) + throw ( Exception ) +{ + if ( this != &connector ) { + Connector::operator=( connector); + + mutexProduce = connector.mutexProduce; + condProduce = connector.condProduce; + mutexConsume = connector.mutexConsume; + condConsume = connector.condConsume; + + if ( threads ) { + delete[] threads; + } + threads = new ThreadData[numSinks]; + for ( unsigned int i = 0; i < numSinks; ++i ) { + threads[i] = connector.threads[i]; + } + } + + return *this; +} + + +/*------------------------------------------------------------------------------ + * Open the source and all the sinks if needed + * Create the sink threads + *----------------------------------------------------------------------------*/ +bool +MultiThreadedConnector :: open ( void ) throw ( Exception ) +{ + unsigned int i; + + if ( !Connector::open() ) { + return false; + } + + running = true; + + pthread_attr_init( &threadAttr); + pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE); + + threads = new ThreadData[numSinks]; + for ( i = 0; i < numSinks; ++i ) { + reportEvent( 8, "Connector :: open starting thread ", i); + ThreadData * threadData = threads + i; + + threadData->connector = this; + threadData->ixSink = i; + threadData->isDone = true; + if ( pthread_create( &(threadData->thread), + &threadAttr, + ThreadData::threadFunction, + threadData ) ) { + reportEvent( 8, "Connector :: open couldn't start thread ", i); + break; + } + } + + /* if could not create all, delete the ones created */ + if ( i < numSinks ) { + unsigned int j; + + /* signal to stop for all running threads */ + pthread_mutex_lock( &mutexProduce); + running = false; + pthread_cond_broadcast( &condProduce); + pthread_mutex_unlock( &mutexProduce); + + for ( j = 0; j < i; ++j ) { + pthread_join( threads[j].thread, 0); + } + + delete[] threads; + threads = 0; + + return false; + } + + return true; +} + + +/*------------------------------------------------------------------------------ + * Transfer some data from the source to the sink + *----------------------------------------------------------------------------*/ +unsigned int +MultiThreadedConnector :: transfer ( unsigned long bytes, + unsigned int bufSize, + unsigned int sec, + unsigned int usec ) + throw ( Exception ) +{ + unsigned int b; + + if ( numSinks == 0 ) { + return 0; + } + + if ( bufSize == 0 ) { + return 0; + } + + dataBuffer = new unsigned char[bufSize]; + dataSize = 0; + + reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes); + + for ( b = 0; !bytes || b < bytes; ) { + if ( source->canRead( sec, usec) ) { + unsigned int i; + +reportEvent( 8, "Connector :: transfer, mutexProduce, enter lock"); + pthread_mutex_lock( &mutexProduce); +reportEvent( 8, "Connector :: transfer, mutexProduce, lock"); + dataSize = source->read( dataBuffer, bufSize); +reportEvent( 8, "Connector :: transfer, read", dataSize); + b += dataSize; + + /* check for EOF */ + if ( dataSize == 0 ) { + reportEvent( 3, "Connector :: transfer, EOF"); + break; + } + + for ( i = 0; i < numSinks; ++i ) { + threads[i].isDone = false; + } + + /* tell sink threads that there is some data available */ +reportEvent( 8, "Connector :: transfer, mutexProduce, broadcast"); + pthread_cond_broadcast( &condProduce); +reportEvent( 8, "Connector :: transfer, mutexProduce, broadcasted"); + + /* wait for all sink threads to get done with this data */ + while ( true ) { + for ( i = 0; i < numSinks && threads[i].isDone; ++i ); + if ( i == numSinks ) { + break; + } +reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); + pthread_cond_wait( &condProduce, &mutexProduce); +reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); + } + pthread_mutex_unlock( &mutexProduce); +reportEvent( 8, "Connector :: transfer, mutexProduce, exit lock"); + + /* wait for all sink threads to process the presented data */ +/* +reportEvent( 8, "Connector :: transfer, mutexConsume, enter lock"); + pthread_mutex_lock( &mutexConsume); +reportEvent( 8, "Connector :: transfer, mutexConsume, lock"); + while ( true ) { + for ( i = 0; i < numSinks && threads[i].isDone; ++i ); + if ( i == numSinks ) { + break; + } +reportEvent( 8, "Connector :: transfer, mutexConsume, waiting"); + pthread_cond_wait( &condConsume, &mutexConsume); +reportEvent( 8, "Connector :: transfer, mutexConsume, waking"); + } + consumeCount = 0; +reportEvent( 8, "Connector :: transfer, mutexConsume, unlock"); + pthread_mutex_unlock( &mutexConsume); +reportEvent( 8, "Connector :: transfer, mutexConsume, exit lock"); +*/ + } else { + reportEvent( 3, "Connector :: transfer, can't read"); + break; + } + } + + delete[] dataBuffer; + return b; +} + + +/*------------------------------------------------------------------------------ + * The function for each thread. + * Read the presented data + *----------------------------------------------------------------------------*/ +void +MultiThreadedConnector :: sinkThread( int ixSink ) +{ + ThreadData * threadData = &threads[ixSink]; + Sink * sink = sinks[ixSink].get(); + + while ( running ) { + /* wait for some data to become available */ +reportEvent( 8, "Connector :: sinkThread, mutexProduce, enter lock ", ixSink); + pthread_mutex_lock( &mutexProduce); +reportEvent( 8, "Connector :: sinkThread, mutexProduce, lock ", ixSink); + while ( running && threadData->isDone ) { +reportEvent( 8, "Connector :: sinkThread, mutexProduce, wait ", ixSink); + pthread_cond_wait( &condProduce, &mutexProduce); +reportEvent( 8, "Connector :: sinkThread, mutexProduce, wake ", ixSink); + } + if ( !running ) { + break; + } + + if ( sink->canWrite( 0, 0) ) { + unsigned int written; + written = sink->write( dataBuffer, dataSize); +/* + unsigned int written; + written = sink->write( dataBuffer, dataSize); +reportEvent( 8, "Connector :: sinkThread, written ", written); +*/ + } else { +reportEvent( 3, "Connector :: sinkThread MAYDAY MAYDAY can't write ", ixSink); + /* don't care if we can't write */ + } + threadData->isDone = true; +reportEvent( 8, "Connector :: sinkThread, condProduce, broadcast ", ixSink); + pthread_cond_broadcast( &condProduce); +reportEvent( 8, "Connector :: sinkThread, condProduce, broadcasted ", ixSink); +reportEvent( 8, "Connector :: sinkThread, mutexProduce, unlock", ixSink); + pthread_mutex_unlock( &mutexProduce); +reportEvent( 8, "Connector :: sinkThread, mutexProduce, exit lock", ixSink); + + /* signal that we have read the data */ +/* +reportEvent( 8, "Connector :: sinkThread, mutexConsume, enter lock"); + pthread_mutex_lock( &mutexConsume); +reportEvent( 8, "Connector :: sinkThread, mutexConsume, lock"); +reportEvent( 8, "Connector :: sinkThread, mutexConsume, signal"); + pthread_cond_signal( &condConsume); +reportEvent( 8, "Connector :: sinkThread, mutexConsume, unlock"); + pthread_mutex_unlock( &mutexConsume); +reportEvent( 8, "Connector :: sinkThread, mutexConsume, exit lock"); +*/ + } +} + + +/*------------------------------------------------------------------------------ + * Stop the treads + * Close the source and all the sinks if needed + *----------------------------------------------------------------------------*/ +void +MultiThreadedConnector :: close ( void ) throw ( Exception ) +{ + unsigned int i; + + /* signal to stop for all threads */ + pthread_mutex_lock( &mutexProduce); + running = false; + pthread_cond_broadcast( &condProduce); + pthread_mutex_unlock( &mutexProduce); + + for ( i = 0; i < numSinks; ++i ) { + pthread_join( threads[i].thread, 0); + } + pthread_attr_destroy( &threadAttr); + + Connector::close(); +} + + +/*------------------------------------------------------------------------------ + * The thread function + *----------------------------------------------------------------------------*/ +void * +MultiThreadedConnector :: ThreadData :: threadFunction( void * param ) +{ + ThreadData * threadData = (ThreadData*) param; +threadData->connector->reportEvent( 8, + "ThreadData :: threadFunction thread starts ", + (threadData->ixSink) ); +threadData->connector->reportEvent( 8, + "ThreadData :: threadFunction numSinks ", + threadData->connector->numSinks); +for ( unsigned int i = 0; i < threadData->connector->numSinks; ++i ) { +threadData->connector->reportEvent( 8, + "ThreadData :: threadFunction thread ", i, + " ixSink ", threadData->connector->threads[i].ixSink); +} + threadData->connector->sinkThread( threadData->ixSink); + + return 0; +} + + +/*------------------------------------------------------------------------------ + + $Source$ + + $Log$ + Revision 1.1 2002/10/19 12:25:47 darkeye + changed internals so that now each encoding/server connection is + a separate thread + + + +------------------------------------------------------------------------------*/ + diff --git a/darkice/trunk/src/MultiThreadedConnector.h b/darkice/trunk/src/MultiThreadedConnector.h new file mode 100644 index 0000000..b234406 --- /dev/null +++ b/darkice/trunk/src/MultiThreadedConnector.h @@ -0,0 +1,342 @@ +/*------------------------------------------------------------------------------ + + Copyright (c) 2000 Tyrell Corporation. All rights reserved. + + Tyrell DarkIce + + File : MultiThreadedConnector.h + Version : $Revision$ + Author : $Author$ + Location : $Source$ + + Copyright notice: + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +------------------------------------------------------------------------------*/ +#ifndef MULTI_THREADED_CONNECTOR_H +#define MULTI_THREADED_CONNECTOR_H + +#ifndef __cplusplus +#error This is a C++ include file +#endif + + +/* ============================================================ include files */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_PTHREAD_H +#include +#else +#error need pthread.h +#endif + +#include "Referable.h" +#include "Ref.h" +#include "Reporter.h" +#include "Source.h" +#include "Sink.h" +#include "Connector.h" + + +/* ================================================================ constants */ + + +/* =================================================================== macros */ + + +/* =============================================================== data types */ + +/** + * Connects a source to one or more sinks, using a multi-threaded + * producer - consumer approach. + * + * @author $Author$ + * @version $Revision$ + */ +class MultiThreadedConnector : public virtual Connector +{ + private: + + /** + * Helper class to collect information for starting threads. + */ + class ThreadData + { + public: + /** + * The connector starting the thread + */ + MultiThreadedConnector * connector; + + /** + * The index of the sink this thread writes to. + */ + unsigned int ixSink; + + /** + * The POSIX thread itself. + */ + pthread_t thread; + + /** + * Marks if the thread has processed the last batch + * of data. + */ + bool isDone; + + /** + * Default constructor. + */ + inline + ThreadData() + { + this->connector = 0; + this->ixSink = 0; + this->thread = 0; + this->isDone = false; + } + + /** + * The thread function. + * + * @param param thread parameter, a pointer to a + * ThreadData + * @return nothing + */ + static void * + threadFunction( void * param ); + }; + + /** + * The mutex of this object. + */ + pthread_mutex_t mutexProduce; + + /** + * The conditional variable for presenting new data. + */ + pthread_cond_t condProduce; + + /** + * The mutex of this object. + */ + pthread_mutex_t mutexConsume; + + /** + * The conditional variable for consuming data. + */ + pthread_cond_t condConsume; + + /** + * The thread attributes. + */ + pthread_attr_t threadAttr; + + /** + * The threads for the sinks. + */ + ThreadData * threads; + + /** + * Signal if we're running or not, so the threads no if to stop. + */ + bool running; + + /** + * The buffer of information presented to each thread. + */ + unsigned char * dataBuffer; + + /** + * The amount of information presented to each thread. + */ + unsigned int dataSize; + + /** + * Initialize the object. + * + * @exception Exception + */ + void + init ( void ) throw ( Exception ); + + /** + * De-initialize the object. + * + * @exception Exception + */ + void + strip ( void ) throw ( Exception ); + + /** + * This is the function for each thread. + * This function has to return fast + * + * @param ixSink the index of the sink this thread works on. + */ + void + sinkThread( int ixSink ); + + protected: + + /** + * Default constructor. Always throws an Exception. + * + * @exception Exception + */ + inline + MultiThreadedConnector ( void ) throw ( Exception ) + { + throw Exception( __FILE__, __LINE__); + } + + + public: + + /** + * Constructor based on a Source. + * + * @param source the source to connect to the sinks. + * @exception Exception + */ + inline + MultiThreadedConnector ( Source * source ) + throw ( Exception ) + : Connector( source ) + { + init(); + } + + /** + * Constructor based on a Source and a Sink. + * + * @param source the source to connect to the sinks. + * @param sink a sink to connect to the source. + * @exception Exception + */ + inline + MultiThreadedConnector ( Source * source, + Sink * sink ) + throw ( Exception ) + : Connector( source, sink) + { + init(); + } + + /** + * Copy constructor. + * + * @param connector the object to copy. + * @exception Exception + */ + MultiThreadedConnector ( const MultiThreadedConnector & connector ) + throw ( Exception ); + + /** + * Destructor. + * + * @exception Exception + */ + inline virtual + ~MultiThreadedConnector( void ) throw ( Exception ) + { + strip(); + } + + /** + * Assignment operator. + * + * @param connector the object to assign to this one. + * @return a reference to this object. + * @exception Exception + */ + virtual MultiThreadedConnector & + operator= ( const MultiThreadedConnector & connector ) + throw ( Exception ); + + /** + * Open the connector. Opens the Source and the Sinks if necessary. + * + * @return true if opening was successful, false otherwise. + * @exception Exception + */ + virtual bool + open ( void ) throw ( Exception ); + + /** + * Transfer a given amount of data from the Source to all the + * Sinks attached. + * If an attached Sink closes or encounteres an error during the + * process, it is detached and the function carries on with the + * rest of the Sinks. If no Sinks remain, or an error is encountered + * with the Source, the function returns prematurely. + * + * @param bytes the amount of data to transfer, in bytes. + * If 0, transfer forever. + * @param bufSize the size of the buffer to use for transfering. + * This amount of data is read from the Source and + * written to each Sink on each turn. + * @param sec the number of seconds to wait for the Source to have + * data available in each turn, and the number of seconds + * to wait for the Sinks to accept data. + * @param usec the number of micros seconds to wait for the Source to + * have data available in each turn, and the number of + * micro seconds to wait for the Sinks to accept data. + * @return the number of bytes read from the Source. + * @exception Exception + */ + virtual unsigned int + transfer ( unsigned long bytes, + unsigned int bufSize, + unsigned int sec, + unsigned int usec ) throw ( Exception ); + + /** + * Close the Connector. The Source and all Sinks are closed. + * + * @exception Exception + */ + virtual void + close ( void ) throw ( Exception ); +}; + + +/* ================================================= external data structures */ + + +/* ====================================================== function prototypes */ + + + +#endif /* MULTI_THREADED_CONNECTOR_H */ + + +/*------------------------------------------------------------------------------ + + $Source$ + + $Log$ + Revision 1.1 2002/10/19 12:25:47 darkeye + changed internals so that now each encoding/server connection is + a separate thread + + + +------------------------------------------------------------------------------*/ +