M test/test_tsio.py +36 -0
@@ 954,6 954,42 @@ insertion_date value_date
2024-01-02 00:00:00+00:00 2024-01-02 1.0
""", diffs)
+ diffs = tsh.diffs(
+ engine,
+ 'test-diffs',
+ from_insertion_date=pd.Timestamp('2023-12-31', tz='utc'),
+ to_insertion_date=pd.Timestamp('2024-1-1', tz='utc')
+ )
+ assert_hist("""
+insertion_date value_date
+2024-01-01 00:00:00+00:00 2024-01-01 0.0
+""", diffs)
+
+ # now, write 2 points in the past (overwrite the first two)
+ ts = pd.Series(
+ [-1, -1],
+ index=pd.date_range(
+ pd.Timestamp(f'2024-1-1'),
+ freq='D',
+ periods=2
+ )
+ )
+ tsh.update(
+ engine,
+ ts,
+ 'test-diffs',
+ 'Babar',
+ insertion_date=pd.Timestamp(f'2024-1-4', tz='utc')
+ )
+ assert_hist("""
+insertion_date value_date
+2024-01-01 00:00:00+00:00 2024-01-01 0.0
+2024-01-02 00:00:00+00:00 2024-01-02 1.0
+2024-01-03 00:00:00+00:00 2024-01-03 2.0
+2024-01-04 00:00:00+00:00 2024-01-01 -1.0
+ 2024-01-02 -1.0
+""", tsh.diffs(engine, 'test-diffs'))
+
def test_infer_freq(engine, tsh):
ts = pd.Series(
M tshistory/migrate.py +1 -41
@@ 236,47 236,9 @@ def migrate_add_diffstart_diffend(engine
)
def populatedata(pid, cn, name, tablename):
- sto = tsh.storageclass(cn, tsh, name)
- tzaware = tsh.tzaware(cn, name)
- ts = util.empty_series(tzaware)
-
- # revs
- revsql = (
- f'select id, snapshot, insertion_date '
- f'from "{namespace}.revision"."{tablename}" '
- f'order by id asc'
- )
- allrevs = [
- (csid, snapshot, idate)
- for csid, snapshot, idate in cn.execute(revsql)
- ]
- chunksql = (
- f'select id, parent, chunk '
- f'from "{namespace}.snapshot"."{tablename}" '
- )
-
- def buildseries(chunks, parent):
- items = []
- while parent in chunks:
- item = chunks[parent]
- parent = item[0] # deref parent
- items.append(item[1]) # bytes
- items.reverse()
- return sto._chunks_to_ts(items)
-
- chunks = {
- c.id: (c.parent, c.chunk)
- for c in cn.execute(
- chunksql
- ).fetchall()
- }
- # rebuild the versions
diffsb = []
delete = []
-
- for csid, snapid, idate in allrevs:
- current = buildseries(chunks, snapid)
- diff = util.diff(ts, current)
+ for csid, idate, diff in util.diffs(cn, tsh, name, tablename, None, None):
if len(diff):
diffsb.append(
{
@@ 293,8 255,6 @@ def migrate_add_diffstart_diffend(engine
}
)
- ts = current
-
if diffsb:
sql = (
f'update "{namespace}.revision"."{tablename}" '
M tshistory/storage.py +0 -29
@@ 305,35 305,6 @@ class Postgres:
for cid, parent, rawchunk in res.fetchall()}
return chunks
- def bychunksrange(self, startid, lastid):
- sql = f"""
- with recursive allchunks as (
- select chunks.id as cid,
- chunks.parent as parent,
- chunks.chunk as chunk
- from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks
- where chunks.id = %(head)s
- union
- select chunks.id as cid,
- chunks.parent as parent,
- chunks.chunk as chunk
- from "{self.tsh.namespace}.snapshot"."{self.tablename}" as chunks
- join allchunks on chunks.id = allchunks.parent
- where chunks.id > %(startid)s
- )
- select chunk from allchunks
- order by cid
- """
- chunks = [
- c.chunk
- for c in self.cn.execute(
- sql,
- startid=startid,
- head=lastid
- ).fetchall()
- ]
- return self._chunks_to_ts(chunks)
-
def garbage(self):
""" inefficient but simple garbage list builder
garbage chunks are created on strip operations
M tshistory/tsio.py +9 -28
@@ 15,6 15,7 @@ from tshistory.util import (
closed_overlaps,
compatible_date,
diff,
+ diffs,
empty_series,
ensuretz,
guard_insert,
@@ 394,36 395,16 @@ class timeseries:
def diffs(self, cn, name,
from_insertion_date=None,
to_insertion_date=None):
- sto = self.storageclass(cn, self, name)
tablename = self._series_to_tablename(cn, name)
-
- lastid = self.changeset_at(cn, name, from_insertion_date) if from_insertion_date else -1
+ out = {}
+ iterable = diffs(
+ cn, self, name, tablename,
+ from_insertion_date, to_insertion_date
+ )
+ for csid, idate, diff in iterable:
+ out[idate] = diff
- revsql = select(
- 'id', 'snapshot', 'insertion_date'
- ).table(f'"{self.namespace}.revision"."{tablename}"'
- ).order('id', direction='asc')
- if from_insertion_date:
- revsql.where(
- 'insertion_date >= %(fromdate)s',
- fromdate=from_insertion_date
- )
- if to_insertion_date:
- revsql.where(
- 'insertion_date <= %(todate)s',
- todate=to_insertion_date
- )
-
- diffs = {}
- ts = empty_series(self.tzaware(cn, name))
- res = revsql.do(cn)
- for csid, snapid, idate in res.fetchall():
- current = sto.bychunksrange(lastid, snapid)
- diffs[idate] = diff(ts, current)
- ts = current
- lastid = snapid
-
- return diffs
+ return out
@tx
def staircase(self, cn, name, delta,
M tshistory/util.py +92 -1
@@ 11,7 11,10 @@ import threading
import tempfile
import shutil
import zlib
-from datetime import datetime
+from datetime import (
+ datetime,
+ timedelta
+)
from importlib_metadata import entry_points
from functools import reduce
from contextlib import contextmanager
@@ 24,6 27,7 @@ import pandas as pd
from sqlalchemy.engine import url
from sqlalchemy.engine.base import Engine
from sqlalchemy import exc
+from sqlhelp import select
from inireader import reader
from dbcache.api import kvstore
@@ 1029,6 1033,93 @@ def diff(base, other, _precision=1e-14):
return pd.concat([diff_overlap, diff_new]).sort_index()
+# stuff
+
+def diffs(cn, tsh, name, tablename, from_idate, to_idate):
+ sto = tsh.storageclass(cn, tsh, name)
+ tzaware = tsh.tzaware(cn, name)
+ if from_idate:
+ ts = tsh.get(
+ cn,
+ name,
+ revision_date=from_idate - timedelta(milliseconds=1)
+ )
+ else:
+ ts = empty_series(tzaware)
+
+ # revs
+ revsql = select(
+ 'id', 'snapshot', 'insertion_date'
+ ).table(f'"{tsh.namespace}.revision"."{tablename}"'
+ ).order('id', direction='asc')
+ if from_idate:
+ revsql.where(
+ 'insertion_date >= %(fromdate)s',
+ fromdate=from_idate
+ )
+ if to_idate:
+ revsql.where(
+ 'insertion_date <= %(to_idate)s',
+ to_idate=to_idate
+ )
+
+ allrevs = [
+ (csid, snapshot, idate)
+ for csid, snapshot, idate in revsql.do(cn).fetchall()
+ ]
+
+ chunksql = (
+ f'select id, parent, chunk '
+ f'from "{tsh.namespace}.snapshot"."{tablename}" '
+ )
+
+ # very greedy, let's try to limit this using
+ # to_idate later ...
+ chunks = {
+ c.id: (c.parent, c.chunk)
+ for c in cn.execute(
+ chunksql
+ ).fetchall()
+ }
+
+ _cache = {}
+
+ def patched(top, prev, items):
+ if not items:
+ return prev
+ items.reverse()
+ # non-appends will cause costly recomputations
+ # we might want to find a middle ground
+ _cache.clear()
+ out = _cache[top] = patch(
+ prev,
+ sto._chunks_to_ts(items)
+ )
+ return out
+
+ def buildseries(parent):
+ top = parent
+ items = []
+
+ while parent in chunks:
+ grandpa, chunk = chunks[parent]
+ items.append(chunk) # bytes
+ if grandpa in _cache:
+ # found a known parent: let's patch and remember
+ cached = _cache.pop(grandpa)
+ return patched(top, cached, items)
+ parent = grandpa
+
+ # initial revision or full new series
+ return patched(top, empty_series(tzaware), items)
+
+ for csid, snapid, idate in allrevs:
+ current = buildseries(snapid)
+ tsdiff = diff(ts, current)
+ yield csid, idate, tsdiff
+ ts = current
+
+
# //ism helper
def threadpool(maxthreads):