Test Failed
Pull Request — master (#593)
by
unknown
13:02
created

tabpy.tabpy_server.handlers.arrow_client.main()   D

Complexity

Conditions 11

Size

Total Lines 70
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 63
dl 0
loc 70
rs 4.9909
c 0
b 0
f 0
cc 11
nop 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tabpy.tabpy_server.handlers.arrow_client.main() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the Apache Software Foundation (ASF) under one
2
# or more contributor license agreements.  See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership.  The ASF licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License.  You may obtain a copy of the License at
8
#
9
#   http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied.  See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
"""An example Flight CLI client."""
19
20
import argparse
21
import sys
22
23
import pyarrow
24
import pyarrow.flight
25
import pyarrow.csv as csv
26
27
28
def list_flights(args, client, connection_args={}):
29
    print('Flights\n=======')
30
    for flight in client.list_flights():
31
        descriptor = flight.descriptor
32
        if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
33
            print("Path:", descriptor.path)
34
        elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
35
            print("Command:", descriptor.command)
36
        else:
37
            print("Unknown descriptor type")
38
39
        print("Total records:", end=" ")
40
        if flight.total_records >= 0:
41
            print(flight.total_records)
42
        else:
43
            print("Unknown")
44
45
        print("Total bytes:", end=" ")
46
        if flight.total_bytes >= 0:
47
            print(flight.total_bytes)
48
        else:
49
            print("Unknown")
50
51
        print("Number of endpoints:", len(flight.endpoints))
52
        print("Schema:")
53
        print(flight.schema)
54
        print('---')
55
56
    print('\nActions\n=======')
57
    for action in client.list_actions():
58
        print("Type:", action.type)
59
        print("Description:", action.description)
60
        print('---')
61
62
63
def do_action(args, client, connection_args={}):
64
    try:
65
        buf = pyarrow.allocate_buffer(0)
66
        action = pyarrow.flight.Action(args.action_type, buf)
67
        print('Running action', args.action_type)
68
        for result in client.do_action(action):
69
            print("Got result", result.body.to_pybytes())
70
    except pyarrow.lib.ArrowIOError as e:
71
        print("Error calling action:", e)
72
73
74
def push_data(args, client, connection_args={}):
75
    print('File Name:', args.file)
76
    my_table = csv.read_csv(args.file)
77
    print('Table rows=', str(len(my_table)))
78
    df = my_table.to_pandas()
79
    print(df.head())
80
    writer, _ = client.do_put(
81
        pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
82
    writer.write_table(my_table)
83
    writer.close()
84
85
86
def upload_data(client, data, filename):
87
    my_table = pyarrow.table(data)
88
    print('Table rows=', str(len(my_table)))
89
    print("Uploading", data.head())
90
    writer, _ = client.do_put(
91
        pyarrow.flight.FlightDescriptor.for_path(filename), my_table.schema)
92
    writer.write_table(my_table)
93
    writer.close()
94
95
96
def get_flight_by_path(path, client, connection_args={}):
97
    descriptor = pyarrow.flight.FlightDescriptor.for_path(path)
98
99
    info = client.get_flight_info(descriptor)
100
    for endpoint in info.endpoints:
101
        print('Ticket:', endpoint.ticket)
102
        for location in endpoint.locations:
103
            print(location)
104
            get_client = pyarrow.flight.FlightClient(location,
105
                                                     **connection_args)
106
            reader = get_client.do_get(endpoint.ticket)
107
            df = reader.read_pandas()
108
            print(df)
109
            return df
110
    print("no data found for get")
111
    return ''
112
113
def _add_common_arguments(parser):
114
    parser.add_argument('--tls', action='store_true',
115
                        help='Enable transport-level security')
116
    parser.add_argument('--tls-roots', default=None,
117
                        help='Path to trusted TLS certificate(s)')
118
    parser.add_argument("--mtls", nargs=2, default=None,
119
                        metavar=('CERTFILE', 'KEYFILE'),
120
                        help="Enable transport-level security")
121
    parser.add_argument('host', type=str,
122
                        help="Address or hostname to connect to")
123
124
125
def main():
126
    parser = argparse.ArgumentParser()
127
    subcommands = parser.add_subparsers()
128
129
    cmd_list = subcommands.add_parser('list')
130
    cmd_list.set_defaults(action='list')
131
    _add_common_arguments(cmd_list)
132
    cmd_list.add_argument('-l', '--list', action='store_true',
133
                          help="Print more details.")
134
135
    cmd_do = subcommands.add_parser('do')
136
    cmd_do.set_defaults(action='do')
137
    _add_common_arguments(cmd_do)
138
    cmd_do.add_argument('action_type', type=str,
139
                        help="The action type to run.")
140
141
    cmd_put = subcommands.add_parser('put')
142
    cmd_put.set_defaults(action='put')
143
    _add_common_arguments(cmd_put)
144
    cmd_put.add_argument('file', type=str,
145
                         help="CSV file to upload.")
146
147
    cmd_get = subcommands.add_parser('get')
148
    cmd_get.set_defaults(action='get')
149
    _add_common_arguments(cmd_get)
150
    cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True)
151
    cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append',
152
                                    help="The path for the descriptor.")
153
    cmd_get_descriptor.add_argument('-c', '--command', type=str,
154
                                    help="The command for the descriptor.")
155
156
    args = parser.parse_args()
157
    if not hasattr(args, 'action'):
158
        parser.print_help()
159
        sys.exit(1)
160
161
    commands = {
162
        'list': list_flights,
163
        'do': do_action,
164
        'get': get_flight,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_flight does not seem to be defined.
Loading history...
165
        'put': push_data,
166
    }
167
    host, port = args.host.split(':')
168
    port = int(port)
169
    scheme = "grpc+tcp"
170
    connection_args = {}
171
    if args.tls:
172
        scheme = "grpc+tls"
173
        if args.tls_roots:
174
            with open(args.tls_roots, "rb") as root_certs:
175
                connection_args["tls_root_certs"] = root_certs.read()
176
    if args.mtls:
177
        with open(args.mtls[0], "rb") as cert_file:
178
            tls_cert_chain = cert_file.read()
179
        with open(args.mtls[1], "rb") as key_file:
180
            tls_private_key = key_file.read()
181
        connection_args["cert_chain"] = tls_cert_chain
182
        connection_args["private_key"] = tls_private_key
183
    client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}",
184
                                         **connection_args)
185
    while True:
186
        try:
187
            action = pyarrow.flight.Action("healthcheck", b"")
188
            options = pyarrow.flight.FlightCallOptions(timeout=1)
189
            list(client.do_action(action, options=options))
190
            break
191
        except pyarrow.ArrowIOError as e:
192
            if "Deadline" in str(e):
193
                print("Server is not ready, waiting...")
194
    commands[args.action](args, client, connection_args)
195
196
197
198
if __name__ == '__main__':
199
    main()
200