ecmwf: separe task in two steps and add filename to function definition
3 files changed, 119 insertions(+), 37 deletions(-)

M meteo_scraper/cli.py
M meteo_scraper/ecmwf_scraper.py
M meteo_scraper/task.py
M meteo_scraper/cli.py +17 -8
@@ 36,25 36,34 @@ def setup_tasks():
             engine,
             'scrap_ecmwf_hres',
             domain='meteo',
-            rule='0 3 0,6,8,10,12,18 * * *'
+            rule='0 3 0,6,8,10,12,18 * * *',
+            inputdata={'filename': 'ecmwf_hres.grib'}
         )
 
     api.prepare(
             engine,
             'scrap_ecmwf_ens',
             domain='meteo',
-            rule='0 13 0,6,8,10,12,18 * * *'
+            rule='0 13 0,6,8,10,12,18 * * *',
+            inputdata={'filename': 'ecmwf_ens.grib'}
         )
 
+
 @meteo.command(name='scrap-ecmwf-hres')
-def scrap_ecmwf_hres_fcst():
+@click.option(
+    "--filename"
+)
+def scrap_ecmwf_hres_fcst(filename='ecmwf_hres.grib'):
     tsa = timeseries(CONFIG['uri']['dburi'])
-    get_last_hres_data()
-    hres_ecmwfdata_to_refinery(tsa)
+    get_last_hres_data(filename)
+    hres_ecmwfdata_to_refinery(tsa, filename)
 
 
 @meteo.command(name='scrap-ecmwf-ens')
-def scrap_ecmwf_ens_fcst():
+@click.option(
+    "--filename"
+)
+def scrap_ecmwf_ens_fcst(filename='ecmwf_ens.grib'):
     tsa = timeseries(CONFIG['uri']['dburi'])
-    get_last_ens_data()
-    ens_ecmwfdata_to_refinery(tsa)
  No newline at end of file
+    get_last_ens_data(filename)
+    ens_ecmwfdata_to_refinery(tsa, filename)

          
M meteo_scraper/ecmwf_scraper.py +20 -21
@@ 12,9 12,7 @@ from meteo_scraper.utils import (
 
 client = Client('ecmwf', beta=True)
 
-HERE = Path(__file__).parent
-FILENAME_HRES = HERE / 'data' / 'ecmwf_hres.grib'
-FILENAME_ENS = HERE / 'data' / 'ecmwf_ens.grib'
+DATADIR = Path(__file__).parent / 'data'
 
 # irregular timesteps of weather model outputs
 STEPS = {

          
@@ 49,7 47,6 @@ ZONES = [
     'PL', 'PT', 'RO', 'RS', 'SE_1', 'SE_2', 'SE_3', 'SE_4', 'SI', 'SK'
 ]
 
-DATADIR = Path(__file__).parent / 'data'
 CITIES = pd.read_csv(DATADIR / 'cities_scraper_meteo.csv', index_col=0)
 
 

          
@@ 60,7 57,7 @@ def find_last_run(model):
     )
 
 
-def get_ens_data(date, time):
+def get_ens_data(filename_ens, date, time):
     client.retrieve(
         date=date,
         time=time,

          
@@ 69,12 66,12 @@ def get_ens_data(date, time):
         type=['cf', 'pf'],
         levtype="sfc",
         param=['2t', '10u', '10v', 'tp'],
-        target=FILENAME_ENS
+        target=filename_ens
     )
-    print(f'{FILENAME_ENS} file is saved')
+    print(f'{filename_ens} file is saved')
 
 
-def get_hres_data(date, time):
+def get_hres_data(filename_hres, date, time):
     client.retrieve(
         date=date,
         time=time,

          
@@ 83,24 80,24 @@ def get_hres_data(date, time):
         type=['fc'],
         levtype="sfc",
         param=['2t', '10u', '10v', 'tp'],
-        target=FILENAME_HRES
+        target=filename_hres
     )
-    print(f'{FILENAME_HRES} file is saved')
+    print(f'{filename_hres} file is saved')
 
 
-def get_last_hres_data():
+def get_last_hres_data(filename_hres):
     last_datetime_hres = find_last_run('HRES')
-    get_hres_data(last_datetime_hres.date(), last_datetime_hres.hour)
+    get_hres_data(filename_hres, last_datetime_hres.date(), last_datetime_hres.hour)
 
 
-def get_last_ens_data():
+def get_last_ens_data(filename_ens):
     last_datetime_ens = find_last_run('ENS')
-    get_ens_data(last_datetime_ens.date(), last_datetime_ens.hour)
+    get_ens_data(filename_ens, last_datetime_ens.date(), last_datetime_ens.hour)
 
 
-def get_specific_date_data(date, time):
-    get_hres_data(date, time)
-    get_ens_data(date, time)
+def get_specific_date_data(filename_hres, filename_ens, date, time):
+    get_hres_data(filename_hres, date, time)
+    get_ens_data(filename_ens, date, time)
 
 
 def exploit_grib_file_hres(tsa, gribfile):

          
@@ 132,7 129,9 @@ def exploit_grib_file_ens(tsa, gribfile)
     for param, unit in PARAMETERS.values():
         print(f'retrieving {param} for zones...')
         gt = gribfile[param]
+        print(f'subset for param {param} ready')
         values = gt.values()
+        print(f'values for param {param} ready')
         index = np.unique(mv.valid_date(gt))
 
         for zone in ZONES:

          
@@ 164,11 163,11 @@ def exploit_grib_file_ens(tsa, gribfile)
         #     tsa.group_replace(group_name, df, author='scraper-ecmwf')
 
 
-def hres_ecmwfdata_to_refinery(tsa):
-    gribfile_hres = mv.read(str(FILENAME_HRES))
+def hres_ecmwfdata_to_refinery(tsa, filename_hres):
+    gribfile_hres = mv.read(str(filename_hres))
     exploit_grib_file_hres(tsa, gribfile_hres)
 
 
-def ens_ecmwfdata_to_refinery(tsa):
-    gribfile_ens = mv.read(str(FILENAME_ENS))
+def ens_ecmwfdata_to_refinery(tsa, filename_ens):
+    gribfile_ens = mv.read(str(filename_ens))
     exploit_grib_file_ens(tsa, gribfile_ens)

          
M meteo_scraper/task.py +82 -8
@@ 1,6 1,8 @@ 
 from inireader import reader
 
+from rework import api
 from rework.api import task
+import rework.io as rio
 from tshistory_refinery.helper import apimaker
 
 from meteo_scraper.ecmwf_scraper import (

          
@@ 12,25 14,97 @@ from meteo_scraper.ecmwf_scraper import 
 
 CONFIG = reader('meteo.cfg')
 
-@task(domain='meteo')
-def scrap_ecmwf_hres(task):
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def download_ecmwf_hres(task):
+    with task.capturelogs(std=True):
+        inputs = task.input
+        get_last_hres_data(inputs['filename'])
+
+
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def ingest_ecmwf_hres(task):
     tsa = apimaker(
         {'db': {'uri': str(task.engine.url)},
          'sources': {}
          }
     )
     with task.capturelogs(std=True):
-        get_last_hres_data()
-        hres_ecmwfdata_to_refinery(tsa)
+        inputs = task.input
+        hres_ecmwfdata_to_refinery(tsa, inputs['filename'])
 
 
-@task(domain='meteo')
-def scrap_ecmwf_ens(task):
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def scrap_ecmwf_hres(task):
+    with task.capturelogs(std=True):
+        inputs = task.input
+        filename = inputs['filename']
+        t = api.schedule(
+            task.engine,
+            'download_ecmwf_hres',
+            domain='meteo',
+            inputdata={'filename': filename}
+        )
+        t.join()
+        t = api.schedule(
+            task.engine,
+            'ingest_ecmwf_hres',
+            domain='meteo',
+            inputdata={'filename': filename}
+        )
+
+
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def download_ecmwf_ens(task):
+    with task.capturelogs(std=True):
+        inputs = task.input
+        get_last_ens_data(inputs['filename'])
+
+
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def ingest_ecmwf_ens(task):
     tsa = apimaker(
         {'db': {'uri': str(task.engine.url)},
          'sources': {}
          }
     )
     with task.capturelogs(std=True):
-        get_last_ens_data()
-        ens_ecmwfdata_to_refinery(tsa)
  No newline at end of file
+        inputs = task.input
+        ens_ecmwfdata_to_refinery(tsa, inputs['filename'])
+
+
+@task(
+    domain='meteo',
+    inputs=(rio.string('filename', required=True),)
+)
+def scrap_ecmwf_ens(task):
+    with task.capturelogs(std=True):
+        inputs = task.input
+        filename = inputs['filename']
+        t = api.schedule(
+            task.engine,
+            'download_ecmwf_ens',
+            domain='meteo',
+            inputdata={'filename': filename}
+        )
+        t.join()
+        t = api.schedule(
+            task.engine,
+            'ingest_ecmwf_ens',
+            domain='meteo',
+            inputdata={'filename': filename}
+        )