import os
from logging import getLogger
from pathlib import Path
import pandas as pd
import yaml
from pandas._libs.parsers import STR_NA_VALUES
from ._utils import (
CustomDataFrame,
activitytype_path,
dataquality_path,
flow_path,
flowobject_path,
location_path,
time_path,
uncertainty_path,
unit_magnitude_path,
unit_monetary_path,
unit_physical_path,
)
logger = getLogger("root")
ROOT_PATH = Path(os.path.dirname(__file__))
accepted_na_values = STR_NA_VALUES - {"NA"}
[docs]
class DataPackage:
[docs]
def to_dict(self):
result = {}
for attr_name in dir(self):
attr_value = getattr(self, attr_name)
if isinstance(attr_value, pd.DataFrame):
result[attr_name] = attr_value
return result
[docs]
def disaggregate_bonsai(self, codes_or_yaml):
"""Disaggregates the code in the bonsai tables.
Currently only for activitytype and flowobject.
Argument
--------
codes_or_yaml: dict or str
A dictionary containing the disaggregation mappings or a string path to a YAML file.
Example::
disaggregations:
- old_code: "ai"
new_codes:
- code: "ai_0"
description: "foo"
mappings: {"nace_rev1": ["0.11", "0.12"]}
- code: "ai_1"
description: ""
mappings: {}
Returns
-------
dict
dict with updated bonsai tables
"""
# Determine if the input is a dictionary or a YAML file path
if isinstance(codes_or_yaml, dict):
disaggregations = codes_or_yaml.get("disaggregations", [])
elif isinstance(codes_or_yaml, str):
# Load the YAML file
with open(codes_or_yaml, "r") as file:
yaml_content = yaml.safe_load(file)
disaggregations = yaml_content.get("disaggregations", [])
else:
raise ValueError("Input must be a dictionary or a path to a YAML file")
# Validate the structure of the YAML content
if not isinstance(disaggregations, list):
raise ValueError("Disaggregations should be a list of mappings")
if not "old_code" and "new_codes" in disaggregations[0].keys():
raise ValueError("Make sure that you provide 'old_code' and 'new_codes'.")
result = {}
for attr_name in dir(self):
attr_value = getattr(self, attr_name)
if isinstance(attr_value, pd.DataFrame):
result[attr_name] = attr_value
for attr_name in result:
attr_value = getattr(self, attr_name)
if isinstance(attr_value, pd.DataFrame) and attr_name.startswith(
"tree_bonsai"
):
# For each disaggregation in the YAML content
for disaggregation in disaggregations:
old_code = disaggregation["old_code"]
new_codes = disaggregation["new_codes"]
if len(new_codes) == 1:
raise ValueError(
f"Only 1 new code provided. To disaggregate {old_code}, provide at least 2 new codes!"
)
# Replace the old code with each new code in a new row
new_rows = attr_value[attr_value["code"] == old_code].copy()
for new_code in new_codes:
new_rows["code"] = new_code["code"]
new_rows["name"] = new_code["description"]
new_rows["parent_code"] = old_code
new_rows["level"] = (
int(attr_value[attr_value["code"] == old_code]["level"]) + 1
)
new_rows["comment"] = ""
# Append the new rows to the DataFrame
attr_value = pd.concat(
[attr_value, new_rows], ignore_index=True
)
# Update the DataFrame in the instance with the modified one
result[attr_name] = attr_value
if (
isinstance(attr_value, pd.DataFrame)
and attr_name.startswith("conc_")
and "bonsai" in attr_name
):
# For each disaggregation in the YAML content
for disaggregation in disaggregations:
old_code = disaggregation["old_code"]
new_codes = disaggregation["new_codes"]
# Replace the old code with each new code in a new row
for new_code in new_codes:
other_classifications = new_code.get("mappings", {})
df2 = pd.DataFrame(data=None, columns=attr_value.columns)
if other_classifications:
for schema, codes_list in other_classifications.items():
if schema in attr_name:
if "activitytype_to" in df2.columns:
df2["activitytype_to"] = codes_list
if "flowobject_to" in df2.columns:
df2["flowobject_to"] = codes_list
if "activitytype_from" in df2.columns:
df2["activitytype_from"] = new_code["code"]
if "flowobject_from" in df2.columns:
df2["flowobject_from"] = new_code["code"]
df2["comment"] = ""
df2["skos_uri"] = ""
# Append the new rows to the DataFrame
attr_value = pd.concat([attr_value, df2], ignore_index=True)
# Remove old parent code from concordance DataFrame (only mapping of most detailed category required)
if "activitytype_from" in attr_value.columns:
attr_value.drop(
attr_value[
# attr_value["tree_bonsai"] == f"{old_code}"
attr_value["activitytype_from"]
== f"{old_code}"
].index,
inplace=True,
)
if "flowobject_from" in attr_value.columns:
attr_value.drop(
attr_value[
# attr_value["tree_bonsai"] == f"{old_code}"
attr_value["flowobject_from"]
== f"{old_code}"
].index,
inplace=True,
)
attr_value.reset_index(drop=True, inplace=True)
# Update the DataFrame in the instance with the modified one
result[attr_name] = attr_value
return result
[docs]
class BonsaiTreeMaster:
def __init__(self, code, parent_code, name, level, alias_code):
self.code = code
self.parent_code = parent_code
self.name = name
self.level = level
self.alias_code = alias_code
[docs]
class TreeMasterFlowobject(BonsaiTreeMaster):
def __init__(
self,
code,
parent_code,
name,
level,
compartment,
chemical_compound,
default_unit,
alias_code,
):
super().__init__(code, parent_code, name, level, alias_code)
self.compartment = compartment
self.chemical_compound = chemical_compound
self.default_unit = default_unit
[docs]
class BonsaiDimMaster:
def __init__(self, code, name, description, comment):
self.code = code
self.name = name
self.description = description
self.comment = comment
[docs]
def list_csv_files_excluding(directory_path, exclude_file):
# List all CSV files in the directory, excluding the specified file
csv_files = [
file for file in directory_path.glob("*.csv") if file.name != exclude_file
]
return csv_files
[docs]
def read_csv_files_as_dataframes(directory_path, exclude_file, prefix_dtype_mapping):
csv_files = list_csv_files_excluding(directory_path, exclude_file)
dataframes = {}
for csv_file in csv_files:
try:
# Get the dtype dictionary based on the column name prefixes
dtype_dict = get_dtype_dict_by_prefixes(csv_file, prefix_dtype_mapping)
# Read each CSV file into a DataFrame with the specified dtypes
file_stem = csv_file.stem # Get the filename without the extension
dataframes[file_stem] = pd.read_csv(
csv_file, dtype=dtype_dict, na_values=[], keep_default_na=False
)
except Exception as e:
logger.info(f"Error reading {csv_file.name}: {e}")
return dataframes
[docs]
def get_dtype_dict_by_prefixes(file_path, prefix_dtype_mapping):
# Read the first row to get column names
temp_df = pd.read_csv(file_path, nrows=0)
column_names = temp_df.columns.tolist()
# Create a dictionary mapping column names to dtypes based on prefix_dtype_mapping
dtype_dict = {}
for prefix, dtype in prefix_dtype_mapping.items():
dtype_dict.update(
{col: dtype for col in column_names if col.startswith(prefix)}
)
return dtype_dict
dtype_dict = {
"code": "str",
"parent_code": "str",
"tree_": "str",
"dim_": "str",
"flowobject_from": "str",
"flowobject_to": "str",
"activitytype_from": "str",
"activitytype_to": "str",
}
[docs]
class ObjectNames:
def __init__(self, code):
self.code = code
[docs]
class EmptyClass:
def __init__(self) -> None:
pass
[docs]
class Resources:
def __init__(self, path_to_resource):
self.datapackage = DataPackage()
self.bonsai = DataPackage()
self.classification_name = EmptyClass()
dfs = read_csv_files_as_dataframes(
directory_path=path_to_resource,
exclude_file=None, # ,"resource.csv",
prefix_dtype_mapping=dtype_dict,
)
def _remove_keywords(text, keywords):
for keyword in keywords:
text = text.replace(keyword, "")
return text
for file_name, df in dfs.items():
if isinstance(df, pd.DataFrame): # and "resources.csv" not in file_name:
setattr(self.datapackage, file_name, df)
self._create_attributes()
self._create_attributes_names()
# Inject the CustomDataFrame class into the DataFrame objects in datapackage
# Iterate over DataFrame attributes in datapackage
for attr_name in dir(self.datapackage):
attr = getattr(self.datapackage, attr_name)
if isinstance(attr, pd.DataFrame):
# Replace the DataFrame with an instance of CustomDataFrame
setattr(self.datapackage, attr_name, CustomDataFrame(attr))
def _create_attributes(self):
def _remove_keywords(text, keywords):
for keyword in keywords:
text = text.replace(keyword, "")
return text
if hasattr(self.datapackage, "tree_bonsai"):
for index, row in self.datapackage.tree_bonsai.iterrows():
if "compartment" in self.datapackage.tree_bonsai.columns:
obj = TreeMasterFlowobject(
code=row["code"],
parent_code=row["parent_code"],
name=row["name"],
level=row["level"],
default_unit=row["default_unit"],
compartment=row["compartment"],
chemical_compound=row["chemical_compound"],
alias_code=row["alias_code"],
)
else:
obj = BonsaiTreeMaster(
code=row["code"],
parent_code=row["parent_code"],
name=row["name"],
level=row["level"],
alias_code=row["alias_code"],
)
setattr(self.bonsai, row["code"], obj)
elif hasattr(self.datapackage, "dim_bonsai"):
for index, row in self.datapackage.dim_bonsai.iterrows():
obj = BonsaiDimMaster(
code=row["code"],
name=row["name"],
description=row["description"],
comment=row["comment"],
)
setattr(self.bonsai, row["code"], obj)
def _create_attributes_names(self):
def _remove_keywords(text, keywords):
for keyword in keywords:
text = text.replace(keyword, "")
return text
if hasattr(self.datapackage, "resources"):
exclude_strings = [
"level",
"compartment",
"chemical_compound",
"lcia",
"urban_rural",
"distribution",
"calendar",
"year",
"unit",
"unit_conversion",
]
for index, row in self.datapackage.resources.iterrows():
_name = row["name"]
if ("tree_" in _name or "dim_" in _name) and not any(
excl in _name for excl in exclude_strings
):
class_name = _remove_keywords(_name, ["tree_", "dim_"])
obj = ObjectNames(code=class_name)
# setattr(
# self.classification_name,
# class_name,
# class_name,
# )
setattr(self.classification_name, class_name, obj)
[docs]
def get_classification_names(self):
"""
Return considerred classification names.
Returns
-------
list of strings
"""
attribute_list = [
attr for attr in dir(self.classification_name) if not attr.startswith("__")
]
return attribute_list
[docs]
def get_bonsai_classification():
"""
Returns a dictionary of all the default bonsai classification names
"""
bonsai_classifications = {
"location": "bonsai",
"activitytype": "bonsai",
"flowobject": "bonsai",
"flow": "bonsai",
}
return bonsai_classifications
[docs]
def get_bonsai_schemas_mapping():
"""
Returns dictionary that mapps bonsai schemas to bonsai codes.
"""
mapping = {
"ProductionVolumes": {"activitytype": ["ai"], "flowobject": ["fi"]},
"Trade": {"activitytype": ["oa_imp", "oa_exp"], "flowobject": ["fi"]},
"ConsumptionVolumes": {"activitytype": ["ai"], "flowobject": ["fi"]},
"Use": {"activitytype": ["ai", "am", "aa_comp"], "flowobject": ["fi", "fm"]},
"WasteSupply": {"activitytype": ["at"], "flowobject": ["ft"]},
"Supply": {"activitytype": ["ai", "am", "aa_comp"], "flowobject": ["fi", "fm"]},
"Imports": {"activitytype": ["oa_imp"], "flowobject": ["fi"]},
"FinalUses": {"activitytype": ["oa_FU"], "flowobject": ["fi"]},
"ValueAdded": {"activitytype": ["ai"], "flowobject": ["fec_VA"]},
"SocialSatellite": {"activitytype": ["ai"], "flowobject": ["fs"]},
"Valuation": {"activitytype": ["oa_VALU"], "flowobject": ["fi"]},
}
return mapping
activitytype = Resources(ROOT_PATH.joinpath(activitytype_path))
location = Resources(ROOT_PATH.joinpath(location_path))
dataquality = Resources(ROOT_PATH.joinpath(dataquality_path))
uncertainty = Resources(ROOT_PATH.joinpath(uncertainty_path))
time = Resources(ROOT_PATH.joinpath(time_path))
flowobject = Resources(ROOT_PATH.joinpath(flowobject_path))
flow = Resources(ROOT_PATH.joinpath(flow_path))
unit_monetary = Resources(ROOT_PATH.joinpath(unit_monetary_path))
unit_physical = Resources(ROOT_PATH.joinpath(unit_physical_path))
unit_magnitude = Resources(ROOT_PATH.joinpath(unit_magnitude_path))