byceps.services.guest_server.guest_server_service   A
last analyzed

Complexity

Total Complexity 34

Size/Duplication

Total Lines 529
Duplicated Lines 11.53 %

Test Coverage

Coverage 34.21%

Importance

Changes 0
Metric Value
eloc 328
dl 61
loc 529
ccs 52
cts 152
cp 0.3421
rs 9.68
c 0
b 0
f 0
wmc 34

25 Functions

Rating   Name   Duplication   Size   Complexity  
A count_servers_for_owner_and_party() 0 11 1
A update_setting() 0 21 1
A _find_db_server() 0 4 1
A get_all_servers_for_party() 29 29 1
A is_hostname_registered() 0 13 1
A find_server() 0 11 2
A _get_db_server() 0 7 2
A _persist_server_registration() 0 24 2
A _db_entity_to_address() 0 9 1
A create_address() 0 36 1
A update_address() 0 23 2
A register_server() 0 24 1
A delete_server() 0 13 1
A approve_server() 0 17 2
A check_in_server() 0 20 2
A ensure_user_may_register_server() 0 25 1
A get_setting_for_party() 0 15 2
A find_address() 0 8 2
A _db_entity_to_server() 0 22 1
A check_out_server() 0 20 2
A _db_entity_to_setting() 0 8 1
A _find_db_address() 0 4 1
A update_server() 0 12 1
A get_servers_for_owner_and_party() 32 32 1
A _get_db_setting() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
byceps.services.guest_server.guest_server_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from datetime import datetime
10
11 1
from sqlalchemy import delete, select
12
13 1
from byceps.database import db
14
from byceps.events.guest_server import (
15 1
    GuestServerApprovedEvent,
16 1
    GuestServerCheckedInEvent,
17
    GuestServerCheckedOutEvent,
18
    GuestServerRegisteredEvent,
19
)
20
from byceps.services.orga_team import orga_team_service
21
from byceps.services.party.models import Party, PartyID
22 1
from byceps.services.ticketing import ticket_service
23 1
from byceps.services.user import user_service
24 1
from byceps.services.user.models.user import User, UserID
25 1
from byceps.util.result import Err, Ok, Result
26 1
27 1
from . import guest_server_domain_service
28
from .dbmodels import DbGuestServer, DbGuestServerAddress, DbGuestServerSetting
29 1
from .errors import (
30 1
    AlreadyApprovedError,
31 1
    AlreadyCheckedInError,
32
    AlreadyCheckedOutError,
33
    NotApprovedError,
34
    NotCheckedInError,
35
    PartyIsOverError,
36
    QuantityLimitReachedError,
37
    UserUsesNoTicketError,
38
)
39
from .models import (
40
    Address,
41 1
    AddressData,
42
    AddressID,
43
    IPAddress,
44
    Server,
45
    ServerID,
46
    Setting,
47
)
48
49
50
# -------------------------------------------------------------------- #
51
# setting
52
53
54
def get_setting_for_party(party_id: PartyID) -> Setting:
55
    """Return the setting for the party."""
56 1
    db_setting = _get_db_setting(party_id)
57
58
    if db_setting is None:
59
        return Setting(
60
            party_id=party_id,
61
            netmask=None,
62
            gateway=None,
63
            dns_server1=None,
64
            dns_server2=None,
65
            domain=None,
66
        )
67
68
    return _db_entity_to_setting(db_setting)
69
70
71
def update_setting(
72
    party: Party,
73 1
    netmask: IPAddress | None,
74
    gateway: IPAddress | None,
75
    dns_server1: IPAddress | None,
76
    dns_server2: IPAddress | None,
77
    domain: str | None,
78
) -> Setting:
79
    """Update the setting for the party."""
80
    db_setting = _get_db_setting(party.id) or DbGuestServerSetting(party.id)
81
82
    db_setting.netmask = netmask
83
    db_setting.gateway = gateway
84
    db_setting.dns_server1 = dns_server1
85
    db_setting.dns_server2 = dns_server2
86
    db_setting.domain = domain
87
88
    db.session.add(db_setting)
89
    db.session.commit()
90
91
    return _db_entity_to_setting(db_setting)
92
93
94
def _get_db_setting(party_id: PartyID) -> DbGuestServerSetting | None:
95
    return db.session.execute(
96 1
        select(DbGuestServerSetting).filter_by(party_id=party_id)
97
    ).scalar_one_or_none()
98
99
100
def _db_entity_to_setting(db_setting: DbGuestServerSetting) -> Setting:
101
    return Setting(
102 1
        party_id=db_setting.party_id,
103
        netmask=db_setting.netmask,
104
        gateway=db_setting.gateway,
105
        dns_server1=db_setting.dns_server1,
106
        dns_server2=db_setting.dns_server2,
107
        domain=db_setting.domain,
108
    )
109
110
111
# -------------------------------------------------------------------- #
112
# server
113
114
115
def ensure_user_may_register_server(
116
    party: Party, user: User
117 1
) -> Result[
118
    None, PartyIsOverError | QuantityLimitReachedError | UserUsesNoTicketError
119
]:
120
    """Return an error if the user is not allowed to register a(nother)
121
    guest server for a party.
122
    """
123
    user_uses_ticket_for_party = ticket_service.uses_any_ticket_for_party(
124
        user.id, party.id
125
    )
126
127
    user_is_orga_for_party = orga_team_service.is_orga_for_party(
128
        user.id, party.id
129
    )
130
131
    already_registered_server_quantity = count_servers_for_owner_and_party(
132
        user.id, party.id
133
    )
134
135
    return guest_server_domain_service.ensure_user_may_register_server(
136
        party,
137
        user_uses_ticket_for_party,
138
        user_is_orga_for_party,
139
        already_registered_server_quantity,
140
    )
141
142
143
def register_server(
144
    party: Party,
145 1
    creator: User,
146
    owner: User,
147
    description: str,
148
    address_datas: set[AddressData],
149
    *,
150
    notes_owner: str | None = None,
151
    notes_admin: str | None = None,
152
) -> tuple[Server, GuestServerRegisteredEvent]:
153
    """Register a server for a party."""
154
    server, event = guest_server_domain_service.register_server(
155
        party,
156
        creator,
157
        owner,
158
        description,
159
        address_datas,
160
        notes_owner=notes_owner,
161
        notes_admin=notes_admin,
162
    )
163
164
    _persist_server_registration(server)
165
166
    return server, event
167
168
169
def _persist_server_registration(server: Server) -> None:
170
    db_server = DbGuestServer(
171 1
        server.id,
172
        server.party_id,
173
        server.created_at,
174
        server.creator.id,
175
        server.owner.id,
176
        description=server.description,
177
        notes_owner=server.notes_owner,
178
        notes_admin=server.notes_admin,
179
    )
180
    db.session.add(db_server)
181
182
    for address in server.addresses:
183
        db_address = DbGuestServerAddress(
184
            address.id,
185
            server.id,
186
            server.created_at,
187
            ip_address=address.ip_address,
188
            hostname=address.hostname,
189
        )
190
        db.session.add(db_address)
191
192
    db.session.commit()
193
194
195
def update_server(server_id: ServerID, notes_admin: str | None) -> Server:
196
    """Update the server."""
197 1
    db_server = _get_db_server(server_id)
198
199
    db_server.notes_admin = notes_admin
200
201
    db.session.commit()
202
203
    creator = user_service.get_user(db_server.creator_id)
204
    owner = user_service.get_user(db_server.owner_id)
205
206
    return _db_entity_to_server(db_server, creator, owner)
207
208
209
def approve_server(
210
    pending_server: Server, initiator: User
211 1
) -> Result[tuple[Server, GuestServerApprovedEvent], AlreadyApprovedError]:
212
    """Approve a guest server."""
213
    result = guest_server_domain_service.approve_server(
214
        pending_server, initiator
215
    )
216
    if result.is_err():
217
        return result
218
219
    approved_server, event = result.unwrap()
220
221
    db_server = _get_db_server(approved_server.id)
222
    db_server.approved = True
223
    db.session.commit()
224
225
    return Ok((approved_server, event))
226
227
228
def check_in_server(
229
    approved_server: Server, initiator: User
230 1
) -> Result[
231
    tuple[Server, GuestServerCheckedInEvent],
232
    AlreadyCheckedInError | AlreadyCheckedOutError | NotApprovedError,
233
]:
234
    """Check in a guest server."""
235
    result = guest_server_domain_service.check_in_server(
236
        approved_server, initiator
237
    )
238
    if result.is_err():
239
        return result
240
241
    checked_in_server, event = result.unwrap()
242
243
    db_server = _get_db_server(checked_in_server.id)
244
    db_server.checked_in_at = checked_in_server.checked_in_at
245
    db.session.commit()
246
247
    return Ok((checked_in_server, event))
248
249
250
def check_out_server(
251
    checked_in_server: Server, initiator: User
252 1
) -> Result[
253
    tuple[Server, GuestServerCheckedOutEvent],
254
    AlreadyCheckedOutError | NotCheckedInError,
255
]:
256
    """Check out a guest server."""
257
    result = guest_server_domain_service.check_out_server(
258
        checked_in_server, initiator
259
    )
260
    if result.is_err():
261
        return result
262
263
    checked_out_server, event = result.unwrap()
264
265
    db_server = _get_db_server(checked_out_server.id)
266
    db_server.checked_out_at = checked_out_server.checked_out_at
267
    db.session.commit()
268
269
    return Ok((checked_out_server, event))
270
271
272
def find_server(server_id: ServerID) -> Server | None:
273
    """Return the server, if found."""
274 1
    db_server = _find_db_server(server_id)
275
276
    if db_server is None:
277
        return None
278
279
    creator = user_service.get_user(db_server.creator_id)
280
    owner = user_service.get_user(db_server.owner_id)
281
282
    return _db_entity_to_server(db_server, creator, owner)
283
284
285 View Code Duplication
def get_all_servers_for_party(party_id: PartyID) -> list[Server]:
286
    """Return all servers for the party."""
287 1
    db_servers = (
288
        db.session.scalars(
289 1
            select(DbGuestServer)
290
            .filter_by(party_id=party_id)
291
            .join(DbGuestServerAddress)
292
        )
293
        .unique()
294
        .all()
295
    )
296
297
    creator_ids = {db_server.creator_id for db_server in db_servers}
298
    creators_by_id = user_service.get_users_indexed_by_id(
299 1
        creator_ids, include_avatars=True
300 1
    )
301
302
    owner_ids = {db_server.owner_id for db_server in db_servers}
303
    owners_by_id = user_service.get_users_indexed_by_id(
304 1
        owner_ids, include_avatars=True
305 1
    )
306
307
    return [
308
        _db_entity_to_server(
309 1
            db_server,
310
            creators_by_id[db_server.creator_id],
311
            owners_by_id[db_server.owner_id],
312
        )
313
        for db_server in db_servers
314
    ]
315
316
317 View Code Duplication
def get_servers_for_owner_and_party(
318
    owner_id: UserID, party_id: PartyID
319 1
) -> list[Server]:
320
    """Return the servers owned by the user for the party."""
321
    db_servers = (
322
        db.session.scalars(
323 1
            select(DbGuestServer)
324
            .filter_by(owner_id=owner_id)
325
            .filter_by(party_id=party_id)
326
            .join(DbGuestServerAddress)
327
        )
328
        .unique()
329
        .all()
330
    )
331
332
    creator_ids = {db_server.creator_id for db_server in db_servers}
333
    creators_by_id = user_service.get_users_indexed_by_id(
334 1
        creator_ids, include_avatars=True
335 1
    )
336
337
    owner_ids = {db_server.owner_id for db_server in db_servers}
338
    owners_by_id = user_service.get_users_indexed_by_id(
339 1
        owner_ids, include_avatars=True
340 1
    )
341
342
    return [
343
        _db_entity_to_server(
344 1
            db_server,
345
            creators_by_id[db_server.creator_id],
346
            owners_by_id[db_server.owner_id],
347
        )
348
        for db_server in db_servers
349
    ]
350
351
352
def count_servers_for_owner_and_party(
353
    owner_id: UserID, party_id: PartyID
354 1
) -> int:
355
    """Return the number of servers owned by the user for the party."""
356
    return (
357
        db.session.scalar(
358
            select(db.func.count(DbGuestServer.id))
359
            .filter_by(owner_id=owner_id)
360
            .filter_by(party_id=party_id)
361
        )
362
        or 0
363
    )
364
365 1
366
def delete_server(server_id: ServerID) -> None:
367
    """Delete a server and its addresses."""
368
    db.session.execute(
369
        delete(DbGuestServerAddress).where(
370
            DbGuestServerAddress.server_id == server_id
371
        )
372
    )
373
374
    db.session.execute(
375
        delete(DbGuestServer).where(DbGuestServer.id == server_id)
376
    )
377
378
    db.session.commit()
379
380 1
381
def _find_db_server(server_id: ServerID) -> DbGuestServer | None:
382
    return db.session.execute(
383
        select(DbGuestServer).filter_by(id=server_id)
384
    ).scalar_one_or_none()
385
386 1
387
def _get_db_server(server_id: ServerID) -> DbGuestServer:
388
    db_server = _find_db_server(server_id)
389
390
    if db_server is None:
391
        raise ValueError(f'Unknown server ID "{server_id}"')
392
393
    return db_server
394
395 1
396
def _db_entity_to_server(
397
    db_server: DbGuestServer, creator: User, owner: User
398
) -> Server:
399
    addresses = {
400
        _db_entity_to_address(db_address) for db_address in db_server.addresses
401
    }
402
403
    return Server(
404
        id=db_server.id,
405
        party_id=db_server.party_id,
406
        created_at=db_server.created_at,
407
        creator=creator,
408
        owner=owner,
409
        description=db_server.description,
410
        notes_owner=db_server.notes_owner,
411
        notes_admin=db_server.notes_admin,
412
        approved=db_server.approved,
413
        checked_in=db_server.checked_in_at is not None,
414
        checked_in_at=db_server.checked_in_at,
415
        checked_out=db_server.checked_out_at is not None,
416
        checked_out_at=db_server.checked_out_at,
417
        addresses=addresses,
418
    )
419
420
421
# -------------------------------------------------------------------- #
422
# address
423
424 1
425
def find_address(address_id: AddressID) -> Address | None:
426
    """Return the address, if found."""
427
    db_address = _find_db_address(address_id)
428
429
    if db_address is None:
430
        return None
431
432
    return _db_entity_to_address(db_address)
433
434 1
435
def create_address(
436
    server_id: ServerID,
437
    ip_address: IPAddress | None = None,
438
    hostname: str | None = None,
439
    netmask: IPAddress | None = None,
440
    gateway: IPAddress | None = None,
441
) -> Address:
442
    """Append an address to a server."""
443
    db_server = _get_db_server(server_id)
444
445
    created_at = datetime.utcnow()
446
447
    address_data = AddressData(
448
        ip_address=ip_address,
449
        hostname=hostname,
450
        netmask=netmask,
451
        gateway=gateway,
452
    )
453
    address = guest_server_domain_service._build_address(
454
        db_server.id, created_at, address_data
455
    )
456
457
    db_address = DbGuestServerAddress(
458
        address.id,
459
        address.server_id,
460
        address.created_at,
461
        ip_address=address.ip_address,
462
        hostname=address.hostname,
463
        netmask=address.netmask,
464
        gateway=address.gateway,
465
    )
466
    db.session.add(db_address)
467
468
    db.session.commit()
469
470
    return _db_entity_to_address(db_address)
471
472 1
473
def update_address(
474
    address_id: AddressID,
475
    ip_address: IPAddress | None,
476
    hostname: str | None,
477
    netmask: IPAddress | None = None,
478
    gateway: IPAddress | None = None,
479
) -> Result[Address, str]:
480
    """Update the address."""
481
    db_address = _find_db_address(address_id)
482
483
    if db_address is None:
484
        return Err(f'Unknown address ID "{address_id}"')
485
486
    db_address.ip_address = ip_address
487
    db_address.hostname = hostname
488
    db_address.netmask = netmask
489
    db_address.gateway = gateway
490
491
    db.session.commit()
492
493
    address = _db_entity_to_address(db_address)
494
495
    return Ok(address)
496
497 1
498
def _find_db_address(address_id: AddressID) -> DbGuestServerAddress | None:
499
    return db.session.execute(
500
        select(DbGuestServerAddress).filter_by(id=address_id)
501
    ).scalar_one_or_none()
502
503 1
504
def _db_entity_to_address(db_address: DbGuestServerAddress) -> Address:
505
    return Address(
506
        id=db_address.id,
507
        server_id=db_address.server_id,
508
        created_at=db_address.created_at,
509
        ip_address=db_address.ip_address,
510
        hostname=db_address.hostname,
511
        netmask=db_address.netmask,
512
        gateway=db_address.gateway,
513
    )
514
515 1
516
def is_hostname_registered(party_id: PartyID, hostname: str) -> bool:
517
    """Check if the hostname is registered."""
518
    return (
519
        db.session.scalar(
520
            select(
521
                select(DbGuestServerAddress)
522
                .join(DbGuestServer)
523
                .filter(DbGuestServer.party_id == str(party_id))
524
                .filter(DbGuestServerAddress.hostname == hostname.lower())
525
                .exists()
526
            )
527
        )
528
        or False
529
    )
530