Skip to content

Commit 074da24

Browse files
committed
Complete JRC converter
1 parent cf4def9 commit 074da24

File tree

1 file changed

+216
-46
lines changed

1 file changed

+216
-46
lines changed

transport_data/jrc/__init__.py

+216-46
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@
99
1010
"""
1111
import re
12+
from collections import defaultdict
1213
from functools import partial
13-
from itertools import chain
14+
from itertools import chain, count
1415
from operator import add
1516
from pathlib import Path
1617

1718
import numpy as np
1819
import pandas as pd
1920
import sdmx.model.v21 as m
2021

22+
from transport_data import registry
2123
from transport_data.util.pooch import Pooch
24+
from transport_data.util.sdmx import anno_generated
2225

2326

2427
def get_agency() -> m.Agency:
@@ -219,15 +222,27 @@ def _fill_unit_measure(df: pd.DataFrame, measure: str) -> pd.DataFrame:
219222
}
220223

221224
UNPACK = {
222-
"ALL": dict(MODE="ALL"),
223-
"Road transport": dict(MODE="ROAD", VEHICLE_TYPE="ALL"),
225+
"ALL": dict(),
226+
"Road transport": dict(MODE="ROAD", VEHICLE_TYPE="_T"),
224227
"Powered 2-wheelers": dict(MODE="ROAD", VEHICLE_TYPE="2W"),
228+
"Powered 2-wheelers (Gasoline)": dict(
229+
MODE="ROAD", VEHICLE_TYPE="2W", FUEL="Gasoline"
230+
),
231+
"of which biofuels": dict(FUEL="Biofuels"),
225232
"Passenger cars": dict(MODE="ROAD", VEHICLE_TYPE="LDV"),
226233
"Gasoline engine": dict(MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="GAS"),
227234
"Diesel oil engine": dict(MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="DIES"),
228235
"LPG engine": dict(MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="LPG"),
229236
"Natural gas engine": dict(MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="NG"),
237+
"of which biogas": dict(FUEL="Biogas"),
230238
"Plug-in hybrid electric": dict(MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="PHEV"),
239+
"Plug-in hybrid electric (Gasoline and electricity)": dict(
240+
MODE="ROAD",
241+
VEHICLE_TYPE="LDV",
242+
POWERTRAIN="PHEV",
243+
FUEL="ALL",
244+
),
245+
"of which electricity": dict(FUEL="ELE"),
231246
"Battery electric vehicles": dict(
232247
MODE="ROAD", VEHICLE_TYPE="LDV", POWERTRAIN="BEV"
233248
),
@@ -243,25 +258,73 @@ def _fill_unit_measure(df: pd.DataFrame, measure: str) -> pd.DataFrame:
243258
# Freight
244259
"Light duty vehicles": dict(MODE="ROAD", VEHICLE_TYPE="LDV"),
245260
"Heavy duty vehicles": dict(MODE="ROAD", VEHICLE_TYPE="HDV"),
246-
"Rail transport": dict(MODE="RAIL", VEHICLE_TYPE="ALL"),
261+
"Heavy duty vehicles (Diesel oil incl. biofuels)": dict(
262+
MODE="ROAD",
263+
VEHICLE_TYPE="HDV",
264+
FUEL="Diesel oil incl. biofuels",
265+
),
266+
"Rail transport": dict(MODE="RAIL", VEHICLE_TYPE="_T"),
247267
"Domestic and International - Intra-EU": dict(MODE="RAIL", SEGMENT="DOM_IN_EU"),
248268
"Coastal shipping and inland waterways": dict(MODE="WATER", SEGMENT="ALL"),
249269
"Domestic coastal shipping": dict(MODE="WATER", SEGMENT="DOM"),
250270
"Inland waterways": dict(MODE="WATER", SEGMENT="IWW"),
271+
# Sheet TrRoad_ene, table "Total energy consumption (ktoe)"
272+
# "Domestic": dict(MODE="ROAD", VEHICLE_TYPE="TRUCK", SEGMENT="DOM"),
273+
"International": dict(SEGMENT="INTL"),
274+
# Sheet TrRoad_ene
275+
"by fuel": dict(FUEL="ALL"),
276+
"by fuel (EUROSTAT DATA)": dict(FUEL="ALL"),
277+
"Liquids": dict(FUEL="Liquids"),
278+
"Liquids (Petroleum products)": dict(FUEL="Liquids (without biofuels)"),
279+
"Liquified petroleum gas (LPG)": dict(FUEL="LPG"),
280+
"LPG": dict(FUEL="LPG"),
281+
"Gasoline (without biofuels)": dict(FUEL="Gasoline (without biofuels)"),
282+
"Gasoline (incl. biofuels)": dict(FUEL="Gasoline (incl. biofuels)"),
283+
"Gas/Diesel oil (without biofuels)": dict(FUEL="Gas/Diesel oil (without biofuels)"),
284+
"Diesel": dict(FUEL="Diesel"),
285+
"Diesel oil": dict(FUEL="Diesel"),
286+
"Diesel oil (incl. biofuels)": dict(FUEL="Diesel oil (incl. biofuels)"),
287+
"Kerosene": dict(FUEL="Kerosene"),
288+
"Residual fuel oil": dict(FUEL="Residual fuel oil"),
289+
"Other petroleum products": dict(FUEL="Other petroleum products"),
290+
"Natural gas": dict(FUEL="NG"),
291+
"Natural gas (incl. biogas)": dict(FUEL="Natural gas (incl. biogas)"),
292+
"Renewable energies and wastes": dict(FUEL="Renewable energies and wastes"),
293+
"Biogas": dict(FUEL="Biogas"),
294+
"Biogasoline": dict(FUEL="Biogasoline"),
295+
"Biodiesel": dict(FUEL="Biodiesel"),
296+
"Biomass and wastes": dict(FUEL="Biomass and wastes"),
297+
"Other biofuels": dict(FUEL="Other biofuels"),
298+
"Electricity": dict(FUEL="ELE"),
299+
"Electric": dict(FUEL="ELE"),
300+
"Solids": dict(FUEL="Solids"),
251301
}
252302

253303

254304
def _unpack_info(df: pd.DataFrame) -> pd.DataFrame:
255-
print(
256-
df["INFO"]
257-
.drop_duplicates()
258-
.apply(lambda v: pd.Series(UNPACK[v]))
259-
.fillna("_Z")
260-
.to_string()
305+
"""Unpack values from the INFO column."""
306+
info = df["INFO"].drop_duplicates()
307+
try:
308+
unpacked = pd.concat([info, info.apply(lambda v: pd.Series(UNPACK[v]))], axis=1)
309+
except KeyError as e:
310+
print(f"Failed to unpack INFO for {e.args[0]!r}:")
311+
print(info.to_string())
312+
assert False
313+
314+
def _merge_mode(df: pd.DataFrame) -> pd.DataFrame:
315+
cols = ["MODE_x", "MODE_y"]
316+
try:
317+
return df.assign(MODE=df[cols].ffill(axis=1)[cols[-1]]).drop(cols, axis=1)
318+
except KeyError:
319+
return df
320+
321+
return (
322+
df.merge(unpacked, on="INFO")
323+
.drop("INFO", axis=1)
324+
.pipe(_merge_mode)
325+
.fillna("_X")
261326
)
262327

263-
raise NotImplementedError
264-
265328

266329
def read(geo=None):
267330
"""Read data from a single file.
@@ -275,22 +338,34 @@ def read(geo=None):
275338
path = path_for(geo, "Transport")
276339

277340
# Metadata across all blocks
278-
ALL_INFO = set()
279341
CS_MEASURE = m.ConceptScheme(id="MEASURE")
280-
CL_UNIT_MEASURE = m.Codelist(id="UNIT_MEASURE")
342+
_measure_id = dict()
343+
# Data per measure
344+
data = defaultdict(list)
345+
346+
print(f"Read data for {geo = }")
281347

282348
# Iterate over blocks of data
283-
for i, block in enumerate(iter_blocks(path, geo)):
349+
i = count(start=1)
350+
for block in iter_blocks(path, geo):
284351
# Identify the measure (INFO column on first row of the block)
285352
measure_unit = block.loc[0, "INFO"]
286353

287-
# Unpack a string that combines MEASURE and UNIT_MEASURE
354+
# Unpack a string that combines MEASURE and, possibly, UNIT_MEASURE
288355
match = re.fullmatch(r"(?P<measure>[^\(]+) \((?P<unit>.*)\)\*?", measure_unit)
289356
try:
290357
measure, unit = match.group("measure", "unit")
291358
except AttributeError:
292359
measure, unit = measure_unit, None
293360

361+
# The measure concept
362+
try:
363+
mc_id = _measure_id[measure]
364+
except KeyError:
365+
mc_id = f"{next(i):02d}"
366+
CS_MEASURE.append(m.Concept(id=mc_id, name=measure))
367+
_measure_id[measure] = mc_id
368+
294369
# - Assign `unit` from above as U0, one candidate from UNIT_MEASURE.
295370
# - Replace the embedded measure/unit expression with "ALL transport".
296371
# - Unpack and fill forward SERVICE values from "Passenger transport" or
@@ -310,44 +385,139 @@ def read(geo=None):
310385
.pipe(_unpack_info)
311386
)
312387

388+
data[mc_id].append(block)
389+
390+
print(f"{sum(map(len, data.values()))} data sets for {len(CS_MEASURE)} measures")
391+
392+
# Join multiple data frames
393+
return {
394+
CS_MEASURE[mc_id]: pd.concat(dfs).fillna("_X") for mc_id, dfs in data.items()
395+
}
396+
397+
398+
def convert(geo):
399+
data = read(geo)
400+
401+
# TODO create SDMX DSD, Dataflow, and DataSets from the resulting pd.DataFrame
402+
403+
# Code lists
404+
CL = dict()
405+
406+
# Retrieve the parent concept scheme
407+
CS_MEASURE = list(data.keys())[0].parent
408+
409+
# Iterate over measure_concepts
410+
for measure_concept, df in data.items():
313411
# Update the structure information
412+
for concept, values in df.items():
413+
if concept in ("GEO", "OBS_VALUE", "TIME_PERIOD"):
414+
continue
314415

315-
# Units of measure
316-
for unit in block["UNIT_MEASURE"].unique():
317-
try:
318-
CL_UNIT_MEASURE.append(m.Code(id=unit, name=unit))
319-
except ValueError:
320-
pass # Already exists
416+
cl = CL.setdefault(concept, m.Codelist(id=concept))
321417

322-
# The measure concept
323-
measure_concept = m.Concept(id=str(i), name=measure)
324-
CS_MEASURE.append(measure_concept)
418+
for value in values.unique():
419+
try:
420+
cl.append(m.Code(id=value, name=value))
421+
except ValueError:
422+
pass # Already exists
325423

326-
# Remaining labels appearing in the "INFO" dimension
327-
ALL_INFO.update(block["INFO"].unique())
424+
# Prepare an empty data set, associated structures, and a helper function
425+
dims = []
426+
for c in df.columns:
427+
if c in ("OBS_VALUE",):
428+
continue
429+
dims.append(c)
430+
ds, _make_obs = prepare(measure_concept, dims)
431+
432+
# Convert rows of `data` into SDMX Observation objects
433+
ds.obs.extend(_make_obs(row) for _, row in df.iterrows())
434+
assert len(ds) == len(df)
435+
436+
# Write the data set, DSD, and DFD to file
437+
for obj in (ds, ds.described_by, ds.structured_by):
438+
obj.annotations.append(
439+
m.Annotation(
440+
id="tdc-comment", text=f"Primary measure is {measure_concept!r}"
441+
)
442+
)
443+
registry.write(obj, force=True, _show_status=False)
328444

329-
# First two rows, transposed
330-
print("\n", repr(measure_concept))
331-
print(block.head(2).transpose().to_string())
445+
# Write code lists, measure concept scheme to file
446+
a = get_agency()
447+
for obj in chain(CL.values(), [CS_MEASURE]):
448+
obj.maintainer = a
449+
obj.version = "0.1.0"
450+
registry.write(obj, force=True, _show_status=False)
332451

333-
# print("\n", repr(measure_concept))
334-
# print(block.to_string())
452+
raise NotImplementedError("Merging data for multiple GEO")
335453

336-
print(
337-
"---",
338-
f"{len(ALL_INFO)} distinct INFO labels",
339-
"\n".join(sorted(ALL_INFO)),
340-
"---",
341-
sep="\n",
342-
)
343-
print("---", repr(CL_UNIT_MEASURE), CL_UNIT_MEASURE.items, sep="\n")
344454

345-
# TODO extract MODE and VEHICLE from INFO
346-
# TODO create SDMX DSD, Dataflow, and DataSets from the resulting pd.DataFrame
455+
def prepare(measure_concept, dims):
456+
# TODO merge with the similar function in .adb.__init__
347457

348-
# raise NotImplementedError
458+
measure_id = measure_concept.id
459+
c = measure_concept
460+
aa = measure_concept
349461

462+
# Data structure definition with an ID matching the measure
463+
# NB here we set ADB as the maintainer. Precisely, ADB establishes the data
464+
# structure, but TDCI is maintaining the SDMX representation of it.
465+
dsd = m.DataStructureDefinition(
466+
id=measure_id, maintainer=get_agency(), version="0.0.0"
467+
)
468+
anno_generated(dsd)
350469

351-
def convert(geo):
352-
read(geo)
353-
raise NotImplementedError
470+
dfd = m.DataflowDefinition(
471+
id=measure_id, maintainer=get_agency(), version="0.0.0", structure=dsd
472+
)
473+
474+
pm = m.PrimaryMeasure(id="OBS_VALUE", concept_identity=c)
475+
dsd.measures.append(pm)
476+
477+
# Dimensions
478+
dsd.dimensions.extend(m.Dimension(id=d) for d in dims)
479+
480+
# Convert annotations to DataAttributes. "NoSpecifiedRelationship" means that the
481+
# attribute is attached to an entire data set (not a series, individual obs, etc.).
482+
da = {} # Store references for use below
483+
for a in filter(lambda a: a.id != "remark-cols", aa.annotations):
484+
da[a.id] = m.DataAttribute(id=a.id, related_to=m.NoSpecifiedRelationship)
485+
dsd.attributes.append(da[a.id])
486+
487+
_PMR = m.PrimaryMeasureRelationship # Shorthand
488+
489+
# Convert remark column labels to DataAttributes. "PrimaryMeasureRelationship" means
490+
# that the attribute is attached to individual observations.
491+
for name in aa.eval_annotation("remark-cols") or []:
492+
dsd.attributes.append(m.DataAttribute(id=name, related_to=_PMR))
493+
494+
# Empty data set structured by this DSD
495+
496+
ds = m.StructureSpecificDataSet(described_by=dfd, structured_by=dsd)
497+
try:
498+
ds.annotations.append(aa.get_annotation(id="remark-cols"))
499+
except KeyError:
500+
pass
501+
anno_generated(ds)
502+
503+
# Convert temporary annotation values to attributes attached to the data set
504+
for a in filter(lambda a: a.id != "remark-cols", aa.annotations):
505+
ds.attrib[a.id] = m.AttributeValue(value=str(a.text), value_for=da[a.id])
506+
507+
def _make_obs(row):
508+
"""Helper function for making :class:`sdmx.model.Observation` objects."""
509+
key = dsd.make_key(m.Key, row[[d.id for d in dsd.dimensions]])
510+
511+
# Attributes
512+
attrs = {}
513+
for a in filter(lambda a: a.related_to is _PMR, dsd.attributes):
514+
# Only store an AttributeValue if there is some text
515+
value = row[a.id]
516+
if not pd.isna(value):
517+
attrs[a.id] = m.AttributeValue(value_for=a, value=value)
518+
519+
return m.Observation(
520+
dimension=key, attached_attribute=attrs, value_for=pm, value=row[pm.id]
521+
)
522+
523+
return ds, _make_obs

0 commit comments

Comments
 (0)