Source code for fireant.tests.slicer.mocks

from collections import (
    OrderedDict,
)
from unittest.mock import Mock

import pandas as pd
from datetime import (
    datetime,
)

from fireant import *
from fireant.slicer.references import ReferenceType
from fireant.slicer.totals import get_totals_marker_for_dtype
from fireant.utils import (
    format_dimension_key as fd,
    format_metric_key as fm,
)
from pypika import (
    JoinType,
    Table,
    functions as fn,
)


[docs]class TestDatabase(VerticaDatabase): # Vertica client that uses the vertica_python driver. connect = Mock() def __eq__(self, other): return isinstance(other, TestDatabase)
test_database = TestDatabase() politicians_table = Table('politician', schema='politics') voters_table = Table('voter', schema='politics') state_table = Table('state', schema='locations') district_table = Table('district', schema='locations') deep_join_table = Table('deep', schema='test') slicer = Slicer( table=politicians_table, database=test_database, joins=( Join(table=district_table, criterion=politicians_table.district_id == district_table.id, join_type=JoinType.outer), Join(table=district_table, criterion=politicians_table.district_id == district_table.id, join_type=JoinType.outer), Join(table=state_table, criterion=district_table.state_id == state_table.id), Join(table=voters_table, criterion=politicians_table.id == voters_table.politician_id), Join(table=deep_join_table, criterion=deep_join_table.id == state_table.ref_id), ), dimensions=( DatetimeDimension('timestamp', label='Timestamp', definition=politicians_table.timestamp), DatetimeDimension('timestamp2', label='Timestamp 2', definition=politicians_table.timestamp2), DatetimeDimension('join_timestamp', label='Join Timestamp', definition=voters_table.timestamp), CategoricalDimension('political_party', label='Party', definition=politicians_table.political_party, display_values=( ('d', 'Democrat'), ('r', 'Republican'), ('i', 'Independent'), ('l', 'Libertarian'), ('g', 'Green'), ('c', 'Constitution'))), UniqueDimension('candidate', label='Candidate', definition=politicians_table.candidate_id, display_definition=politicians_table.candidate_name), UniqueDimension('election', label='Election', definition=politicians_table.election_id, display_definition=politicians_table.election_year), UniqueDimension('district', label='District', definition=politicians_table.district_id, display_definition=district_table.district_name), UniqueDimension('state', label='State', definition=district_table.state_id, display_definition=state_table.state_name), BooleanDimension('winner', label='Winner', definition=politicians_table.is_winner), UniqueDimension('deepjoin', definition=deep_join_table.id), ), metrics=( Metric('votes', label='Votes', definition=fn.Sum(politicians_table.votes)), Metric('wins', label='Wins', definition=fn.Sum(politicians_table.is_winner)), Metric('voters', label='Voters', definition=fn.Count(voters_table.id)), Metric('turnout', label='Turnout', definition=fn.Sum(politicians_table.votes) / fn.Count(voters_table.id), suffix='%', precision=2), Metric('wins_with_suffix_and_prefix', label='Wins', definition=fn.Sum(politicians_table.is_winner), prefix='$', suffix='€'), ), ) political_parties = OrderedDict((('d', 'Democrat'), ('r', 'Republican'), ('i', 'Independent'), ('l', 'Libertarian'), ('g', 'Green'), ('c', 'Constitution'))) candidates = OrderedDict(((1, 'Bill Clinton'), (2, 'Bob Dole'), (3, 'Ross Perot'), (4, 'George Bush'), (5, 'Al Gore'), (6, 'John Kerry'), (7, 'Barrack Obama'), (8, 'John McCain'), (9, 'Mitt Romney'), (10, 'Donald Trump'), (11, 'Hillary Clinton'))) states = OrderedDict(((1, 'Texas'), (2, 'California'))) elections = OrderedDict(((1, '1996'), (2, '2000'), (3, '2004'), (4, '2008'), (5, '2012'), (6, '2016'))) election_candidates = { 1: {'candidates': [1, 2, 3], 'winner': 1}, 2: {'candidates': [4, 5], 'winner': 4}, 3: {'candidates': [4, 6], 'winner': 4}, 4: {'candidates': [7, 8], 'winner': 7}, 5: {'candidates': [7, 9], 'winner': 7}, 6: {'candidates': [10, 11], 'winner': 10}, } candidate_parties = { 1: 'd', 2: 'r', 3: 'i', 4: 'r', 5: 'd', 6: 'd', 7: 'd', 8: 'r', 9: 'r', 10: 'r', 11: 'd', } election_candidate_state_votes = { # Texas (1, 1, 1): 2459683, (1, 2, 1): 2736167, (1, 3, 1): 378537, (2, 4, 1): 3799639, (2, 5, 1): 2433746, (3, 4, 1): 4526917, (3, 6, 1): 2832704, (4, 7, 1): 3528633, (4, 8, 1): 4479328, (5, 7, 1): 4569843, (5, 9, 1): 3308124, (6, 10, 1): 4685047, (6, 11, 1): 387868, # California (1, 1, 2): 5119835, (1, 2, 2): 3828380, (1, 3, 2): 697847, (2, 4, 2): 4567429, (2, 5, 2): 5861203, (3, 4, 2): 5509826, (3, 6, 2): 6745485, (4, 7, 2): 8274473, (4, 8, 2): 5011781, (5, 7, 2): 7854285, (5, 9, 2): 4839958, (6, 10, 2): 8753788, (6, 11, 2): 4483810, } election_candidate_wins = { (1, 1): True, (1, 2): False, (1, 3): False, (2, 4): True, (2, 5): False, (3, 4): True, (3, 6): False, (4, 7): True, (4, 8): False, (5, 7): True, (5, 9): False, (6, 10): True, (6, 11): False, } df_columns = [fd('timestamp'), fd('candidate'), fd('candidate_display'), fd('political_party'), fd('election'), fd('election_display'), fd('state'), fd('state_display'), fd('winner'), fm('votes'), fm('wins')]
[docs]def PoliticsRow(timestamp, candidate, candidate_display, political_party, election, election_display, state, state_display, winner, votes, wins): return ( timestamp, candidate, candidate_display, political_party, election, election_display, state, state_display, winner, votes, wins )
records = [] for (election_id, candidate_id, state_id), votes in election_candidate_state_votes.items(): election_year = elections[election_id] winner = election_candidate_wins[(election_id, candidate_id)] records.append(PoliticsRow( timestamp=datetime(int(election_year), 1, 1), candidate=candidate_id, candidate_display=candidates[candidate_id], political_party=candidate_parties[candidate_id], election=election_id, election_display=elections[election_id], state=state_id, state_display=states[state_id], winner=winner, votes=votes, wins=(1 if winner else 0), )) mock_politics_database = pd.DataFrame.from_records(records, columns=df_columns) single_metric_df = pd.DataFrame(mock_politics_database[[fm('votes')]] .sum()).T multi_metric_df = pd.DataFrame(mock_politics_database[[fm('votes'), fm('wins')]] .sum()).T cont_dim_df = mock_politics_database[[fd('timestamp'), fm('votes'), fm('wins')]] \ .groupby(fd('timestamp')) \ .sum() no_index_df = pd.DataFrame(cont_dim_df.sum()).T cat_dim_df = mock_politics_database[[fd('political_party'), fm('votes'), fm('wins')]] \ .groupby(fd('political_party')) \ .sum() cat_uni_dim_df = mock_politics_database[[fd('political_party'), fd('candidate'), fd('candidate_display'), fm('votes'), fm('wins')]] \ .groupby([fd('political_party'), fd('candidate'), fd('candidate_display')]) \ .sum() \ .reset_index(fd('candidate_display')) uni_dim_df = mock_politics_database[[fd('candidate'), fd('candidate_display'), fm('votes'), fm('wins')]] \ .groupby([fd('candidate'), fd('candidate_display')]) \ .sum() \ .reset_index(fd('candidate_display')) cont_cat_dim_df = mock_politics_database[[fd('timestamp'), fd('political_party'), fm('votes'), fm('wins')]] \ .groupby([fd('timestamp'), fd('political_party')]) \ .sum() cont_uni_dim_df = mock_politics_database[[fd('timestamp'), fd('state'), fd('state_display'), fm('votes'), fm('wins')]] \ .groupby([fd('timestamp'), fd('state'), fd('state_display')]) \ .sum() \ .reset_index(fd('state_display')) cont_cat_uni_dim_df = mock_politics_database[[fd('timestamp'), fd('political_party'), fd('state'), fd('state_display'), fm('votes'), fm('wins')]] \ .groupby([fd('timestamp'), fd('political_party'), fd('state'), fd('state_display')]) \ .sum() \ .reset_index(fd('state_display')) cont_dim_operation_df = cont_dim_df.copy() operation_key = fm('cumsum(votes)') cont_dim_operation_df[operation_key] = cont_dim_df[fm('votes')].cumsum()
[docs]def split(list, i): return list[:i], list[i:]
[docs]def ref(data_frame, columns): ref_cols = {column: '%s_eoe' % column for column in columns} ref_df = data_frame \ .shift(2) \ .rename(columns=ref_cols)[list(ref_cols.values())] return (cont_uni_dim_df .copy() .join(ref_df) .iloc[2:])
[docs]def ref_delta(ref_data_frame, columns): ref_columns = ['%s_eoe' % column for column in columns] delta_data_frame = pd.DataFrame( data=ref_data_frame[ref_columns].values - ref_data_frame[columns].values, columns=['%s_eoe_delta' % column for column in columns], index=ref_data_frame.index ) return ref_data_frame.join(delta_data_frame)
_columns = [fm('votes'), fm('wins')] cont_uni_dim_ref_df = ref(cont_uni_dim_df, _columns) cont_uni_dim_ref_delta_df = ref_delta(cont_uni_dim_ref_df, _columns)
[docs]def totals(data_frame, dimensions, columns): """ Computes the totals across a dimension and adds the total as an extra row. """ if not isinstance(data_frame.index, pd.MultiIndex): totals_marker = get_totals_marker_for_dtype(data_frame.index.dtype) totals_df = pd.DataFrame([data_frame.sum()], index=pd.Index([totals_marker], name=data_frame.index.name)) return data_frame.append(totals_df) def _totals(df): if isinstance(df, pd.Series): return df.sum() totals_index_value = get_totals_marker_for_dtype(df.index.levels[-1].dtype) return pd.DataFrame( [df.sum()], columns=columns, index=pd.Index([totals_index_value], name=df.index.names[-1])) totals_df = None for i in range(-1, -1 - len(dimensions), -1): groupby_levels = data_frame.index.names[:i] if groupby_levels: level_totals_df = data_frame[columns].groupby(level=groupby_levels).apply(_totals) missing_dims = set(data_frame.index.names) - set(level_totals_df.index.names) if missing_dims: for dim in missing_dims: dtype = data_frame.index.levels[data_frame.index.names.index(dim)].dtype level_totals_df[dim] = get_totals_marker_for_dtype(dtype) level_totals_df.set_index(dim, append=True, inplace=True) level_totals_df = level_totals_df.reorder_levels(data_frame.index.names) else: totals_index_values = [get_totals_marker_for_dtype(level.dtype) for level in data_frame.index.levels] level_totals_df = pd.DataFrame([data_frame[columns].apply(_totals)], columns=columns, index=pd.MultiIndex.from_tuples([totals_index_values], names=data_frame.index.names)) totals_df = totals_df.append(level_totals_df) \ if totals_df is not None \ else level_totals_df return data_frame.append(totals_df).sort_index()
# Convert all index values to string for l in list(locals().values()): if not isinstance(l, pd.DataFrame): continue if hasattr(l.index, 'levels'): l.index = pd.MultiIndex(levels=[level.astype('str') if not isinstance(level, (pd.DatetimeIndex, pd.RangeIndex)) else level for level in l.index.levels], labels=l.index.labels) elif not isinstance(l.index, (pd.DatetimeIndex, pd.RangeIndex)): l.index = l.index.astype('str') cat_dim_totals_df = totals(cat_dim_df, [fd('political_party')], _columns) cont_cat_dim_totals_df = totals(cont_cat_dim_df, [fd('political_party')], _columns) cont_cat_dim_all_totals_df = totals(cont_cat_dim_df, [fd('timestamp'), fd('political_party')], _columns) cont_uni_dim_totals_df = totals(cont_uni_dim_df, [fd('state')], _columns) cont_uni_dim_all_totals_df = totals(cont_uni_dim_df, [fd('timestamp'), fd('state')], _columns) cont_cat_uni_dim_all_totals_df = totals(cont_cat_uni_dim_df, [fd('timestamp'), fd('political_party'), fd('state')], _columns) ElectionOverElection = ReferenceType('eoe', 'EoE', 'year', 4)