You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
6.2 KiB

  1. from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate
  2. BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/'
  3. CONFIRMED = 'time_series_19-covid-Confirmed.csv'
  4. DEATH = 'time_series_19-covid-Deaths.csv'
  5. RECOVERED = 'time_series_19-covid-Recovered.csv'
  6. def to_normal_date(row):
  7. old_date = row['Date']
  8. month, day, year = row['Date'].split('-')
  9. day = f'0{day}' if len(day) == 1 else day
  10. month = f'0{month}' if len(month) == 1 else month
  11. row['Date'] = '-'.join([day, month, year])
  12. unpivoting_fields = [
  13. { 'name': '([0-9]+\/[0-9]+\/[0-9]+)', 'keys': {'Date': r'\1'} }
  14. ]
  15. extra_keys = [{'name': 'Date', 'type': 'string'} ]
  16. extra_value = {'name': 'Case', 'type': 'number'}
  17. Flow(
  18. load(f'{BASE_URL}{CONFIRMED}'),
  19. load(f'{BASE_URL}{RECOVERED}'),
  20. load(f'{BASE_URL}{DEATH}'),
  21. checkpoint('load_data'),
  22. unpivot(unpivoting_fields, extra_keys, extra_value),
  23. find_replace([{'name': 'Date', 'patterns': [{'find': '/', 'replace': '-'}]}]),
  24. to_normal_date,
  25. set_type('Date', type='date', format='%d-%m-%y', resources=None),
  26. set_type('Case', type='number', resources=None),
  27. join(
  28. source_name='time_series_19-covid-Confirmed',
  29. source_key=['Province/State', 'Country/Region', 'Date'],
  30. source_delete=True,
  31. target_name='time_series_19-covid-Deaths',
  32. target_key=['Province/State', 'Country/Region', 'Date'],
  33. fields=dict(Confirmed={
  34. 'name': 'Case',
  35. 'aggregate': 'first'
  36. })
  37. ),
  38. join(
  39. source_name='time_series_19-covid-Recovered',
  40. source_key=['Province/State', 'Country/Region', 'Date'],
  41. source_delete=True,
  42. target_name='time_series_19-covid-Deaths',
  43. target_key=['Province/State', 'Country/Region', 'Date'],
  44. fields=dict(Recovered={
  45. 'name': 'Case',
  46. 'aggregate': 'first'
  47. })
  48. ),
  49. add_computed_field(
  50. target={'name': 'Deaths', 'type': 'number'},
  51. operation='format',
  52. with_='{Case}'
  53. ),
  54. delete_fields(['Case']),
  55. update_resource('time_series_19-covid-Deaths', name='time-series-19-covid-combined', path='data/time-series-19-covid-combined.csv'),
  56. update_schema('worldwide-aggregated', fields=[
  57. {
  58. "format": "%Y-%m-%d",
  59. "name": "Date",
  60. "type": "date"
  61. },
  62. {
  63. "format": "default",
  64. "name": "Country/Region",
  65. "type": "string"
  66. },
  67. {
  68. "format": "default",
  69. "name": "Province/State",
  70. "type": "string"
  71. },
  72. {
  73. "decimalChar": ".",
  74. "format": "default",
  75. "groupChar": "",
  76. "name": "Lat",
  77. "type": "number"
  78. },
  79. {
  80. "decimalChar": ".",
  81. "format": "default",
  82. "groupChar": "",
  83. "name": "Long",
  84. "type": "number"
  85. },
  86. {
  87. "format": "default",
  88. "groupChar": "",
  89. "name": "Confirmed",
  90. "title": "Cumulative total confirmed cases to date",
  91. "type": "integer"
  92. },
  93. {
  94. "format": "default",
  95. "groupChar": "",
  96. "name": "Recovered",
  97. "title": "Cumulative total recovered cases to date",
  98. "type": "integer"
  99. },
  100. {
  101. "format": "default",
  102. "groupChar": "",
  103. "name": "Deaths",
  104. "title": "Cumulative total deaths to date",
  105. "type": "integer"
  106. }
  107. ]),
  108. checkpoint('processed_data'),
  109. # Duplicate the stream to create aggregated data
  110. duplicate(
  111. source='time-series-19-covid-combined',
  112. target_name='worldwide-aggregated',
  113. target_path='worldwide-aggregated.csv'
  114. ),
  115. join_with_self(
  116. resource_name='worldwide-aggregated',
  117. join_key=['Date'],
  118. fields=dict(
  119. Date={
  120. 'name': 'Date'
  121. },
  122. Confirmed={
  123. 'name': 'Confirmed',
  124. 'aggregate': 'sum'
  125. },
  126. Recovered={
  127. 'name': 'Recovered',
  128. 'aggregate': 'sum'
  129. },
  130. Deaths={
  131. 'name': 'Deaths',
  132. 'aggregate': 'sum'
  133. }
  134. )
  135. ),
  136. update_schema('worldwide-aggregated', fields=[
  137. {
  138. "format": "default",
  139. "name": "Province/State",
  140. "type": "string"
  141. },
  142. {
  143. "format": "default",
  144. "name": "Country/Region",
  145. "type": "string"
  146. },
  147. {
  148. "decimalChar": ".",
  149. "format": "default",
  150. "groupChar": "",
  151. "name": "Lat",
  152. "type": "number"
  153. },
  154. {
  155. "decimalChar": ".",
  156. "format": "default",
  157. "groupChar": "",
  158. "name": "Long",
  159. "type": "number"
  160. },
  161. {
  162. "format": "%Y-%m-%d",
  163. "name": "Date",
  164. "type": "date"
  165. },
  166. {
  167. "format": "default",
  168. "groupChar": "",
  169. "name": "Confirmed",
  170. "title": "Cumulative total confirmed cases to date",
  171. "type": "integer"
  172. },
  173. {
  174. "format": "default",
  175. "groupChar": "",
  176. "name": "Recovered",
  177. "title": "Cumulative total recovered cases to date",
  178. "type": "integer"
  179. },
  180. {
  181. "format": "default",
  182. "groupChar": "",
  183. "name": "Deaths",
  184. "title": "Cumulative total deaths to date",
  185. "type": "integer"
  186. }
  187. ]),
  188. update_package(
  189. name='covid-19',
  190. title='Novel Coronavirus 2019',
  191. views=[
  192. {
  193. "title": "Total world to date",
  194. "resources": ["worldwide-aggregated"],
  195. "specType": "simple",
  196. "spec": {
  197. "group": "Date",
  198. "series": ["Confirmed", "Recovered", "Deaths"],
  199. "type": "line"
  200. }
  201. }
  202. ]
  203. ),
  204. dump_to_path()
  205. ).results()[0]