From 710d2c26c4f9518d2f382673bf8b1829b9bdac3d Mon Sep 17 00:00:00 2001 From: "rafael@riseup.net" Date: Wed, 26 Feb 2014 19:48:19 +0000 Subject: [PATCH] Server connection related fixes. --- darkice/trunk/ChangeLog | 4 ++ darkice/trunk/src/AlsaDspSource.cpp | 2 +- darkice/trunk/src/BufferedSink.cpp | 85 ++++++++++++++++++++++---- darkice/trunk/src/BufferedSink.h | 30 +++++++-- darkice/trunk/src/DarkIce.cpp | 56 ++++++++--------- darkice/trunk/src/OpusLibEncoder.h | 6 +- darkice/trunk/src/TcpSocket.cpp | 21 ++++--- darkice/trunk/src/VorbisLibEncoder.cpp | 7 ++- darkice/trunk/src/VorbisLibEncoder.h | 6 +- darkice/trunk/src/aacPlusEncoder.cpp | 27 +++++--- 10 files changed, 169 insertions(+), 75 deletions(-) diff --git a/darkice/trunk/ChangeLog b/darkice/trunk/ChangeLog index 7b2bbc1..b7b7d40 100644 --- a/darkice/trunk/ChangeLog +++ b/darkice/trunk/ChangeLog @@ -1,3 +1,7 @@ +trunk: + o Bugs related to streaming to remote servers fixed. Patch by Kalle Kulonen + and Mark Turner . + 15-07-2013 Darkice 1.2 released o Issue #75: Added Ogg/Opus support. Patch by Doug Kelly dougk.ff7@gmail.com diff --git a/darkice/trunk/src/AlsaDspSource.cpp b/darkice/trunk/src/AlsaDspSource.cpp index bb5407f..30bef73 100644 --- a/darkice/trunk/src/AlsaDspSource.cpp +++ b/darkice/trunk/src/AlsaDspSource.cpp @@ -255,7 +255,7 @@ AlsaDspSource :: read ( void * buf, // Check for buffer overrun if (ret == -EPIPE) { - reportEvent(1, "Buffer overrun!"); + reportEvent(1, "AlsaDspSource :: Buffer overrun!"); snd_pcm_prepare(captureHandle); ret = -EAGAIN; } diff --git a/darkice/trunk/src/BufferedSink.cpp b/darkice/trunk/src/BufferedSink.cpp index 1794189..460baed 100644 --- a/darkice/trunk/src/BufferedSink.cpp +++ b/darkice/trunk/src/BufferedSink.cpp @@ -99,6 +99,8 @@ BufferedSink :: init ( Sink * sink, this->bufferEnd = buffer + bufferSize; this->inp = buffer; this->outp = buffer; + this->bOpen = true; + this->openAttempts = 0; } @@ -112,6 +114,8 @@ BufferedSink :: BufferedSink ( const BufferedSink & buffer ) this->peak = buffer.peak; this->misalignment = buffer.misalignment; + this->bOpen = buffer.bOpen; + this->openAttempts = buffer.openAttempts; memcpy( this->buffer, buffer.buffer, this->bufferSize); } @@ -145,6 +149,8 @@ BufferedSink :: operator= ( const BufferedSink & buffer ) this->peak = buffer.peak; this->misalignment = buffer.misalignment; + this->bOpen = buffer.bOpen; + this->openAttempts = buffer.openAttempts; memcpy( this->buffer, buffer.buffer, this->bufferSize); } @@ -177,6 +183,16 @@ BufferedSink :: store ( const void * buffer, return 0; } + unsigned int remaining = this->bufferSize - ( outp <= inp ? inp - outp : + (bufferEnd - outp) + (inp - this->buffer) ); + + // react only to the first overrun whenever there is a series of overruns + if ( remaining + chunkSize <= bufferSize && remaining > chunkSize ) { + reportEvent(3,"BufferedSink :: store, buffer overrun"); + throw Exception( __FILE__, __LINE__, + "buffer overrun"); + } + oldInp = inp; buf = (const unsigned char *) buffer; @@ -257,8 +273,8 @@ unsigned int BufferedSink :: write ( const void * buf, unsigned int len ) throw ( Exception ) { - unsigned int length; - unsigned int soFar; + unsigned int length = 0; + unsigned int soFar = 0; unsigned char * b = (unsigned char *) buf; if ( !buf ) { @@ -272,6 +288,28 @@ BufferedSink :: write ( const void * buf, if ( !align() ) { return 0; } + + if ( !sink->isOpen() && openAttempts < 10 ) { + // try to reopen underlying sink, because it has closed on its own + openAttempts++; + try { + if( sink->open() ) { + // if reopening succeeds, reset open attempts + openAttempts = 0; + } + } catch ( Exception &e ) { + reportEvent( 4,"BufferedSink :: write,", + "couldn't reopen underlying sink, attempt", + openAttempts, "/ 10" ); + } + + if( openAttempts == 10 ) { + // all the attempts have been used, give up + close(); + throw Exception( __FILE__, __LINE__, + "reopen failed"); + } + } // make it a multiple of chunkSize len -= len % chunkSize; @@ -286,12 +324,24 @@ BufferedSink :: write ( const void * buf, // try to write the outp -> bufferEnd // the rest will be written in the next if - size = bufferEnd - outp - 1; + size = bufferEnd - outp; size -= size % chunkSize; + if( size > len * 2 ) { + // do not try to send the content of the entire buffer at once, + // but limit sending to a multiple of len + // this prevents a surge of data to underlying buffer + // which is important especially during a lot of packet loss + size = len * 2; + } soFar = 0; while ( outp > inp && soFar < size && sink->canWrite( 0, 0) ) { - length = sink->write( outp + soFar, size - soFar); + try { + length = sink->write( outp + soFar, size - soFar); + } catch (Exception &e) { + length = 0; + reportEvent(3,"Exception caught in BufferedSink :: write1"); + } outp = slidePointer( outp, length); soFar += length; } @@ -305,10 +355,19 @@ BufferedSink :: write ( const void * buf, // this part will write the rest size = inp - outp; + if( size > len * 2 ) { + // prevent a surge of data to underlying buffer + size = len * 2; + } soFar = 0; while ( soFar < size && sink->canWrite( 0, 0) ) { - length = sink->write( outp + soFar, size - soFar); + try { + length = sink->write( outp + soFar, size - soFar); + } catch (Exception &e) { + length = 0; + reportEvent(3,"Exception caught in BufferedSink :: write2" ); + } outp = slidePointer( outp, length); soFar += length; } @@ -332,13 +391,12 @@ BufferedSink :: write ( const void * buf, soFar = 0; if ( inp == outp ) { while ( soFar < len && sink->canWrite( 0, 0) ) { - try { - soFar += sink->write( b + soFar, len - soFar); - } catch (Exception &e) { - reportEvent(3,"Exception caught in BufferedSink :: write3\n"); - throw; /* up a level */ - } - } + try { + soFar += sink->write( b + soFar, len - soFar); + } catch (Exception &e) { + reportEvent(3,"Exception caught in BufferedSink :: write3"); + } + } } length = soFar; @@ -351,6 +409,8 @@ BufferedSink :: write ( const void * buf, store( b + length, len - length); } + updatePeak(); + // tell them we ate everything up to chunkSize alignment return len; } @@ -369,5 +429,6 @@ BufferedSink :: close ( void ) throw ( Exception ) flush(); sink->close(); inp = outp = buffer; + bOpen = false; } diff --git a/darkice/trunk/src/BufferedSink.h b/darkice/trunk/src/BufferedSink.h index 53ff4a0..2445c38 100644 --- a/darkice/trunk/src/BufferedSink.h +++ b/darkice/trunk/src/BufferedSink.h @@ -109,6 +109,17 @@ class BufferedSink : public Sink, public virtual Reporter * The underlying Sink. */ Ref sink; + + /** + * Is BufferedSink open. + */ + bool bOpen; + + /** + * Number of attempts so far to open underlying sink after it has + * closed on its own. + */ + unsigned int openAttempts; /** * Initialize the object. @@ -163,10 +174,17 @@ class BufferedSink : public Sink, public virtual Reporter unsigned int u; u = outp <= inp ? inp - outp : (bufferEnd - outp) + (inp - buffer); - if ( peak < u ) { + + // report new peaks if it is either significantly more severe than + // the previously reported peak + if ( peak * 2 < u ) { peak = u; - reportEvent( 4, "BufferedSink, new peak:", peak); - reportEvent( 4, "BufferedSink, remaining:", bufferSize - peak); + reportEvent( 4, "BufferedSink, new peak:", peak, " / ", bufferSize); + } + + if ( peak > 0 && u == 0 ) { + peak = 0; + reportEvent( 4, "BufferedSink, healed:", peak, " / ", bufferSize); } } @@ -306,7 +324,9 @@ class BufferedSink : public Sink, public virtual Reporter inline virtual bool open ( void ) throw ( Exception ) { - return sink->open(); + bOpen = sink->open(); + openAttempts = 0; + return bOpen; } /** @@ -317,7 +337,7 @@ class BufferedSink : public Sink, public virtual Reporter inline virtual bool isOpen ( void ) const throw () { - return sink->isOpen(); + return bOpen; } /** diff --git a/darkice/trunk/src/DarkIce.cpp b/darkice/trunk/src/DarkIce.cpp index 9b09a98..04cb145 100644 --- a/darkice/trunk/src/DarkIce.cpp +++ b/darkice/trunk/src/DarkIce.cpp @@ -262,7 +262,7 @@ DarkIce :: configIceCast ( const Config & config, FileSink * localDumpFile = 0; bool fileAddDate = false; const char * fileDateFormat = 0; - AudioEncoder * encoder = 0; + BufferedSink * audioOut = 0; int bufferSize = 0; str = cs->get( "sampleRate"); @@ -384,10 +384,14 @@ DarkIce :: configIceCast ( const Config & config, "unsupported stream format: ", str); } + + // augment audio outs with a buffer when used from encoder + audioOut = new BufferedSink( audioOuts[u].server.get(), + bufferSize, 1); #ifdef HAVE_LAME_LIB if ( Util::strEq( str, "mp3") ) { - encoder = new LameLibEncoder( audioOuts[u].server.get(), + audioOuts[u].encoder = new LameLibEncoder( audioOut, dsp.get(), bitrateMode, bitrate, @@ -400,8 +404,8 @@ DarkIce :: configIceCast ( const Config & config, #endif #ifdef HAVE_TWOLAME_LIB if ( Util::strEq( str, "mp2") ) { - encoder = new TwoLameLibEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new TwoLameLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -410,7 +414,6 @@ DarkIce :: configIceCast ( const Config & config, } #endif - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); encConnector->attach( audioOuts[u].encoder.get()); #endif // HAVE_LAME_LIB || HAVE_TWOLAME_LIB } @@ -467,7 +470,7 @@ DarkIce :: configIceCast2 ( const Config & config, FileSink * localDumpFile = 0; bool fileAddDate = false; const char * fileDateFormat = 0; - AudioEncoder * encoder = 0; + BufferedSink * audioOut = 0; int bufferSize = 0; str = cs->getForSure( "format", " missing in section ", stream); @@ -597,6 +600,9 @@ DarkIce :: configIceCast2 ( const Config & config, isPublic, localDumpFile); + audioOut = new BufferedSink( audioOuts[u].server.get(), + bufferSize, 1); + switch ( format ) { case IceCast2::mp3: #ifndef HAVE_LAME_LIB @@ -605,8 +611,8 @@ DarkIce :: configIceCast2 ( const Config & config, "thus can't create mp3 stream: ", stream); #else - encoder = new LameLibEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new LameLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -616,8 +622,6 @@ DarkIce :: configIceCast2 ( const Config & config, lowpass, highpass ); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); - #endif // HAVE_LAME_LIB break; @@ -630,8 +634,8 @@ DarkIce :: configIceCast2 ( const Config & config, stream); #else - encoder = new VorbisLibEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new VorbisLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -640,7 +644,6 @@ DarkIce :: configIceCast2 ( const Config & config, dsp->getChannel(), maxBitrate); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); #endif // HAVE_VORBIS_LIB break; @@ -652,8 +655,8 @@ DarkIce :: configIceCast2 ( const Config & config, stream); #else - encoder = new OpusLibEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new OpusLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -662,7 +665,6 @@ DarkIce :: configIceCast2 ( const Config & config, dsp->getChannel(), maxBitrate); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getBitsPerSample() / 8); #endif // HAVE_OPUS_LIB break; @@ -673,15 +675,14 @@ DarkIce :: configIceCast2 ( const Config & config, "thus can't create mp2 stream: ", stream); #else - encoder = new TwoLameLibEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new TwoLameLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, sampleRate, channel ); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); #endif // HAVE_TWOLAME_LIB break; @@ -693,8 +694,8 @@ DarkIce :: configIceCast2 ( const Config & config, "thus can't aac stream: ", stream); #else - encoder = new FaacEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new FaacEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -702,7 +703,6 @@ DarkIce :: configIceCast2 ( const Config & config, sampleRate, dsp->getChannel()); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); #endif // HAVE_FAAC_LIB break; @@ -713,8 +713,8 @@ DarkIce :: configIceCast2 ( const Config & config, "thus can't aacp stream: ", stream); #else - encoder = new aacPlusEncoder( - audioOuts[u].server.get(), + audioOuts[u].encoder = new aacPlusEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -722,7 +722,6 @@ DarkIce :: configIceCast2 ( const Config & config, sampleRate, channel ); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); #endif // HAVE_AACPLUS_LIB break; @@ -793,7 +792,7 @@ DarkIce :: configShoutCast ( const Config & config, FileSink * localDumpFile = 0; bool fileAddDate = false; const char * fileDateFormat = 0; - AudioEncoder * encoder = 0; + BufferedSink * audioOut = 0; int bufferSize = 0; str = cs->get( "sampleRate"); @@ -908,7 +907,9 @@ DarkIce :: configShoutCast ( const Config & config, localDumpFile); - encoder = new LameLibEncoder( audioOuts[u].server.get(), + audioOut = new BufferedSink(audioOuts[u].socket.get(), bufferSize, 1); + audioOuts[u].encoder = new LameLibEncoder( + audioOut, dsp.get(), bitrateMode, bitrate, @@ -917,7 +918,6 @@ DarkIce :: configShoutCast ( const Config & config, channel, lowpass, highpass ); - audioOuts[u].encoder = new BufferedSink(encoder, bufferSize, dsp->getSampleSize()); encConnector->attach( audioOuts[u].encoder.get()); #endif // HAVE_LAME_LIB diff --git a/darkice/trunk/src/OpusLibEncoder.h b/darkice/trunk/src/OpusLibEncoder.h index b2a6904..276bce0 100644 --- a/darkice/trunk/src/OpusLibEncoder.h +++ b/darkice/trunk/src/OpusLibEncoder.h @@ -54,7 +54,7 @@ #include "Exception.h" #include "Reporter.h" #include "AudioEncoder.h" -#include "CastSink.h" +#include "Sink.h" #ifdef HAVE_SRC_LIB #include #else @@ -313,7 +313,7 @@ class OpusLibEncoder : public AudioEncoder, public virtual Reporter * @exception Exception */ inline - OpusLibEncoder ( CastSink * sink, + OpusLibEncoder ( Sink * sink, unsigned int inSampleRate, unsigned int inBitsPerSample, unsigned int inChannel, @@ -358,7 +358,7 @@ class OpusLibEncoder : public AudioEncoder, public virtual Reporter * @exception Exception */ inline - OpusLibEncoder ( CastSink * sink, + OpusLibEncoder ( Sink * sink, const AudioSource * as, BitrateMode outBitrateMode, unsigned int outBitrate, diff --git a/darkice/trunk/src/TcpSocket.cpp b/darkice/trunk/src/TcpSocket.cpp index f065f36..7c15176 100644 --- a/darkice/trunk/src/TcpSocket.cpp +++ b/darkice/trunk/src/TcpSocket.cpp @@ -318,9 +318,9 @@ TcpSocket :: read ( void * buf, break; default: - ::close( sockfd); - sockfd = 0; - throw Exception( __FILE__, __LINE__, "recv error", errno); + ::close( sockfd); + sockfd = 0; + throw Exception( __FILE__, __LINE__, "recv error", errno); } } @@ -329,7 +329,7 @@ TcpSocket :: read ( void * buf, /*------------------------------------------------------------------------------ - * Check wether read() would return anything + * Check wether write() would send anything *----------------------------------------------------------------------------*/ bool TcpSocket :: canWrite ( unsigned int sec, @@ -357,11 +357,11 @@ TcpSocket :: canWrite ( unsigned int sec, ret = pselect( sockfd + 1, NULL, &fdset, NULL, ×pec, &sigset); if ( ret == -1 ) { - ::close( sockfd); - sockfd = 0; - throw Exception( __FILE__, __LINE__, "select error"); + ::close( sockfd); + sockfd = 0; + reportEvent(4,"TcpSocket :: canWrite, connection lost", errno); } - + return ret > 0; } @@ -389,8 +389,9 @@ TcpSocket :: write ( const void * buf, if ( errno == EAGAIN ) { ret = 0; } else { - ::close( sockfd); - sockfd = 0; + ::close( sockfd); + sockfd = 0; + reportEvent(4,"TcpSocket :: write, send error", errno); throw Exception( __FILE__, __LINE__, "send error", errno); } } diff --git a/darkice/trunk/src/VorbisLibEncoder.cpp b/darkice/trunk/src/VorbisLibEncoder.cpp index e4a7456..3b6b851 100644 --- a/darkice/trunk/src/VorbisLibEncoder.cpp +++ b/darkice/trunk/src/VorbisLibEncoder.cpp @@ -300,13 +300,13 @@ VorbisLibEncoder :: write ( const void * buf, if ( bitsPerSample == 8 ) { char * buf8 = (char *) buf; unsigned int ix = sampleSize * i; - unsigned int iix = ix; + unsigned int iix = ix; buf8[i] = (buf8[ix] + buf8[++iix]) / 2; } if ( bitsPerSample == 16 ) { short * buf16 = (short *) buf; unsigned int ix = (bitsPerSample >> 3) * i; - unsigned int iix = ix; + unsigned int iix = ix; buf16[i] = (buf16[ix] + buf16[++iix]) / 2; } } @@ -370,6 +370,7 @@ VorbisLibEncoder :: write ( const void * buf, } delete[] shortBuffer; + vorbisBlocksOut(); return processed; @@ -411,7 +412,7 @@ VorbisLibEncoder :: vorbisBlocksOut ( void ) throw ( Exception ) ogg_stream_packetin( &oggStreamState, &oggPacket); while ( ogg_stream_pageout( &oggStreamState, &oggPage) ) { - int written; + int written = 0; written = getSink()->write(oggPage.header, oggPage.header_len); written += getSink()->write( oggPage.body, oggPage.body_len); diff --git a/darkice/trunk/src/VorbisLibEncoder.h b/darkice/trunk/src/VorbisLibEncoder.h index bf18ee8..05e90b1 100644 --- a/darkice/trunk/src/VorbisLibEncoder.h +++ b/darkice/trunk/src/VorbisLibEncoder.h @@ -51,7 +51,7 @@ #include "Exception.h" #include "Reporter.h" #include "AudioEncoder.h" -#include "CastSink.h" +#include "Sink.h" #ifdef HAVE_SRC_LIB #include #else @@ -199,7 +199,7 @@ class VorbisLibEncoder : public AudioEncoder, public virtual Reporter * @exception Exception */ inline - VorbisLibEncoder ( CastSink * sink, + VorbisLibEncoder ( Sink * sink, unsigned int inSampleRate, unsigned int inBitsPerSample, unsigned int inChannel, @@ -244,7 +244,7 @@ class VorbisLibEncoder : public AudioEncoder, public virtual Reporter * @exception Exception */ inline - VorbisLibEncoder ( CastSink * sink, + VorbisLibEncoder ( Sink * sink, const AudioSource * as, BitrateMode outBitrateMode, unsigned int outBitrate, diff --git a/darkice/trunk/src/aacPlusEncoder.cpp b/darkice/trunk/src/aacPlusEncoder.cpp index d4e7a5b..b3c36cd 100644 --- a/darkice/trunk/src/aacPlusEncoder.cpp +++ b/darkice/trunk/src/aacPlusEncoder.cpp @@ -114,7 +114,7 @@ aacPlusEncoder :: open ( void ) #else converter->initialize( resampleRatio, getInChannel()); //needed 2x(converted input samples) to handle offsets - int outCount = 2 * getInChannel() * (inputSamples + 1); + int outCount = 2 * getInChannel() * (inputSamples + 1); if (resampleRatio > 1) outCount = (int) (outCount * resampleRatio); resampledOffset = new short int[outCount]; @@ -152,7 +152,6 @@ aacPlusEncoder :: write ( const void * buf, int processedSamples = 0; - if ( converter ) { unsigned int converted; #ifdef HAVE_SRC_LIB @@ -178,25 +177,29 @@ aacPlusEncoder :: write ( const void * buf, // encode samples (if enough) while(resampledOffsetSize - processedSamples >= inputSamples/channels) { - int outputBytes; #ifdef HAVE_SRC_LIB short *shortData = new short[inputSamples]; src_float_to_short_array(resampledOffset + (processedSamples * channels), shortData, inputSamples) ; - outputBytes = aacplusEncEncode(encoderHandle, + int outputBytes = aacplusEncEncode(encoderHandle, (int32_t*) shortData, inputSamples, aacplusBuf, maxOutputBytes); delete [] shortData; #else - outputBytes = aacplusEncEncode(encoderHandle, + int outputBytes = aacplusEncEncode(encoderHandle, (int32_t*) &resampledOffset[processedSamples*channels], inputSamples, aacplusBuf, maxOutputBytes); #endif - getSink()->write(aacplusBuf, outputBytes); + unsigned int wrote = getSink()->write(aacplusBuf, outputBytes); + + if (wrote < outputBytes) { + reportEvent(3, "aacPlusEncoder :: write, couldn't write full data to underlying sink"); + } + processedSamples+=inputSamples/channels; } @@ -214,18 +217,22 @@ aacPlusEncoder :: write ( const void * buf, } } else { while (processedSamples < samples) { - int outputBytes; int inSamples = samples - processedSamples < (int) inputSamples ? samples - processedSamples : inputSamples; - outputBytes = aacplusEncEncode(encoderHandle, + int outputBytes = aacplusEncEncode(encoderHandle, (int32_t*) (b + processedSamples/sampleSize), inSamples, aacplusBuf, maxOutputBytes); - getSink()->write(aacplusBuf, outputBytes); - + + unsigned int wrote = getSink()->write(aacplusBuf, outputBytes); + + if (wrote < outputBytes) { + reportEvent(3, "aacPlusEncoder :: write, couldn't write full data to underlying sink"); + } + processedSamples += inSamples; } }