Source code for bw2analyzer.utils

import itertools
import string
import sys
from warnings import warn

import bw2calc as bc
import numpy as np
import pandas as pd
import pyprind
from bw2data import Database, databases, get_activity, methods


[docs]def contribution_for_all_datasets_one_method(database, method, progress=True): """Calculate contribution analysis (for technosphere processes) for all inventory datasets in one database for one LCIA method. Args: *database* (str): Name of database *method* (tuple): Method tuple Returns: NumPy array of relative contributions. Each column sums to one. Lookup dictionary, dataset keys to row/column indices """ def get_normalized_scores(lca, kind): if kind == "activities": data = lca.characterized_inventory.sum(axis=0) elif kind == "flows": data = lca.characterized_inventory.sum(axis=1) elif kind == "all": data = lca.characterized_inventory.data scores = np.abs(np.array(data).ravel()) summed = scores.sum() if summed == 0: return np.zeros(scores.shape) else: return scores / summed assert database in databases, f"Can't find database {database}" assert method in methods, f"Can't find method {method}" db = Database(database) assert len(db), f"Database {database} appears to have no datasets" # Array to store results results = np.zeros((len(db), len(db)), dtype=np.float32) # Instantiate LCA object lca = bc.LCA({db.random(): 1}, method=method) lca.lci() lca.lcia() rows = lca.characterized_inventory.shape[0] cols = lca.characterized_inventory.shape[1] all_cutoff = cols * 4 results = { "activities": np.zeros((cols, cols), dtype=np.float32), "flows": np.zeros((rows, cols), dtype=np.float32), "all": np.zeros((all_cutoff, cols), dtype=np.float32), } pbar = pyprind.ProgBar(len(db), title="Activities:") # Actual calculations for ds in db: lca.redo_lcia({ds.id: 1}) if not lca.score: continue col = lca.dicts.activity[ds.id] results["activities"][:, col] = get_normalized_scores(lca, "activities") results["flows"][:, col] = get_normalized_scores(lca, "flows") results_all = get_normalized_scores(lca, "all") results_all.sort() results_all = results_all[::-1] fill_number = results_all.shape[0] assert fill_number < all_cutoff, "Too many values in 'all'" results["all"][:fill_number, col] = results_all pbar.update() print(pbar) return results
[docs]def infinite_alphabet(): """Return generator with values a-z, then aa-az, ba-bz, then aaa-aaz, aba-abz, etc.""" for value in itertools.chain.from_iterable( itertools.product(string.ascii_lowercase, repeat=i) for i in itertools.count(1) ): yield "".join(value)
[docs]def recursive_calculation_to_object( activity, lcia_method, amount=1, max_level=3, cutoff=1e-2, as_dataframe=False, root_label="root", use_matrix_values=False, _lca_obj=None, _total_score=None, __result_list=None, __level=0, __label="", __parent=None, ): """Traverse a supply chain graph, and calculate the LCA scores of each component. Adds a dictionary to ``result_list`` of the form: { 'label': Label of this branch. Starts with nothing, then A, AA, AB, AAA, AAB, etc. 'score': Absolute score of this activity 'fraction': Fraction of total score of this activity 'amount': Input amount of the reference product of this activity 'name': Name of this activity 'key': Activity key 'root_label': Starting label of root element for recursion. } Args: activity: ``Activity``. The starting point of the supply chain graph. lcia_method: tuple. LCIA method to use when traversing supply chain graph. amount: int. Amount of ``activity`` to assess. max_level: int. Maximum depth to traverse. cutoff: float. Fraction of total score to use as cutoff when deciding whether to traverse deeper. as_dataframe: Return results as a list (default) or a pandas ``DataFrame`` use_matrix_values: bool. Take exchange values from the matrix instead of the exchange instance ``amount``. Useful for Monte Carlo, but can be incorrect if there is more than one exchange from the same pair of nodes. Internal args (used during recursion, do not touch): __result_list: list. __level: int. __label: str. __parent: str. Returns: List of dicts """ activity = get_activity(activity) if __result_list is None: __result_list = [] __label = root_label if _lca_obj is None: _lca_obj = bc.LCA({activity: amount}, lcia_method) _lca_obj.lci() _lca_obj.lcia() _total_score = _lca_obj.score elif _total_score is None: raise ValueError else: _lca_obj.redo_lcia({activity.id: amount}) if abs(_lca_obj.score) <= abs(_total_score * cutoff): return __result_list.append( { "label": __label, "parent": __parent, "score": _lca_obj.score, "fraction": _lca_obj.score / _total_score, "amount": float(amount), "name": activity.get("name", "(Unknown name)"), "key": activity.key, } ) if __level < max_level: prod_exchanges = list(activity.production()) if not prod_exchanges: prod_amount = 1 elif len(prod_exchanges) > 1: warn( "Hit multiple production exchanges for {}; aborting in this branch".format( activity ) ) return else: prod_amount = _lca_obj.technosphere_matrix[ _lca_obj.dicts.product[prod_exchanges[0].input.id], _lca_obj.dicts.activity[prod_exchanges[0].output.id], ] for child_label, exc in zip(infinite_alphabet(), activity.technosphere()): if exc.input.id == exc.output.id: continue if use_matrix_values: sign = ( -1 if exc.get("type") in ("technosphere", "generic technosphere") else 1 ) tm_amount = ( _lca_obj.technosphere_matrix[ _lca_obj.dicts.product[exc.input.id], _lca_obj.dicts.activity[exc.output.id], ] * sign ) else: tm_amount = exc["amount"] recursive_calculation_to_object( activity=exc.input, lcia_method=lcia_method, amount=amount * tm_amount / prod_amount, max_level=max_level, cutoff=cutoff, as_dataframe=as_dataframe, __result_list=__result_list, __parent=__label, __label=__label + "_" + child_label if __label else child_label, _lca_obj=_lca_obj, _total_score=_total_score, __level=__level + 1, ) if as_dataframe and __label == root_label: return pd.DataFrame(__result_list) else: return __result_list