Passed
Push — master ( ad1f56...e8e600 )
by Daniel
01:39
created

TableauHyperApiExtraLogic.fn_string_to_date()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 3
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
"""
2
TableauHyperApiExtraLogic - a Hyper client library.
3
4
This library allows packaging CSV content into HYPER format with data type checks
5
"""
6
# package regular expression
7
import re
8
# package to handle numerical structures
9
import numpy
10
# package to handle Data Frames (in this file)
11
import pandas as pd
12
# Custom classes from Tableau Hyper package
13
from tableauhyperapi import HyperProcess, Telemetry, \
14
    Connection, CreateMode, \
15
    NOT_NULLABLE, NULLABLE, SqlType, TableDefinition, \
16
    Inserter, \
17
    TableName, \
18
    HyperException
19
20
21
class TableauHyperApiExtraLogic:
22
23
    def fn_build_hyper_columns_for_csv(self, logger, detected_csv_structure):
24
        list_to_return = []
25
        for current_field_structure in detected_csv_structure:
26
            list_to_return.append(current_field_structure['order'])
27
            current_column_type = self.fn_convert_to_hyper_types(current_field_structure['type'])
28
            logger.debug('Column ' + str(current_field_structure['order']) + ' having name "'
29
                         + current_field_structure['name'] + '" and type "'
30
                         + current_field_structure['type'] + '" will become "'
31
                         + str(current_column_type) + '"')
32
            nullability_value = NULLABLE
33
            if current_field_structure['nulls'] == 0:
34
                nullability_value = NOT_NULLABLE
35
            list_to_return[current_field_structure['order']] = TableDefinition.Column(
36
                name=current_field_structure['name'],
37
                type=current_column_type,
38
                nullability=nullability_value
39
            )
40
        return list_to_return
41
42
    @staticmethod
43
    def fn_convert_to_hyper_types(given_type):
44
        switcher = {
45
            'empty': SqlType.text(),
46
            'bool': SqlType.bool(),
47
            'int': SqlType.big_int(),
48
            'float-dot': SqlType.double(),
49
            'date-YMD': SqlType.date(),
50
            'date-MDY': SqlType.date(),
51
            'date-DMY': SqlType.date(),
52
            'time-24': SqlType.time(),
53
            'time-12': SqlType.time(),
54
            'datetime-24-YMD': SqlType.timestamp(),
55
            'datetime-12-MDY': SqlType.timestamp(),
56
            'datetime-24-DMY': SqlType.timestamp(),
57
            'str': SqlType.text()
58
        }
59
        identified_type = switcher.get(given_type)
60
        if identified_type is None:
61
            identified_type = SqlType.text()
62
        return identified_type
63
64
    def fn_create_hyper_file_from_csv(self, local_logger, timmer, input_csv_data_frame,
65
                                      in_data_type, given_parameters):
66
        timmer.start()
67
        hyper_cols = self.fn_build_hyper_columns_for_csv(local_logger, in_data_type)
68
        local_logger.info('Building Hyper columns completed')
69
        timmer.stop()
70
        # The rows to insert into the <hyper_table> table.
71
        timmer.start()
72
        data_to_insert = self.fn_rebuild_csv_content_for_hyper(local_logger,
73
                                                               input_csv_data_frame,
74
                                                               in_data_type)
75
        local_logger.info('Re-building CSV content for maximum Hyper compatibility '
76
                          + 'has been completed')
77
        timmer.stop()
78
        # Starts the Hyper Process with telemetry enabled/disabled to send data to Tableau or not
79
        # To opt in, simply set telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU.
80
        # To opt out, simply set telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU.
81
        with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
82
            # Creates new Hyper file <output_hyper_file>
83
            # Replaces file with CreateMode.CREATE_AND_REPLACE if it already exists.
84
            timmer.start()
85
            with Connection(endpoint=hyper.endpoint,
86
                            database=given_parameters.output_file,
87
                            create_mode=CreateMode.CREATE_AND_REPLACE) as hyper_connection:
88
                local_logger.info('Connection to the Hyper engine '
89
                                  + f'file "{given_parameters.output_file}" has been created.')
90
                timmer.stop()
91
                timmer.start()
92
                hyper_connection.catalog.create_schema("Extract")
93
                local_logger.info('Hyper schema "Extract" has been created.')
94
                hyper_table = TableDefinition(
95
                    TableName("Extract", "Extract"),
96
                    columns=hyper_cols
97
                )
98
                hyper_connection.catalog.create_table(table_definition=hyper_table)
99
                local_logger.info('Hyper table "Extract" has been created.')
100
                timmer.stop()
101
                timmer.start()
102
                # Execute the actual insert
103
                with Inserter(hyper_connection, hyper_table) as hyper_insert:
104
                    hyper_insert.add_rows(rows=data_to_insert)
105
                    hyper_insert.execute()
106
                local_logger.info('Data has been inserted into Hyper table')
107
                timmer.stop()
108
                timmer.start()
109
                # Number of rows in the <hyper_table> table.
110
                # `execute_scalar_query` is for executing a query
111
                # that returns exactly one row with one column.
112
                query_to_run = f'SELECT COUNT(*) FROM {hyper_table.table_name}'
113
                row_count = hyper_connection.execute_scalar_query(query=query_to_run)
114
                local_logger.info(f'Table {hyper_table.table_name} has {row_count} rows')
115
                timmer.stop()
116
            local_logger.info('Connection to the Hyper engine file has been closed')
117
        local_logger.info('Hyper engine process has been shut down')
118
119
    def fn_rebuild_csv_content_for_hyper(self, logger, input_df, detected_fields_type):
120
        input_df.replace(to_replace=[numpy.nan], value=[None], inplace=True)
121
        # Cycle through all found columns
122
        for current_field in detected_fields_type:
123
            fld_nm = current_field['name']
124
            logger.debug(f'Column {fld_nm} has panda_type = '
125
                         + str(current_field['panda_type'])
126
                         + ' and python type = ' + str(current_field['type']))
127
            if current_field['panda_type'] == 'float64' and current_field['type'] == 'int':
128
                input_df[fld_nm] = input_df[fld_nm].fillna(0).astype('int64')
129
            elif current_field['type'][0:5] in ('date-', 'datet', 'time-'):
130
                input_df[fld_nm] = self.fn_string_to_date(fld_nm, input_df)
131
        return input_df.values
132
133
    def fn_run_hyper_creation(self, local_logger, timmer, input_data_frame, input_data_type,
134
                              given_parameters):
135
        try:
136
            self.fn_create_hyper_file_from_csv(local_logger, timmer, input_data_frame,
137
                                               input_data_type, given_parameters)
138
        except HyperException as ex:
139
            local_logger.error(str(ex).replace(chr(10), ' '))
140
            exit(1)
141
142
    def fn_string_to_date(self, column_name, input_data_frame):
143
        if re.match('-YMD', column_name):
144
            input_data_frame[column_name] = pd.to_datetime(input_data_frame[column_name],
145
                                                           yearfirst=True)
146
        elif re.match('-DMY', column_name):
147
            input_data_frame[column_name] = pd.to_datetime(input_data_frame[column_name],
148
                                                           dayfirst=True)
149
        else:
150
            input_data_frame[column_name] = pd.to_datetime(input_data_frame[column_name])
151
        return input_data_frame[column_name]
152