Source code for fireant.slicer.queries.builder

from typing import (
    Dict,
    Iterable,
)

import pandas as pd

from fireant.utils import (
    format_dimension_key,
    format_metric_key,
    immutable,
    repr_field_key,
)
from pypika import Order
from . import special_cases
from .execution import fetch_data
from .finders import (
    find_and_group_references_for_dimensions,
    find_and_replace_reference_dimensions,
    find_metrics_for_widgets,
    find_operations_for_widgets,
    find_share_dimensions,
)
from .pagination import paginate
from .sql_transformer import (
    make_latest_query,
    make_orders_for_dimensions,
    make_slicer_query,
    make_slicer_query_with_totals_and_references,
)
from .. import QueryException
from ..base import SlicerElement
from ..dimensions import Dimension
from ..references import reference_key
from ..totals import scrub_totals_from_share_results


[docs]def add_hints(queries, hint=None): return [query.hint(hint) if hint is not None and hasattr(query.__class__, 'hint') else query for query in queries]
[docs]class QueryBuilder(object): """ This is the base class for building slicer queries. This class provides an interface for building slicer queries via a set of functions which can be chained together. """ def __init__(self, slicer, table): self.slicer = slicer self.table = table self._dimensions = [] self._filters = [] self._apply_filter_to_totals = [] self._references = [] self._limit = None self._offset = None @immutable def filter(self, *filters, apply_to_totals=True): """ :param filters: :param apply_to_totals: :return: """ self._filters += [f for f in filters] self._apply_filter_to_totals += [apply_to_totals] * len(filters) @immutable def limit(self, limit): """ :param limit: A limit on the number of database rows returned. """ self._limit = limit @immutable def offset(self, offset): """ :param offset: A offset on the number of database rows returned. """ self._offset = offset @property def queries(self): """ Serialize this query builder object to a set of Pypika/SQL queries. This is the base implementation shared by two implementations: the query to fetch data for a slicer request and the query to fetch choices for dimensions. This function only handles dimensions (select+group by) and filtering (where/having), which is everything needed for the query to fetch choices for dimensions. The slicer query extends this with metrics, references, and totals. """ raise NotImplementedError()
[docs] def fetch(self, hint=None): """ Fetches the data for this query instance and returns it in an instance of `pd.DataFrame` :param hint: For database vendors that support it, add a query hint to collect analytics on the queries triggerd by fireant. """ queries = add_hints(self.queries, hint) return fetch_data(self.slicer.database, queries, self._dimensions)
[docs]class SlicerQueryBuilder(QueryBuilder): """ Slicer queries consist of widgets, dimensions, filters, and references. At least one or more widgets is required. All others are optional. """ def __init__(self, slicer): super(SlicerQueryBuilder, self).__init__(slicer, slicer.table) self._widgets = [] self._orders = [] @immutable def widget(self, *widgets): """ :param widgets: :return: """ self._widgets += widgets @immutable def dimension(self, *dimensions): """ :param dimensions: :return: """ self._dimensions += [dimension for dimension in dimensions if dimension not in self._dimensions] @immutable def reference(self, *references): """ Add a reference for a dimension when building a slicer query. :param references: References to add to the query :return: A copy of the dimension with the reference added. """ self._references += references @immutable def orderby(self, element: SlicerElement, orientation: Order = None): """ :param element: The element to order by, either a metric or dimension. :param orientation: The directionality to order by, either ascending or descending. :return: """ format_key = format_dimension_key \ if isinstance(element, Dimension) \ else format_metric_key self._orders += [(element.definition.as_(format_key(element.key)), orientation)] def _validate(self): for widget in self._widgets: if hasattr(widget, 'validate'): widget.validate(self._dimensions) @property def reference_groups(self): return list(find_and_group_references_for_dimensions(self._references).values()) @property def queries(self): """ Serialize this query builder to a list of Pypika/SQL queries. This function will return one query for every combination of reference and rolled up dimension (including null options). This collects all of the metrics in each widget, dimensions, and filters and builds a corresponding pypika query to fetch the data. When references are used, the base query normally produced is wrapped in an outer query and a query for each reference is joined based on the referenced dimension shifted. """ # First run validation for the query on all widgets self._validate() # Optionally select all metrics for slicer to better utilize caching metrics = list(self.slicer.metrics) \ if self.slicer.always_query_all_metrics \ else find_metrics_for_widgets(self._widgets) operations = find_operations_for_widgets(self._widgets) share_dimensions = find_share_dimensions(self._dimensions, operations) references = find_and_replace_reference_dimensions(self._references, self._dimensions) orders = (self._orders or make_orders_for_dimensions(self._dimensions)) return make_slicer_query_with_totals_and_references(self.slicer.database, self.table, self.slicer.joins, self._dimensions, metrics, operations, self._filters, references, orders, share_dimensions=share_dimensions, apply_filter_to_totals=self._apply_filter_to_totals)
[docs] def fetch(self, hint=None) -> Iterable[Dict]: """ Fetch the data for this query and transform it into the widgets. :param hint: A query hint label used with database vendors which support it. Adds a label comment to the query. :return: A list of dict (JSON) objects containing the widget configurations. """ queries = add_hints(self.queries, hint) operations = find_operations_for_widgets(self._widgets) share_dimensions = find_share_dimensions(self._dimensions, operations) data_frame = fetch_data(self.slicer.database, queries, self._dimensions, share_dimensions, self.reference_groups) # Apply operations for operation in operations: for reference in [None] + self._references: df_key = format_metric_key(reference_key(operation, reference)) data_frame[df_key] = operation.apply(data_frame, reference) data_frame = scrub_totals_from_share_results(data_frame, self._dimensions) data_frame = special_cases.apply_operations_to_data_frame(operations, data_frame) data_frame = paginate(data_frame, self._widgets, orders=self._orders, limit=self._limit, offset=self._offset) # Apply transformations return [widget.transform(data_frame, self.slicer, self._dimensions, self._references) for widget in self._widgets]
def __str__(self): return str(self.queries) def __repr__(self): return ".".join(["slicer", "data"] + ["widget({})".format(repr(widget)) for widget in self._widgets] + ["dimension({})".format(repr(dimension)) for dimension in self._dimensions] + ["filter({}{})".format(repr(f), ', apply_filter_to_totals=True' if apply_filter_to_totals else '') for f, apply_filter_to_totals in zip(self._filters, self._apply_filter_to_totals)] + ["reference({})".format(repr(reference)) for reference in self._references] + ["orderby({}, {})".format(repr_field_key(definition.alias), orientation) for (definition, orientation) in self._orders])
[docs]class DimensionChoicesQueryBuilder(QueryBuilder): """ This builder is used for building slicer queries for fetching the choices for a dimension given a set of filters. """ def __init__(self, slicer, dimension): super(DimensionChoicesQueryBuilder, self).__init__(slicer, slicer.hint_table or slicer.table) self._dimensions.append(dimension) @property def queries(self): """ Serializes this query builder as a set of SQL queries. This method will always return a list of one query since only one query is required to retrieve dimension choices. This function only handles dimensions (select+group by) and filtering (where/having), which is everything needed for the query to fetch choices for dimensions. The slicer query extends this with metrics, references, and totals. """ query = make_slicer_query(database=self.slicer.database, base_table=self.table, joins=self.slicer.joins, dimensions=self._dimensions, filters=self._filters) \ .limit(self._limit) \ .offset(self._offset) return [query]
[docs] def fetch(self, hint=None, force_include=()) -> pd.Series: """ Fetch the data for this query and transform it into the widgets. :param hint: For database vendors that support it, add a query hint to collect analytics on the queries triggerd by fireant. :param force_include: A list of dimension values to include in the result set. This can be used to avoid having necessary results cut off due to the pagination. These results will be returned at the head of the results. :return: A list of dict (JSON) objects containing the widget configurations. """ query = add_hints(self.queries, hint)[0] dimension = self._dimensions[0] definition = dimension.display_definition.as_(format_dimension_key(dimension.display_key)) \ if dimension.has_display_field \ else dimension.definition.as_(format_dimension_key(dimension.key)) if force_include: include = self.slicer.database.to_char(dimension.definition) \ .isin([str(x) for x in force_include]) # Ensure that these values are included query = query.orderby(include, order=Order.desc) # Order by the dimension definition that the choices are for query = query.orderby(definition) data = fetch_data(self.slicer.database, [query], self._dimensions) df_key = format_dimension_key(getattr(dimension, 'display_key', None)) if df_key is not None: return data[df_key] display_key = 'display' if hasattr(dimension, 'display_values'): # Include provided display values data[display_key] = pd.Series(dimension.display_values) else: data[display_key] = data.index.tolist() return data[display_key]
def __repr__(self): return ".".join(["slicer", self._dimensions[0].key, "choices"] + ["filter({})".format(repr(f)) for f in self._filters])
[docs]class DimensionLatestQueryBuilder(QueryBuilder): def __init__(self, slicer): super(DimensionLatestQueryBuilder, self).__init__(slicer, slicer.hint_table or slicer.table) @immutable def __call__(self, dimension: Dimension, *dimensions: Dimension): self._dimensions += [dimension] + list(dimensions) @property def queries(self): """ Serializes this query builder as a set of SQL queries. This method will always return a list of one query since only one query is required to retrieve dimension choices. This function only handles dimensions (select+group by) and filtering (where/having), which is everything needed for the query to fetch choices for dimensions. The slicer query extends this with metrics, references, and totals. """ if not self._dimensions: raise QueryException('Must select at least one dimension to query latest values') query = make_latest_query(database=self.slicer.database, base_table=self.table, joins=self.slicer.joins, dimensions=self._dimensions) return [query]
[docs] def fetch(self, hint=None): data = super().fetch(hint=hint).reset_index().iloc[0] # Remove the row index as the name and trim the special dimension key characters from the dimension key data.name = None data.index = [key[3:] for key in data.index] return data