Test Failed
Push — master ( 33d579...1dd004 )
by Daniel
03:50 queued 12s
created

TableauHyperApiExtraLogic.fn_rebuild_csv_content_for_hyper()   A

Complexity

Conditions 5

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 14
nop 4
dl 0
loc 16
rs 9.2333
c 0
b 0
f 0
1
import pandas as pd
2
3
from BasicNeeds import BasicNeeds as _cls_bn
4
from TypeDetermination import TypeDetermination as _cls_td
5
6
from datetime import datetime,time
7
from tableauhyperapi import HyperProcess, Telemetry, \
8
    Connection, CreateMode, \
9
    NOT_NULLABLE, NULLABLE, SqlType, TableDefinition, \
10
    Inserter, \
11
    escape_name, escape_string_literal, \
12
    TableName, \
13
    HyperException, \
14
    Timestamp
15
16
17
class TableauHyperApiExtraLogic:
18
19
    def fn_build_hyper_columns_for_csv(self, detected_csv_structure, verbose):
20
        list_hyper_table_columns_to_return = []
21
        for current_field_structure in detected_csv_structure:
22
            list_hyper_table_columns_to_return.append(current_field_structure['order'])
23
            current_column_type = self.fn_convert_to_hyper_types(current_field_structure['type'])
24
            _cls_bn.fn_optional_print(_cls_bn, verbose, 'Column '
25
                                      + str(current_field_structure['order']) + ' having name "'
26
                                      + current_field_structure['name'] + '" and type "'
27
                                      + current_field_structure['type'] + '" will become "'
28
                                      + str(current_column_type) + '"')
29
            if current_field_structure['nulls'] == 0:
30
                list_hyper_table_columns_to_return[current_field_structure['order']] = TableDefinition.Column(
31
                    name = current_field_structure['name'],
32
                    type = current_column_type,
33
                    nullability = NOT_NULLABLE
34
                )
35
            else:
36
                list_hyper_table_columns_to_return[current_field_structure['order']] = TableDefinition.Column(
37
                    name = current_field_structure['name'],
38
                    type = current_column_type,
39
                    nullability = NULLABLE
40
                )
41
        return list_hyper_table_columns_to_return
42
    
43
    '''
44
    def fn_convert_and_validate_content(crt_value, crt_type):
45
        if crt_value == '':
46
            return None
47
        else:
48
            if crt_type == 'int':
49
                return int(crt_value)
50
            elif crt_type == 'float-USA':
51
                return float(crt_value)
52
            elif crt_type == 'date-iso8601':
53
                tm = datetime.strptime(crt_value, '%Y-%m-%d')
54
                return datetime(tm.year, tm.month, tm.day)
55
            elif crt_type == 'date-USA':
56
                tm = datetime.strptime(crt_value, '%m/%d/%Y')
57
                return datetime(tm.year, tm.month, tm.day)
58
            elif crt_type == 'time-24':
59
                tm = datetime.strptime(crt_value, '%H:%M:%S')
60
                return time(tm.hour, tm.minute, tm.second)
61
            elif crt_type == 'time-24-us':
62
                tm = datetime.strptime(crt_value, '%H:%M:%S.%f')
63
                return time(tm.hour, tm.minute, tm.second, tm.microsecond)
64
            elif crt_type == 'time-USA':
65
                tm = datetime.strptime(crt_value, '%I:%M:%S')
66
                return time(tm.hour, tm.minute, tm.second)
67
            elif crt_type == 'datetime-iso8601':
68
                tm = datetime.fromisoformat(crt_value)
69
                return Timestamp(tm.year, tm.month, tm.day, tm.hour, tm.minute, tm.second)
70
            elif crt_type == 'datetime-iso8601-us':
71
                tm = datetime.fromisoformat(crt_value)
72
                return Timestamp(tm.year, tm.month, tm.day, tm.hour, tm.minute, tm.second, tm.microsecond)
73
            else:
74
                return crt_value.replace('"', '\\"')
75
    '''
76
77
    @staticmethod
78
    def fn_convert_to_hyper_types(given_type):
79
        switcher = {
80
            'empty': SqlType.text(),
81
            'int': SqlType.big_int(),
82
            'float-USA': SqlType.double(),
83
            'date-iso8601': SqlType.date(),
84
            'date-USA': SqlType.date(),
85
            'time-24': SqlType.time(),
86
            'time-24-us': SqlType.time(),
87
            'time-USA': SqlType.time(),
88
            'datetime-iso8601': SqlType.timestamp(),
89
            'str': SqlType.text()
90
        }
91
        identified_type = switcher.get(given_type)
92
        if identified_type is None:
93
            identified_type = SqlType.text()
94
        return identified_type
95
96
    def fn_create_hyper_file_from_csv(self, input_csv_data_frame, output_hyper_file, verbose):
97
        detected_csv_structure = _cls_td.fn_detect_csv_structure(_cls_td,
98
                                                                 input_csv_data_frame,
99
                                                                 verbose)
100
        hyper_table_columns = self.fn_build_hyper_columns_for_csv(self, detected_csv_structure, verbose)
101
        # Starts the Hyper Process with telemetry enabled/disabled to send data to Tableau or not
102
        # To opt in, simply set telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU.
103
        # To opt out, simply set telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU.
104
        with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
105
            # Creates new Hyper file <output_hyper_file>
106
            # Replaces file with CreateMode.CREATE_AND_REPLACE if it already exists.
107
            with Connection(endpoint = hyper.endpoint,
108
                            database = output_hyper_file,
109
                            create_mode = CreateMode.CREATE_AND_REPLACE) as hyper_connection:
110
                print("The connection to the Hyper engine file has been created.")
111
                hyper_connection.catalog.create_schema("Extract")
112
                print("Hyper schema Extract has been created.")
113
                hyper_table = TableDefinition(
114
                    TableName("Extract", "Extract"),
115
                    columns = hyper_table_columns
116
                )
117
                hyper_connection.catalog.create_table(table_definition = hyper_table)
118
                print("Hyper table Extract has been created.")
119
                # The rows to insert into the <hyper_table> table.
120
                data_to_insert = self.fn_rebuild_csv_content_for_hyper(self,
121
                                                                       input_csv_data_frame,
122
                                                                       detected_csv_structure,
123
                                                                       verbose)
124
                # Execute the actual insert
125
                with Inserter(hyper_connection, hyper_table) as hyper_inserter:
126
                    hyper_inserter.add_rows(rows = data_to_insert)
127
                    hyper_inserter.execute()
128
                # Number of rows in the <hyper_table> table.
129
                # `execute_scalar_query` is for executing a query that returns exactly one row with one column.
130
                row_count = hyper_connection.\
131
                    execute_scalar_query(query = f'SELECT COUNT(*) FROM {hyper_table.table_name}')
132
                print(f'The number of rows in table {hyper_table.table_name} is {row_count}.')
133
            print('The connection to the Hyper file has been closed.')
134
        print('The Hyper process has been shut down.')
135
136
    def fn_rebuild_csv_content_for_hyper(self, input_csv_data_frame, detected_fields_type, verbose):
137
        input_csv_data_frame.replace(to_replace = [pd.np.nan], value = [None], inplace = True)
138
        # Cycle through all found columns
139
        for current_field in detected_fields_type:
140
            fld_nm = current_field['name']
141
            if current_field['panda_type'] == 'float64' and current_field['type'] == 'int':
142
                #input_csv_data_frame[fld_nm] = input_csv_data_frame[fld_nm].apply(lambda x: None if x is None else round(x, 0))
143
                input_csv_data_frame[fld_nm] = input_csv_data_frame[fld_nm].replace(to_replace = [pd.np.nan, '.0'],
144
                                                                                    value = [None, ''],
145
                                                                                    inplace = True)
146
            elif current_field['type'] == 'datetime-iso8601':
147
                input_csv_data_frame[fld_nm] = pd.to_datetime(input_csv_data_frame[fld_nm])
148
        _cls_bn.fn_optional_print(_cls_bn, verbose, 'Column ' + fld_nm + ' '
0 ignored issues
show
introduced by
The variable fld_nm does not seem to be defined in case the for loop on line 139 is not entered. Are you sure this can never be the case?
Loading history...
149
                                  + 'has panda_type = ' + str(current_field['panda_type']) + ' '
0 ignored issues
show
introduced by
The variable current_field does not seem to be defined in case the for loop on line 139 is not entered. Are you sure this can never be the case?
Loading history...
150
                                  + 'and ' + str(current_field['type']))
151
        return input_csv_data_frame.values
152
153
    def fn_run_hyper_creation(self, input_csv_data_frame, output_hyper_file, verbose):
154
        try:
155
            self.fn_create_hyper_file_from_csv(self, input_csv_data_frame, output_hyper_file, verbose)
156
        except HyperException as ex:
157
            print(ex)
158
            exit(1)
159