Passed
Push — master ( 5ef4cb...89c1da )
by Daniel
01:13
created

DatabaseTalker.connect_to_database()   A

Complexity

Conditions 4

Size

Total Lines 39
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 36
nop 4
dl 0
loc 39
rs 9.016
c 0
b 0
f 0
1
"""
2
CommandLineArgumentManagement - library to manage input parameters from command line
3
4
This library allows handling pre-configured arguments to be received from command line and use them
5
to call the main package functions
6
"""
7
# package helping out to work with SAP HANA
8
from hdbcli import dbapi
9
# package helping out to work with Oracle MySQL
10
import mysql.connector
11
# package to facilitate time operations
12
from datetime import datetime, timedelta
13
# package facilitating Data Frames manipulation
14
import pandas as pd
15
16
17
class DatabaseTalker:
18
    conn = None
19
20
    @staticmethod
21
    def append_additional_columns_to_data_frame(local_logger, timered, data_frame, session_details):
22
        if 'additional-columns' in session_details:
23
            timered.start()
24
            for crt_column in session_details['additional-columns']:
25
                if crt_column['value'] == 'utcnow': # special case
26
                    data_frame[crt_column['name']] = datetime.utcnow()
27
                else:
28
                    data_frame[crt_column['name']] = crt_column['value']
29
            local_logger.info('Additional column(s) have been added to Pandas DataFrame')
30
            timered.stop()
31
32
    def connect_to_database(self, local_logger, timered, connection_details):
33
        timered.start()
34
        local_logger.info('I will attempt to connect to '
35
                          + connection_details['server-vendor-and-type'] + ' server, layer '
36
                          + connection_details['server-layer'] + ' which means ('
37
                          + 'server ' + connection_details['ServerName']
38
                          + ', port ' + str(connection_details['ServerPort'])
39
                          + ') using the username ' + connection_details['Username']
40
                          + ' (' + connection_details['Name'] + ')')
41
        try:
42
            # create actual connection
43
            if connection_details['server-vendor-and-type'] == 'SAP HANA':
44
                self.conn = dbapi.connect(
45
                    address=connection_details['ServerName'],
46
                    port=connection_details['ServerPort'],
47
                    user=connection_details['Username'],
48
                    password=connection_details['Password'],
49
                )
50
            elif connection_details['server-vendor-and-type'] == 'Oracle MySQL':
51
                self.conn = mysql.connector.connect(
52
                    host=connection_details['ServerName'],
53
                    port=connection_details['ServerPort'],
54
                    user=connection_details['Username'],
55
                    password=connection_details['Password'],
56
                    database='mysql',
57
                    compress=True,
58
                    autocommit=True,
59
                    use_unicode=True,
60
                    charset='utf8mb4',
61
                    get_warnings=True,
62
                )
63
                self.conn.set_charset_collation('utf8mb4', 'utf8mb4_0900_ai_ci')
64
            local_logger.info('Connecting to  ' + connection_details['server-vendor-and-type']
65
                              + ' server completed')
66
            timered.stop()
67
        except Exception as e:
68
            local_logger.error('Error in Connection with details: ')
69
            local_logger.error(e)
70
            timered.stop()
71
72
    @staticmethod
73
    def execute_query(local_logger, timered, given_cursor, given_query):
74
        try:
75
            timered.start()
76
            given_cursor.execute(given_query)
77
            try:
78
                pt = timedelta(microseconds=(given_cursor.server_processing_time() / 1000))
79
                local_logger.info('Query executed successfully ' + format(pt))
80
            except AttributeError:
81
                local_logger.info('Query executed successfully')
82
            timered.stop()
83
            return given_cursor
84
        except TypeError as e:
85
            local_logger.error('Error running the query: ')
86
            local_logger.error(e)
87
            timered.stop()
88
89
    @staticmethod
90
    def fetch_executed_query(local_logger, timered, given_cursor):
91
        timered.start()
92
        local_result_set = given_cursor.fetchall()
93
        local_logger.info('Result-set has been completely fetched and contains '
94
                          + str(len(local_result_set)) + ' rows')
95
        timered.stop()
96
        return local_result_set
97
98
    @staticmethod
99
    def result_set_to_data_frame(local_logger, timered, given_columns_name, given_result_set):
100
        timered.start()
101
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
102
        local_logger.info('Result-set has been loaded into Pandas DataFrame')
103
        timered.stop()
104
        return df
105