Source code for bw2analyzer.tagged

from collections import defaultdict
from warnings import warn

from bw2calc import LCA
from bw2data import Method, get_activity, Database


[docs]def traverse_tagged_databases( functional_unit, method, label="tag", default_tag="other", secondary_tags=[], fg_databases=None ): """Traverse a functional unit throughout its foreground database(s) or the listed databses in fg_databses, and group impacts by tag label. Contribution analysis work by linking impacts to individual activities. However, you also might want to group impacts in other ways. For example, give individual biosphere exchanges their own grouping, or aggregate two activities together. Consider this example system, where the letters are the tag labels, and the numbers are exchange amounts. The functional unit is one unit of the tree root. .. image:: images/tagged-traversal.png :alt: Example tagged supply chain In this supply chain, tags are applied to activities and biosphere exchanges. If a biosphere exchange is not tagged, it inherits the tag of its producing activity. Similarly, links to other databases are assessed with the usual LCA machinery, and the total LCA score is tagged according to its consuming activity. If an activity does not have a tag, a default tag is applied. We can change our visualization to show the use of the default tags: .. image:: images/tagged-traversal-2.png :alt: Example tagged supply chain And then we can manually calculate the tagged impacts. Normally we would need to know the actual biosphere flows and their respective characterization factors (CF), but in this example we assume that each CF is one. Our result, group by tags, would therefore be: * **A**: :math:`6 + 27 = 33` * **B**: :math:`30 + 44 = 74` * **C**: :math:`5 + 16 + 48 = 69` * **D**: :math:`14` This function will only traverse the foreground database, i.e. the database of the functional unit activity. A functional unit can have multiple starting nodes; in this case, all foreground databases are traversed. Input arguments: * ``functional_unit``: A functional unit dictionary, e.g. ``{("foo", "bar"): 42}``. * ``method``: A method name, e.g. ``("foo", "bar")`` * ``label``: The label of the tag classifier. Default is ``"tag"`` * ``default_tag``: The tag classifier to use if none was given. Default is ``"other"`` * ``secondary_tags``: List of tuples in the format (secondary_label, secondary_default_tag). Default is empty list. * ``fg_databases``: a list of foreground databases to be traversed, e.g. ['foreground', 'biomass', 'machinery'] It's not recommended to include all databases of a project in the list to be traversed, especially not ecoinvent itself Returns: Aggregated tags dictionary from ``aggregate_tagged_graph``, and tagged supply chain graph from ``recurse_tagged_database``. """ lca = LCA(functional_unit, method) lca.lci() lca.lcia() method_dict = {o[0]: o[1] for o in Method(method).load()} graph = [ recurse_tagged_database( key, amount, method_dict, lca, label, default_tag, secondary_tags, fg_databases ) for key, amount in functional_unit.items() ] return aggregate_tagged_graph(graph), graph
[docs]def aggregate_tagged_graph(graph): """Aggregate a graph produced by ``recurse_tagged_database`` by the provided tags. Outputs a dictionary with keys of tags and numeric values. .. code-block:: python {'a tag': summed LCIA scores} """ def recursor(obj, scores): scores[obj["tag"]] += obj["impact"] for flow in obj["biosphere"]: scores[flow["tag"]] += flow["impact"] for exc in obj["technosphere"]: scores = recursor(exc, scores) return scores scores = defaultdict(int) for obj in graph: scores = recursor(obj, scores) return scores
[docs]def recurse_tagged_database( activity, amount, method_dict, lca, label, default_tag, secondary_tags=[], fg_databases=None, warned=False ): """Traverse a foreground database and assess activities and biosphere flows by tags. Input arguments: * ``activity``: Activity tuple or object * ``amount``: float * ``method_dict``: Dictionary of biosphere flow tuples to CFs, e.g. ``{("biosphere", "foo"): 3}`` * ``lca``: An ``LCA`` object that is already initialized, i.e. has already calculated LCI and LCIA with same method as in ``method_dict`` * ``label``: string * ``default_tag``: string * ``secondary_tags``: List of tuples in the format (secondary_label, secondary_default_tag). Default is empty list. * ``fg_databases``: a list of foreground databases to be traversed, e.g. ['foreground', 'biomass', 'machinery'] It's not recommended to include all databases of a project in the list to be traversed, especially not ecoinvent itself Returns: .. code-block:: python { 'activity': activity object, 'amount': float, 'tag': string, 'secondary_tags': [list of strings], 'impact': float (impact of inputs from outside foreground database), 'biosphere': [{ 'amount': float, 'impact': float, 'tag': string, 'secondary_tags': [list of strings] }], 'technosphere': [this data structure] } """ if isinstance(activity, tuple): activity = get_activity(activity) MESSAGE = """Given databases include many activities, and traversal may be slow. Consider using `GraphTraversalLCA` from `bw2calc` instead.""" if fg_databases is None: # then set the list equal to the database of the functional unit fg_databases = [activity['database']] # list, single item elif not warned and sum(len(Database(name)) for name in fg_databases) > 2500: warn(MESSAGE) warned = True inputs = list(activity.technosphere()) production = list(activity.production()) if not production: scale = 1 elif len(production) > 1: warn("Hit multiple production exchanges; aborting in this branch") return else: scale = production[0]["amount"] for other in activity.technosphere(): if other.input == production[0].input: scale -= other["amount"] inside = [exc for exc in inputs if exc.input["database"] in fg_databases] outside = { exc.input.id: exc["amount"] / scale * amount for exc in inputs if exc["input"][0] not in fg_databases } if outside: lca.redo_lcia(outside) outside_score = lca.score else: outside_score = 0 return { "activity": activity, "amount": amount, "tag": activity.get(label) or default_tag, "secondary_tags": [activity.get(t[0]) or t[1] for t in secondary_tags], "impact": outside_score, "biosphere": [ { "activity": exc.input, "amount": exc["amount"] / scale * amount, "impact": exc["amount"] / scale * amount * method_dict.get(exc["input"], 0), "tag": exc.get(label) or activity.get(label) or default_tag, "secondary_tags": [ exc.get(t[0]) or activity.get(t[0]) or t[1] for t in secondary_tags ], } for exc in activity.biosphere() ], "technosphere": [ recurse_tagged_database( activity=exc.input, amount=exc["amount"] / scale * amount, method_dict=method_dict, lca=lca, label=label, default_tag=default_tag, secondary_tags=secondary_tags, fg_databases=fg_databases, warned=warned, ) for exc in inside ], }
## tagged graph functions using multiple methods
[docs]def multi_traverse_tagged_databases( functional_unit, methods, label="tag", default_tag="other", secondary_tags=[] ): """Traverse a functional unit throughout its foreground database(s), and group impacts (for multiple methods) by tag label. Input arguments: * ``functional_unit``: A functional unit dictionary, e.g. ``{("foo", "bar"): 42}``. * ``methods``: A list of method names, e.g. ``[("foo", "bar"), ("baz", "qux"), ...]`` * ``label``: The label of the tag classifier. Default is ``"tag"`` * ``default_tag``: The tag classifier to use if none was given. Default is ``"other"`` * ``secondary_tags``: List of tuples in the format (secondary_label, secondary_default_tag). Default is empty list. Returns: Aggregated tags dictionary from ``aggregate_tagged_graph``, and tagged supply chain graph from ``recurse_tagged_database``. """ lca = LCA(functional_unit, methods[0]) lca.lci() # factorize=True) lca.lcia() method_dicts = [{o[0]: o[1] for o in Method(method).load()} for method in methods] graph = [ multi_recurse_tagged_database( key, amount, methods, method_dicts, lca, label, default_tag, secondary_tags ) for key, amount in functional_unit.items() ] return multi_aggregate_tagged_graph(graph), graph
[docs]def multi_aggregate_tagged_graph(graph): """Aggregate a graph produced by ``multi_recurse_tagged_database`` by the provided tags. Outputs a dictionary with keys of tags and numeric values. Note: this only aggregates on the primary tag, secondary tags are not aggregated .. code-block:: python {'a tag': [list of summed LCIA scores with one sum per method]} """ def recursor(obj, scores): if not scores.get(obj["tag"]): scores[obj["tag"]] = [x for x in obj["impact"]] else: scores[obj["tag"]] = [ sum(x) for x in zip(scores[obj["tag"]], obj["impact"]) ] for flow in obj["biosphere"]: if not scores.get(flow["tag"]): scores[flow["tag"]] = [x for x in flow["impact"]] else: scores[flow["tag"]] = [ sum(x) for x in zip(scores[flow["tag"]], flow["impact"]) ] for exc in obj["technosphere"]: scores = recursor(exc, scores) return scores scores = defaultdict(int) for obj in graph: scores = recursor(obj, scores) return scores
[docs]def multi_recurse_tagged_database( activity, amount, methods, method_dicts, lca, label, default_tag, secondary_tags=[] ): """Traverse a foreground database and assess activities and biosphere flows by tags using multiple methods. Input arguments: * ``activity``: Activity tuple or object * ``amount``: float * ``methods``: list of LCA methods (tuples) * ``method_dicts``: list of dictionaries of biosphere flow tuples to CFs, e.g. ``{("biosphere", "foo"): 3}`` corresponding to methods in ``methods`` * ``lca``: An ``LCA`` object that is already initialized, i.e. has already calculated LCI * ``label``: string * ``default_tag``: string * ``secondary_tags``: list of tuples in the format (secondary_label, secondary_default_tag). Default is empty list. Returns: .. code-block:: python { 'activity': activity object, 'amount': float, 'tag': string, 'secondary_tags': [list of strings], 'impact': [list of floats (impact of inputs from outside foreground database) with one element per method], 'biosphere': [{ 'amount': float, 'impact': [list of floats with one element per method], 'tag': string, 'secondary_tags': [list of strings] }], 'technosphere': [this data structure] } """ if isinstance(activity, tuple): activity = get_activity(activity) inputs = list(activity.technosphere()) inside = [exc for exc in inputs if exc.input["database"] == activity["database"]] outside = { exc.input.id: exc["amount"] * amount for exc in inputs if exc["input"][0] != activity["database"] } if outside: outside_scores = [] for n, m in enumerate(methods): lca.switch_method(m) lca.redo_lcia(outside) outside_scores.append(lca.score) else: outside_scores = [0] * len(methods) return { "activity": activity, "amount": amount, "tag": activity.get(label) or default_tag, "secondary_tags": [activity.get(t[0]) or t[1] for t in secondary_tags], "impact": outside_scores, "biosphere": [ { "activity": exc.input, "amount": exc["amount"] * amount, "impact": [ exc["amount"] * amount * method_dict.get(exc["input"], 0) for method_dict in method_dicts ], "tag": exc.get(label) or activity.get(label) or default_tag, "secondary_tags": [ exc.get(t[0]) or activity.get(t[0]) or t[1] for t in secondary_tags ], } for exc in activity.biosphere() ], "technosphere": [ multi_recurse_tagged_database( exc.input, exc["amount"] * amount, methods, method_dicts, lca, label, default_tag, secondary_tags, ) for exc in inside ], }
[docs]def get_cum_impact(graph, max_levels=100): """Add cumulative impact ``cum_impact`` to each ``technosphere`` level of a tagged graph. This function recurses until all levels in the graph have been checked, or the ``max_levels`` cutoff is reached Input arguments: * ``graph``: A tagged supply chain graph from ``recurse_tagged_database``. * ``max_levels``: maximum number of graph levels to check before giving up. Default is 100. Returns: Tagged supply chain graph with additional cumulative impact ``cum_impact`` key at each ``technosphere`` level. """ def cum_impact_recurse(d): to_return = {} cum_impact = 0 for k, v in d.items(): if k == "technosphere": if len(v) != 0: for e in v: cum_impact += e["impact"] if "cum_impact" in e.keys(): cum_impact += e["cum_impact"] if k in to_return.keys(): to_return[k].append(cum_impact_recurse(e)) else: to_return[k] = [cum_impact_recurse(e)] else: to_return[k] = [] elif k == "biosphere": to_return[k] = v if len(v) != 0: for b in v: cum_impact += b["impact"] # elif k == 'activity': # to_return[k] = str(v) else: to_return[k] = v to_return["cum_impact"] = cum_impact return to_return return_list = [] for subgraph in graph: this_d = subgraph for i in range(max_levels): prev_d = this_d this_d = cum_impact_recurse(prev_d) if this_d == prev_d: break return_list.append(this_d) return return_list
[docs]def get_multi_cum_impact(graph, max_levels=100): """Add cumulative impact ``cum_impact`` to each ``technosphere`` level of a multi method tagged graph. This function recurses until all levels in the graph have been checked, or the ``max_levels`` cutoff is reached Input arguments: * ``graph``: A tagged supply chain graph from ``multi_recurse_tagged_database``. * ``max_levels``: maximum number of graph levels to check before giving up. Default is 100. Returns: Tagged supply chain graph with additional cumulative impact ``cum_impact`` key at each ``technosphere`` level. """ def multi_cum_impact_recurse(d): to_return = {} cum_impact = [0] * len(d["impact"]) for k, v in d.items(): if k == "technosphere": if len(v) != 0: for e in v: cum_impact = [sum(x) for x in zip(cum_impact, e["impact"])] if "cum_impact" in e.keys(): cum_impact = [ sum(x) for x in zip(cum_impact, e["cum_impact"]) ] if k in to_return.keys(): to_return[k].append(multi_cum_impact_recurse(e)) else: to_return[k] = [multi_cum_impact_recurse(e)] else: to_return[k] = [] elif k == "biosphere": to_return[k] = v if len(v) != 0: for b in v: cum_impact = [sum(x) for x in zip(cum_impact, b["impact"])] # elif k == 'activity': # to_return[k] = str(v) else: to_return[k] = v to_return["cum_impact"] = cum_impact return to_return return_list = [] for subgraph in graph: this_d = subgraph for i in range(max_levels): prev_d = this_d this_d = multi_cum_impact_recurse(prev_d) if this_d == prev_d: break return_list.append(this_d) return return_list