|
|
@ -1,4 +1,4 @@ |
|
|
|
from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate |
|
|
|
from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate, filter_rows |
|
|
|
|
|
|
|
BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/' |
|
|
|
CONFIRMED = 'time_series_19-covid-Confirmed.csv' |
|
|
@ -19,6 +19,10 @@ unpivoting_fields = [ |
|
|
|
extra_keys = [{'name': 'Date', 'type': 'string'} ] |
|
|
|
extra_value = {'name': 'Case', 'type': 'number'} |
|
|
|
|
|
|
|
def is_key_country(row): |
|
|
|
key_countries = ['Chine', 'US', 'United Kingdom', 'Italy', 'France', 'Germany'] |
|
|
|
return row['Country'] in key_countries |
|
|
|
|
|
|
|
Flow( |
|
|
|
load(f'{BASE_URL}{CONFIRMED}'), |
|
|
|
load(f'{BASE_URL}{RECOVERED}'), |
|
|
@ -115,7 +119,7 @@ Flow( |
|
|
|
duplicate( |
|
|
|
source='time-series-19-covid-combined', |
|
|
|
target_name='worldwide-aggregated', |
|
|
|
target_path='worldwide-aggregated.csv' |
|
|
|
target_path='data/worldwide-aggregated.csv' |
|
|
|
), |
|
|
|
join_with_self( |
|
|
|
resource_name='worldwide-aggregated', |
|
|
@ -166,6 +170,72 @@ Flow( |
|
|
|
"type": "integer" |
|
|
|
} |
|
|
|
]), |
|
|
|
checkpoint('processed_worldwide_data'), |
|
|
|
# Create another resource with countries aggregated |
|
|
|
duplicate( |
|
|
|
source='time-series-19-covid-combined', |
|
|
|
target_name='countries-aggregated', |
|
|
|
target_path='data/countries-aggregated.csv' |
|
|
|
), |
|
|
|
join_with_self( |
|
|
|
resource_name='countries-aggregated', |
|
|
|
join_key=['Date', 'Country/Region'], |
|
|
|
fields=dict( |
|
|
|
Date={ |
|
|
|
'name': 'Date' |
|
|
|
}, |
|
|
|
Country={ |
|
|
|
'name': 'Country/Region' |
|
|
|
}, |
|
|
|
Confirmed={ |
|
|
|
'name': 'Confirmed', |
|
|
|
'aggregate': 'sum' |
|
|
|
}, |
|
|
|
Recovered={ |
|
|
|
'name': 'Recovered', |
|
|
|
'aggregate': 'sum' |
|
|
|
}, |
|
|
|
Deaths={ |
|
|
|
'name': 'Deaths', |
|
|
|
'aggregate': 'sum' |
|
|
|
} |
|
|
|
) |
|
|
|
), |
|
|
|
update_schema('countries-aggregated', fields=[ |
|
|
|
{ |
|
|
|
"format": "%Y-%m-%d", |
|
|
|
"name": "Date", |
|
|
|
"type": "date" |
|
|
|
}, |
|
|
|
{ |
|
|
|
"format": "default", |
|
|
|
"name": "Country", |
|
|
|
"type": "string" |
|
|
|
}, |
|
|
|
{ |
|
|
|
"format": "default", |
|
|
|
"groupChar": "", |
|
|
|
"name": "Confirmed", |
|
|
|
"title": "Cumulative total confirmed cases to date", |
|
|
|
"type": "integer" |
|
|
|
}, |
|
|
|
{ |
|
|
|
"format": "default", |
|
|
|
"groupChar": "", |
|
|
|
"name": "Recovered", |
|
|
|
"title": "Cumulative total recovered cases to date", |
|
|
|
"type": "integer" |
|
|
|
}, |
|
|
|
{ |
|
|
|
"format": "default", |
|
|
|
"groupChar": "", |
|
|
|
"name": "Deaths", |
|
|
|
"title": "Cumulative total deaths to date", |
|
|
|
"type": "integer" |
|
|
|
} |
|
|
|
]), |
|
|
|
checkpoint('processed_country_data'), |
|
|
|
# Prepare data package (name, title) and add views |
|
|
|
update_package( |
|
|
|
name='covid-19', |
|
|
|
title='Novel Coronavirus 2019', |
|
|
|