util/diffs: factor a common implementation for diffs

It is now used by tsio.diffs and the 0.20 migration.
5 files changed, 138 insertions(+), 99 deletions(-)

M test/test_tsio.py
M tshistory/migrate.py
M tshistory/storage.py
M tshistory/tsio.py
M tshistory/util.py
M test/test_tsio.py +36 -0
@@ 954,6 954,42 @@ insertion_date             value_date
 2024-01-02 00:00:00+00:00  2024-01-02    1.0
 """, diffs)
 
+    diffs = tsh.diffs(
+        engine,
+        'test-diffs',
+        from_insertion_date=pd.Timestamp('2023-12-31', tz='utc'),
+        to_insertion_date=pd.Timestamp('2024-1-1', tz='utc')
+    )
+    assert_hist("""
+insertion_date             value_date
+2024-01-01 00:00:00+00:00  2024-01-01    0.0
+""", diffs)
+
+    # now, write 2 points in the past (overwrite the first two)
+    ts = pd.Series(
+        [-1, -1],
+        index=pd.date_range(
+            pd.Timestamp(f'2024-1-1'),
+            freq='D',
+            periods=2
+        )
+    )
+    tsh.update(
+        engine,
+        ts,
+        'test-diffs',
+        'Babar',
+        insertion_date=pd.Timestamp(f'2024-1-4', tz='utc')
+    )
+    assert_hist("""
+insertion_date             value_date
+2024-01-01 00:00:00+00:00  2024-01-01    0.0
+2024-01-02 00:00:00+00:00  2024-01-02    1.0
+2024-01-03 00:00:00+00:00  2024-01-03    2.0
+2024-01-04 00:00:00+00:00  2024-01-01   -1.0
+                           2024-01-02   -1.0
+""", tsh.diffs(engine, 'test-diffs'))
+
 
 def test_infer_freq(engine, tsh):
     ts = pd.Series(

          
M tshistory/migrate.py +1 -41
@@ 236,47 236,9 @@ def migrate_add_diffstart_diffend(engine
         )
 
     def populatedata(pid, cn, name, tablename):
-        sto = tsh.storageclass(cn, tsh, name)
-        tzaware = tsh.tzaware(cn, name)
-        ts = util.empty_series(tzaware)
-
-        # revs
-        revsql = (
-            f'select id, snapshot, insertion_date '
-            f'from "{namespace}.revision"."{tablename}" '
-            f'order by id asc'
-        )
-        allrevs = [
-            (csid, snapshot, idate)
-            for csid, snapshot, idate in cn.execute(revsql)
-        ]
-        chunksql = (
-            f'select id, parent, chunk '
-            f'from "{namespace}.snapshot"."{tablename}" '
-        )
-
-        def buildseries(chunks, parent):
-            items = []
-            while parent in chunks:
-                item = chunks[parent]
-                parent = item[0]  # deref parent
-                items.append(item[1]) # bytes
-            items.reverse()
-            return sto._chunks_to_ts(items)
-
-        chunks = {
-            c.id: (c.parent, c.chunk)
-            for c in cn.execute(
-                    chunksql
-            ).fetchall()
-        }
-        # rebuild the versions
         diffsb = []
         delete = []
-
-        for csid, snapid, idate in allrevs:
-            current = buildseries(chunks, snapid)
-            diff = util.diff(ts, current)
+        for csid, idate, diff in util.diffs(cn, tsh, name, tablename, None, None):
             if len(diff):
                 diffsb.append(
                     {

          
@@ 293,8 255,6 @@ def migrate_add_diffstart_diffend(engine
                     }
                 )
 
-            ts = current
-
         if diffsb:
             sql = (
                 f'update "{namespace}.revision"."{tablename}" '

          
M tshistory/storage.py +0 -29
@@ 305,35 305,6 @@ class Postgres:
                   for cid, parent, rawchunk in res.fetchall()}
         return chunks
 
-    def bychunksrange(self, startid, lastid):
-        sql = f"""
-        with recursive allchunks as (
-          select chunks.id as cid,
-                 chunks.parent as parent,
-                 chunks.chunk as chunk
-          from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks
-          where chunks.id = %(head)s
-        union
-          select chunks.id as cid,
-                 chunks.parent as parent,
-                 chunks.chunk as chunk
-          from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks
-          join allchunks on chunks.id = allchunks.parent
-          where chunks.id > %(startid)s
-        )
-        select chunk from allchunks
-        order by cid
-        """
-        chunks = [
-            c.chunk
-            for c in self.cn.execute(
-                    sql,
-                    startid=startid,
-                    head=lastid
-            ).fetchall()
-        ]
-        return self._chunks_to_ts(chunks)
-
     def garbage(self):
         """ inefficient but simple garbage list builder
         garbage chunks are created on strip operations

          
M tshistory/tsio.py +9 -28
@@ 15,6 15,7 @@ from tshistory.util import (
     closed_overlaps,
     compatible_date,
     diff,
+    diffs,
     empty_series,
     ensuretz,
     guard_insert,

          
@@ 394,36 395,16 @@ class timeseries:
     def diffs(self, cn, name,
               from_insertion_date=None,
               to_insertion_date=None):
-        sto = self.storageclass(cn, self, name)
         tablename = self._series_to_tablename(cn, name)
-
-        lastid = self.changeset_at(cn, name, from_insertion_date) if from_insertion_date else -1
+        out = {}
+        iterable = diffs(
+            cn, self, name, tablename,
+            from_insertion_date, to_insertion_date
+        )
+        for csid, idate, diff in iterable:
+            out[idate] = diff
 
-        revsql = select(
-            'id', 'snapshot', 'insertion_date'
-        ).table(f'"{self.namespace}.revision"."{tablename}"'
-        ).order('id', direction='asc')
-        if from_insertion_date:
-            revsql.where(
-                'insertion_date >= %(fromdate)s',
-                fromdate=from_insertion_date
-            )
-        if to_insertion_date:
-            revsql.where(
-                'insertion_date <= %(todate)s',
-                todate=to_insertion_date
-            )
-
-        diffs = {}
-        ts = empty_series(self.tzaware(cn, name))
-        res = revsql.do(cn)
-        for csid, snapid, idate in res.fetchall():
-            current = sto.bychunksrange(lastid, snapid)
-            diffs[idate] = diff(ts, current)
-            ts = current
-            lastid = snapid
-
-        return diffs
+        return out
 
     @tx
     def staircase(self, cn, name, delta,

          
M tshistory/util.py +92 -1
@@ 11,7 11,10 @@ import threading
 import tempfile
 import shutil
 import zlib
-from datetime import datetime
+from datetime import (
+    datetime,
+    timedelta
+)
 from importlib_metadata import entry_points
 from functools import reduce
 from contextlib import contextmanager

          
@@ 24,6 27,7 @@ import pandas as pd
 from sqlalchemy.engine import url
 from sqlalchemy.engine.base import Engine
 from sqlalchemy import exc
+from sqlhelp import select
 from inireader import reader
 from dbcache.api import kvstore
 

          
@@ 1029,6 1033,93 @@ def diff(base, other, _precision=1e-14):
     return pd.concat([diff_overlap, diff_new]).sort_index()
 
 
+# stuff
+
+def diffs(cn, tsh, name, tablename, from_idate, to_idate):
+    sto = tsh.storageclass(cn, tsh, name)
+    tzaware = tsh.tzaware(cn, name)
+    if from_idate:
+        ts = tsh.get(
+            cn,
+            name,
+            revision_date=from_idate - timedelta(milliseconds=1)
+        )
+    else:
+        ts = empty_series(tzaware)
+
+    # revs
+    revsql = select(
+        'id', 'snapshot', 'insertion_date'
+    ).table(f'"{tsh.namespace}.revision"."{tablename}"'
+    ).order('id', direction='asc')
+    if from_idate:
+        revsql.where(
+            'insertion_date >= %(fromdate)s',
+            fromdate=from_idate
+        )
+    if to_idate:
+        revsql.where(
+            'insertion_date <= %(to_idate)s',
+            to_idate=to_idate
+        )
+
+    allrevs = [
+        (csid, snapshot, idate)
+        for csid, snapshot, idate in revsql.do(cn).fetchall()
+    ]
+
+    chunksql = (
+        f'select id, parent, chunk '
+        f'from "{tsh.namespace}.snapshot"."{tablename}" '
+    )
+
+    # very greedy, let's try to limit this using
+    # to_idate later ...
+    chunks = {
+        c.id: (c.parent, c.chunk)
+        for c in cn.execute(
+                chunksql
+        ).fetchall()
+    }
+
+    _cache = {}
+
+    def patched(top, prev, items):
+        if not items:
+            return prev
+        items.reverse()
+        # non-appends will cause costly recomputations
+        # we might want to find a middle ground
+        _cache.clear()
+        out = _cache[top] = patch(
+            prev,
+            sto._chunks_to_ts(items)
+        )
+        return out
+
+    def buildseries(parent):
+        top = parent
+        items = []
+
+        while parent in chunks:
+            grandpa, chunk = chunks[parent]
+            items.append(chunk)  # bytes
+            if grandpa in _cache:
+                # found a known parent: let's patch and remember
+                cached = _cache.pop(grandpa)
+                return patched(top, cached, items)
+            parent = grandpa
+
+        # initial revision or full new series
+        return patched(top, empty_series(tzaware), items)
+
+    for csid, snapid, idate in allrevs:
+        current = buildseries(snapid)
+        tsdiff = diff(ts, current)
+        yield csid, idate, tsdiff
+        ts = current
+
+
 # //ism helper
 
 def threadpool(maxthreads):