Server connection related fixes.

This commit is contained in:
rafael@riseup.net 2014-02-26 19:48:19 +00:00
parent 40fd8cc0cf
commit 710d2c26c4
10 changed files with 169 additions and 75 deletions

View File

@ -1,3 +1,7 @@
trunk:
o Bugs related to streaming to remote servers fixed. Patch by Kalle Kulonen
<kulonenk@gmail.com> and Mark Turner <jmarkturner@gmail.com>.
15-07-2013 Darkice 1.2 released
o Issue #75: Added Ogg/Opus support. Patch by Doug Kelly
dougk.ff7@gmail.com

View File

@ -255,7 +255,7 @@ AlsaDspSource :: read ( void * buf,
// Check for buffer overrun
if (ret == -EPIPE) {
reportEvent(1, "Buffer overrun!");
reportEvent(1, "AlsaDspSource :: Buffer overrun!");
snd_pcm_prepare(captureHandle);
ret = -EAGAIN;
}

View File

@ -99,6 +99,8 @@ BufferedSink :: init ( Sink * sink,
this->bufferEnd = buffer + bufferSize;
this->inp = buffer;
this->outp = buffer;
this->bOpen = true;
this->openAttempts = 0;
}
@ -112,6 +114,8 @@ BufferedSink :: BufferedSink ( const BufferedSink & buffer )
this->peak = buffer.peak;
this->misalignment = buffer.misalignment;
this->bOpen = buffer.bOpen;
this->openAttempts = buffer.openAttempts;
memcpy( this->buffer, buffer.buffer, this->bufferSize);
}
@ -145,6 +149,8 @@ BufferedSink :: operator= ( const BufferedSink & buffer )
this->peak = buffer.peak;
this->misalignment = buffer.misalignment;
this->bOpen = buffer.bOpen;
this->openAttempts = buffer.openAttempts;
memcpy( this->buffer, buffer.buffer, this->bufferSize);
}
@ -177,6 +183,16 @@ BufferedSink :: store ( const void * buffer,
return 0;
}
unsigned int remaining = this->bufferSize - ( outp <= inp ? inp - outp :
(bufferEnd - outp) + (inp - this->buffer) );
// react only to the first overrun whenever there is a series of overruns
if ( remaining + chunkSize <= bufferSize && remaining > chunkSize ) {
reportEvent(3,"BufferedSink :: store, buffer overrun");
throw Exception( __FILE__, __LINE__,
"buffer overrun");
}
oldInp = inp;
buf = (const unsigned char *) buffer;
@ -257,8 +273,8 @@ unsigned int
BufferedSink :: write ( const void * buf,
unsigned int len ) throw ( Exception )
{
unsigned int length;
unsigned int soFar;
unsigned int length = 0;
unsigned int soFar = 0;
unsigned char * b = (unsigned char *) buf;
if ( !buf ) {
@ -272,6 +288,28 @@ BufferedSink :: write ( const void * buf,
if ( !align() ) {
return 0;
}
if ( !sink->isOpen() && openAttempts < 10 ) {
// try to reopen underlying sink, because it has closed on its own
openAttempts++;
try {
if( sink->open() ) {
// if reopening succeeds, reset open attempts
openAttempts = 0;
}
} catch ( Exception &e ) {
reportEvent( 4,"BufferedSink :: write,",
"couldn't reopen underlying sink, attempt",
openAttempts, "/ 10" );
}
if( openAttempts == 10 ) {
// all the attempts have been used, give up
close();
throw Exception( __FILE__, __LINE__,
"reopen failed");
}
}
// make it a multiple of chunkSize
len -= len % chunkSize;
@ -286,12 +324,24 @@ BufferedSink :: write ( const void * buf,
// try to write the outp -> bufferEnd
// the rest will be written in the next if
size = bufferEnd - outp - 1;
size = bufferEnd - outp;
size -= size % chunkSize;
if( size > len * 2 ) {
// do not try to send the content of the entire buffer at once,
// but limit sending to a multiple of len
// this prevents a surge of data to underlying buffer
// which is important especially during a lot of packet loss
size = len * 2;
}
soFar = 0;
while ( outp > inp && soFar < size && sink->canWrite( 0, 0) ) {
length = sink->write( outp + soFar, size - soFar);
try {
length = sink->write( outp + soFar, size - soFar);
} catch (Exception &e) {
length = 0;
reportEvent(3,"Exception caught in BufferedSink :: write1");
}
outp = slidePointer( outp, length);
soFar += length;
}
@ -305,10 +355,19 @@ BufferedSink :: write ( const void * buf,
// this part will write the rest
size = inp - outp;
if( size > len * 2 ) {
// prevent a surge of data to underlying buffer
size = len * 2;
}
soFar = 0;
while ( soFar < size && sink->canWrite( 0, 0) ) {
length = sink->write( outp + soFar, size - soFar);
try {
length = sink->write( outp + soFar, size - soFar);
} catch (Exception &e) {
length = 0;
reportEvent(3,"Exception caught in BufferedSink :: write2" );
}
outp = slidePointer( outp, length);
soFar += length;
}
@ -332,13 +391,12 @@ BufferedSink :: write ( const void * buf,
soFar = 0;
if ( inp == outp ) {
while ( soFar < len && sink->canWrite( 0, 0) ) {
try {
soFar += sink->write( b + soFar, len - soFar);
} catch (Exception &e) {
reportEvent(3,"Exception caught in BufferedSink :: write3\n");
throw; /* up a level */
}
}
try {
soFar += sink->write( b + soFar, len - soFar);
} catch (Exception &e) {
reportEvent(3,"Exception caught in BufferedSink :: write3");
}
}
}
length = soFar;
@ -351,6 +409,8 @@ BufferedSink :: write ( const void * buf,
store( b + length, len - length);
}
updatePeak();
// tell them we ate everything up to chunkSize alignment
return len;
}
@ -369,5 +429,6 @@ BufferedSink :: close ( void ) throw ( Exception )
flush();
sink->close();
inp = outp = buffer;
bOpen = false;
}

View File

@ -109,6 +109,17 @@ class BufferedSink : public Sink, public virtual Reporter
* The underlying Sink.
*/
Ref<Sink> sink;
/**
* Is BufferedSink open.
*/
bool bOpen;
/**
* Number of attempts so far to open underlying sink after it has
* closed on its own.
*/
unsigned int openAttempts;
/**
* Initialize the object.
@ -163,10 +174,17 @@ class BufferedSink : public Sink, public virtual Reporter
unsigned int u;
u = outp <= inp ? inp - outp : (bufferEnd - outp) + (inp - buffer);
if ( peak < u ) {
// report new peaks if it is either significantly more severe than
// the previously reported peak
if ( peak * 2 < u ) {
peak = u;
reportEvent( 4, "BufferedSink, new peak:", peak);
reportEvent( 4, "BufferedSink, remaining:", bufferSize - peak);
reportEvent( 4, "BufferedSink, new peak:", peak, " / ", bufferSize);
}
if ( peak > 0 && u == 0 ) {
peak = 0;
reportEvent( 4, "BufferedSink, healed:", peak, " / ", bufferSize);
}
}
@ -306,7 +324,9 @@ class BufferedSink : public Sink, public virtual Reporter
inline virtual bool
open ( void ) throw ( Exception )
{
return sink->open();
bOpen = sink->open();
openAttempts = 0;
return bOpen;
}
/**
@ -317,7 +337,7 @@ class BufferedSink : public Sink, public virtual Reporter
inline virtual bool
isOpen ( void ) const throw ()
{
return sink->isOpen();
return bOpen;
}
/**

View File

@ -262,7 +262,7 @@ DarkIce :: configIceCast ( const Config & config,
FileSink * localDumpFile = 0;
bool fileAddDate = false;
const char * fileDateFormat = 0;
AudioEncoder * encoder = 0;
BufferedSink * audioOut = 0;
int bufferSize = 0;
str = cs->get( "sampleRate");
@ -384,10 +384,14 @@ DarkIce :: configIceCast ( const Config & config,
"unsupported stream format: ", str);
}
// augment audio outs with a buffer when used from encoder
audioOut = new BufferedSink( audioOuts[u].server.get(),
bufferSize, 1);
#ifdef HAVE_LAME_LIB
if ( Util::strEq( str, "mp3") ) {
encoder = new LameLibEncoder( audioOuts[u].server.get(),
audioOuts[u].encoder = new LameLibEncoder( audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -400,8 +404,8 @@ DarkIce :: configIceCast ( const Config & config,
#endif
#ifdef HAVE_TWOLAME_LIB
if ( Util::strEq( str, "mp2") ) {
encoder = new TwoLameLibEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new TwoLameLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -410,7 +414,6 @@ DarkIce :: configIceCast ( const Config & config,
}
#endif
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
encConnector->attach( audioOuts[u].encoder.get());
#endif // HAVE_LAME_LIB || HAVE_TWOLAME_LIB
}
@ -467,7 +470,7 @@ DarkIce :: configIceCast2 ( const Config & config,
FileSink * localDumpFile = 0;
bool fileAddDate = false;
const char * fileDateFormat = 0;
AudioEncoder * encoder = 0;
BufferedSink * audioOut = 0;
int bufferSize = 0;
str = cs->getForSure( "format", " missing in section ", stream);
@ -597,6 +600,9 @@ DarkIce :: configIceCast2 ( const Config & config,
isPublic,
localDumpFile);
audioOut = new BufferedSink( audioOuts[u].server.get(),
bufferSize, 1);
switch ( format ) {
case IceCast2::mp3:
#ifndef HAVE_LAME_LIB
@ -605,8 +611,8 @@ DarkIce :: configIceCast2 ( const Config & config,
"thus can't create mp3 stream: ",
stream);
#else
encoder = new LameLibEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new LameLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -616,8 +622,6 @@ DarkIce :: configIceCast2 ( const Config & config,
lowpass,
highpass );
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
#endif // HAVE_LAME_LIB
break;
@ -630,8 +634,8 @@ DarkIce :: configIceCast2 ( const Config & config,
stream);
#else
encoder = new VorbisLibEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new VorbisLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -640,7 +644,6 @@ DarkIce :: configIceCast2 ( const Config & config,
dsp->getChannel(),
maxBitrate);
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
#endif // HAVE_VORBIS_LIB
break;
@ -652,8 +655,8 @@ DarkIce :: configIceCast2 ( const Config & config,
stream);
#else
encoder = new OpusLibEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new OpusLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -662,7 +665,6 @@ DarkIce :: configIceCast2 ( const Config & config,
dsp->getChannel(),
maxBitrate);
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getBitsPerSample() / 8);
#endif // HAVE_OPUS_LIB
break;
@ -673,15 +675,14 @@ DarkIce :: configIceCast2 ( const Config & config,
"thus can't create mp2 stream: ",
stream);
#else
encoder = new TwoLameLibEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new TwoLameLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
sampleRate,
channel );
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
#endif // HAVE_TWOLAME_LIB
break;
@ -693,8 +694,8 @@ DarkIce :: configIceCast2 ( const Config & config,
"thus can't aac stream: ",
stream);
#else
encoder = new FaacEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new FaacEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -702,7 +703,6 @@ DarkIce :: configIceCast2 ( const Config & config,
sampleRate,
dsp->getChannel());
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
#endif // HAVE_FAAC_LIB
break;
@ -713,8 +713,8 @@ DarkIce :: configIceCast2 ( const Config & config,
"thus can't aacp stream: ",
stream);
#else
encoder = new aacPlusEncoder(
audioOuts[u].server.get(),
audioOuts[u].encoder = new aacPlusEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -722,7 +722,6 @@ DarkIce :: configIceCast2 ( const Config & config,
sampleRate,
channel );
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
#endif // HAVE_AACPLUS_LIB
break;
@ -793,7 +792,7 @@ DarkIce :: configShoutCast ( const Config & config,
FileSink * localDumpFile = 0;
bool fileAddDate = false;
const char * fileDateFormat = 0;
AudioEncoder * encoder = 0;
BufferedSink * audioOut = 0;
int bufferSize = 0;
str = cs->get( "sampleRate");
@ -908,7 +907,9 @@ DarkIce :: configShoutCast ( const Config & config,
localDumpFile);
encoder = new LameLibEncoder( audioOuts[u].server.get(),
audioOut = new BufferedSink(audioOuts[u].socket.get(), bufferSize, 1);
audioOuts[u].encoder = new LameLibEncoder(
audioOut,
dsp.get(),
bitrateMode,
bitrate,
@ -917,7 +918,6 @@ DarkIce :: configShoutCast ( const Config & config,
channel,
lowpass,
highpass );
audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize());
encConnector->attach( audioOuts[u].encoder.get());
#endif // HAVE_LAME_LIB

View File

@ -54,7 +54,7 @@
#include "Exception.h"
#include "Reporter.h"
#include "AudioEncoder.h"
#include "CastSink.h"
#include "Sink.h"
#ifdef HAVE_SRC_LIB
#include <samplerate.h>
#else
@ -313,7 +313,7 @@ class OpusLibEncoder : public AudioEncoder, public virtual Reporter
* @exception Exception
*/
inline
OpusLibEncoder ( CastSink * sink,
OpusLibEncoder ( Sink * sink,
unsigned int inSampleRate,
unsigned int inBitsPerSample,
unsigned int inChannel,
@ -358,7 +358,7 @@ class OpusLibEncoder : public AudioEncoder, public virtual Reporter
* @exception Exception
*/
inline
OpusLibEncoder ( CastSink * sink,
OpusLibEncoder ( Sink * sink,
const AudioSource * as,
BitrateMode outBitrateMode,
unsigned int outBitrate,

View File

@ -318,9 +318,9 @@ TcpSocket :: read ( void * buf,
break;
default:
::close( sockfd);
sockfd = 0;
throw Exception( __FILE__, __LINE__, "recv error", errno);
::close( sockfd);
sockfd = 0;
throw Exception( __FILE__, __LINE__, "recv error", errno);
}
}
@ -329,7 +329,7 @@ TcpSocket :: read ( void * buf,
/*------------------------------------------------------------------------------
* Check wether read() would return anything
* Check wether write() would send anything
*----------------------------------------------------------------------------*/
bool
TcpSocket :: canWrite ( unsigned int sec,
@ -357,11 +357,11 @@ TcpSocket :: canWrite ( unsigned int sec,
ret = pselect( sockfd + 1, NULL, &fdset, NULL, &timespec, &sigset);
if ( ret == -1 ) {
::close( sockfd);
sockfd = 0;
throw Exception( __FILE__, __LINE__, "select error");
::close( sockfd);
sockfd = 0;
reportEvent(4,"TcpSocket :: canWrite, connection lost", errno);
}
return ret > 0;
}
@ -389,8 +389,9 @@ TcpSocket :: write ( const void * buf,
if ( errno == EAGAIN ) {
ret = 0;
} else {
::close( sockfd);
sockfd = 0;
::close( sockfd);
sockfd = 0;
reportEvent(4,"TcpSocket :: write, send error", errno);
throw Exception( __FILE__, __LINE__, "send error", errno);
}
}

View File

@ -300,13 +300,13 @@ VorbisLibEncoder :: write ( const void * buf,
if ( bitsPerSample == 8 ) {
char * buf8 = (char *) buf;
unsigned int ix = sampleSize * i;
unsigned int iix = ix;
unsigned int iix = ix;
buf8[i] = (buf8[ix] + buf8[++iix]) / 2;
}
if ( bitsPerSample == 16 ) {
short * buf16 = (short *) buf;
unsigned int ix = (bitsPerSample >> 3) * i;
unsigned int iix = ix;
unsigned int iix = ix;
buf16[i] = (buf16[ix] + buf16[++iix]) / 2;
}
}
@ -370,6 +370,7 @@ VorbisLibEncoder :: write ( const void * buf,
}
delete[] shortBuffer;
vorbisBlocksOut();
return processed;
@ -411,7 +412,7 @@ VorbisLibEncoder :: vorbisBlocksOut ( void ) throw ( Exception )
ogg_stream_packetin( &oggStreamState, &oggPacket);
while ( ogg_stream_pageout( &oggStreamState, &oggPage) ) {
int written;
int written = 0;
written = getSink()->write(oggPage.header, oggPage.header_len);
written += getSink()->write( oggPage.body, oggPage.body_len);

View File

@ -51,7 +51,7 @@
#include "Exception.h"
#include "Reporter.h"
#include "AudioEncoder.h"
#include "CastSink.h"
#include "Sink.h"
#ifdef HAVE_SRC_LIB
#include <samplerate.h>
#else
@ -199,7 +199,7 @@ class VorbisLibEncoder : public AudioEncoder, public virtual Reporter
* @exception Exception
*/
inline
VorbisLibEncoder ( CastSink * sink,
VorbisLibEncoder ( Sink * sink,
unsigned int inSampleRate,
unsigned int inBitsPerSample,
unsigned int inChannel,
@ -244,7 +244,7 @@ class VorbisLibEncoder : public AudioEncoder, public virtual Reporter
* @exception Exception
*/
inline
VorbisLibEncoder ( CastSink * sink,
VorbisLibEncoder ( Sink * sink,
const AudioSource * as,
BitrateMode outBitrateMode,
unsigned int outBitrate,

View File

@ -114,7 +114,7 @@ aacPlusEncoder :: open ( void )
#else
converter->initialize( resampleRatio, getInChannel());
//needed 2x(converted input samples) to handle offsets
int outCount = 2 * getInChannel() * (inputSamples + 1);
int outCount = 2 * getInChannel() * (inputSamples + 1);
if (resampleRatio > 1)
outCount = (int) (outCount * resampleRatio);
resampledOffset = new short int[outCount];
@ -152,7 +152,6 @@ aacPlusEncoder :: write ( const void * buf,
int processedSamples = 0;
if ( converter ) {
unsigned int converted;
#ifdef HAVE_SRC_LIB
@ -178,25 +177,29 @@ aacPlusEncoder :: write ( const void * buf,
// encode samples (if enough)
while(resampledOffsetSize - processedSamples >= inputSamples/channels) {
int outputBytes;
#ifdef HAVE_SRC_LIB
short *shortData = new short[inputSamples];
src_float_to_short_array(resampledOffset + (processedSamples * channels),
shortData, inputSamples) ;
outputBytes = aacplusEncEncode(encoderHandle,
int outputBytes = aacplusEncEncode(encoderHandle,
(int32_t*) shortData,
inputSamples,
aacplusBuf,
maxOutputBytes);
delete [] shortData;
#else
outputBytes = aacplusEncEncode(encoderHandle,
int outputBytes = aacplusEncEncode(encoderHandle,
(int32_t*) &resampledOffset[processedSamples*channels],
inputSamples,
aacplusBuf,
maxOutputBytes);
#endif
getSink()->write(aacplusBuf, outputBytes);
unsigned int wrote = getSink()->write(aacplusBuf, outputBytes);
if (wrote < outputBytes) {
reportEvent(3, "aacPlusEncoder :: write, couldn't write full data to underlying sink");
}
processedSamples+=inputSamples/channels;
}
@ -214,18 +217,22 @@ aacPlusEncoder :: write ( const void * buf,
}
} else {
while (processedSamples < samples) {
int outputBytes;
int inSamples = samples - processedSamples < (int) inputSamples
? samples - processedSamples
: inputSamples;
outputBytes = aacplusEncEncode(encoderHandle,
int outputBytes = aacplusEncEncode(encoderHandle,
(int32_t*) (b + processedSamples/sampleSize),
inSamples,
aacplusBuf,
maxOutputBytes);
getSink()->write(aacplusBuf, outputBytes);
unsigned int wrote = getSink()->write(aacplusBuf, outputBytes);
if (wrote < outputBytes) {
reportEvent(3, "aacPlusEncoder :: write, couldn't write full data to underlying sink");
}
processedSamples += inSamples;
}
}