from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate, filter_rows BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/' CONFIRMED = 'time_series_covid19_confirmed_global.csv' DEATH = 'time_series_covid19_deaths_global.csv' RECOVERED = 'time_series_19-covid-Recovered.csv' def to_normal_date(row): old_date = row['Date'] month, day, year = row['Date'].split('-') day = f'0{day}' if len(day) == 1 else day month = f'0{month}' if len(month) == 1 else month row['Date'] = '-'.join([day, month, year]) unpivoting_fields = [ { 'name': '([0-9]+\/[0-9]+\/[0-9]+)', 'keys': {'Date': r'\1'} } ] extra_keys = [{'name': 'Date', 'type': 'string'} ] extra_value = {'name': 'Case', 'type': 'number'} def is_key_country(row): key_countries = ['Chine', 'US', 'United Kingdom', 'Italy', 'France', 'Germany'] return row['Country'] in key_countries Flow( load(f'{BASE_URL}{CONFIRMED}'), load(f'{BASE_URL}{RECOVERED}'), load(f'{BASE_URL}{DEATH}'), checkpoint('load_data'), unpivot(unpivoting_fields, extra_keys, extra_value), find_replace([{'name': 'Date', 'patterns': [{'find': '/', 'replace': '-'}]}]), to_normal_date, set_type('Date', type='date', format='%d-%m-%y', resources=None), set_type('Case', type='number', resources=None), join( source_name='time_series_covid19_confirmed_global', source_key=['Province/State', 'Country/Region', 'Date'], source_delete=True, target_name='time_series_covid19_deaths_global', target_key=['Province/State', 'Country/Region', 'Date'], fields=dict(Confirmed={ 'name': 'Case', 'aggregate': 'first' }) ), join( source_name='time_series_19-covid-Recovered', source_key=['Province/State', 'Country/Region', 'Date'], source_delete=True, target_name='time_series_covid19_deaths_global', target_key=['Province/State', 'Country/Region', 'Date'], fields=dict(Recovered={ 'name': 'Case', 'aggregate': 'first' }) ), add_computed_field( target={'name': 'Deaths', 'type': 'number'}, operation='format', with_='{Case}' ), delete_fields(['Case']), update_resource('time_series_covid19_deaths_global', name='time-series-19-covid-combined', path='data/time-series-19-covid-combined.csv'), update_schema('time-series-19-covid-combined', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "name": "Country/Region", "type": "string" }, { "format": "default", "name": "Province/State", "type": "string" }, { "decimalChar": ".", "format": "default", "groupChar": "", "name": "Lat", "type": "number" }, { "decimalChar": ".", "format": "default", "groupChar": "", "name": "Long", "type": "number" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_data'), # Duplicate the stream to create aggregated data duplicate( source='time-series-19-covid-combined', target_name='worldwide-aggregated', target_path='data/worldwide-aggregated.csv' ), join_with_self( resource_name='worldwide-aggregated', join_key=['Date'], fields=dict( Date={ 'name': 'Date' }, Confirmed={ 'name': 'Confirmed', 'aggregate': 'sum' }, Recovered={ 'name': 'Recovered', 'aggregate': 'sum' }, Deaths={ 'name': 'Deaths', 'aggregate': 'sum' } ) ), update_schema('worldwide-aggregated', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_worldwide_data'), # Create another resource with countries aggregated duplicate( source='time-series-19-covid-combined', target_name='countries-aggregated', target_path='data/countries-aggregated.csv' ), join_with_self( resource_name='countries-aggregated', join_key=['Date', 'Country/Region'], fields=dict( Date={ 'name': 'Date' }, Country={ 'name': 'Country/Region' }, Confirmed={ 'name': 'Confirmed', 'aggregate': 'sum' }, Recovered={ 'name': 'Recovered', 'aggregate': 'sum' }, Deaths={ 'name': 'Deaths', 'aggregate': 'sum' } ) ), update_schema('countries-aggregated', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "name": "Country", "type": "string" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_country_data'), # Prepare data package (name, title) and add views update_package( name='covid-19', title='Novel Coronavirus 2019', views=[ { "title": "Total world to date", "resources": ["worldwide-aggregated"], "specType": "simple", "spec": { "group": "Date", "series": ["Confirmed", "Recovered", "Deaths"], "type": "line" } } ] ), dump_to_path() ).results()[0]