# HG changeset patch # User Aurelien Campeas # Date 1719848481 -7200 # Mon Jul 01 17:41:21 2024 +0200 # Node ID 65495cc4f3f4e99f5c4607881c99feceeaf12f1b # Parent fa4e4742572a89e1a1c754346ca2ddd738c75d7e util/diffs: factor a common implementation for diffs It is now used by tsio.diffs and the 0.20 migration. diff --git a/test/test_tsio.py b/test/test_tsio.py --- a/test/test_tsio.py +++ b/test/test_tsio.py @@ -954,6 +954,42 @@ 2024-01-02 00:00:00+00:00 2024-01-02 1.0 """, diffs) + diffs = tsh.diffs( + engine, + 'test-diffs', + from_insertion_date=pd.Timestamp('2023-12-31', tz='utc'), + to_insertion_date=pd.Timestamp('2024-1-1', tz='utc') + ) + assert_hist(""" +insertion_date value_date +2024-01-01 00:00:00+00:00 2024-01-01 0.0 +""", diffs) + + # now, write 2 points in the past (overwrite the first two) + ts = pd.Series( + [-1, -1], + index=pd.date_range( + pd.Timestamp(f'2024-1-1'), + freq='D', + periods=2 + ) + ) + tsh.update( + engine, + ts, + 'test-diffs', + 'Babar', + insertion_date=pd.Timestamp(f'2024-1-4', tz='utc') + ) + assert_hist(""" +insertion_date value_date +2024-01-01 00:00:00+00:00 2024-01-01 0.0 +2024-01-02 00:00:00+00:00 2024-01-02 1.0 +2024-01-03 00:00:00+00:00 2024-01-03 2.0 +2024-01-04 00:00:00+00:00 2024-01-01 -1.0 + 2024-01-02 -1.0 +""", tsh.diffs(engine, 'test-diffs')) + def test_infer_freq(engine, tsh): ts = pd.Series( diff --git a/tshistory/migrate.py b/tshistory/migrate.py --- a/tshistory/migrate.py +++ b/tshistory/migrate.py @@ -236,47 +236,9 @@ ) def populatedata(pid, cn, name, tablename): - sto = tsh.storageclass(cn, tsh, name) - tzaware = tsh.tzaware(cn, name) - ts = util.empty_series(tzaware) - - # revs - revsql = ( - f'select id, snapshot, insertion_date ' - f'from "{namespace}.revision"."{tablename}" ' - f'order by id asc' - ) - allrevs = [ - (csid, snapshot, idate) - for csid, snapshot, idate in cn.execute(revsql) - ] - chunksql = ( - f'select id, parent, chunk ' - f'from "{namespace}.snapshot"."{tablename}" ' - ) - - def buildseries(chunks, parent): - items = [] - while parent in chunks: - item = chunks[parent] - parent = item[0] # deref parent - items.append(item[1]) # bytes - items.reverse() - return sto._chunks_to_ts(items) - - chunks = { - c.id: (c.parent, c.chunk) - for c in cn.execute( - chunksql - ).fetchall() - } - # rebuild the versions diffsb = [] delete = [] - - for csid, snapid, idate in allrevs: - current = buildseries(chunks, snapid) - diff = util.diff(ts, current) + for csid, idate, diff in util.diffs(cn, tsh, name, tablename, None, None): if len(diff): diffsb.append( { @@ -293,8 +255,6 @@ } ) - ts = current - if diffsb: sql = ( f'update "{namespace}.revision"."{tablename}" ' diff --git a/tshistory/storage.py b/tshistory/storage.py --- a/tshistory/storage.py +++ b/tshistory/storage.py @@ -305,35 +305,6 @@ for cid, parent, rawchunk in res.fetchall()} return chunks - def bychunksrange(self, startid, lastid): - sql = f""" - with recursive allchunks as ( - select chunks.id as cid, - chunks.parent as parent, - chunks.chunk as chunk - from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks - where chunks.id = %(head)s - union - select chunks.id as cid, - chunks.parent as parent, - chunks.chunk as chunk - from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks - join allchunks on chunks.id = allchunks.parent - where chunks.id > %(startid)s - ) - select chunk from allchunks - order by cid - """ - chunks = [ - c.chunk - for c in self.cn.execute( - sql, - startid=startid, - head=lastid - ).fetchall() - ] - return self._chunks_to_ts(chunks) - def garbage(self): """ inefficient but simple garbage list builder garbage chunks are created on strip operations diff --git a/tshistory/tsio.py b/tshistory/tsio.py --- a/tshistory/tsio.py +++ b/tshistory/tsio.py @@ -15,6 +15,7 @@ closed_overlaps, compatible_date, diff, + diffs, empty_series, ensuretz, guard_insert, @@ -394,36 +395,16 @@ def diffs(self, cn, name, from_insertion_date=None, to_insertion_date=None): - sto = self.storageclass(cn, self, name) tablename = self._series_to_tablename(cn, name) - - lastid = self.changeset_at(cn, name, from_insertion_date) if from_insertion_date else -1 + out = {} + iterable = diffs( + cn, self, name, tablename, + from_insertion_date, to_insertion_date + ) + for csid, idate, diff in iterable: + out[idate] = diff - revsql = select( - 'id', 'snapshot', 'insertion_date' - ).table(f'"{self.namespace}.revision"."{tablename}"' - ).order('id', direction='asc') - if from_insertion_date: - revsql.where( - 'insertion_date >= %(fromdate)s', - fromdate=from_insertion_date - ) - if to_insertion_date: - revsql.where( - 'insertion_date <= %(todate)s', - todate=to_insertion_date - ) - - diffs = {} - ts = empty_series(self.tzaware(cn, name)) - res = revsql.do(cn) - for csid, snapid, idate in res.fetchall(): - current = sto.bychunksrange(lastid, snapid) - diffs[idate] = diff(ts, current) - ts = current - lastid = snapid - - return diffs + return out @tx def staircase(self, cn, name, delta, diff --git a/tshistory/util.py b/tshistory/util.py --- a/tshistory/util.py +++ b/tshistory/util.py @@ -11,7 +11,10 @@ import tempfile import shutil import zlib -from datetime import datetime +from datetime import ( + datetime, + timedelta +) from importlib_metadata import entry_points from functools import reduce from contextlib import contextmanager @@ -24,6 +27,7 @@ from sqlalchemy.engine import url from sqlalchemy.engine.base import Engine from sqlalchemy import exc +from sqlhelp import select from inireader import reader from dbcache.api import kvstore @@ -1029,6 +1033,93 @@ return pd.concat([diff_overlap, diff_new]).sort_index() +# stuff + +def diffs(cn, tsh, name, tablename, from_idate, to_idate): + sto = tsh.storageclass(cn, tsh, name) + tzaware = tsh.tzaware(cn, name) + if from_idate: + ts = tsh.get( + cn, + name, + revision_date=from_idate - timedelta(milliseconds=1) + ) + else: + ts = empty_series(tzaware) + + # revs + revsql = select( + 'id', 'snapshot', 'insertion_date' + ).table(f'"{tsh.namespace}.revision"."{tablename}"' + ).order('id', direction='asc') + if from_idate: + revsql.where( + 'insertion_date >= %(fromdate)s', + fromdate=from_idate + ) + if to_idate: + revsql.where( + 'insertion_date <= %(to_idate)s', + to_idate=to_idate + ) + + allrevs = [ + (csid, snapshot, idate) + for csid, snapshot, idate in revsql.do(cn).fetchall() + ] + + chunksql = ( + f'select id, parent, chunk ' + f'from "{tsh.namespace}.snapshot"."{tablename}" ' + ) + + # very greedy, let's try to limit this using + # to_idate later ... + chunks = { + c.id: (c.parent, c.chunk) + for c in cn.execute( + chunksql + ).fetchall() + } + + _cache = {} + + def patched(top, prev, items): + if not items: + return prev + items.reverse() + # non-appends will cause costly recomputations + # we might want to find a middle ground + _cache.clear() + out = _cache[top] = patch( + prev, + sto._chunks_to_ts(items) + ) + return out + + def buildseries(parent): + top = parent + items = [] + + while parent in chunks: + grandpa, chunk = chunks[parent] + items.append(chunk) # bytes + if grandpa in _cache: + # found a known parent: let's patch and remember + cached = _cache.pop(grandpa) + return patched(top, cached, items) + parent = grandpa + + # initial revision or full new series + return patched(top, empty_series(tzaware), items) + + for csid, snapid, idate in allrevs: + current = buildseries(snapid) + tsdiff = diff(ts, current) + yield csid, idate, tsdiff + ts = current + + # //ism helper def threadpool(maxthreads):