071d5320c747 — Chris Cannam 7 months ago
Formalise, fix, and test the read-while-writing
M bqaudiostream/AudioReadStream.h +27 -0
@@ 150,6 150,31 @@ public:
      * empty string otherwise.
      */
     virtual std::string getArtistName() const = 0;
+
+    /**
+     * Return true if this reader has explicit support for synchronous
+     * incremental reading of its file (i.e. reading while the audio
+     * file is still being written without returning early on
+     * EOF). Few readers do, and it may depend on the file as well as
+     * the reader.
+     *
+     * If so, then setIncrementalTimeouts can be used to tell it that
+     * when the end of file is reached, if the file has apparently not
+     * been finalised, it should wait a certain amount of time for
+     * more data rather than giving up directly.
+     */
+    virtual bool hasIncrementalSupport() const;
+
+    /**
+     * Set timeouts for incremental reading. If hasIncrementalSupport
+     * returns true and retryTimeoutMs is greater than zero, then if
+     * EOF is reached during a read and the file has not yet
+     * detectably been finalised by its writer, the reader will wait
+     * retryTimeoutMs milliseconds and try again. The totalTimeoutMs
+     * value, which will usually be larger, places a limit on the
+     * total duration of retries. Both are zero by default.
+     */
+    void setIncrementalTimeouts(int retryTimeoutMs, int totalTimeoutMs);
     
 protected:
     AudioReadStream();

          
@@ 159,6 184,8 @@ protected:
     size_t m_sampleRate;
     size_t m_estimatedFrameCount;
     bool m_seekable;
+    int m_retryTimeoutMs;
+    int m_totalTimeoutMs;
 
 private:
     int getResampledChunk(int count, float *frames);

          
M src/AudioReadStream.cpp +15 -0
@@ 48,6 48,8 @@ AudioReadStream::AudioReadStream() :
     m_sampleRate(0),
     m_estimatedFrameCount(0),
     m_seekable(false),
+    m_retryTimeoutMs(0),
+    m_totalTimeoutMs(0),
     m_retrievalRate(0),
     m_totalFileFrames(0),
     m_totalRetrievedFrames(0),

          
@@ 114,6 116,19 @@ AudioReadStream::seek(size_t frame)
     return performSeek(frame);
 }
 
+bool
+AudioReadStream::hasIncrementalSupport() const
+{
+    return false;
+}
+
+void
+AudioReadStream::setIncrementalTimeouts(int retryTimeoutMs, int totalTimeoutMs)
+{
+    m_retryTimeoutMs = retryTimeoutMs;
+    m_totalTimeoutMs = totalTimeoutMs;
+}
+
 size_t
 AudioReadStream::getInterleavedFrames(size_t count, float *frames)
 {

          
M src/AudioReadStreamFactory.cpp +9 -6
@@ 134,15 134,18 @@ AudioReadStreamFactory::getFileFilter()
 // builders are guaranteed to be registered in lexical order. So we
 // should put the desirable readers first and the iffy ones after.
 
+// SimpleWavFileReadStream reads most WAV files. It's much more
+// limited than the libsndfile-based WavFileReadStream, but it doesn't
+// lack any features we actually use, and it includes optional
+// incremental reading (read-during-write) which the other doesn't, so
+// we have it first. One of these two must also come before the other
+// general platform frameworks because we don't currently have seek
+// support in those
+#include "SimpleWavFileReadStream.cpp"
+
 // WavFileReadStream uses libsndfile, which is mostly trustworthy
 #include "WavFileReadStream.cpp"
 
-// SimpleWavFileReadStream reads most WAV files. The dedicated
-// WavFileReadStream using libsndfile is better and goes first, but
-// this must come before the other general platform libraries because
-// we don't currently have seek support in those
-#include "SimpleWavFileReadStream.cpp"
-
 // OggVorbisReadStream uses the official libraries, which ought to be good
 #include "OggVorbisReadStream.cpp"
 

          
M src/SimpleWavFileReadStream.cpp +27 -13
@@ 39,7 39,7 @@ 
 #include <chrono>
 #include <thread>
 
-#define DEBUG_SIMPLE_WAV_FILE_READ_STREAM 1
+//#define DEBUG_SIMPLE_WAV_FILE_READ_STREAM 1
 
 namespace breakfastquay
 {

          
@@ 155,7 155,6 @@ SimpleWavFileReadStream::readHeader()
     m_channelCount = channels;
     m_sampleRate = sampleRate;
     m_bitDepth = bitsPerSample;
-    m_seekable = true;
 
     // we don't use
     (void)byteRate;

          
@@ 173,6 172,16 @@ SimpleWavFileReadStream::readHeader()
     } else {
         m_estimatedFrameCount = 0;
     }
+
+    // Mark as seekable only if we have a known duration. This is
+    // largely to honour the guarantee in getEstimatedFrameCount that
+    // the returned value will be a true value if the stream is
+    // seekable. But it's also a bit iffy with files that are still
+    // being written, to have seek success depend on whether the
+    // target frame has been written yet. (Nonetheless we do support
+    // that case in the actual seek implementation)
+    m_seekable = (m_estimatedFrameCount > 0);
+
     m_dataReadOffset = 0;
     m_dataReadStart = m_file->tellg();
 }

          
@@ 291,7 300,7 @@ SimpleWavFileReadStream::performSeek(siz
         return false;
     }
     
-    std::ifstream::pos_type actual = m_file->tellg();
+    std::streampos actual = m_file->tellg();
     // (In fact I think tellg() always reports whatever you passed to seekg())
     if (actual != std::ifstream::pos_type(target)) {
 #ifdef DEBUG_SIMPLE_WAV_FILE_READ_STREAM

          
@@ 328,12 337,18 @@ SimpleWavFileReadStream::shouldRetry(int
 #ifdef DEBUG_SIMPLE_WAV_FILE_READ_STREAM
     std::cerr << "SimpleWavFileReadStream::shouldRetry: m_dataReadOffset = "
               << m_dataReadOffset << ", m_dataChunkSize = " << m_dataChunkSize
-              << ", justReadBytes = " << justReadBytes << std::endl;
+              << ", justReadBytes = " << justReadBytes
+              << ", m_retryTimeoutMs = " << m_retryTimeoutMs
+              << ", m_totalTimeoutMs = " << m_totalTimeoutMs
+              << std::endl;
 #endif
 
     if (m_dataChunkSize > 0) {
         return false;
     }
+    if (m_retryTimeoutMs == 0 || m_totalTimeoutMs == 0) {
+        return false;
+    }
     if (m_file->bad()) {
 #ifdef DEBUG_SIMPLE_WAV_FILE_READ_STREAM
         std::cerr << "SimpleWavFileReadStream::shouldRetry: file is bad"

          
@@ 342,11 357,7 @@ SimpleWavFileReadStream::shouldRetry(int
         return false;
     }
 
-    int retryTimeMs = 50;
-    int totalTimeoutMs = 1000;
-    int permittedRetryCount = totalTimeoutMs / retryTimeMs;
-    
-    uint32_t location = m_file->tellg();
+    int permittedRetryCount = m_totalTimeoutMs / m_retryTimeoutMs;
 
     if (m_file->eof()) {
         if (m_retryCount > permittedRetryCount) {

          
@@ 355,20 366,23 @@ SimpleWavFileReadStream::shouldRetry(int
 #endif
             return false;
         }
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::this_thread::sleep_for(std::chrono::milliseconds(m_retryTimeoutMs));
         m_file->clear();
+        std::streampos location = m_file->tellg();
         m_file->seekg(m_dataChunkOffset, std::ios::beg);
         m_dataChunkSize = readExpectedChunkSize("data");
-        m_file->seekg(location - justReadBytes, std::ios::beg);
+        std::streamoff target = location - std::streamoff(justReadBytes);
+        m_file->seekg(target, std::ios::beg);
         if (m_file->fail()) {
 #ifdef DEBUG_SIMPLE_WAV_FILE_READ_STREAM
             std::cerr << "SimpleWavFileReadStream::shouldRetry: seek to "
-                      << location - justReadBytes << " failed" << std::endl;
+                      << target << " failed" << std::endl;
 #endif
             return false;
         }
 #ifdef DEBUG_SIMPLE_WAV_FILE_READ_STREAM
-        std::cerr << "SimpleWavFileReadStream::shouldRetry: returning true" << std::endl;
+        std::cerr << "SimpleWavFileReadStream::shouldRetry: re-seek to "
+                  << target << " succeeded, returning true" << std::endl;
 #endif
         ++m_retryCount;
         return true;

          
M src/SimpleWavFileReadStream.h +2 -0
@@ 60,6 60,8 @@ public:
 
     virtual std::string getError() const { return m_error; }
 
+    virtual bool hasIncrementalSupport() const { return true; }
+    
 protected:
     virtual size_t getFrames(size_t count, float *frames);
     virtual bool performSeek(size_t frame);

          
M src/WavFileWriteStream.cpp +2 -0
@@ 104,6 104,8 @@ WavFileWriteStream::putInterleavedFrames
     if (written != sf_count_t(count)) {
         throw FileOperationFailed(getPath(), "write sf data");
     }
+
+    sf_write_sync(m_file);
 }
 
 }

          
M test/TestWavReadWhileWriting.h +61 -28
@@ 7,8 7,13 @@ 
 #include <QObject>
 #include <QtTest>
 
-#include "bqaudiostream/../src/SimpleWavFileReadStream.h"
-#include "bqaudiostream/../src/SimpleWavFileWriteStream.h"
+#include "bqaudiostream/AudioReadStreamFactory.h"
+#include "bqaudiostream/AudioReadStream.h"
+#include "bqaudiostream/AudioWriteStreamFactory.h"
+#include "bqaudiostream/AudioWriteStream.h"
+
+#include <thread>
+#include <chrono>
 
 namespace breakfastquay {
 

          
@@ 47,42 52,24 @@ class TestWavReadWhileWriting : public Q
     }
     
 private slots:
-    void readWhileWriting() {
+    void readWhileWritingNoWait() {
 
         int bs = 1024;
         int channels = 2;
+        int rate = 44100;
         std::vector<float> buffer(bs * channels, 0.f);
+        std::string file = "test-audiostream-readwhilewriting.wav";
         
-        AudioWriteStream::Target target("test-audiostream-readwhilewriting.wav",
-                                        channels, 44100);
-
-        SimpleWavFileWriteStream *ws = nullptr;
-        SimpleWavFileReadStream *rs = nullptr;
-
-        try {
-            ws = new SimpleWavFileWriteStream(target);
-        } catch (const std::exception &e) {
-            std::cerr << "readWhileWriting: exception caught when creating write stream: " << e.what() << std::endl;
-            throw;
-        }
+        auto ws = AudioWriteStreamFactory::createWriteStream(file, channels, rate);
         QVERIFY(ws->getError() == std::string());
 
-        try {
-            rs = new SimpleWavFileReadStream(target.getPath());
-        } catch (const std::exception &e) {
-            std::cerr << "readWhileWriting: exception caught when creating read stream: " << e.what() << std::endl;
-            delete ws;
-            throw;
-        }
+        auto rs = AudioReadStreamFactory::createReadStream(file);
         QVERIFY(rs->getError() == std::string());
 
         QCOMPARE(rs->getChannelCount(), size_t(channels));
         QCOMPARE(rs->getSampleRate(), size_t(44100));
-        QVERIFY(rs->isSeekable());
-
-        //!!! Contradicts the documentation, which says "For seekable
-        //!!! streams this is guaranteed to return a true frame count"
-        //!!! - probably this stream should not be seekable?
+        QVERIFY(rs->hasIncrementalSupport());
+        QVERIFY(!rs->isSeekable());
         QCOMPARE(rs->getEstimatedFrameCount(), size_t(0));
 
         QCOMPARE(rs->getInterleavedFrames(bs, buffer.data()), size_t(0));

          
@@ 95,7 82,53 @@ private slots:
 
         QCOMPARE(rs->getInterleavedFrames(bs, buffer.data()), size_t(bs));
 
-        checkBuf(buffer, 0, bs * channels);
+        QVERIFY(checkBuf(buffer, 0, bs * channels));
+        
+        delete rs;
+        delete ws;
+    }
+
+    void readWhileWritingWithWait() {
+
+        int bs = 1024;
+        int channels = 2;
+        int rate = 44100;
+        std::vector<float> readbuf(bs * channels, 0.f);
+        std::vector<float> writebuf(bs * channels, 0.f);
+        std::string file = "test-audiostream-readwhilewriting.wav";
+        
+        auto ws = AudioWriteStreamFactory::createWriteStream(file, channels, rate);
+        QVERIFY(ws->getError() == std::string());
+
+        auto rs = AudioReadStreamFactory::createReadStream(file);
+        QVERIFY(rs->getError() == std::string());
+
+        QCOMPARE(rs->getChannelCount(), size_t(channels));
+        QCOMPARE(rs->getSampleRate(), size_t(44100));
+        QVERIFY(rs->hasIncrementalSupport());
+
+        rs->setIncrementalTimeouts(20, 200);
+
+        initBuf(writebuf, 0, bs * channels);
+        
+        auto writer = [&]() {
+            std::this_thread::sleep_for(std::chrono::milliseconds(150));
+            ws->putInterleavedFrames(bs/2, writebuf.data());
+            std::this_thread::sleep_for(std::chrono::milliseconds(150));
+            ws->putInterleavedFrames(bs/2, writebuf.data() + (bs/2) * channels);
+        };
+        
+        QCOMPARE(rs->getInterleavedFrames(bs, readbuf.data()), size_t(0));
+
+        rs->setIncrementalTimeouts(20, 1000);
+        
+        std::thread writeThread(writer);
+        
+        QCOMPARE(rs->getInterleavedFrames(bs, readbuf.data()), size_t(bs));
+
+        QVERIFY(checkBuf(readbuf, 0, bs * channels));
+
+        writeThread.join();
         
         delete rs;
         delete ws;