5331a22b7d3b — Chris Cannam 2 years ago
Merge from branch thunk-seek-test
4 files changed, 320 insertions(+), 54 deletions(-)

M blockstream-fn.sml
M duration-adapting-fn.sml
M resampling-fn.sml
M test.sml
M blockstream-fn.sml +155 -43
@@ 91,12 91,16 @@ functor BlockStreamFromStatefulFn (S: ST
 
     structure SampleMatrix = S.SampleMatrix
 
+    datatype action =
+             ACT_READ
+           | ACT_SEEK of SampleStreamTypes.seek_mode * RealTime.t
+                                 
     datatype block_stream =
-             BLOCK of RealTime.t * SampleMatrix.matrix * (unit -> block_stream)
-           | END of RealTime.t
+             BLOCK of RealTime.t * SampleMatrix.matrix * (action -> block_stream)
+           | END of RealTime.t * (action -> block_stream)
 
     datatype read_state =
-             READ of RealTime.t * SampleMatrix.matrix * (unit -> block_stream)
+             READ of RealTime.t * SampleMatrix.matrix * (action -> block_stream)
            | UNREAD of S.stream
            | EOS of RealTime.t
 

          
@@ 107,78 111,100 @@ functor BlockStreamFromStatefulFn (S: ST
         channels : int,
         blocksize : int,
         time : RealTime.t,
+        seekable : SampleStreamTypes.seekable,
         bstr : block_stream
     }
 
     type new_args = { blocksize : int } * S.new_args
                       
     fun thunk_for (upstream : S.stream, blocksize : int)
-        : (unit -> block_stream) =
+        : (action -> block_stream) =
         let val rs : read_state ref = ref (UNREAD upstream)
+                                          
+            fun read () =
+                case (! rs) of
+                    READ result => BLOCK result
+                  | UNREAD upstream =>
+                    (case S.read (upstream, blocksize) of
+                         NONE =>
+                         let val t = S.time upstream
+                         in
+                             rs := EOS t;
+                             END (t, thunk_for (upstream, blocksize))
+                         end
+                       | SOME matrix =>
+                         let val result =
+                                 (S.time upstream, matrix,
+                                  thunk_for (upstream, blocksize))
+                         in
+                             rs := READ result;
+                             BLOCK result
+                         end)
+                  | EOS t => END (t, thunk_for (upstream, blocksize))
+
+            fun seek (m, t) =
+                case S.seek (upstream, m, t) of
+                    false => END (t, thunk_for (upstream, blocksize))
+                  | true =>
+                    case thunk_for (upstream, blocksize) ACT_READ of
+                        BLOCK result => (rs := READ result;
+                                         BLOCK result) (* will be discarded *)
+                      | END (t, thunk) => END (t, thunk)
         in
-            fn () =>
-               case (!rs) of
-                   READ result => BLOCK result
-                 | UNREAD upstream =>
-                   (case S.read (upstream, blocksize) of
-                        NONE =>
-                        let val t = S.time upstream
-                        in
-                            rs := EOS t;
-                            END t
-                        end
-                      | SOME matrix =>
-                        let val result =
-                                (S.time upstream, matrix,
-                                 thunk_for (upstream, blocksize))
-                        in
-                            rs := READ result;
-                            BLOCK result
-                        end)
-                 | EOS t => END t
+            fn action => case action of
+                             ACT_READ => read ()
+                           | ACT_SEEK (m, t) => seek (m, t)
         end
 
     fun rate ({ rate, ... } : stream) = rate
     fun channels ({ channels, ... } : stream) = channels
     fun blocksize ({ blocksize, ... } : stream) = blocksize
     fun time ({ time, ... } : stream) = time
-
-    (* Seeking isn't possible - our read-thunk needs to be the only
-       thing that reads on the underlying mutable stream, and it can't
-       allow the stream to be relocated under it.
-
-       If seeking is essential, alternatives may include:
-         - Finding a way that does not use a stateful stream!
-         - Going via a random-access (which is seekable because it
-           caches the whole stream)
-         - Managing seeks at a higher level, discarding and rebuilding
-           the block stream entirely when the underlying stateful
-           stream needs to be relocated
-     *)
-    fun seekable s = SampleStreamTypes.NON_SEEKABLE
+    fun seekable ({ seekable, ... } : stream) = seekable
                                             
-    fun read (s as { rate, channels, blocksize, time, bstr } : stream) =
+    fun read (s as { rate, channels, blocksize, time, seekable, bstr }
+              : stream) =
         case bstr of
-            END t => NONE
+            END _ => NONE
           | BLOCK (time, matrix, thunk) =>
             SOME ({ rate = rate,
                     channels = channels,
                     blocksize = blocksize,
                     time = time,
-                    bstr = thunk ()
+                    seekable = seekable,
+                    bstr = thunk ACT_READ
                   },
                   matrix)
             
     fun foldl f = BlockStreamFolder.makeFoldl (read, f)
 
-    fun seek (s, m, t) = NONE
+    fun seek (s as { rate, channels, blocksize, time, seekable, bstr }
+              : stream, m, t) =
+        case seekable of
+            SampleStreamTypes.NON_SEEKABLE => NONE
+          | _ => 
+            let val thunk = case bstr of END (_, thunk) => thunk
+                                       | BLOCK (_, _, thunk) => thunk
+            in
+                case thunk (ACT_SEEK (m, t)) of
+                    END _ => NONE
+                  | BLOCK (time, matrix, thunk) => 
+                    SOME { rate = rate,
+                           channels = channels,
+                           blocksize = blocksize,
+                           time = time,
+                           seekable = seekable,
+                           bstr = thunk ACT_READ
+                         }
+            end
                                               
     fun wrap ({ blocksize : int }, upstream : S.stream) : stream = {
         rate = S.rate upstream,
         channels = S.channels upstream,
         blocksize = blocksize,
         time = S.time upstream,
-        bstr = thunk_for (upstream, blocksize) ()
+        seekable = S.seekable upstream,
+        bstr = thunk_for (upstream, blocksize) ACT_READ
     }
 
     fun new (args, theirArgs) : stream =

          
@@ 248,6 274,92 @@ end
 structure RealBlockStreamFromBlockReader = BlockStreamFromBlockReaderFn
                                                (RealMatrix)
 
+(** Accept a function that can read and optionally seek through block
+    data on demand, and adapt it into a block stream.
+ *)
+functor BlockStreamFromSeekableBlockReaderFn (M : ORDERED_MATRIX_WITH_ZERO) : sig
+            include BLOCK_STREAM
+            datatype action =
+                     ACT_READ
+                   | ACT_SEEK of SampleStreamTypes.seek_mode * RealTime.t
+            datatype reader =
+                     READER of (action -> reader) * M.matrix option
+        end = struct
+
+    structure SampleMatrix = M
+
+    datatype action =
+             ACT_READ
+           | ACT_SEEK of SampleStreamTypes.seek_mode * RealTime.t
+
+    datatype reader =
+             READER of (action -> reader) * M.matrix option
+
+    open SignalTypes
+                 
+    type stream = {
+        rate : rate,
+        channels : int,
+        blocksize : int,
+        time : RealTime.t,
+        seekable : SampleStreamTypes.seekable,
+        reader : reader
+    }
+
+    type new_args = stream
+
+    fun rate ({ rate, ... } : stream) = rate
+    fun channels ({ channels, ... } : stream) = channels
+    fun blocksize ({ blocksize, ... } : stream) = blocksize
+    fun time ({ time, ... } : stream) = time
+    fun seekable ({ seekable, ... } : stream) = seekable
+                      
+    fun read (s as { rate, channels, blocksize, time, seekable, reader }
+              : stream) =
+        case reader of
+            READER (f, NONE) => NONE
+          | READER (f, SOME m) => 
+            SOME ({ rate = rate,
+                    channels = channels,
+                    blocksize = blocksize,
+                    time = RealTime.+ (time,
+                                       RealTime.fromFrame
+                                           (FRAME (M.columns m), rate)),
+                    seekable = seekable,
+                    reader = f ACT_READ
+                  },
+                  m)
+
+    fun seek (s as { rate, channels, blocksize, time, seekable, reader }
+              : stream,
+              m, t)
+        : stream option =
+        case reader of
+            READER (f, _) =>
+            case f (ACT_SEEK (m, t)) of
+                READER (f, NONE) => NONE
+              | reader =>
+                SOME ({ rate = rate,
+                        channels = channels,
+                        blocksize = blocksize,
+                        time = t,
+                        seekable = seekable,
+                        reader = reader
+                     })
+
+    fun foldl f = BlockStreamFolder.makeFoldl (read, f)
+                                              
+    fun new stream = stream
+                                            
+end
+
+(** Accept a function that can read and optionally seek through
+    real-valued block data on demand, and adapt it into a block
+    stream.
+ *)
+structure RealBlockStreamFromSeekableBlockReader = BlockStreamFromSeekableBlockReaderFn
+                                                       (RealMatrix)
+
 (** Adapt a block stream into a stateless sample stream. 
  *)
 functor SampleStreamFromBlockStreamFn (S: BLOCK_STREAM) :

          
M duration-adapting-fn.sml +8 -3
@@ 38,7 38,8 @@ functor PairConcatenatingStatefulStreamF
         let fun performCutover n =
                 case (! status) of
                     IN_1 => (status := IN_2;
-                             cutover := SOME (FRAME (frameOf (! position) + n))
+                             cutover := SOME (FRAME (frameOf (! position) + n));
+                             SampleStreamLog.info (fn () => ["cutover = %1", SampleStreamLog.I (frameOf (! position) + n)])
                             )
                   | _ => raise Fail "Internal error: performCutover called when already cutover"
             val result = 

          
@@ 97,19 98,23 @@ functor PairConcatenatingStatefulStreamF
             case (! cutover) of
                 SOME (FRAME c) =>
                 if (fr >= c) then
-                    if S2.seek (s2, m, RealTime.fromFrame
-                                           (FRAME (fr - c), S2.rate s2))
+                    if S2.seek (s2, m, RealTime.fromFrame (FRAME (fr - c), rate))
                     then (status := IN_2;
+                          position := FRAME fr;
                           true)
                     else false
                 else
                     if S1.seek (s1, m, t)
                     then (status := IN_1;
+                          S2.seek (s2, SampleStreamTypes.SEEK_COMPLETE,
+                                   RealTime.zeroTime);
+                          position := FRAME fr;
                           true)
                     else false
               | NONE =>
                 if S1.seek (s1, m, t)
                 then (status := IN_1;
+                      position := FRAME fr;
                       true)
                 else let val c = findCutover (! position, s1)
                      in

          
M resampling-fn.sml +11 -8
@@ 29,15 29,14 @@ functor ResamplingStatefulStreamFn (S: R
 
     datatype simple_factor_status = UP of int | DOWN of int | NOT_SIMPLE
 
-    fun simpleFactorStatus ({ rate, upstream, ... } : stream)
+    fun simpleFactorStatus ({ ratio, ... } : stream)
         : simple_factor_status =
         let open SignalTypes
             val eps = 1.0e~8
-            val factor = rateOf rate / rateOf (S.rate upstream)
-            val inverse = 1.0 / factor
+            val inverse = 1.0 / ratio
         in
-            if Real.abs (factor - Real.realRound factor) < eps
-            then UP (round factor)
+            if Real.abs (ratio - Real.realRound ratio) < eps
+            then UP (round ratio)
             else if Real.abs (inverse - Real.realRound inverse) < eps
             then DOWN (round inverse)
             else NOT_SIMPLE

          
@@ 56,9 55,10 @@ functor ResamplingStatefulStreamFn (S: R
        bit early then read and discard some samples in order to
        populate the resampler filter state, so seeking is not
        completely without cost. *)
-    fun seekable (s as { upstream, ... } : stream) =
+    fun seekable (s as { upstream, ratio, ... } : stream) =
         case simpleFactorStatus s of
-            NOT_SIMPLE => SampleStreamTypes.NON_SEEKABLE
+            NOT_SIMPLE => (SampleStreamLog.debug (fn () => ["ResamplingStatefulStreamFn: not seekable with non-simple ratio of %1", SampleStreamLog.R ratio]);
+                           SampleStreamTypes.NON_SEEKABLE)
           | _ => S.seekable upstream
 
     fun readWithoutUsingSpare

          
@@ 178,7 178,10 @@ functor ResamplingStatefulStreamFn (S: R
             if target < 0
             then false
             else if not (Option.isSome resampler)
-            then S.seek (upstream, m, t)
+            then if S.seek (upstream, m, t)
+                 then (reset s;
+                       true)
+                 else false
             else case simpleFactorStatus s of
                      NOT_SIMPLE => false
                    | UP n =>

          
M test.sml +146 -0
@@ 96,6 96,151 @@ structure TestSyntheticStreams : TESTS =
                          
 end
 
+structure TestPadded : TESTS = struct
+
+    type test = string * (unit -> bool)
+
+    val name = "padded"
+
+    structure P = PaddedStatefulStreamFn
+                      (StatefulSampleStreamFromStatelessFn
+                           (StatelessSyntheticStream))
+
+    structure RM = RealMatrix
+    structure RT = RealTime
+
+    open TestSupport
+    open SignalTypes
+    open SampleStreamTestSupport
+
+    fun checkReadMono s expected =
+        let val t = P.time s
+            val v = RealVector.fromList expected
+            val n = RealVector.length v
+            val mopt = P.read (s, n)
+        in
+            checkSome mopt andalso
+            let val m = Option.valOf mopt in
+                intsEqual (RM.rows m, 1) andalso
+                realVectorsEqual (RM.row (m, 0), v) andalso
+                realTimesEqual (P.time s, RT.+ (t, RT.fromFrame
+                                                       (FRAME n, P.rate s)))
+            end
+        end
+
+    val mode = SampleStreamTypes.SEEK_COMPLETE
+            
+    fun tests () = [
+        ("no-padding",
+         fn () =>
+            let val p = P.new ({ padding = 0 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                equal (rateOf (P.rate p), 80.0) andalso
+                realTimesEqual (P.time p, RT.zeroTime) andalso
+                checkReadMono p [ 0.0, 1.0, 2.0, 3.0 ]
+            end
+        ),
+        ("no-padding-seek",
+         fn () =>
+            let val p = P.new ({ padding = 0 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                (case P.seekable p of
+                     SampleStreamTypes.SEEKABLE { startTime, endTime = NONE } =>
+                     true
+                   | _ => false) andalso
+                realTimesEqual (P.time p, RT.zeroTime) andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 5, P.rate p))) andalso
+                checkReadMono p [ 5.0, 6.0, 7.0, 8.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 1, P.rate p))) andalso
+                checkReadMono p [ 1.0, 2.0, 3.0, 4.0 ]
+            end
+        ),
+        ("padding",
+         fn () =>
+            let val p = P.new ({ padding = 4 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                equal (rateOf (P.rate p), 80.0) andalso
+                realTimesEqual (P.time p, RT.zeroTime) andalso
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0 ]
+            end
+        ),
+        ("padding-seek", (* Seeking to beyond cutover before reading at all *)
+         fn () =>
+            let val p = P.new ({ padding = 4 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                (case P.seekable p of
+                     SampleStreamTypes.SEEKABLE { startTime, endTime = NONE } =>
+                     true
+                   | _ => false) andalso
+                realTimesEqual (P.time p, RT.zeroTime) andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 5, P.rate p))) andalso
+                checkReadMono p [ 1.0, 2.0, 3.0, 4.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 1, P.rate p))) andalso
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 4, P.rate p))) andalso
+                checkReadMono p [ 0.0, 1.0, 2.0, 3.0 ]
+            end
+        ),
+        ("padding-seek-2", (* Reading to precisely cutover before seeking at all *)
+         fn () =>
+            let val p = P.new ({ padding = 4 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 5, P.rate p))) andalso
+                checkReadMono p [ 1.0, 2.0, 3.0, 4.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 1, P.rate p))) andalso
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 4, P.rate p))) andalso
+                checkReadMono p [ 0.0, 1.0, 2.0, 3.0 ]
+            end
+        ),
+        ("padding-seek-3", (* Reading to beyond cutover before seeking at all *)
+         fn () =>
+            let val p = P.new ({ padding = 4 },
+                               { rate = RATE 80.0,
+                                 sort = StatelessSyntheticStream.GENERATED
+                                            (fn i => SOME (real i))
+                               }
+                              )
+            in
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0, 0.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 5, P.rate p))) andalso
+                checkReadMono p [ 1.0, 2.0, 3.0, 4.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 1, P.rate p))) andalso
+                checkReadMono p [ 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0 ] andalso
+                verify (P.seek (p, mode, RT.fromFrame (FRAME 4, P.rate p))) andalso
+                checkReadMono p [ 0.0, 1.0, 2.0, 3.0 ]
+            end
+        )
+    ]
+
+end
+                                             
 structure TestFraming : TESTS = struct
 
     type test = string * (unit -> bool)

          
@@ 1266,6 1411,7 @@ end
                                         
 val samplestreams_tests = [
     (TestSyntheticStreams.name, TestSyntheticStreams.tests ()),
+    (TestPadded.name, TestPadded.tests ()),
     (TestFraming.name, TestFraming.tests ()),
     (TestFrequencyDomain.name, TestFrequencyDomain.tests ()),
     (TestSummarising.name, TestSummarising.tests ()),