Passed
Push — master ( 3fbce7...871c09 )
by Daniel
01:14
created

sources.db_extractor.DatabaseTalker   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 119
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 98
dl 0
loc 119
rs 10
c 0
b 0
f 0
wmc 16

6 Methods

Rating   Name   Duplication   Size   Complexity  
A DatabaseTalker.append_additional_columns_to_data_frame() 0 11 4
A DatabaseTalker.result_set_to_data_frame() 0 7 1
A DatabaseTalker.get_column_names() 0 12 3
A DatabaseTalker.fetch_executed_query() 0 8 1
A DatabaseTalker.execute_query() 0 16 3
B DatabaseTalker.connect_to_database() 0 40 4
1
"""
2
CommandLineArgumentManagement - library to manage input parameters from command line
3
4
This library allows handling pre-configured arguments to be received from command line and use them
5
to call the main package functions
6
"""
7
# package helping out to work with SAP HANA
8
from hdbcli import dbapi
9
# package helping out to work with Oracle MySQL
10
import mysql.connector
11
# package to facilitate time operations
12
from datetime import datetime, timedelta
13
# package facilitating Data Frames manipulation
14
import pandas as pd
15
16
17
class DatabaseTalker:
18
    conn = None
19
20
    @staticmethod
21
    def append_additional_columns_to_data_frame(local_logger, timered, data_frame, session_details):
22
        if 'additional-columns' in session_details:
23
            timered.start()
24
            for crt_column in session_details['additional-columns']:
25
                if crt_column['value'] == 'utcnow': # special case
26
                    data_frame[crt_column['name']] = datetime.utcnow()
27
                else:
28
                    data_frame[crt_column['name']] = crt_column['value']
29
            local_logger.info('Additional column(s) have been added to Pandas DataFrame')
30
            timered.stop()
31
32
    def connect_to_database(self, local_logger, timered, connection_details):
33
        timered.start()
34
        local_logger.info('I will attempt to connect to '
35
                          + connection_details['server-vendor-and-type'] + ' server, layer '
36
                          + connection_details['server-layer'] + ' which means ('
37
                          + 'server ' + connection_details['ServerName']
38
                          + ', port ' + str(connection_details['ServerPort'])
39
                          + ') using the username ' + connection_details['Username']
40
                          + ' (' + connection_details['Name'] + ')')
41
        try:
42
            # create actual connection
43
            if connection_details['server-vendor-and-type'] == 'SAP HANA':
44
                self.conn = dbapi.connect(
45
                    address=connection_details['ServerName'],
46
                    port=connection_details['ServerPort'],
47
                    user=connection_details['Username'],
48
                    password=connection_details['Password'],
49
                    compress='TRUE',
50
                )
51
            elif connection_details['server-vendor-and-type'] == 'Oracle MySQL':
52
                self.conn = mysql.connector.connect(
53
                    host=connection_details['ServerName'],
54
                    port=connection_details['ServerPort'],
55
                    user=connection_details['Username'],
56
                    password=connection_details['Password'],
57
                    database='mysql',
58
                    compress=True,
59
                    autocommit=True,
60
                    use_unicode=True,
61
                    charset='utf8mb4',
62
                    get_warnings=True,
63
                )
64
                self.conn.set_charset_collation('utf8mb4', 'utf8mb4_0900_ai_ci')
65
            local_logger.info('Connecting to  ' + connection_details['server-vendor-and-type']
66
                              + ' server completed')
67
            timered.stop()
68
        except Exception as e:
69
            local_logger.error('Error in Connection with details: ')
70
            local_logger.error(e)
71
            timered.stop()
72
73
    @staticmethod
74
    def execute_query(local_logger, timered, given_cursor, given_query):
75
        try:
76
            timered.start()
77
            given_cursor.execute(given_query)
78
            try:
79
                pt = timedelta(microseconds=(given_cursor.server_processing_time() / 1000))
80
                local_logger.info('Query executed successfully ' + format(pt))
81
            except AttributeError:
82
                local_logger.info('Query executed successfully')
83
            timered.stop()
84
            return given_cursor
85
        except TypeError as e:
86
            local_logger.error('Error running the query: ')
87
            local_logger.error(e)
88
            timered.stop()
89
90
    @staticmethod
91
    def fetch_executed_query(local_logger, timered, given_cursor):
92
        timered.start()
93
        local_result_set = given_cursor.fetchall()
94
        local_logger.info('Result-set has been completely fetched and contains '
95
                          + str(len(local_result_set)) + ' rows')
96
        timered.stop()
97
        return local_result_set
98
99
    @staticmethod
100
    def get_column_names(local_logger, timered, given_cursor):
101
        timered.start()
102
        try:
103
            column_names = given_cursor.column_names
104
        except AttributeError:
105
            column_names = []
106
            for column_name, col2, col3, col4, col5, col6, col7 in given_cursor.description:
107
                column_names.append(column_name)
108
        local_logger.info('Result-set column name determination completed')
109
        timered.stop()
110
        return column_names
111
112
    @staticmethod
113
    def result_set_to_data_frame(local_logger, timered, given_columns_name, given_result_set):
114
        timered.start()
115
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
116
        local_logger.info('Result-set has been loaded into Pandas DataFrame')
117
        timered.stop()
118
        return df
119