from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate, filter_rows BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/' CONFIRMED = 'time_series_covid19_confirmed_global.csv' DEATH = 'time_series_covid19_deaths_global.csv' RECOVERED = 'time_series_19-covid-Recovered.csv' def to_normal_date(row): old_date = row['Date'] month, day, year = row['Date'].split('-') day = f'0{day}' if len(day) == 1 else day month = f'0{month}' if len(month) == 1 else month row['Date'] = '-'.join([day, month, year]) unpivoting_fields = [ { 'name': '([0-9]+\/[0-9]+\/[0-9]+)', 'keys': {'Date': r'\1'} } ] extra_keys = [{'name': 'Date', 'type': 'string'} ] extra_value = {'name': 'Case', 'type': 'number'} def pivot_key_countries(package): key_countries = ['China', 'US', 'United_Kingdom', 'Italy', 'France', 'Germany', 'Spain', 'Iran'] for country in key_countries: package.pkg.descriptor['resources'][1]['schema']['fields'].append(dict( name=country, type='integer', title='Cumulative total confirmed cases to date.' )) yield package.pkg resources = iter(package) data_by_province = next(resources) yield data_by_province data_by_key_countries = next(resources) def process_rows(rows): new_row = dict(Date=None, China=None, US=None, United_Kingdom=None, Italy=None, France=None, Germany=None, Spain=None, Iran=None) for row in rows: country = row['Country'].replace(' ', '_') if country in key_countries: new_row['Date'] = row['Date'] new_row[country] = row['Confirmed'] if None not in new_row.values(): yield new_row new_row = dict(Date=None, China=None, US=None, United_Kingdom=None, Italy=None, France=None, Germany=None, Spain=None, Iran=None) yield process_rows(data_by_key_countries) data_by_country = next(resources) yield data_by_country worldwide = next(resources) yield worldwide Flow( load(f'{BASE_URL}{CONFIRMED}'), load(f'{BASE_URL}{RECOVERED}'), load(f'{BASE_URL}{DEATH}'), checkpoint('load_data'), unpivot(unpivoting_fields, extra_keys, extra_value), find_replace([{'name': 'Date', 'patterns': [{'find': '/', 'replace': '-'}]}]), to_normal_date, set_type('Date', type='date', format='%d-%m-%y', resources=None), set_type('Case', type='number', resources=None), join( source_name='time_series_covid19_confirmed_global', source_key=['Province/State', 'Country/Region', 'Date'], source_delete=True, target_name='time_series_covid19_deaths_global', target_key=['Province/State', 'Country/Region', 'Date'], fields=dict(Confirmed={ 'name': 'Case', 'aggregate': 'first' }) ), join( source_name='time_series_19-covid-Recovered', source_key=['Province/State', 'Country/Region', 'Date'], source_delete=True, target_name='time_series_covid19_deaths_global', target_key=['Province/State', 'Country/Region', 'Date'], fields=dict(Recovered={ 'name': 'Case', 'aggregate': 'first' }) ), add_computed_field( target={'name': 'Deaths', 'type': 'number'}, operation='format', with_='{Case}' ), delete_fields(['Case']), update_resource('time_series_covid19_deaths_global', name='time-series-19-covid-combined', path='data/time-series-19-covid-combined.csv'), update_schema('time-series-19-covid-combined', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "name": "Country/Region", "type": "string" }, { "format": "default", "name": "Province/State", "type": "string" }, { "decimalChar": ".", "format": "default", "groupChar": "", "name": "Lat", "type": "number" }, { "decimalChar": ".", "format": "default", "groupChar": "", "name": "Long", "type": "number" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_data'), # Duplicate the stream to create aggregated data duplicate( source='time-series-19-covid-combined', target_name='worldwide-aggregated', target_path='data/worldwide-aggregated.csv' ), join_with_self( resource_name='worldwide-aggregated', join_key=['Date'], fields=dict( Date={ 'name': 'Date' }, Confirmed={ 'name': 'Confirmed', 'aggregate': 'sum' }, Recovered={ 'name': 'Recovered', 'aggregate': 'sum' }, Deaths={ 'name': 'Deaths', 'aggregate': 'sum' } ) ), update_schema('worldwide-aggregated', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_worldwide_data'), # Create another resource with key countries pivoted duplicate( source='time-series-19-covid-combined', target_name='key-countries-pivoted', target_path='data/key-countries-pivoted.csv' ), join_with_self( resource_name='key-countries-pivoted', join_key=['Date', 'Country/Region'], fields=dict( Date={ 'name': 'Date' }, Country={ 'name': 'Country/Region' }, Confirmed={ 'name': 'Confirmed', 'aggregate': 'sum' }, Recovered={ 'name': 'Recovered', 'aggregate': 'sum' }, Deaths={ 'name': 'Deaths', 'aggregate': 'sum' } ) ), update_schema('key-countries-pivoted', missingValues=['None', ''], fields=[ { "format": "%Y-%m-%d", "name": "Date", "type": "date" }, { "format": "default", "name": "Country", "type": "string" }, { "format": "default", "groupChar": "", "name": "Confirmed", "title": "Cumulative total confirmed cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Recovered", "title": "Cumulative total recovered cases to date", "type": "integer" }, { "format": "default", "groupChar": "", "name": "Deaths", "title": "Cumulative total deaths to date", "type": "integer" } ]), checkpoint('processed_country_data'), # All countries aggregated duplicate( source='key-countries-pivoted', target_name='countries-aggregated', target_path='data/countries-aggregated.csv' ), pivot_key_countries, delete_fields(['Country', 'Confirmed', 'Recovered', 'Deaths'], resources='key-countries-pivoted'), # Prepare data package (name, title) and add views update_package( name='covid-19', title='Novel Coronavirus 2019', views=[ { "title": "Total world to date", "resources": ["worldwide-aggregated"], "specType": "simple", "spec": { "group": "Date", "series": ["Confirmed", "Recovered", "Deaths"], "type": "line" } }, { "title": "Number of confirmed cases in key countries", "resources": ["key-countries-pivoted"], "specType": "simple", "spec": { "group": "Date", "series": ["China", "US", "United_Kingdom", "Italy", "France", "Germany", "Spain", "Iran"], "type": "line" } } ] ), dump_to_path() ).results()[0]