Source code for fireant.database.vertica

from pypika import (
    Table,
    VerticaQuery,
    functions as fn,
    terms,
)

from .base import Database


[docs]class Trunc(terms.Function): """ Wrapper for Vertica TRUNC function for truncating dates. """ def __init__(self, field, date_format, alias=None): super(Trunc, self).__init__('TRUNC', field, date_format, alias=alias) # Setting the fields here means we can access the TRUNC args by name. self.field = field self.date_format = date_format self.alias = alias
[docs]class VerticaDatabase(Database): """ Vertica client that uses the vertica_python driver. """ # The pypika query class to use for constructing queries query_cls = VerticaQuery DATETIME_INTERVALS = { 'hour': 'HH', 'day': 'DD', 'week': 'IW', 'month': 'MM', 'quarter': 'Q', 'year': 'Y' } def __init__(self, host='localhost', port=5433, database='vertica', user='vertica', password=None, read_timeout=None, **kwags): super(VerticaDatabase, self).__init__(host, port, database, **kwags) self.user = user self.password = password self.read_timeout = read_timeout
[docs] def connect(self): import vertica_python return vertica_python.connect(host=self.host, port=self.port, database=self.database, user=self.user, password=self.password, read_timeout=self.read_timeout, unicode_error='replace')
[docs] def trunc_date(self, field, interval): trunc_date_interval = self.DATETIME_INTERVALS.get(str(interval), 'DD') return Trunc(field, trunc_date_interval)
[docs] def date_add(self, field, date_part, interval): return fn.TimestampAdd(str(date_part), interval, field)
[docs] def get_column_definitions(self, schema, table): """ Return schema information including column names and their data type :param schema: the name of the schema if you would like to narrow the results down (String) :param table: the name of the table if you would like to narrow the results down (String) :return: List of column name, column data type pairs """ table_columns = Table('columns') table_query = VerticaQuery.from_(table_columns) \ .select(table_columns.column_name, table_columns.data_type) \ .where((table_columns.table_schema == schema) & (table_columns.field('table_name') == table)) \ .distinct() return self.fetch(str(table_query))