M meteo_scraper/cli.py +17 -8
@@ 36,25 36,34 @@ def setup_tasks():
engine,
'scrap_ecmwf_hres',
domain='meteo',
- rule='0 3 0,6,8,10,12,18 * * *'
+ rule='0 3 0,6,8,10,12,18 * * *',
+ inputdata={'filename': 'ecmwf_hres.grib'}
)
api.prepare(
engine,
'scrap_ecmwf_ens',
domain='meteo',
- rule='0 13 0,6,8,10,12,18 * * *'
+ rule='0 13 0,6,8,10,12,18 * * *',
+ inputdata={'filename': 'ecmwf_ens.grib'}
)
+
@meteo.command(name='scrap-ecmwf-hres')
-def scrap_ecmwf_hres_fcst():
+@click.option(
+ "--filename"
+)
+def scrap_ecmwf_hres_fcst(filename='ecmwf_hres.grib'):
tsa = timeseries(CONFIG['uri']['dburi'])
- get_last_hres_data()
- hres_ecmwfdata_to_refinery(tsa)
+ get_last_hres_data(filename)
+ hres_ecmwfdata_to_refinery(tsa, filename)
@meteo.command(name='scrap-ecmwf-ens')
-def scrap_ecmwf_ens_fcst():
+@click.option(
+ "--filename"
+)
+def scrap_ecmwf_ens_fcst(filename='ecmwf_ens.grib'):
tsa = timeseries(CONFIG['uri']['dburi'])
- get_last_ens_data()
- ens_ecmwfdata_to_refinery(tsa)
No newline at end of file
+ get_last_ens_data(filename)
+ ens_ecmwfdata_to_refinery(tsa, filename)
M meteo_scraper/ecmwf_scraper.py +20 -21
@@ 12,9 12,7 @@ from meteo_scraper.utils import (
client = Client('ecmwf', beta=True)
-HERE = Path(__file__).parent
-FILENAME_HRES = HERE / 'data' / 'ecmwf_hres.grib'
-FILENAME_ENS = HERE / 'data' / 'ecmwf_ens.grib'
+DATADIR = Path(__file__).parent / 'data'
# irregular timesteps of weather model outputs
STEPS = {
@@ 49,7 47,6 @@ ZONES = [
'PL', 'PT', 'RO', 'RS', 'SE_1', 'SE_2', 'SE_3', 'SE_4', 'SI', 'SK'
]
-DATADIR = Path(__file__).parent / 'data'
CITIES = pd.read_csv(DATADIR / 'cities_scraper_meteo.csv', index_col=0)
@@ 60,7 57,7 @@ def find_last_run(model):
)
-def get_ens_data(date, time):
+def get_ens_data(filename_ens, date, time):
client.retrieve(
date=date,
time=time,
@@ 69,12 66,12 @@ def get_ens_data(date, time):
type=['cf', 'pf'],
levtype="sfc",
param=['2t', '10u', '10v', 'tp'],
- target=FILENAME_ENS
+ target=filename_ens
)
- print(f'{FILENAME_ENS} file is saved')
+ print(f'{filename_ens} file is saved')
-def get_hres_data(date, time):
+def get_hres_data(filename_hres, date, time):
client.retrieve(
date=date,
time=time,
@@ 83,24 80,24 @@ def get_hres_data(date, time):
type=['fc'],
levtype="sfc",
param=['2t', '10u', '10v', 'tp'],
- target=FILENAME_HRES
+ target=filename_hres
)
- print(f'{FILENAME_HRES} file is saved')
+ print(f'{filename_hres} file is saved')
-def get_last_hres_data():
+def get_last_hres_data(filename_hres):
last_datetime_hres = find_last_run('HRES')
- get_hres_data(last_datetime_hres.date(), last_datetime_hres.hour)
+ get_hres_data(filename_hres, last_datetime_hres.date(), last_datetime_hres.hour)
-def get_last_ens_data():
+def get_last_ens_data(filename_ens):
last_datetime_ens = find_last_run('ENS')
- get_ens_data(last_datetime_ens.date(), last_datetime_ens.hour)
+ get_ens_data(filename_ens, last_datetime_ens.date(), last_datetime_ens.hour)
-def get_specific_date_data(date, time):
- get_hres_data(date, time)
- get_ens_data(date, time)
+def get_specific_date_data(filename_hres, filename_ens, date, time):
+ get_hres_data(filename_hres, date, time)
+ get_ens_data(filename_ens, date, time)
def exploit_grib_file_hres(tsa, gribfile):
@@ 132,7 129,9 @@ def exploit_grib_file_ens(tsa, gribfile)
for param, unit in PARAMETERS.values():
print(f'retrieving {param} for zones...')
gt = gribfile[param]
+ print(f'subset for param {param} ready')
values = gt.values()
+ print(f'values for param {param} ready')
index = np.unique(mv.valid_date(gt))
for zone in ZONES:
@@ 164,11 163,11 @@ def exploit_grib_file_ens(tsa, gribfile)
# tsa.group_replace(group_name, df, author='scraper-ecmwf')
-def hres_ecmwfdata_to_refinery(tsa):
- gribfile_hres = mv.read(str(FILENAME_HRES))
+def hres_ecmwfdata_to_refinery(tsa, filename_hres):
+ gribfile_hres = mv.read(str(filename_hres))
exploit_grib_file_hres(tsa, gribfile_hres)
-def ens_ecmwfdata_to_refinery(tsa):
- gribfile_ens = mv.read(str(FILENAME_ENS))
+def ens_ecmwfdata_to_refinery(tsa, filename_ens):
+ gribfile_ens = mv.read(str(filename_ens))
exploit_grib_file_ens(tsa, gribfile_ens)
M meteo_scraper/task.py +82 -8
@@ 1,6 1,8 @@
from inireader import reader
+from rework import api
from rework.api import task
+import rework.io as rio
from tshistory_refinery.helper import apimaker
from meteo_scraper.ecmwf_scraper import (
@@ 12,25 14,97 @@ from meteo_scraper.ecmwf_scraper import
CONFIG = reader('meteo.cfg')
-@task(domain='meteo')
-def scrap_ecmwf_hres(task):
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def download_ecmwf_hres(task):
+ with task.capturelogs(std=True):
+ inputs = task.input
+ get_last_hres_data(inputs['filename'])
+
+
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def ingest_ecmwf_hres(task):
tsa = apimaker(
{'db': {'uri': str(task.engine.url)},
'sources': {}
}
)
with task.capturelogs(std=True):
- get_last_hres_data()
- hres_ecmwfdata_to_refinery(tsa)
+ inputs = task.input
+ hres_ecmwfdata_to_refinery(tsa, inputs['filename'])
-@task(domain='meteo')
-def scrap_ecmwf_ens(task):
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def scrap_ecmwf_hres(task):
+ with task.capturelogs(std=True):
+ inputs = task.input
+ filename = inputs['filename']
+ t = api.schedule(
+ task.engine,
+ 'download_ecmwf_hres',
+ domain='meteo',
+ inputdata={'filename': filename}
+ )
+ t.join()
+ t = api.schedule(
+ task.engine,
+ 'ingest_ecmwf_hres',
+ domain='meteo',
+ inputdata={'filename': filename}
+ )
+
+
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def download_ecmwf_ens(task):
+ with task.capturelogs(std=True):
+ inputs = task.input
+ get_last_ens_data(inputs['filename'])
+
+
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def ingest_ecmwf_ens(task):
tsa = apimaker(
{'db': {'uri': str(task.engine.url)},
'sources': {}
}
)
with task.capturelogs(std=True):
- get_last_ens_data()
- ens_ecmwfdata_to_refinery(tsa)
No newline at end of file
+ inputs = task.input
+ ens_ecmwfdata_to_refinery(tsa, inputs['filename'])
+
+
+@task(
+ domain='meteo',
+ inputs=(rio.string('filename', required=True),)
+)
+def scrap_ecmwf_ens(task):
+ with task.capturelogs(std=True):
+ inputs = task.input
+ filename = inputs['filename']
+ t = api.schedule(
+ task.engine,
+ 'download_ecmwf_ens',
+ domain='meteo',
+ inputdata={'filename': filename}
+ )
+ t.join()
+ t = api.schedule(
+ task.engine,
+ 'ingest_ecmwf_ens',
+ domain='meteo',
+ inputdata={'filename': filename}
+ )