import functools
import pandas as pd
from dateutil.relativedelta import relativedelta
from fireant.dataset.fields import DataType
from fireant.dataset.filters import RangeFilter
from fireant.dataset.intervals import DatetimeInterval
from fireant.dataset.operations import RollingOperation
[docs]def adjust_daterange_filter_for_rolling_window(dimensions, operations, filters):
"""
This function adjusts date filters for a rolling operation in order to select enough date to compute the values for
within the original range.
It only applies when using a date dimension in the first position and a RangeFilter is used on that dimension. It
is meant to be applied to a slicer query.
:param dimensions:
The dimensions applied to a slicer query
:param operations:
The dimensions used in widgets in a slicer query
:param filters:
The filters applied to a slicer query
:return:
"""
has_datetime_dimension_in_first_dimension_pos = not len(dimensions) \
or not dimensions[0].data_type == DataType.date
if has_datetime_dimension_in_first_dimension_pos:
return filters
has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return filters
dim0 = dimensions[0]
filters_on_dim0 = [filter_
for filter_ in filters
if isinstance(filter_, RangeFilter)
and str(filter_.definition.term) == str(dim0.definition)]
if not 0 < len(filters_on_dim0):
return filters
max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))
for filter_ in filters_on_dim0:
# Monkey patch the update start date on the date filter
print('stop')
args = {dim0.interval_key + 's': max_rolling_period} \
if isinstance(dim0, DatetimeInterval) \
and 'quarter' != dim0.interval_key \
else {'months': max_rolling_period * 3}
filter_.definition.start.value -= relativedelta(**args)
return filters
[docs]def adjust_dataframe_for_rolling_window(operations, data_frame):
"""
This function adjusts the resulting data frame after executing a slicer query with a rolling operation. If there is
a date dimension in the first level of the data frame's index and a rolling operation is applied, it will slice the
dates following the max window to remove it. This way, the adjustment of date filters applied in
#adjust_daterange_filter_for_rolling_window are removed from the data frame but also in case there are no filters,
the first few date data points will be removed where the rolling window cannot be calculated.
:param operations:
:param data_frame:
:return:
"""
has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return data_frame
max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))
if isinstance(data_frame.index, pd.DatetimeIndex):
return data_frame.iloc[max_rolling_period - 1:]
if isinstance(data_frame.index, pd.MultiIndex) \
and isinstance(data_frame.index.levels[0], pd.DatetimeIndex):
num_levels = len(data_frame.index.levels)
return data_frame.groupby(level=list(range(1, num_levels))) \
.apply(lambda df: df.iloc[max_rolling_period - 1:]) \
.reset_index(level=list(range(num_levels - 1)), drop=True)
return data_frame
[docs]def apply_to_query_args(database, table, joins, dimensions, metrics, operations, filters, references, orders):
filters = adjust_daterange_filter_for_rolling_window(dimensions, operations, filters)
return (database, table, joins, dimensions, metrics, operations, filters, references, orders)
[docs]def apply_special_cases(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return f(*apply_to_query_args(*args), **kwargs)
return wrapper
[docs]def apply_operations_to_data_frame(operations, data_frame):
data_frame = adjust_dataframe_for_rolling_window(operations, data_frame)
return data_frame