Source code for fireant.slicer.operations

import numpy as np
import pandas as pd

from fireant.slicer.references import reference_key
from fireant.slicer.totals import get_totals_marker_for_dtype
from fireant.utils import (
    format_dimension_key,
    format_metric_key,
    reduce_data_frame_levels,
)
from .dimensions import Dimension
from .metrics import Metric


def _extract_key_or_arg(data_frame, key):
    return data_frame[key] \
        if key in data_frame \
        else key


[docs]class Operation(object): """ The `Operation` class represents an operation in the `Slicer` API. """
[docs] def apply(self, data_frame, reference): raise NotImplementedError()
@property def metrics(self): raise NotImplementedError() @property def operations(self): return []
class _BaseOperation(Operation): def __init__(self, key, label, prefix=None, suffix=None, precision=None): self.key = key self.label = label self.prefix = prefix self.suffix = suffix self.precision = precision def apply(self, data_frame, reference): raise NotImplementedError() @property def metrics(self): raise NotImplementedError() @property def operations(self): raise NotImplementedError() def _group_levels(self, index): """ Get the index levels that need to be grouped. This is to avoid apply the cumulative function across separate dimensions. Only the first dimension should be accumulated across. :param index: :return: """ return index.names[1:] class _Cumulative(_BaseOperation): def __init__(self, arg): super(_Cumulative, self).__init__( key='{}({})'.format(self.__class__.__name__.lower(), getattr(arg, 'key', arg)), label='{}({})'.format(self.__class__.__name__, getattr(arg, 'label', arg)), prefix=getattr(arg, 'prefix'), suffix=getattr(arg, 'suffix'), precision=getattr(arg, 'precision'), ) self.arg = arg def apply(self, data_frame, reference): raise NotImplementedError() @property def metrics(self): return [metric for metric in [self.arg] if isinstance(metric, Metric)] @property def operations(self): return [op_and_children for operation in [self.arg] if isinstance(operation, Operation) for op_and_children in [operation] + operation.operations] def __repr__(self): return self.key
[docs]class CumSum(_Cumulative):
[docs] def apply(self, data_frame, reference): df_key = format_metric_key(reference_key(self.arg, reference)) if isinstance(data_frame.index, pd.MultiIndex): levels = self._group_levels(data_frame.index) return data_frame[df_key] \ .groupby(level=levels) \ .cumsum() return data_frame[df_key].cumsum()
[docs]class CumProd(_Cumulative):
[docs] def apply(self, data_frame, reference): df_key = format_metric_key(reference_key(self.arg, reference)) if isinstance(data_frame.index, pd.MultiIndex): levels = self._group_levels(data_frame.index) return data_frame[df_key] \ .groupby(level=levels) \ .cumprod() return data_frame[df_key].cumprod()
[docs]class CumMean(_Cumulative):
[docs] @staticmethod def cummean(x): return x.cumsum() / np.arange(1, len(x) + 1)
[docs] def apply(self, data_frame, reference): df_key = format_metric_key(reference_key(self.arg, reference)) if isinstance(data_frame.index, pd.MultiIndex): levels = self._group_levels(data_frame.index) return data_frame[df_key] \ .groupby(level=levels) \ .apply(self.cummean) return self.cummean(data_frame[df_key])
[docs]class RollingOperation(_BaseOperation): def __init__(self, arg, window, min_periods=None): super(RollingOperation, self).__init__( key='{}({})'.format(self.__class__.__name__.lower(), getattr(arg, 'key', arg)), label='{}({})'.format(self.__class__.__name__, getattr(arg, 'label', arg)), prefix=getattr(arg, 'prefix'), suffix=getattr(arg, 'suffix'), precision=getattr(arg, 'precision'), ) self.arg = arg self.window = window self.min_periods = min_periods def _should_adjust(self, other_operations): # Need to figure out if this rolling operation is has the largest window, and if it's the first of multiple # rolling operations if there are more than one operation sharing the largest window. first_max_rolling = list(sorted(other_operations, key=lambda operation: operation.window))[0] return first_max_rolling is self
[docs] def apply(self, data_frame, reference): raise NotImplementedError()
@property def metrics(self): return [metric for metric in [self.arg] if isinstance(metric, Metric)] @property def operations(self): return [op_and_children for operation in [self.arg] if isinstance(operation, Operation) for op_and_children in [operation] + operation.operations]
[docs]class RollingMean(RollingOperation):
[docs] def rolling_mean(self, x): return x.rolling(self.window, self.min_periods).mean()
[docs] def apply(self, data_frame, reference): df_key = format_metric_key(reference_key(self.arg, reference)) if isinstance(data_frame.index, pd.MultiIndex): levels = self._group_levels(data_frame.index) return data_frame[df_key] \ .groupby(level=levels) \ .apply(self.rolling_mean) return self.rolling_mean(data_frame[df_key])
[docs]class Share(_BaseOperation): def __init__(self, metric: Metric, over: Dimension = None, precision=2): super(Share, self).__init__( key='share({},{})'.format(getattr(metric, 'key', metric), getattr(over, 'key', over), ), label='Share of {} over {}'.format(getattr(metric, 'label', metric), getattr(over, 'label', over)), prefix=None, suffix='%', precision=precision, ) self.metric = metric self.over = over @property def metrics(self): return [metric for metric in [self.metric] if isinstance(metric, Metric)] @property def operations(self): return [op_and_children for operation in [self.metric] if isinstance(operation, Operation) for op_and_children in [operation] + operation.operations]
[docs] def apply(self, data_frame, reference): f_metric_key = format_metric_key(reference_key(self.metric, reference)) if self.over is None: df = data_frame[f_metric_key] return 100 * df / df if not isinstance(data_frame.index, pd.MultiIndex): marker = get_totals_marker_for_dtype(data_frame.index.dtype) totals = data_frame.loc[marker, f_metric_key] return 100 * data_frame[f_metric_key] / totals f_over_key = format_dimension_key(self.over.key) idx = data_frame.index.names.index(f_over_key) group_levels = data_frame.index.names[idx:] over_dim_value = get_totals_marker_for_dtype(data_frame.index.levels[idx].dtype) totals_key = (slice(None),) * idx + (slice(over_dim_value, over_dim_value),) totals = reduce_data_frame_levels(data_frame.loc[totals_key, f_metric_key], group_levels) def apply_totals(group_df): if not isinstance(totals, pd.Series): return 100 * group_df / totals n_index_levels = len(totals.index.names) extra_level_names = group_df.index.names[n_index_levels:] group_df = group_df.reset_index(extra_level_names, drop=True) share = 100 * group_df / totals[group_df.index] return pd.Series(share.values, index=group_df.index) return data_frame[f_metric_key] \ .groupby(level=group_levels) \ .apply(apply_totals) \ .reorder_levels(order=data_frame.index.names) \ .sort_index()