@@ 1,4 1,5 @@
from json import dumps
+import os
from sqlalchemy import (
create_engine,
@@ 13,6 14,7 @@ from dbcache import (
from tshistory import __version__
from tshistory.tsio import timeseries as tshclass
+from tshistory import util
VERSIONS = {}
@@ 164,13 166,53 @@ def migrate_series_versions(engine, name
migrate_add_diffstart_diffend(engine, f'{namespace}.group', interactive)
+def populatedata(pid, cn, tsh, namespace, name, tablename):
+ diffsb = []
+ delete = []
+ for csid, idate, diff in util.diffs(cn, tsh, name, tablename, None, None):
+ if len(diff):
+ diffsb.append(
+ {
+ 'csid': csid,
+ 'diffstart': diff.index[0],
+ 'diffend': diff.index[-1]
+ }
+ )
+ else:
+ delete.append(
+ {
+ 'csid': csid,
+ 'idate': idate
+ }
+ )
+
+ if diffsb:
+ sql = (
+ f'update "{namespace}.revision"."{tablename}" '
+ f'set diffstart=%(diffstart)s, '
+ f' diffend=%(diffend)s '
+ f'where id=%(csid)s'
+ )
+ cn.execute(
+ sql, diffsb
+ )
+
+ if delete:
+ print(f'{pid}: revs to delete:', ','.join(x['idate'].isoformat()
+ for x in delete))
+ sql = (
+ f'delete from "{namespace}.revision"."{tablename}" '
+ f'where id = %(csid)s'
+ )
+ cn.execute(sql, [{'csid': x['csid']} for x in delete])
+
+
def migrate_add_diffstart_diffend(engine, namespace, interactive, onlydata=False, cpus=1):
- import os
import signal
import sys
import multiprocessing
from sqlalchemy import create_engine
- from tshistory import util
+
if onlydata:
print(f'data migration for columns `diffstart` and `diffend` to {namespace}.revision')
else:
@@ 223,56 265,12 @@ def migrate_add_diffstart_diffend(engine
f'alter column tsend drop not null'
)
- def partition(alist, size):
- for i in range(0, len(alist), size):
- yield alist[i:i+size]
-
def listchunks(alist, n):
import numpy as np
return list(
np.array_split(np.array(alist), n)
)
- def populatedata(pid, cn, name, tablename):
- diffsb = []
- delete = []
- for csid, idate, diff in util.diffs(cn, tsh, name, tablename, None, None):
- if len(diff):
- diffsb.append(
- {
- 'csid': csid,
- 'diffstart': diff.index[0],
- 'diffend': diff.index[-1]
- }
- )
- else:
- delete.append(
- {
- 'csid': csid,
- 'idate': idate
- }
- )
-
- if diffsb:
- sql = (
- f'update "{namespace}.revision"."{tablename}" '
- f'set diffstart=%(diffstart)s, '
- f' diffend=%(diffend)s '
- f'where id=%(csid)s'
- )
- cn.execute(
- sql, diffsb
- )
-
- if delete:
- print(f'{pid}: revs to delete:', ','.join(x['idate'].isoformat()
- for x in delete))
- sql = (
- f'delete from "{namespace}.revision"."{tablename}" '
- f'where id = %(csid)s'
- )
- cn.execute(sql, [{'csid': x['csid']} for x in delete])
-
# main
tsh = tshclass(namespace)
with engine.begin() as cn:
@@ 309,7 307,7 @@ def migrate_add_diffstart_diffend(engine
if not onlydata:
addattributes(cn, tablename)
if migdata or onlydata:
- populatedata(pid, cn, name, tablename)
+ populatedata(pid, cn, tsh, namespace, name, tablename)
if not onlydata:
finalizeattributes(cn, tablename)
@@ 342,6 340,18 @@ def migrate_add_diffstart_diffend(engine
)
+def migrate_seriesdata_diffstart_diffend(engine, namespace, name):
+ tsh = tshclass(namespace)
+ pid = os.getpid()
+
+ with engine.begin() as cn:
+ cn.cache = {'series_tablename': {}}
+ tablename = tsh._series_to_tablename(cn, name)
+ if tablename:
+ print(f'{pid}: migrating `{name}` (table: {tablename})')
+ populatedata(pid, cn, tsh, namespace, name, tablename)
+
+
def migrate_metadata(engine, namespace, interactive):
ns = namespace