Passed
Push — main ( d517ed...ce71f4 )
by Rahn20
03:21
created

src.api.ApiData.get_scooter_data()   A

Complexity

Conditions 2

Size

Total Lines 37
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 2
dl 0
loc 37
ccs 0
cts 9
cp 0
crap 6
rs 9.8
c 0
b 0
f 0
1
#!/usr/bin/python3
2
# pylint: disable=broad-except
3
4
"""
5
url 1: https://spark.isal.live/api/v1/
6
url 2: http://localhost:1337/api/v1/
7
"""
8
9
from datetime import datetime
10
import requests
11
12
from src.scooter import Scooter
13
14
class ApiData(Scooter):
15
    """ Api class """
16
17
    ## API endpoint URL
18
    _URL = "http://localhost:1337/api/v1/graphql"
19
20
    ## Set the headers
21
    _HEADERS = {'Content-Type': 'application/json'}
22
23
24
    def __init__(self, user_id: int) -> None:
25
        """ Initialize class """
26
        super().__init__()
27
        self._user_id = user_id
28
        self._log_id = str
29
30
31
    def get_scooter_data(self, scooter_id: int) -> dict:
32
        """
33
        Get scooter data from API.
34
        """
35
        ## Create the GraphQL query
36
        query = ''' query getScooterById($id: String!) {
37
            getScooterById(id: $id) {
38
                id
39
                latitude
40
                longitude
41
                speed
42
                battery
43
                status {
44
                    id
45
                    status
46
                }
47
                station {
48
                    id
49
                }
50
            }
51
        } '''
52
53
        payload = {
54
            'query': query,
55
            'variables': {
56
                'id': str(scooter_id)
57
            }
58
        }
59
60
        try:
61
            ## Send the POST request
62
            response = requests.post(self._URL, json = payload, headers = self._HEADERS)
63
64
            return response.json()["data"]["getScooterById"][0]
65
        except Exception as error:
66
            print(error)
67
            return 0
68
69
70
    def update_scooter(self) -> None:
71
        """ Update api with scooter's new position, speed, status and battery level. """
72
        mutation = ''' mutation updateScooterById(
73
            $id: String!,
74
            $battery: String!,
75
            $status_id: String!,
76
            $longitude: String!,
77
            $latitude: String!,
78
            $price_id: String!,
79
            $speed: String!,
80
            $station_id: String!) {
81
                updateScooterById(
82
                    id: $id,
83
                    battery: $battery,
84
                    status_id: $status_id,
85
                    longitude: $longitude,
86
                    latitude: $latitude,
87
                    price_id: $price_id,
88
                    speed: $speed,
89
                    station_id: $station_id) { id }
90
        } '''
91
92
        payload = {
93
            'query': mutation,
94
            'variables': {
95
                'id': self.data["id"],
96
                'status_id': str(self.data["status"]),
97
                'latitude': str(self.data["lat"]),
98
                'longitude': str(self.data["lon"]),
99
                'speed': str(self.data["speed"]),
100
                'battery': str(int(self.data["battery"])),
101
                'station_id': str(self.data["station"]),
102
                'price_id': "1"
103
            }
104
        }
105
106
        try:
107
            requests.post(self._URL, json = payload, headers = self._HEADERS)
108
        except Exception as error:
109
            print(error)
110
111
112
113
    def create_log(self) -> None:
114
        """
115
        Create log. Data to be added is scooter's position, start date/time and scooter/user id.
116
        """
117
        mutation = ''' mutation createLog(
118
            $scooter_id: String!,
119
            $customer_id: String!,
120
            $start_time: String!,
121
            $start_longitude: String!,
122
            $start_latitude: String!,
123
            $price_id: String!) {
124
                createLog(
125
                    scooter_id: $scooter_id,
126
                    customer_id: $customer_id,
127
                    start_time: $start_time,
128
                    start_longitude: $start_longitude,
129
                    start_latitude: $start_latitude,
130
                    price_id: $price_id) { id }
131
        } '''
132
133
        payload = {
134
            'query': mutation,
135
            'variables': {
136
                'scooter_id': self.data["id"],
137
                'customer_id': str(self._user_id),
138
                'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
139
                'start_longitude': str(self.data["lon"]),
140
                'start_latitude': str(self.data["lat"]),
141
                'price_id': "1"
142
            }
143
        }
144
145
        try:
146
            response = requests.post(self._URL, json = payload, headers = self._HEADERS)
147
            data = response.json()["data"]["createLog"][0]
148
149
            ## save the log id
150
            self._log_id = data["id"]
151
        except Exception as error:
152
            print(error)
153
154
155
    def update_log(self) -> None:
156
        """ Update log. Data to be updated is scooter's position and end date/time. """
157
        mutation = ''' mutation UpdateLogById(
158
            $id: String!,
159
            $end_time: String!,
160
            $end_longitude: String!,
161
            $end_latitude: String!) {
162
                UpdateLogById(
163
                    id: $id,
164
                    end_time: $end_time,
165
                    end_longitude: $end_longitude,
166
                    end_latitude: $end_latitude) { id }
167
        } '''
168
169
        payload = {
170
            'query': mutation,
171
            'variables': {
172
                'id': self._log_id,
173
                'end_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
174
                'end_longitude': str(self.data["lon"]),
175
                'end_latitude': str(self.data["lat"]),
176
            }
177
        }
178
179
        try:
180
            requests.post(self._URL, json = payload, headers = self._HEADERS)
181
        except Exception as error:
182
            print(error)
183
184
185
    def get_city_data(self) -> None:
186
        """
187
        Get city's center position, id and area where the scooter is located.
188
        And adds it to city dictionary.
189
        """
190
        query = ''' query getCityByScooterId($id: String!) {
191
            getCityByScooterId(id: $id) {
192
                id
193
                latitude
194
                longitude
195
                area
196
            }
197
        } '''
198
199
        payload = {
200
            'query': query,
201
            'variables': {
202
                'id': self.data["id"]
203
            }
204
        }
205
206
        try:
207
            response = requests.post(self._URL, json = payload, headers = self._HEADERS)
208
            data = response.json()["data"]["getCityByScooterId"][0]
209
210
            super().city["id"] = data["id"]
211
            super().city["area"] = float(data["area"])
212
            super().city["lat"] = float(data["latitude"])
213
            super().city["lon"] = float(data["longitude"])
214
215
        except Exception as error:
216
            print(error)
217
218
219
    def get_station(self, zone_id: str) -> dict:
220
        """
221
        return random charging/maintenance station data in the city where the scooter is located.
222
        Zone id: 1- Charging Station, 2- Parking Station, 3- Bike Statione, 4- Maintenance Station.
223
        """
224
        query = ''' query getStationByCityIdAndZoneId($cityId: String!, $zoneId: String!) {
225
            getStationByCityIdAndZoneId(cityId: $cityId, zoneId: $zoneId) {
226
                id
227
                latitude
228
                longitude
229
            }
230
        } '''
231
232
        payload = {
233
            'query': query,
234
            'variables': {
235
                'cityId': self.city["id"],
236
                'zoneId': zone_id
237
            }
238
        }
239
240
        try:
241
            response = requests.post(self._URL, json = payload, headers = self._HEADERS)
242
243
            return response.json()["data"]["getStationByCityIdAndZoneId"][0]
244
        except Exception as error:
245
            print(error)
246
            return 0
247
248
249
    def add_scooter(self) -> None:
250
        """ Add a new scooter."""
251
        mutation = ''' mutation addScooter(
252
            $battery: String!,
253
            $longitude: String!,
254
            $latitude: String!,
255
            $price_id: String!,
256
            $speed: String!,
257
            $station_id: String!,
258
            $status_id:String!) {
259
                addScooter(
260
                    battery: $battery,
261
                    longitude: $longitude,
262
                    latitude: $latitude,
263
                    speed: $speed,
264
                    price_id: $price_id,
265
                    station_id: $station_id,
266
                    status_id: $status_id) { id }
267
        } '''
268
269
        payload = {
270
            'query': mutation,
271
            'variables': {
272
                'status_id': "1",
273
                'latitude': "59.32511720",
274
                'longitude': "18.07109350",
275
                'speed': "0",
276
                'battery': "70",
277
                'station_id': "2",
278
                'price_id': "1"
279
            }
280
        }
281
282
        try:
283
            requests.post(self._URL, json = payload, headers = self._HEADERS)
284
        except Exception as error:
285
            print(error)
286
287
288
    def create_user(self) -> None:
289
        """ Create a new user."""
290
        mutation = ''' mutation createUser(
291
            $first_name: String!,
292
            $last_name: String!,
293
            $password: String!,
294
            $email: String!,
295
            $phone: String!,
296
            $role_id: String!) {
297
                createUser(
298
                    first_name: $first_name,
299
                    last_name: $last_name,
300
                    password: $password,
301
                    email: $email,
302
                    phone: $phone,
303
                    role_id: $role_id) { id }
304
        } '''
305
306
        payload = {
307
            'query': mutation,
308
            'variables': {
309
                'first_name': "Test",
310
                'last_name': "Testing",
311
                'password': "1234567891",
312
                'email': "[email protected]",
313
                'phone': "0700000000",
314
                'role_id': "2",
315
            }
316
        }
317
318
        try:
319
            requests.post(self._URL, json = payload, headers = self._HEADERS)
320
            #print(response.json())
321
        except Exception as error:
322
            print(error)
323