import time
from functools import (
reduce,
wraps,
)
from typing import (
Iterable,
Sized,
Union,
)
import numpy as np
import pandas as pd
from fireant.database import Database
from fireant.dataset.fields import Field
from fireant.dataset.totals import get_totals_marker_for_dtype
from fireant.utils import (
alias_selector,
chunks,
)
from .finders import find_totals_dimensions
from .slow_query_logger import (
query_logger,
slow_query_logger,
)
[docs]def fetch_data(database: Database,
queries: Union[Sized, Iterable],
dimensions: Iterable[Field],
share_dimensions: Iterable[Field] = (),
reference_groups=()):
queries = [
str(query.limit(min(query._limit or float("inf"), database.max_result_set_size)))
for query in queries
]
results = database.concurrency_middleware.fetch_queries_as_dataframe(queries, database)
return reduce_result_set(results, reference_groups, dimensions, share_dimensions)
[docs]def db_cache(func):
@wraps(func)
def wrapper(query, database, *args):
if database.cache_middleware is not None:
return database.cache_middleware(func)(query, database, *args)
return func(query, database, *args)
return wrapper
[docs]def log(func):
@wraps(func)
def wrapper(query, database, *args):
start_time = time.time()
query_logger.debug(query)
result = func(query, database, *args)
duration = round(time.time() - start_time, 4)
query_log_msg = '[{duration} seconds]: {query}'.format(duration=duration,
query=query)
query_logger.info(query_log_msg)
if database.slow_query_log_min_seconds is not None and duration >= database.slow_query_log_min_seconds:
slow_query_logger.warning(query_log_msg)
return result
return wrapper
[docs]@db_cache
@log
def fetch_as_dataframe(query: str, database: Database):
"""
Executes a query to fetch data from database middleware and builds/cleans the data as a data frame. The query
execution is logged with its duration.
:param database:
instance of `fireant.Database`, database middleware
:param query: Query string
:return: `pd.DataFrame` constructed from the result of the query
"""
with database.connect() as connection:
return pd.read_sql(query, connection, coerce_float=True, parse_dates=True)
[docs]def reduce_result_set(results: Iterable[pd.DataFrame],
reference_groups,
dimensions: Iterable[Field],
share_dimensions: Iterable[Field]):
"""
Reduces the result sets from individual queries into a single data frame. This effectively joins sets of references
and concatenates the sets of totals.
:param results: A list of data frame
:param reference_groups: A list of groups of references (grouped by interval such as WoW, etc)
:param dimensions: A list of dimensions, used for setting the index on the result data frame.
:param share_dimensions: A list of dimensions from which the totals are used for calculating share operations.
:return:
"""
# One result group for each rolled up dimension. Groups contain one member plus one for each reference type used.
result_groups = chunks(results, 1 + len(reference_groups))
dimension_keys = [alias_selector(d.alias)
for d in dimensions]
totals_dimension_keys = [alias_selector(d.alias)
for d in find_totals_dimensions(dimensions, share_dimensions)]
dimension_dtypes = result_groups[0][0][dimension_keys].dtypes
# Reduce each group to one data frame per rolled up dimension
group_data_frames = []
for i, result_group in enumerate(result_groups):
if dimension_keys:
result_group = [result.set_index(dimension_keys)
for result in result_group]
base_df = result_group[0]
reference_dfs = [_make_reference_data_frame(base_df, result, reference)
for result, reference_group in zip(result_group[1:], reference_groups)
for reference in reference_group]
reduced = reduce(lambda left, right: pd.merge(left, right, how='outer', left_index=True, right_index=True),
[base_df] + reference_dfs)
# If there are rolled up dimensions in this result set then replace the NaNs for that dimension value with a
# marker to indicate totals.
# The data frames will be ordered so that the first group will contain the data without any rolled up
# dimensions, then followed by the groups with them, ordered by the last rollup dimension first.
if totals_dimension_keys[:i]:
reduced = _replace_nans_for_totals_values(reduced, dimension_dtypes)
group_data_frames.append(reduced)
return pd.concat(group_data_frames, sort=False) \
.sort_index(na_position='first')
def _replace_nans_for_totals_values(data_frame, dtypes):
# some things are just easier to do without an index. Reset it temporarily to replaxe NaN values with the rollup
# marker values
index_names = data_frame.index.names
data_frame.reset_index(inplace=True)
for dimension_key, dtype in dtypes.items():
data_frame[dimension_key] = data_frame[dimension_key].fillna(get_totals_marker_for_dtype(dtype))
return data_frame.set_index(index_names)
def _make_reference_data_frame(base_df, ref_df, reference):
"""
This applies the reference metrics to the data frame given the base data frame and the reference data frame.
When a reference is selected as a delta or a delta percentage, the calculation is performed here. Otherwise, the
reference data frame is returned.
:param base_df:
:param ref_df:
:param reference:
:return:
"""
mertric_column_indices = [i
for i, column in enumerate(ref_df.columns)
if column not in base_df.columns]
ref_columns = [ref_df.columns[i] for i in mertric_column_indices]
if not (reference.delta or reference.delta_percent):
return ref_df[ref_columns]
base_columns = [base_df.columns[i] for i in mertric_column_indices]
# Select just the metric columns from the DF and rename them with the reference key as a suffix
base_df, ref_df = base_df[base_columns].copy(), ref_df[ref_columns].copy()
# Both data frame columns are renamed in order to perform the calculation below.
base_df.columns = ref_df.columns = [column.replace(reference.reference_type.alias, reference.alias)
for column in ref_columns]
ref_delta_df = base_df - ref_df
if reference.delta_percent:
return 100. * ref_delta_df / ref_df.replace(0, np.nan) # pandas raises an exception when dividing by zero
return ref_delta_df