TableauHyperApiExtraLogic.fn_run_hyper_creation()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 6
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
"""
2
TableauHyperApiExtraLogic - a Hyper client library.
3
4
This library allows packaging CSV content into HYPER format with data type checks
5
"""
6
# package regular expression
7
import re
8
# package to handle numerical structures
9
import numpy
10
# package to handle Data Frames (in this file)
11
import pandas as pd
12
# Custom classes from Tableau Hyper package
13
from tableauhyperapi import HyperProcess, Telemetry, \
14
    Connection, CreateMode, \
15
    NOT_NULLABLE, NULLABLE, SqlType, TableDefinition, \
16
    Inserter, \
17
    TableName, \
18
    HyperException
19
20
21
class TableauHyperApiExtraLogic:
22
23
    def fn_build_hyper_columns_for_csv(self, logger, timmer, detected_csv_structure):
24
        timmer.start()
25
        list_to_return = []
26
        for current_field_structure in detected_csv_structure:
27
            list_to_return.append(current_field_structure['order'])
28
            current_column_type = self.fn_convert_to_hyper_types(current_field_structure['type'])
29
            logger.debug('Column ' + str(current_field_structure['order']) + ' having name "'
30
                         + current_field_structure['name'] + '" and type "'
31
                         + current_field_structure['type'] + '" will become "'
32
                         + str(current_column_type) + '"')
33
            nullability_value = NULLABLE
34
            if current_field_structure['nulls'] == 0:
35
                nullability_value = NOT_NULLABLE
36
            list_to_return[current_field_structure['order']] = TableDefinition.Column(
37
                name=current_field_structure['name'],
38
                type=current_column_type,
39
                nullability=nullability_value
40
            )
41
        logger.info('Building Hyper columns completed')
42
        timmer.stop()
43
        return list_to_return
44
45
    @staticmethod
46
    def fn_convert_to_hyper_types(given_type):
47
        switcher = {
48
            'empty': SqlType.text(),
49
            'bool': SqlType.bool(),
50
            'int': SqlType.big_int(),
51
            'float-dot': SqlType.double(),
52
            'date-YMD': SqlType.date(),
53
            'date-MDY': SqlType.date(),
54
            'date-DMY': SqlType.date(),
55
            'time-24': SqlType.time(),
56
            'time-12': SqlType.time(),
57
            'datetime-24-YMD': SqlType.timestamp(),
58
            'datetime-12-MDY': SqlType.timestamp(),
59
            'datetime-24-DMY': SqlType.timestamp(),
60
            'str': SqlType.text()
61
        }
62
        identified_type = switcher.get(given_type)
63
        if identified_type is None:
64
            identified_type = SqlType.text()
65
        return identified_type
66
67
    def fn_create_hyper_file_from_csv(self, local_logger, timmer, input_csv_data_frame,
68
                                      in_data_type, given_parameters):
69
        hyper_cols = self.fn_build_hyper_columns_for_csv(local_logger, timmer, in_data_type)
70
        # The rows to insert into the <hyper_table> table.
71
        data_to_insert = self.fn_rebuild_csv_content_for_hyper(local_logger, timmer,
72
                                                               input_csv_data_frame,
73
                                                               in_data_type)
74
        # Starts the Hyper Process with telemetry enabled/disabled to send data to Tableau or not
75
        # To opt in, simply set telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU.
76
        # To opt out, simply set telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU.
77
        with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
78
            # Creates new Hyper file <output_hyper_file>
79
            # Replaces file with CreateMode.CREATE_AND_REPLACE if it already exists.
80
            timmer.start()
81
            with Connection(endpoint=hyper.endpoint,
82
                            database=given_parameters.output_file,
83
                            create_mode=CreateMode.CREATE_AND_REPLACE) as hyper_connection:
84
                local_logger.info('Connection to the Hyper engine '
85
                                  + f'file "{given_parameters.output_file}" has been created.')
86
                timmer.stop()
87
                timmer.start()
88
                hyper_connection.catalog.create_schema("Extract")
89
                local_logger.info('Hyper schema "Extract" has been created.')
90
                hyper_table = TableDefinition(
91
                    TableName("Extract", "Extract"),
92
                    columns=hyper_cols
93
                )
94
                hyper_connection.catalog.create_table(table_definition=hyper_table)
95
                local_logger.info('Hyper table "Extract" has been created.')
96
                timmer.stop()
97
                timmer.start()
98
                # Execute the actual insert
99
                with Inserter(hyper_connection, hyper_table) as hyper_insert:
100
                    hyper_insert.add_rows(rows=data_to_insert)
101
                    hyper_insert.execute()
102
                local_logger.info('Data has been inserted into Hyper table')
103
                timmer.stop()
104
                timmer.start()
105
                # Number of rows in the <hyper_table> table.
106
                # `execute_scalar_query` is for executing a query
107
                # that returns exactly one row with one column.
108
                query_to_run = f'SELECT COUNT(*) FROM {hyper_table.table_name}'
109
                row_count = hyper_connection.execute_scalar_query(query=query_to_run)
110
                local_logger.info(f'Table {hyper_table.table_name} has {row_count} rows')
111
                timmer.stop()
112
            local_logger.info('Connection to the Hyper engine file has been closed')
113
        local_logger.info('Hyper engine process has been shut down')
114
115
    def fn_rebuild_csv_content_for_hyper(self, logger, timmer, input_df, detected_fields_type):
116
        timmer.start()
117
        input_df.replace(to_replace=[numpy.nan], value=[None], inplace=True)
118
        # Cycle through all found columns
119
        for current_field in detected_fields_type:
120
            fld_nm = current_field['name']
121
            logger.debug(f'Column {fld_nm} has panda_type = ' + str(current_field['panda_type'])
122
                         + ' and python type = ' + str(current_field['type']))
123
            input_df[fld_nm] = self.reevaluate_single_column(input_df, fld_nm, current_field)
124
        logger.info('Re-building CSV content for maximum Hyper compatibility has been completed')
125
        timmer.stop()
126
        return input_df.values
127
128
    def reevaluate_single_column(self, given_df, given_field_name, current_field_details):
129
        if current_field_details['type'] == 'str':
130
            given_df[given_field_name] = given_df[given_field_name].astype(str)
131
        elif current_field_details['panda_type'] == 'float64' \
132
                and current_field_details['type'] == 'int':
133
            given_df[given_field_name] = given_df[given_field_name].fillna(0).astype('int64')
134
        elif current_field_details['type'][0:5] in ('date-', 'datet', 'time-'):
135
            given_df[given_field_name] = self.fn_string_to_date(given_field_name, given_df)
136
        return given_df[given_field_name]
137
138
    def fn_run_hyper_creation(self, local_logger, timmer, input_data_frame, input_data_type,
139
                              given_parameters):
140
        try:
141
            self.fn_create_hyper_file_from_csv(local_logger, timmer, input_data_frame,
142
                                               input_data_type, given_parameters)
143
        except HyperException as ex:
144
            local_logger.error(str(ex).replace(chr(10), ' '))
145
            exit(1)
146
147
    @staticmethod
148
    def fn_string_to_date(in_col_name, in_data_frame):
149
        if re.match('-YMD', in_col_name):
150
            in_data_frame[in_col_name] = pd.to_datetime(in_data_frame[in_col_name], yearfirst=True)
151
        elif re.match('-DMY', in_col_name):
152
            in_data_frame[in_col_name] = pd.to_datetime(in_data_frame[in_col_name], dayfirst=True)
153
        else:
154
            in_data_frame[in_col_name] = pd.to_datetime(in_data_frame[in_col_name])
155
        return in_data_frame[in_col_name]
156