Source code for bw2analyzer.contribution

import numpy as np
from bw2data import get_activity


[docs]class ContributionAnalysis:
[docs] def sort_array(self, data, limit=25, limit_type="number", total=None): """ Common sorting function for all ``top`` methods. Sorts by highest value first. Operates in either ``number`` or ``percent`` mode. In ``number`` mode, return ``limit`` values. In ``percent`` mode, return all values >= (total * limit); where ``0 < limit <= 1``. Returns 2-d numpy array of sorted values and row indices, e.g.: .. code-block:: python ContributionAnalysis().sort_array((1., 3., 2.)) returns .. code-block:: python ( (3, 1), (2, 2), (1, 0) ) Args: * *data* (numpy array): A 1-d array of values to sort. * *limit* (number, default=25): Number of values to return, or percentage cutoff. * *limit_type* (str, default=``number``): Either ``number`` or ``percent``. * *total* (number, default=None): Optional specification of summed data total. Returns: 2-d numpy array of values and row indices. """ total = total or np.abs(data).sum() if limit_type not in ("number", "percent"): raise ValueError("limit_type must be either 'percent' or 'index'.") if limit_type == "percent": if not 0 < limit <= 1: raise ValueError("Percentage limits > 0 and <= 1.") limit = (np.abs(data) >= (total * limit)).sum() results = np.hstack( (data.reshape((-1, 1)), np.arange(data.shape[0]).reshape((-1, 1))) ) return results[np.argsort(np.abs(data))[::-1]][:limit, :]
[docs] def top_matrix(self, matrix, rows=5, cols=5): """ Find most important (i.e. highest summed) rows and columns in a matrix, as well as the most corresponding non-zero individual elements in the top rows and columns. Only returns matrix values which are in the top rows and columns. Element values are returned as a tuple: ``(row, col, row index in top rows, col index in top cols, value)``. Example: .. code-block:: python matrix = [ [0, 0, 1, 0], [2, 0, 4, 0], [3, 0, 1, 1], [0, 7, 0, 1], ] In this matrix, the row sums are ``(1, 6, 5, 8)``, and the columns sums are ``(5, 7, 6, 2)``. Therefore, the top rows are ``(3, 1)`` and the top columns are ``(1, 2)``. The result would therefore be: .. code-block:: python ( ( (3, 1, 0, 0, 7), (3, 2, 0, 1, 1), (1, 2, 1, 1, 4) ), (3, 1), (1, 2) ) Args: * *matrix* (array or matrix): Any Python object that supports the ``.sum(axis=)`` syntax. * *rows* (int): Number of rows to select. * *cols* (int): Number of columns to select. Returns: (elements, top rows, top columns) """ top_rows = np.argsort(np.abs(np.array(matrix.sum(axis=1)).ravel()))[ : -rows - 1 : -1 ] top_cols = np.argsort(np.abs(np.array(matrix.sum(axis=0)).ravel()))[ : -cols - 1 : -1 ] elements = [] for row, x in enumerate(top_rows): for col, y in enumerate(top_cols): if matrix[x, y] != 0: elements.append((x, y, row, col, float(matrix[x, y]))) return elements, top_rows.astype(int), top_cols.astype(int)
[docs] def hinton_matrix(self, lca, rows=5, cols=5): coo, b, t = self.top_matrix(lca.characterized_inventory, rows=rows, cols=cols) coo = [row[2:] for row in coo] # Don't need matrix indices flows = [self.get_name(lca.dicts.biosphere.reversed[x]) for x in b] activities = [self.get_name(lca.dicts.activity.reversed[x]) for x in t] return { "results": coo, "total": lca.score, "xlabels": activities, "ylabels": flows, }
[docs] def annotate(self, sorted_data, rev_mapping): """Reverse the mapping from database ids to array indices""" return [(row[0], rev_mapping[row[1]]) for row in sorted_data]
[docs] def top_processes(self, matrix, **kwargs): """Return an array of [value, index] technosphere processes.""" return self.sort_array(np.array(matrix.sum(axis=0)).ravel(), **kwargs)
[docs] def top_emissions(self, matrix, **kwargs): """Return an array of [value, index] biosphere emissions.""" return self.sort_array(np.array(matrix.sum(axis=1)).ravel(), **kwargs)
[docs] def annotated_top_processes(self, lca, names=True, **kwargs): """Get list of most damaging processes in an LCA, sorted by ``abs(direct impact)``. Returns a list of tuples: ``(lca score, supply, activity)``. If ``names`` is False, they returns the process key as the last element. """ results = [ ( score, lca.supply_array[int(index)], lca.dicts.activity.reversed[int(index)], ) for score, index in self.top_processes( lca.characterized_inventory, **kwargs ) ] if names: results = [(x[0], x[1], get_activity(x[2])) for x in results] return results
[docs] def annotated_top_emissions(self, lca, names=True, **kwargs): """Get list of most damaging biosphere flows in an LCA, sorted by ``abs(direct impact)``. Returns a list of tuples: ``(lca score, inventory amount, activity)``. If ``names`` is False, they returns the process key as the last element. """ results = [ (score, lca.inventory[int(index), :].sum(), lca.dicts.biosphere.reversed[int(index)]) for score, index in self.top_emissions( lca.characterized_inventory, **kwargs ) ] if names: results = [(x[0], x[1], get_activity(x[2])) for x in results] return results
[docs] def get_name(self, key): return get_activity(key).get("name", "Unknown")
[docs] def d3_treemap( self, matrix, rev_bio, rev_techno, limit=0.025, limit_type="percent" ): """ Construct treemap input data structure for LCA result. Output like: .. code-block:: python { "name": "LCA result", "children": [{ "name": process 1, "children": [ {"name": emission 1, "size": score}, {"name": emission 2, "size": score}, ], }] } """ total = np.abs(matrix).sum() processes = self.top_processes(matrix, limit=limit, limit_type=limit_type) data = {"name": "LCA result", "children": [], "size": total} for _, tech_index in processes: name = self.get_name(rev_techno[tech_index]) this_score = np.abs(matrix[:, int(tech_index)].toarray().ravel()).sum() children = [] for score, bio_index in self.sort_array( matrix[:, int(tech_index)].toarray().ravel(), limit=limit, limit_type=limit_type, total=total, ): children.append( { "name": self.get_name(rev_bio[bio_index]), "size": float(abs(matrix[int(bio_index), int(tech_index)])), } ) children_score = sum([x["size"] for x in children]) if children_score < (0.95 * this_score): children.append({"name": "Others", "size": this_score - children_score}) data["children"].append( { "name": name, "size": this_score, # "children": children } ) return data