Test Failed
Push — master ( 9ea4cd...0ffab9 )
by Koen
03:34 queued 13s
created

validate_connection_string()   C

Complexity

Conditions 10

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 18
rs 5.9999
c 0
b 0
f 0
cc 10
nop 1

How to fix   Complexity   

Complexity

Complex classes like atramhasis.scripts.import_file.validate_connection_string() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys
2
import os
3
import argparse
4
5
from skosprovider_rdf.providers import RDFProvider
6
from rdflib import Graph
7
from rdflib.util import SUFFIX_FORMAT_MAP, guess_format
8
9
from skosprovider.providers import SimpleCsvProvider
10
from skosprovider.uri import UriPatternGenerator
11
import csv
12
13
from skosprovider.providers import DictionaryProvider
14
import json
15
16
from skosprovider_sqlalchemy.utils import import_provider
17
from skosprovider_sqlalchemy.models import (
18
    ConceptScheme,
19
    Label,
20
    conceptscheme_label,
21
    Note,
22
    Source
23
)
24
25
from sqlalchemy import create_engine
26
from sqlalchemy.orm import sessionmaker
27
from sqlalchemy.engine import url
28
29
30
def file_to_rdf_provider(**kwargs):
31
    """
32
    Create RDF provider from the input file
33
    """
34
    input_file = kwargs.get('input_file')
35
    input_name, input_ext = os.path.splitext(os.path.basename(input_file))
36
    graph = Graph()
37
    graph.parse(input_file, format=guess_format(input_ext))
38
    return RDFProvider(
39
        {'id': input_name.upper()},
40
        graph
41
    )
42
43
44
def file_to_csv_provider(**kwargs):
45
    """
46
    Create CSV provider from the input file
47
    """
48
    input_file = kwargs.get('input_file')
49
    input_name, input_ext = os.path.splitext(os.path.basename(input_file))
50
    with open(input_file) as ifile:
51
        reader = csv.reader(ifile)
52
        uri_pattern = kwargs.get('uri_pattern')
53
        provider_kwargs = {'uri_generator': UriPatternGenerator(uri_pattern)} if uri_pattern else {}
54
        return SimpleCsvProvider(
55
            {'id': input_name.upper()},
56
            reader,
57
            **provider_kwargs
58
        )
59
60
61
def file_to_json_provider(**kwargs):
62
    """
63
    Create Dictionary provider from the input file
64
    """
65
    input_file = kwargs.get('input_file')
66
    input_name, input_ext = os.path.splitext(os.path.basename(input_file))
67
    with open(input_file) as data_file:
68
        dictionary = json.load(data_file)
69
    uri_pattern = kwargs.get('uri_pattern')
70
    provider_kwargs = {'uri_generator': UriPatternGenerator(uri_pattern)} if uri_pattern else {}
71
    return DictionaryProvider(
72
        {'id': input_name.upper()},
73
        dictionary,
74
        **provider_kwargs
75
    )
76
77
78
supported_types = {
79
    'JSON': {
80
        'extensions': ['.json'],
81
        'file_to_provider': file_to_json_provider
82
    },
83
    'RDF': {
84
        'extensions': ['.%s' % suffix for suffix in SUFFIX_FORMAT_MAP],
85
        'file_to_provider': file_to_rdf_provider
86
    },
87
    'CSV': {
88
        'extensions': ['.csv'],
89
        'file_to_provider': file_to_csv_provider
90
    }
91
}
92
93
supported_ext = [item for sublist in [supported_types[filetype]['extensions'] for filetype in supported_types.keys()]
94
                 for item in sublist]
95
96
97
def parse_argv_for_import(argv):
98
    """
99
    Parse parameters and validate
100
    """
101
    cmd = os.path.basename(argv[0])
102
    parser = argparse.ArgumentParser(
103
        description='Import file to a database',
104
        usage='{} [--from path_input_file] [--to conn_string] [--conceptscheme_label cs_label] [--conceptscheme_uri cs_uri] [--uri_pattern uri_pattern]\n '
105
              '(example: "{} --from atramhasis/scripts/my_file --to sqlite:///atramhasis.sqlite --conceptscheme_label Labels --conceptscheme_uri urn:x-skosprovider:trees" --uri_pattern urn:x-skosprovider:trees:%s)'.format(
106
            cmd, cmd)
107
    )
108
    parser.add_argument('--from',
109
                        dest='input_file',
110
                        type=str,
111
                        help='local path to the input file',
112
                        required=True
113
                        )
114
    parser.add_argument('--to',
115
                        dest='to',
116
                        type=str,
117
                        help='Connection string of the output database',
118
                        required=False,
119
                        default='sqlite:///atramhasis.sqlite'
120
                        )
121
    parser.add_argument('--conceptscheme_label',
122
                        dest='cs_label',
123
                        type=str,
124
                        help='Label of the conceptscheme',
125
                        required=False,
126
                        default=None
127
                        )
128
    parser.add_argument('--conceptscheme_uri',
129
                        dest='cs_uri',
130
                        type=str,
131
                        help='URI of the conceptscheme',
132
                        required=False,
133
                        default=None
134
                        )
135
    parser.add_argument('--uri_pattern',
136
                        dest='uri_pattern',
137
                        type=str,
138
                        help='URI pattern input for the URIGenerator',
139
                        required=False,
140
                        default=None
141
                        )
142
    args = parser.parse_args()
143
    if not validate_file(args.input_file) or not validate_connection_string(args.to):
144
        sys.exit(1)
145
    return args
146
147
148
def validate_file(input_file):
149
    if not os.path.exists(input_file):
150
        print(f'The input file {input_file} does not exists')
151
        return False
152
    elif os.path.splitext(input_file)[1] not in supported_ext:
153
        print (f'the input file {input_file} is not supported. Allowed extensions are: {supported_ext}')
154
        return False
155
    else:
156
        return True
157
158
159
def validate_connection_string(connection_string):
160
    """
161
    Validate the connection string
162
    :param connection_string
163
    :return: Boolean True if correct connection string
164
    """
165
    u = url.make_url(connection_string)
166
    if u.drivername == 'postgresql':
167
        if u.username and u.password and u.host and u.port and u.database:
168
            return True
169
    elif u.drivername == 'sqlite':
170
        if u.database:
171
            return True
172
    elif u.drivername:
173
        print('The database driver ' + u.drivername + ' is not supported.')
174
    print('Wrong structure of connection string "' + connection_string + '"')
175
    print('Structure: postgresql://username:password@host:port/db_name OR sqlite:///path/db_name.sqlite')
176
    return False
177
178
179
def conn_str_to_session(conn_str):
180
    """
181
    create session from database connection string
182
    """
183
    connect_uri = conn_str
184
    engine = create_engine(connect_uri, echo=True)
185
    return sessionmaker(
186
        bind=engine,
187
    )()
188
189
190
def create_conceptscheme(conceptscheme_label, conceptscheme_uri):
191
    """
192
    Configure output conceptscheme based on arg values
193
    """
194
    cs = ConceptScheme(uri=conceptscheme_uri)
195
    l = Label(conceptscheme_label, 'prefLabel', 'und')
196
    cs.labels.append(l)
197
    return cs
198
199
def create_conceptscheme_from_skos(conceptscheme):
200
    """
201
    Configure output conceptscheme based on a `skosprovider.skos.ConceptScheme`
202
    """
203
    return ConceptScheme(
204
        uri=conceptscheme.uri,
205
        labels = [
206
            Label(l.label, l.type, l.language)
207
            for l in conceptscheme.labels
208
        ],
209
        notes = [
210
            Note(n.note, n.type, n.language, n.markup)
211
            for n in conceptscheme.notes
212
        ],
213
        sources = [
214
            Source(s.citation, s.markup)
215
            for s in conceptscheme.sources
216
        ],
217
        languages = [
218
            l for l in conceptscheme.languages
219
        ]
220
    )
221
222
223
224
def provider_to_db(provider, conceptscheme, session):
225
    """
226
    import provider data into the database
227
    """
228
    session.add(conceptscheme)
229
    import_provider(provider, conceptscheme, session)
230
    session.commit()
231
232
233
def main(argv=sys.argv):
234
    """
235
    Documentation: import -h
236
    Run: import --from <path_input_file> --to <conn_string> --conceptscheme_label <cs_label> --conceptscheme_uri <cs_uri> --uri_pattern <uri_pattern>
237
238
    example path_input_file:
239
     atramhasis/scripts/my_file
240
241
    structure conn_string:
242
     postgresql://username:password@host:port/db_name
243
     sqlite:///path/db_name.sqlite
244
    default conn_string:
245
     sqlite:///atramhasis.sqlite
246
247
    example conceptscheme_label
248
     My Conceptscheme
249
    default conceptscheme_label is the name of the file if a URI is specified.
250
    If no URI is specified, a conceptscheme will be imported from the input
251
    file. This only works for RDf files. For other file types (JSON and CSV)
252
    conceptscheme_uri is mandatory and conceptscheme_label is recommended.
253
    """
254
255
    # Import the data
256
    args = parse_argv_for_import(argv)
257
    input_name, input_ext = os.path.splitext(os.path.basename(args.input_file))
258
    session = conn_str_to_session(args.to)
259
    file_to_provider_function = [supported_types[filetype]['file_to_provider'] for filetype in supported_types.keys()
260
                                 if input_ext in supported_types[filetype]['extensions']][0]
261
    provider = file_to_provider_function(**vars(args))
262
    if args.cs_uri:
263
        cs_uri = args.cs_uri
264
        cs_label = args.cs_label if args.cs_label else input_name.capitalize()
265
        cs = create_conceptscheme(cs_label, cs_uri)
266
    else:
267
        cs = create_conceptscheme_from_skos(provider.concept_scheme)
268
    provider_to_db(provider, cs, session)
269
270
    # Get info to return to the user
271
    prov_id = input_name.upper()
272
    scheme_id = cs.id
273
    print("\n\n*** The import of conceptscheme {0} from the {1} file to {2} was succesful. ***\
274
          \n\nTo use the data in Atramhasis, you must edit the file my_thesaurus/skos/__init__.py.\
275
          \nAdd a configuration similar to:\
276
            \n\ndef create_registry(request):\
277
            \n\t# create the SKOS registry\
278
            \n\tregistry = Registry(instance_scope='threaded_thread')\
279
            \n\t{3} = SQLAlchemyProvider(\
280
            \n\t\t{{'id': '{4}', 'conceptscheme_id': {5}}},\
281
            \n\t\trequest.db\
282
            \n\t)\
283
            \n\tregistry.register_provider({6})\
284
            \n\treturn registry\
285
            \n\n".
286
          format(prov_id, args.input_file, args.to,
287
                 prov_id.replace(' ', '_'), prov_id, scheme_id, prov_id.replace(' ', '_')))
288