Test Failed
Push — master ( 59c008...283ae7 )
by Daniel
01:26
created

TableauHyperApiExtraLogic   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 167
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 132
dl 0
loc 167
rs 10
c 0
b 0
f 0
wmc 23

5 Methods

Rating   Name   Duplication   Size   Complexity  
A TableauHyperApiExtraLogic.fn_convert_to_hyper_types() 0 17 2
C TableauHyperApiExtraLogic.fn_convert_and_validate_content() 0 31 11
A TableauHyperApiExtraLogic.fn_build_hyper_columns_for_csv() 0 16 2
B TableauHyperApiExtraLogic.fn_run_create_hyper_file_from_csv() 0 59 4
A TableauHyperApiExtraLogic.fn_rebuild_csv_content_for_hyper() 0 22 4
1
import csv
2
3
from BasicNeeds import BasicNeeds as cls_bn
4
from TypeDetermination import TypeDetermination
5
from datetime import datetime,time
6
from tableauhyperapi import HyperProcess, Telemetry, \
7
    Connection, CreateMode, \
8
    NOT_NULLABLE, NULLABLE, SqlType, TableDefinition, \
9
    Inserter, \
10
    escape_name, escape_string_literal, \
11
    TableName, \
12
    HyperException, \
13
    Timestamp
14
15
16
class TableauHyperApiExtraLogic:
17
18
    def fn_build_hyper_columns_for_csv(self, given_file_name, csv_field_separator, detected_csv_structure, verbose):
19
        list_hyper_table_columns_to_return = []
20
        for current_field_structure in detected_csv_structure:
21
            list_hyper_table_columns_to_return.append(current_field_structure['order'])
22
            current_column_type = self.fn_convert_to_hyper_types(current_field_structure['type'])
23
            cls_bn.fn_optional_print(cls_bn, verbose, 'Column '
24
                                     + str(current_field_structure['order']) + ' having name "'
25
                                     + current_field_structure['name'] + '" and type "'
26
                                     + current_field_structure['type'] + '" will become "'
27
                                     + str(current_column_type) + '"')
28
            list_hyper_table_columns_to_return[current_field_structure['order']] = TableDefinition.Column(
29
                name=current_field_structure['name'],
30
                type=current_column_type,
31
                nullability=NULLABLE
32
            )
33
        return list_hyper_table_columns_to_return
34
35
    def fn_convert_and_validate_content(crt_value, crt_type):
36
        if crt_value == '':
37
            return None
38
        else:
39
            if crt_type == 'int':
40
                return int(crt_value)
41
            elif crt_type == 'float-USA':
42
                return float(crt_value)
43
            elif crt_type == 'date-iso8601':
44
                tm = datetime.strptime(crt_value, '%Y-%m-%d')
45
                return datetime(tm.year, tm.month, tm.day)
46
            elif crt_type == 'date-USA':
47
                tm = datetime.strptime(crt_value, '%m/%d/%Y')
48
                return datetime(tm.year, tm.month, tm.day)
49
            elif crt_type == 'time-24':
50
                tm = datetime.strptime(crt_value, '%H:%M:%S')
51
                return time(tm.hour, tm.minute, tm.second)
52
            elif crt_type == 'time-24-us':
53
                tm = datetime.strptime(crt_value, '%H:%M:%S.%f')
54
                return time(tm.hour, tm.minute, tm.second, tm.microsecond)
55
            elif crt_type == 'time-USA':
56
                tm = datetime.strptime(crt_value, '%I:%M:%S')
57
                return time(tm.hour, tm.minute, tm.second)
58
            elif crt_type == 'datetime-iso8601':
59
                tm = datetime.fromisoformat(crt_value)
60
                return Timestamp(tm.year, tm.month, tm.day, tm.hour, tm.minute, tm.second)
61
            elif crt_type == 'datetime-iso8601-us':
62
                tm = datetime.fromisoformat(crt_value)
63
                return Timestamp(tm.year, tm.month, tm.day, tm.hour, tm.minute, tm.second, tm.microsecond)
64
            else:
65
                return crt_value.replace('"', '\\"')
66
67
    def fn_convert_to_hyper_types(given_type):
68
        switcher = {
69
            'empty': SqlType.text(),
70
            'int': SqlType.big_int(),
71
            'float-USA': SqlType.double(),
72
            'date-iso8601': SqlType.date(),
73
            'date-USA': SqlType.date(),
74
            'time-24': SqlType.time(),
75
            'time-24-us': SqlType.time(),
76
            'time-USA': SqlType.time(),
77
            'datetime-iso8601': SqlType.timestamp(),
78
            'str': SqlType.text()
79
        }
80
        identified_type = switcher.get(given_type)
81
        if identified_type is None:
82
            identified_type = SqlType.text()
83
        return identified_type
84
85
    def fn_rebuild_csv_content_for_hyper(self, given_file_name, csv_field_separator, detected_fields_type, verbose):
86
        csv_content_for_hyper = []
87
        with open(given_file_name, newline='') as csv_file:
88
            csv_object = csv.DictReader(csv_file, delimiter=csv_field_separator)
89
            # parse rows with index
90
            for row_idx, row_content in enumerate(csv_object):
91
                csv_content_for_hyper.append(row_idx)
92
                csv_content_for_hyper[row_idx] = []
93
                print_prefix = 'On the row ' + str((row_idx + 1))
94
                # parse all columns with index
95
                for col_idx, column_name in enumerate(csv_object.fieldnames):
96
                    csv_content_for_hyper[row_idx].append(col_idx)
97
                    csv_content_for_hyper[row_idx][col_idx] = \
98
                        self.fn_convert_and_validate_content(row_content[csv_object.fieldnames[col_idx]],
99
                                                             detected_fields_type[col_idx]['type'])
100
                    cls_bn.fn_optional_print(cls_bn, verbose, print_prefix + ' column ' + str(col_idx)
101
                                             + ' having the name [' + csv_object.fieldnames[col_idx] + '] '
102
                                             + ' has the value <' + row_content[csv_object.fieldnames[col_idx]]
103
                                             + '> which was interpreted as <<'
104
                                             + str(csv_content_for_hyper[row_idx][col_idx])
105
                                             + '>>')
106
        return csv_content_for_hyper
107
108
    def fn_run_create_hyper_file_from_csv(self,
109
                                          input_csv_file,
110
                                          csv_field_separator,
111
                                          output_hyper_file,
112
                                          verbose):
113
        detected_csv_structure = TypeDetermination.fn_detect_csv_structure(TypeDetermination,
114
                                                                           input_csv_file,
115
                                                                           csv_field_separator,
116
                                                                           verbose)
117
        hyper_table_columns = self.fn_build_hyper_columns_for_csv(self,
118
                                                                  input_csv_file,
119
                                                                  csv_field_separator,
120
                                                                  detected_csv_structure,
121
                                                                  verbose)
122
        # Starts the Hyper Process with telemetry enabled/disabled to send data to Tableau or not
123
        # To opt in, simply set telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU.
124
        # To opt out, simply set telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU.
125
        with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
126
            # Creates new Hyper file <output_hyper_file>
127
            # Replaces file with CreateMode.CREATE_AND_REPLACE if it already exists.
128
            with Connection(endpoint=hyper.endpoint,
129
                            database=output_hyper_file,
130
                            create_mode=CreateMode.CREATE_AND_REPLACE) as hyper_connection:
131
                hyper_connection.catalog.create_schema("Extract")
132
                hyper_table = TableDefinition(
133
                    name=TableName("Extract", "Extract"),
134
                    columns=hyper_table_columns
135
                )
136
                hyper_connection.catalog.create_table(table_definition=hyper_table)
137
                print("The connection to the Hyper engine file has been created.")
138
                '''
139
                VERDICT: does not work as DOUBLE or INT are not accepting empty values... :-(
140
                print("I am about to execute command: " 
141
                    + f"COPY {hyper_table.table_name} from {escape_string_literal(input_csv_file)} with "
142
                    f"(format csv, NULL 'NULL', delimiter '{csv_field_separator}', header)")
143
                # Load all rows into "Customers" table from the CSV file.
144
                # `execute_command` executes a SQL statement and returns the impacted row count.
145
                count_in_target_table = hyper_connection.execute_command(
146
                    command=f"COPY {hyper_table.table_name} from {escape_string_literal(input_csv_file)} with "
147
                    f"(format csv, NULL 'NULL', delimiter '{csv_field_separator}', header)")
148
                print(f"The number of rows in table {hyper_table.table_name} is {count_in_target_table}.")
149
                '''
150
                # The rows to insert into the <hyper_table> table.
151
                data_to_insert = self.fn_rebuild_csv_content_for_hyper(self,
152
                                                                       input_csv_file,
153
                                                                       csv_field_separator,
154
                                                                       detected_csv_structure,
155
                                                                       verbose)
156
                # Execute the actual insert
157
                with Inserter(hyper_connection, hyper_table) as hyper_inserter:
158
                    hyper_inserter.add_rows(rows=data_to_insert)
159
                    hyper_inserter.execute()
160
                # Number of rows in the <hyper_table> table.
161
                # `execute_scalar_query` is for executing a query that returns exactly one row with one column.
162
                row_count = hyper_connection.\
163
                    execute_scalar_query(query=f"SELECT COUNT(*) FROM {hyper_table.table_name}")
164
                print(f"The number of rows in table {hyper_table.table_name} is {row_count}.")
165
            print("The connection to the Hyper file has been closed.")
166
        print("The Hyper process has been shut down.")
167