first real tests with multiple streaming

This commit is contained in:
darkeye 2000-11-10 20:16:21 +00:00
parent 19bdccd768
commit 69024b4738
4 changed files with 274 additions and 107 deletions

View File

@ -89,18 +89,24 @@ static const char fileid[] = "$Id$";
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
void void
BufferedSink :: init ( Sink * sink, BufferedSink :: init ( Sink * sink,
unsigned int size ) throw ( Exception ) unsigned int size,
unsigned int chunkSize ) throw ( Exception )
{ {
if ( !sink ) { if ( !sink ) {
throw Exception( __FILE__, __LINE__, "no sink"); throw Exception( __FILE__, __LINE__, "no sink");
} }
this->sink = sink; // create a reference this->sink = sink; // create a reference
this->bufferSize = size; this->chunkSize = chunkSize ? chunkSize : 1;
this->buffer = new unsigned char[bufferSize]; this->bufferSize = size;
this->bufferEnd = buffer + bufferSize; // make bufferSize a multiple of chunkSize
this->inp = buffer; this->bufferSize -= this->bufferSize % this->chunkSize;
this->outp = buffer; this->peak = 0;
this->misalignment = 0;
this->buffer = new unsigned char[bufferSize];
this->bufferEnd = buffer + bufferSize;
this->inp = buffer;
this->outp = buffer;
} }
@ -110,8 +116,10 @@ BufferedSink :: init ( Sink * sink,
BufferedSink :: BufferedSink ( const BufferedSink & buffer ) BufferedSink :: BufferedSink ( const BufferedSink & buffer )
throw ( Exception ) throw ( Exception )
{ {
init( buffer.sink.get(), buffer.bufferSize); init( buffer.sink.get(), buffer.bufferSize, buffer.chunkSize);
this->peak = buffer.peak;
this->misalignment = buffer.misalignment;
memcpy( this->buffer, buffer.buffer, this->bufferSize); memcpy( this->buffer, buffer.buffer, this->bufferSize);
} }
@ -141,8 +149,10 @@ BufferedSink :: operator= ( const BufferedSink & buffer )
if ( this != &buffer ) { if ( this != &buffer ) {
strip(); strip();
Sink::operator=( buffer ); Sink::operator=( buffer );
init( buffer.sink.get(), buffer.bufferSize); init( buffer.sink.get(), buffer.bufferSize, buffer.chunkSize);
this->peak = buffer.peak;
this->misalignment = buffer.misalignment;
memcpy( this->buffer, buffer.buffer, this->bufferSize); memcpy( this->buffer, buffer.buffer, this->bufferSize);
} }
@ -155,6 +165,8 @@ BufferedSink :: operator= ( const BufferedSink & buffer )
* All data is consumed. The return value is less then bufferSize only * All data is consumed. The return value is less then bufferSize only
* if the BufferedSink's internal buffer is smaller than bufferSize, * if the BufferedSink's internal buffer is smaller than bufferSize,
* thus can't hold that much * thus can't hold that much
* The data to be stored is treated as parts with chunkSize size
* Only full chunkSize sized parts are stored
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
unsigned int unsigned int
BufferedSink :: store ( const void * buffer, BufferedSink :: store ( const void * buffer,
@ -176,49 +188,71 @@ BufferedSink :: store ( const void * buffer,
oldInp = inp; oldInp = inp;
buf = (const unsigned char *) buffer; buf = (const unsigned char *) buffer;
/* cut the front of the supplied buffer if it wouldn't fit */ // adjust so it is a multiple of chunkSize
bufferSize -= bufferSize % chunkSize;
// cut the front of the supplied buffer if it wouldn't fit
if ( bufferSize > this->bufferSize ) { if ( bufferSize > this->bufferSize ) {
size = this->bufferSize - 1; size = this->bufferSize - 1;
buf += bufferSize - size; size -= size % chunkSize; // keep it a multiple of chunkSize
buf += bufferSize - size;
} else { } else {
size = bufferSize; size = bufferSize;
} }
/* copy the data into the buffer */ // copy the data into the buffer
i = bufferEnd - inp; i = bufferEnd - inp;
if ( (i % chunkSize) != 0 ) {
throw Exception( __FILE__, __LINE__, "copy quantity not aligned", i);
}
if ( size <= i ) { if ( size <= i ) {
/* the place between inp and bufferEnd is // the place between inp and bufferEnd is
* big enough to hold the data */ // big enough to hold the data
memcpy( inp, buf, size); memcpy( inp, buf, size);
inp = slidePointer( inp, size); inp = slidePointer( inp, size);
/* adjust outp, lose the data that was overwritten, if any */ // adjust outp, lose the data that was overwritten, if any
if ( outp > oldInp && outp <= inp ) { if ( outp > oldInp && outp <= inp ) {
outp = slidePointer( inp, 1); outp = slidePointer( inp, chunkSize);
} }
} else { } else {
/* the place between inp and bufferEnd is not // the place between inp and bufferEnd is not
* big enough to hold the data // big enough to hold the data
* writing will take place in two turns, once from // writing will take place in two turns, once from
* inp -> bufferEnd, then from buffer -> */ // inp -> bufferEnd, then from buffer ->
memcpy( inp, buf, i); memcpy( inp, buf, i);
i = size - i; i = size - i;
if ( (i % chunkSize) != 0 ) {
throw Exception(__FILE__, __LINE__, "copy quantity not aligned", i);
}
memcpy( this->buffer, buf, i); memcpy( this->buffer, buf, i);
inp = slidePointer( this->buffer, i); inp = slidePointer( this->buffer, i);
/* adjust outp, lose the data that was overwritten, if any */ // adjust outp, lose the data that was overwritten, if any
if ( outp <= oldInp ) { if ( outp <= oldInp ) {
if ( outp < inp ) { if ( outp < inp ) {
outp = slidePointer( inp, 1); outp = slidePointer( inp, chunkSize);
} }
} else { } else {
outp = slidePointer( inp, 1); outp = slidePointer( inp, chunkSize);
} }
} }
updatePeak();
if ( ((inp - this->buffer) % chunkSize) != 0 ) {
throw Exception( __FILE__, __LINE__,
"inp not aligned", inp - this->buffer);
}
if ( ((outp - this->buffer) % chunkSize) != 0 ) {
throw Exception( __FILE__, __LINE__,
"outp not aligned", outp - this->buffer);
}
return size; return size;
} }
@ -232,6 +266,8 @@ BufferedSink :: write ( const void * buf,
unsigned int len ) throw ( Exception ) unsigned int len ) throw ( Exception )
{ {
unsigned int length; unsigned int length;
unsigned int soFar;
unsigned char * b = (unsigned char *) buf;
if ( !buf ) { if ( !buf ) {
throw Exception( __FILE__, __LINE__, "buf is null"); throw Exception( __FILE__, __LINE__, "buf is null");
@ -241,43 +277,84 @@ BufferedSink :: write ( const void * buf,
return 0; return 0;
} }
/* try to write data from the buffer first, if any */ if ( !align() ) {
return 0;
}
// make it a multiple of chunkSize
len -= len % chunkSize;
// try to write data from the buffer first, if any
if ( inp != outp ) { if ( inp != outp ) {
unsigned int size; unsigned int size = 0;
unsigned int total = 0;
if ( outp > inp ) { if ( outp > inp ) {
/* valuable data is between outp -> bufferEnd and buffer -> inp // valuable data is between outp -> bufferEnd and buffer -> inp
* try to write the outp -> bufferEnd // try to write the outp -> bufferEnd
* the rest will be written in the next if */ // the rest will be written in the next if
size = bufferEnd - outp; size = bufferEnd - outp - 1;
length = sink->write( outp, size); size -= size % chunkSize;
outp = slidePointer( outp, length); soFar = 0;
while ( outp > inp && soFar < size && sink->canWrite( 0, 0) ) {
length = sink->write( outp + soFar, size - soFar);
outp = slidePointer( outp, length);
soFar += length;
}
total += soFar;
} }
if ( outp < inp ) { if ( outp < inp ) {
/* valuable data is between outp and inp // valuable data is between outp and inp
* if the previous if wrote all data from the end // if the previous if wrote all data from the end
* this part will write the rest */ // this part will write the rest
size = inp - outp; size = inp - outp;
length = sink->write( outp, size); soFar = 0;
outp = slidePointer( outp, length);
while ( soFar < size && sink->canWrite( 0, 0) ) {
length = sink->write( outp + soFar, size - soFar);
outp = slidePointer( outp, length);
soFar += length;
}
total += soFar;
} }
while ( (outp - buffer) % chunkSize ) {
slidePointer( outp, 1);
}
// calulate the misalignment to chunkSize boundaries
misalignment = (chunkSize - (total % chunkSize)) % chunkSize;
} }
/* the internal buffer is empty, try to write the fresh data */ if ( !align() ) {
length = inp == outp ? sink->write( buf, len) : 0; return 0;
}
// the internal buffer is empty, try to write the fresh data
soFar = 0;
if ( inp != outp ) {
while ( soFar < len && sink->canWrite( 0, 0) ) {
soFar += sink->write( b + soFar, len - soFar);
}
}
length = soFar;
// calulate the misalignment to chunkSize boundaries
misalignment = (chunkSize - (length % chunkSize)) % chunkSize;
if ( length < len ) { if ( length < len ) {
/* if not all fresh could be written, store the remains */ // if not all fresh could be written, store the remains
unsigned char * b = (unsigned char *) buf;
store( b + length, len - length); store( b + length, len - length);
} }
/* tell them we ate everything */ // tell them we ate everything up to chunkSize alignment
return len; return len;
} }
@ -303,6 +380,9 @@ BufferedSink :: close ( void ) throw ( Exception )
$Source$ $Source$
$Log$ $Log$
Revision 1.3 2000/11/10 20:16:21 darkeye
first real tests with multiple streaming
Revision 1.2 2000/11/05 14:08:27 darkeye Revision 1.2 2000/11/05 14:08:27 darkeye
changed builting to an automake / autoconf environment changed builting to an automake / autoconf environment

View File

@ -66,6 +66,10 @@ class BufferedSink : public Sink
unsigned char * buffer; unsigned char * buffer;
unsigned char * bufferEnd; unsigned char * bufferEnd;
unsigned int bufferSize; unsigned int bufferSize;
unsigned int peak;
unsigned int chunkSize;
unsigned int misalignment;
unsigned char * inp; unsigned char * inp;
unsigned char * outp; unsigned char * outp;
@ -76,11 +80,12 @@ class BufferedSink : public Sink
void void
init ( Sink * sink, init ( Sink * sink,
unsigned int size ) throw ( Exception ); unsigned int size,
unsigned int chunkSize ) throw ( Exception );
void void
strip ( void ) throw ( Exception ); strip ( void ) throw ( Exception );
inline unsigned char * inline unsigned char *
@ -97,6 +102,41 @@ class BufferedSink : public Sink
} }
inline void
updatePeak ( void )
{
unsigned int u;
u = outp <= inp ? inp - outp : (bufferEnd - outp) + (inp - buffer);
if ( peak < u ) {
peak = u;
}
}
inline bool
align ( void )
{
char b[] = { 0 };
while ( misalignment ) {
if ( sink->canWrite( 0, 0) ) {
unsigned int ret;
if ( !(ret = sink->write( b, 1)) ) {
return false;
}
--misalignment;
} else {
return false;
}
}
return true;
}
protected: protected:
inline inline
@ -122,9 +162,10 @@ class BufferedSink : public Sink
inline inline
BufferedSink ( Sink * sink, BufferedSink ( Sink * sink,
unsigned int size ) throw ( Exception ) unsigned int size,
unsigned int chunkSize = 1 ) throw ( Exception )
{ {
init( sink, size); init( sink, size, chunkSize);
} }
@ -142,6 +183,13 @@ class BufferedSink : public Sink
operator= ( const BufferedSink & bs ) throw ( Exception ); operator= ( const BufferedSink & bs ) throw ( Exception );
inline unsigned int
getPeak ( void ) const throw ()
{
return peak;
}
inline virtual bool inline virtual bool
open ( void ) throw ( Exception ) open ( void ) throw ( Exception )
{ {
@ -198,6 +246,9 @@ class BufferedSink : public Sink
$Source$ $Source$
$Log$ $Log$
Revision 1.3 2000/11/10 20:16:21 darkeye
first real tests with multiple streaming
Revision 1.2 2000/11/05 17:37:24 darkeye Revision 1.2 2000/11/05 17:37:24 darkeye
removed clone() functions removed clone() functions

View File

@ -38,12 +38,6 @@
#include "configure.h" #include "configure.h"
#endif #endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#else
#error need unistd.h
#endif
#ifdef HAVE_STDLIB_H #ifdef HAVE_STDLIB_H
#include <stdlib.h> #include <stdlib.h>
#else #else
@ -153,9 +147,13 @@ DarkIce :: DarkIce ( int argc,
void void
DarkIce :: init ( const Config & config ) throw ( Exception ) DarkIce :: init ( const Config & config ) throw ( Exception )
{ {
unsigned int bufferSecs;
const ConfigSection * cs; const ConfigSection * cs;
const char * str; const char * str;
unsigned int u, v, w; unsigned int sampleRate;
unsigned int bitsPerSample;
unsigned int channel;
const char * device;
int i; int i;
// the [general] section // the [general] section
@ -164,6 +162,8 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
} }
str = cs->getForSure( "duration", " missing in section [general]"); str = cs->getForSure( "duration", " missing in section [general]");
duration = Util::strToL( str); duration = Util::strToL( str);
str = cs->getForSure( "bufferSecs", " missing in section [general]");
bufferSecs = Util::strToL( str);
// the [input] section // the [input] section
@ -172,17 +172,17 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
} }
str = cs->getForSure( "sampleRate", " missing in section [input]"); str = cs->getForSure( "sampleRate", " missing in section [input]");
u = Util::strToL( str); sampleRate = Util::strToL( str);
str = cs->getForSure( "bitsPerSample", " missing in section [input]"); str = cs->getForSure( "bitsPerSample", " missing in section [input]");
v = Util::strToL( str); bitsPerSample = Util::strToL( str);
str = cs->getForSure( "channel", " missing in section [input]"); str = cs->getForSure( "channel", " missing in section [input]");
w = Util::strToL( str); channel = Util::strToL( str);
device = cs->getForSure( "device", " missing in section [input]");
str = cs->getForSure( "device", " missing in section [input]"); dsp = new OssDspSource( device,
sampleRate,
dsp = new OssDspSource( str, u, v, w); bitsPerSample,
channel );
encConnector = new Connector( dsp.get()); encConnector = new Connector( dsp.get());
@ -202,17 +202,20 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
break; break;
} }
const char * encoder = 0; const char * encoder = 0;
unsigned int bitrate = 0; unsigned int bitrate = 0;
const char * server = 0; const char * server = 0;
unsigned int port = 0; unsigned int port = 0;
const char * password = 0; const char * password = 0;
const char * mountPoint = 0; const char * mountPoint = 0;
const char * name = 0; const char * remoteDumpFile = 0;
const char * description = 0; const char * name = 0;
const char * url = 0; const char * description = 0;
const char * genre = 0; const char * url = 0;
bool isPublic = false; const char * genre = 0;
bool isPublic = false;
unsigned int lowpass = 0;
unsigned int highpass = 0;
encoder = cs->getForSure( "encoder", " missing in section ", lame); encoder = cs->getForSure( "encoder", " missing in section ", lame);
str = cs->getForSure( "bitrate", " missing in section ", lame); str = cs->getForSure( "bitrate", " missing in section ", lame);
@ -222,12 +225,17 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
port = Util::strToL( str); port = Util::strToL( str);
password = cs->getForSure( "password", " missing in section ", lame); password = cs->getForSure( "password", " missing in section ", lame);
mountPoint = cs->getForSure( "mountPoint"," missing in section ",lame); mountPoint = cs->getForSure( "mountPoint"," missing in section ",lame);
remoteDumpFile = cs->get( "remoteDumpFile");
name = cs->getForSure( "name", " missing in section ", lame); name = cs->getForSure( "name", " missing in section ", lame);
description = cs->getForSure("description"," missing in section ",lame); description = cs->getForSure("description"," missing in section ",lame);
url = cs->getForSure( "url", " missing in section ", lame); url = cs->getForSure( "url", " missing in section ", lame);
genre = cs->getForSure( "genre", " missing in section ", lame); genre = cs->getForSure( "genre", " missing in section ", lame);
str = cs->getForSure( "public", " missing in section ", lame); str = cs->getForSure( "public", " missing in section ", lame);
isPublic = Util::strEq( str, "yes") ? true : false; isPublic = Util::strEq( str, "yes") ? true : false;
str = cs->get( "lowpass");
lowpass = str ? Util::strToL( str) : 0;
str = cs->get( "highpass");
highpass = str ? Util::strToL( str) : 0;
// generate the pipe names // generate the pipe names
char pipeOutName[lameLen + pipeOutExtLen + 1]; char pipeOutName[lameLen + pipeOutExtLen + 1];
@ -240,6 +248,8 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
// go on and create the things // go on and create the things
outputs[i].pid = 0;
// the pipes // the pipes
outputs[i].encOutPipe = new PipeSource( pipeOutName); outputs[i].encOutPipe = new PipeSource( pipeOutName);
outputs[i].encInPipe = new PipeSink( pipeInName); outputs[i].encInPipe = new PipeSink( pipeInName);
@ -262,26 +272,32 @@ DarkIce :: init ( const Config & config ) throw ( Exception )
// encoder related stuff // encoder related stuff
outputs[i].encIn = new BufferedSink( outputs[i].encInPipe.get(), outputs[i].encIn = new BufferedSink( outputs[i].encInPipe.get(),
64 * 1024); bufferSecs * (bitsPerSample / 8) * channel * sampleRate,
(bitsPerSample / 8) * channel );
encConnector->attach( outputs[i].encIn.get()); encConnector->attach( outputs[i].encIn.get());
outputs[i].encoder = new LameEncoder( encoder, outputs[i].encoder = new LameEncoder( encoder,
outputs[i].encInPipe->getFileName(), outputs[i].encInPipe->getFileName(),
dsp.get(), dsp.get(),
outputs[i].encOutPipe->getFileName(), outputs[i].encOutPipe->getFileName(),
bitrate ); bitrate,
sampleRate,
channel,
lowpass,
highpass );
// streaming related stuff // streaming related stuff
outputs[i].socket = new TcpSocket( server, port); outputs[i].socket = new TcpSocket( server, port);
outputs[i].ice = new IceCast( outputs[i].socket.get(), outputs[i].ice = new IceCast( outputs[i].socket.get(),
password, password,
mountPoint, mountPoint,
name, remoteDumpFile,
description, name,
url, description,
genre, url,
bitrate, genre,
isPublic ); bitrate,
isPublic );
outputs[i].shoutConnector = new Connector( outputs[i].encOutPipe.get(), outputs[i].shoutConnector = new Connector( outputs[i].encOutPipe.get(),
outputs[i].ice.get()); outputs[i].ice.get());
} }
@ -350,7 +366,7 @@ DarkIce :: shout ( unsigned int ix ) throw ( Exception )
* (1024 / 8) * (1024 / 8)
* duration, * duration,
4096, 4096,
1, 10,
0 ); 0 );
cout << len << " bytes transfered" << endl; cout << len << " bytes transfered" << endl;
@ -367,37 +383,40 @@ DarkIce :: shout ( unsigned int ix ) throw ( Exception )
int int
DarkIce :: run ( void ) throw ( Exception ) DarkIce :: run ( void ) throw ( Exception )
{ {
pid_t pid; int i;
cout << "DarkIce" << endl << endl << flush; cout << "DarkIce" << endl << endl << flush;
pid = fork(); for ( i = 0; i < noOutputs; ++i ) {
outputs[i].pid = fork();
if ( pid == -1 ) { if ( outputs[i].pid == -1 ) {
throw Exception( __FILE__, __LINE__, "fork error", errno); throw Exception( __FILE__, __LINE__, "fork error", errno);
} else if ( pid == 0 ) { } else if ( outputs[i].pid == 0 ) {
// this is the child // this is the child
int i;
sleep ( 2 ); sleep ( 1 );
cout << "shouting" << endl << flush; cout << "shouting " << i << endl << flush;
for ( i = 0; i < noOutputs; ++i ) {
shout( i); shout( i);
} cout << "shouting ends " << i << endl << flush;
cout << "shouting ends" << endl << flush;
exit(0); exit(0);
} else { }
// this is the parent }
// this is the parent
cout << "encoding" << endl << flush;
encode();
cout << "encoding ends" << endl << flush;
for ( i = 0; i < noOutputs; ++i ) {
int status; int status;
cout << "encoding" << endl << flush; waitpid( outputs[i].pid, &status, 0);
encode();
cout << "encoding ends" << endl << flush;
waitpid( pid, &status, 0);
if ( !WIFEXITED(status) ) { if ( !WIFEXITED(status) ) {
throw Exception( __FILE__, __LINE__, throw Exception( __FILE__, __LINE__,
"child exited abnormally", WEXITSTATUS(status)); "child exited abnormally", WEXITSTATUS(status));
@ -413,6 +432,9 @@ DarkIce :: run ( void ) throw ( Exception )
$Source$ $Source$
$Log$ $Log$
Revision 1.5 2000/11/10 20:16:21 darkeye
first real tests with multiple streaming
Revision 1.4 2000/11/09 22:09:46 darkeye Revision 1.4 2000/11/09 22:09:46 darkeye
added multiple outputs added multiple outputs
added configuration reading added configuration reading

View File

@ -41,6 +41,16 @@
/* ============================================================ include files */ /* ============================================================ include files */
#ifdef HAVE_CONFIG_H
#include "configure.h"
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#else
#error need unistd.h
#endif
#include <iostream.h> #include <iostream.h>
#include "Referable.h" #include "Referable.h"
@ -82,6 +92,7 @@ class DarkIce : public virtual Referable
Ref<TcpSocket> socket; Ref<TcpSocket> socket;
Ref<IceCast> ice; Ref<IceCast> ice;
Ref<Connector> shoutConnector; Ref<Connector> shoutConnector;
pid_t pid;
} Output; } Output;
@ -164,6 +175,9 @@ class DarkIce : public virtual Referable
$Source$ $Source$
$Log$ $Log$
Revision 1.3 2000/11/10 20:16:21 darkeye
first real tests with multiple streaming
Revision 1.2 2000/11/09 22:09:46 darkeye Revision 1.2 2000/11/09 22:09:46 darkeye
added multiple outputs added multiple outputs
added configuration reading added configuration reading