byceps.services.shop.article.article_service   B
last analyzed

Complexity

Total Complexity 49

Size/Duplication

Total Lines 660
Duplicated Lines 0 %

Test Coverage

Coverage 62.77%

Importance

Changes 0
Metric Value
eloc 431
dl 0
loc 660
ccs 118
cts 188
cp 0.6277
rs 8.48
c 0
b 0
f 0
wmc 49

29 Functions

Rating   Name   Duplication   Size   Complexity  
A find_article() 0 8 2
A delete_article() 0 4 1
A _db_entity_to_article() 0 18 1
A get_article_compilation_for_orderable_articles() 0 52 4
A create_article() 0 39 1
A _filter_by_search_term() 0 5 1
A find_article_with_details() 0 17 1
A get_article_compilations_for_single_articles() 0 40 4
A attach_article() 0 12 1
A get_article_compilation_for_single_article() 0 21 2
A get_articles_for_shop() 0 9 1
A find_db_article() 0 5 1
A get_articles_for_shop_paginated() 0 21 2
A _generate_search_clauses_for_term() 0 6 1
A _get_article_attachments() 0 9 1
A get_attachable_articles() 0 22 1
A get_attached_articles_for_articles() 0 20 3
A increase_quantity() 0 12 2
A unattach_article() 0 6 1
A _get_db_article() 0 11 2
A find_attached_article() 0 5 1
A get_article_by_number() 0 7 1
A decrease_quantity() 0 12 2
B sum_ordered_articles_by_payment_state() 0 69 5
A get_articles() 0 10 2
A get_article() 0 11 2
A update_article() 0 29 1
A create_ticket_article() 0 36 1
A create_ticket_bundle_article() 0 38 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.shop.article.article_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.shop.article.article_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections import defaultdict
10
from datetime import datetime
11 1
from decimal import Decimal
12 1
13 1
from moneyed import Money
14 1
from sqlalchemy import delete, select, update
15
from sqlalchemy.sql import Select
16 1
17 1
from byceps.database import db, paginate, Pagination
18 1
from byceps.services.shop.order.dbmodels.line_item import DbLineItem
19
from byceps.services.shop.order.dbmodels.order import DbOrder
20 1
from byceps.services.shop.order.models.order import PaymentState
21 1
from byceps.services.shop.shop.models import ShopID
22 1
from byceps.services.ticketing.models.ticket import TicketCategoryID
23 1
from byceps.util.result import Err, Ok, Result
24 1
25 1
from .dbmodels.article import DbArticle
26 1
from .dbmodels.attached_article import DbAttachedArticle
27
from .errors import NoArticlesAvailableError
28 1
from .models import (
29 1
    Article,
30 1
    ArticleAttachment,
31
    ArticleCompilation,
32
    ArticleCompilationBuilder,
33
    ArticleID,
34
    ArticleNumber,
35
    ArticleType,
36
    ArticleTypeParams,
37
    AttachedArticleID,
38
)
39
40
41
class UnknownArticleIdError(ValueError):
42 1
    pass
43 1
44
45
def create_article(
46 1
    shop_id: ShopID,
47
    item_number: ArticleNumber,
48
    type_: ArticleType,
49
    name: str,
50
    price: Money,
51
    tax_rate: Decimal,
52
    total_quantity: int,
53
    max_quantity_per_order: int,
54
    processing_required: bool,
55
    *,
56
    type_params: ArticleTypeParams | None = None,
57
    available_from: datetime | None = None,
58
    available_until: datetime | None = None,
59
    not_directly_orderable: bool = False,
60
    separate_order_required: bool = False,
61
) -> Article:
62
    """Create an article."""
63
    db_article = DbArticle(
64 1
        shop_id,
65
        item_number,
66
        type_,
67
        name,
68
        price,
69
        tax_rate,
70
        total_quantity,
71
        max_quantity_per_order,
72
        processing_required,
73
        type_params=type_params,
74
        available_from=available_from,
75
        available_until=available_until,
76
        not_directly_orderable=not_directly_orderable,
77
        separate_order_required=separate_order_required,
78
    )
79
80
    db.session.add(db_article)
81 1
    db.session.commit()
82 1
83
    return _db_entity_to_article(db_article)
84 1
85
86
def create_ticket_article(
87 1
    shop_id: ShopID,
88
    item_number: ArticleNumber,
89
    name: str,
90
    price: Money,
91
    tax_rate: Decimal,
92
    total_quantity: int,
93
    max_quantity_per_order: int,
94
    ticket_category_id: TicketCategoryID,
95
    *,
96
    available_from: datetime | None = None,
97
    available_until: datetime | None = None,
98
    not_directly_orderable: bool = False,
99
    separate_order_required: bool = False,
100
) -> Article:
101
    """Create an article that represents a ticket."""
102
    type_params: ArticleTypeParams = {
103 1
        'ticket_category_id': str(ticket_category_id),
104
    }
105
    processing_required = True
106 1
107
    return create_article(
108 1
        shop_id,
109
        item_number,
110
        ArticleType.ticket,
111
        name,
112
        price,
113
        tax_rate,
114
        total_quantity,
115
        max_quantity_per_order,
116
        processing_required,
117
        type_params=type_params,
118
        available_from=available_from,
119
        available_until=available_until,
120
        not_directly_orderable=not_directly_orderable,
121
        separate_order_required=separate_order_required,
122
    )
123
124
125
def create_ticket_bundle_article(
126 1
    shop_id: ShopID,
127
    item_number: ArticleNumber,
128
    name: str,
129
    price: Money,
130
    tax_rate: Decimal,
131
    total_quantity: int,
132
    max_quantity_per_order: int,
133
    ticket_category_id: TicketCategoryID,
134
    ticket_quantity: int,
135
    *,
136
    available_from: datetime | None = None,
137
    available_until: datetime | None = None,
138
    not_directly_orderable: bool = False,
139
    separate_order_required: bool = False,
140
) -> Article:
141
    """Create an article that represents a ticket bundle."""
142
    type_params: ArticleTypeParams = {
143 1
        'ticket_category_id': str(ticket_category_id),
144
        'ticket_quantity': ticket_quantity,
145
    }
146
    processing_required = True
147 1
148
    return create_article(
149 1
        shop_id,
150
        item_number,
151
        ArticleType.ticket_bundle,
152
        name,
153
        price,
154
        tax_rate,
155
        total_quantity,
156
        max_quantity_per_order,
157
        processing_required,
158
        type_params=type_params,
159
        available_from=available_from,
160
        available_until=available_until,
161
        not_directly_orderable=not_directly_orderable,
162
        separate_order_required=separate_order_required,
163
    )
164
165
166
def update_article(
167 1
    article_id: ArticleID,
168
    name: str,
169
    price: Money,
170
    tax_rate: Decimal,
171
    available_from: datetime | None,
172
    available_until: datetime | None,
173
    total_quantity: int,
174
    max_quantity_per_order: int,
175
    not_directly_orderable: bool,
176
    separate_order_required: bool,
177
) -> Article:
178
    """Update the article."""
179
    db_article = _get_db_article(article_id)
180
181
    db_article.name = name
182
    db_article.price_amount = price.amount
183
    db_article.price_currency = price.currency
184
    db_article.tax_rate = tax_rate
185
    db_article.available_from = available_from
186
    db_article.available_until = available_until
187
    db_article.total_quantity = total_quantity
188
    db_article.max_quantity_per_order = max_quantity_per_order
189
    db_article.not_directly_orderable = not_directly_orderable
190
    db_article.separate_order_required = separate_order_required
191
192
    db.session.commit()
193
194
    return _db_entity_to_article(db_article)
195
196
197
def attach_article(
198 1
    article_id_to_attach: ArticleID,
199
    quantity: int,
200
    article_id_to_attach_to: ArticleID,
201
) -> None:
202
    """Attach an article to another article."""
203
    db_attached_article = DbAttachedArticle(
204
        article_id_to_attach, quantity, article_id_to_attach_to
205
    )
206
207
    db.session.add(db_attached_article)
208
    db.session.commit()
209
210
211
def unattach_article(attached_article_id: AttachedArticleID) -> None:
212 1
    """Unattach an article from another."""
213
    db.session.execute(
214
        delete(DbAttachedArticle).filter_by(id=attached_article_id)
215
    )
216
    db.session.commit()
217
218
219
def increase_quantity(
220 1
    article_id: ArticleID, quantity_to_increase_by: int, *, commit: bool = True
221
) -> None:
222
    """Increase article quantity by the given value."""
223
    db.session.execute(
224 1
        update(DbArticle)
225
        .where(DbArticle.id == article_id)
226
        .values(quantity=DbArticle.quantity + quantity_to_increase_by)
227
    )
228
229
    if commit:
230 1
        db.session.commit()
231
232
233
def decrease_quantity(
234 1
    article_id: ArticleID, quantity_to_decrease_by: int, *, commit: bool = True
235
) -> None:
236
    """Decrease article quantity by the given value."""
237
    db.session.execute(
238 1
        update(DbArticle)
239
        .where(DbArticle.id == article_id)
240
        .values(quantity=DbArticle.quantity - quantity_to_decrease_by)
241
    )
242
243
    if commit:
244 1
        db.session.commit()
245
246
247
def delete_article(article_id: ArticleID) -> None:
248 1
    """Delete an article."""
249
    db.session.execute(delete(DbArticle).filter_by(id=article_id))
250
    db.session.commit()
251
252
253
def find_article(article_id: ArticleID) -> Article | None:
254 1
    """Return the article with that ID, or `None` if not found."""
255
    db_article = find_db_article(article_id)
256 1
257
    if db_article is None:
258 1
        return None
259
260
    return _db_entity_to_article(db_article)
261 1
262
263
def get_article(article_id: ArticleID) -> Article:
264 1
    """Return the article with that ID.
265
266
    Raise an exception if not found.
267
    """
268
    article = find_article(article_id)
269 1
270
    if article is None:
271 1
        raise UnknownArticleIdError(article_id)
272
273
    return article
274 1
275
276
def find_db_article(article_id: ArticleID) -> DbArticle | None:
277 1
    """Return the database entity for the article with that ID, or
278
    `None` if not found.
279
    """
280
    return db.session.get(DbArticle, article_id)
281 1
282
283
def _get_db_article(article_id: ArticleID) -> DbArticle:
284 1
    """Return the database entity for the article with that id.
285
286
    Raise an exception if not found.
287
    """
288
    db_article = find_db_article(article_id)
289 1
290
    if db_article is None:
291 1
        raise UnknownArticleIdError(article_id)
292
293
    return db_article
294 1
295
296
def find_article_with_details(article_id: ArticleID) -> DbArticle | None:
297 1
    """Return the article with that ID, or `None` if not found."""
298
    return (
299
        db.session.execute(
300
            select(DbArticle)
301
            .options(
302
                db.joinedload(DbArticle.articles_attached_to).joinedload(
303
                    DbAttachedArticle.article
304
                ),
305
                db.joinedload(DbArticle.attached_articles).joinedload(
306
                    DbAttachedArticle.article
307
                ),
308
            )
309
            .filter_by(id=article_id)
310
        )
311
        .unique()
312
        .scalar_one_or_none()
313
    )
314
315
316
def find_attached_article(
317 1
    attached_article_id: AttachedArticleID,
318
) -> DbAttachedArticle | None:
319
    """Return the attached article with that ID, or `None` if not found."""
320
    return db.session.get(DbAttachedArticle, attached_article_id)
321
322
323
def get_attached_articles_for_articles(
324 1
    article_ids: set[ArticleID],
325
) -> dict[ArticleID, list[DbAttachedArticle]]:
326
    """Return the attached article with that ID, or `None` if not found."""
327
    if not article_ids:
328
        return {}
329
330
    rows = db.session.execute(
331
        select(DbAttachedArticle.attached_to_article_id, DbAttachedArticle)
332
        .filter(DbAttachedArticle.attached_to_article_id.in_(article_ids))
333
        .options(db.joinedload(DbAttachedArticle.article))
334
    ).all()
335
336
    attached_articles_by_attached_to_article_id = defaultdict(list)
337
    for attached_to_article_id, db_attached_article in rows:
338
        attached_articles_by_attached_to_article_id[
339
            attached_to_article_id
340
        ].append(db_attached_article)
341
342
    return attached_articles_by_attached_to_article_id
343
344
345
def get_article_by_number(article_number: ArticleNumber) -> Article:
346 1
    """Return the article with that item number."""
347
    db_article = db.session.execute(
348
        select(DbArticle).filter_by(item_number=article_number)
349
    ).scalar_one()
350
351
    return _db_entity_to_article(db_article)
352
353
354
def get_articles(article_ids: set[ArticleID]) -> list[Article]:
355 1
    """Return the articles with those IDs."""
356
    if not article_ids:
357
        return []
358
359
    db_articles = db.session.scalars(
360
        select(DbArticle).filter(DbArticle.id.in_(article_ids))
361
    ).all()
362
363
    return [_db_entity_to_article(db_article) for db_article in db_articles]
364
365
366
def get_articles_for_shop(shop_id: ShopID) -> list[Article]:
367 1
    """Return all articles for that shop, ordered by article number."""
368
    db_articles = db.session.scalars(
369
        select(DbArticle)
370
        .filter_by(shop_id=shop_id)
371
        .order_by(DbArticle.item_number)
372
    ).all()
373
374
    return [_db_entity_to_article(db_article) for db_article in db_articles]
375
376
377
def get_articles_for_shop_paginated(
378 1
    shop_id: ShopID,
379
    page: int,
380
    per_page: int,
381
    *,
382
    search_term=None,
383
) -> Pagination:
384
    """Return all articles for that shop, paginated.
385
386
    Ordered by article number, reversed.
387
    """
388
    stmt = (
389 1
        select(DbArticle)
390
        .filter_by(shop_id=shop_id)
391
        .order_by(DbArticle.item_number.desc())
392
    )
393
394
    if search_term:
395 1
        stmt = _filter_by_search_term(stmt, search_term)
396 1
397
    return paginate(stmt, page, per_page)
398 1
399
400
def _filter_by_search_term(stmt: Select, search_term: str) -> Select:
401 1
    terms = search_term.split(' ')
402 1
    clauses = map(_generate_search_clauses_for_term, terms)
403 1
404
    return stmt.filter(db.and_(*clauses))
405 1
406
407
def _generate_search_clauses_for_term(search_term: str) -> Select:
408 1
    ilike_pattern = f'%{search_term}%'
409 1
410
    return db.or_(
411 1
        DbArticle.item_number.ilike(ilike_pattern),
412
        DbArticle.name.ilike(ilike_pattern),
413
    )
414
415
416
def get_article_compilation_for_orderable_articles(
417 1
    shop_id: ShopID,
418
) -> Result[ArticleCompilation, NoArticlesAvailableError]:
419
    """Return a compilation of the articles which can be ordered from
420
    that shop, less the ones that are only orderable in a dedicated
421
    order.
422
    """
423
    now = datetime.utcnow()
424 1
425
    db_orderable_articles = db.session.scalars(
426 1
        select(DbArticle)
427
        .filter_by(shop_id=shop_id)
428
        .filter_by(not_directly_orderable=False)
429
        .filter_by(separate_order_required=False)
430
        # Select only articles that are available in between the
431
        # temporal boundaries for this article, if specified.
432
        .filter(
433
            db.or_(
434
                DbArticle.available_from.is_(None),
435
                now >= DbArticle.available_from,
436
            )
437
        )
438
        .filter(
439
            db.or_(
440
                DbArticle.available_until.is_(None),
441
                now < DbArticle.available_until,
442
            )
443
        )
444
        .order_by(DbArticle.name)
445
    ).all()
446
447
    if not db_orderable_articles:
448 1
        return Err(NoArticlesAvailableError())
449
450 1
    compilation_builder = ArticleCompilationBuilder()
451 1
452
    for db_article in db_orderable_articles:
453
        article = _db_entity_to_article(db_article)
454
        compilation_builder.append_article(article)
455 1
456
        article_attachments = _get_article_attachments(
457
            db_article.attached_articles
458
        )
459 1
        for article_attachment in article_attachments:
460
            compilation_builder.append_article(
461
                article_attachment.attached_article,
462 1
                fixed_quantity=article_attachment.attached_quantity,
463
            )
464
465
    compilation = compilation_builder.build()
466
467
    return Ok(compilation)
468 1
469
470 1
def get_article_compilation_for_single_article(
471
    article_id: ArticleID,
472 1
) -> ArticleCompilation:
473
    """Return a compilation built from just the given article (with a
474
    quantity of one) plus the articles attached to it (if any).
475
    """
476
    db_article = _get_db_article(article_id)
477
478 1
    compilation_builder = ArticleCompilationBuilder()
479
480
    article = _db_entity_to_article(db_article)
481
    compilation_builder.append_article(article, fixed_quantity=1)
482 1
483
    article_attachments = _get_article_attachments(db_article.attached_articles)
484
    for article_attachment in article_attachments:
485 1
        compilation_builder.append_article(
486
            article_attachment.attached_article,
487
            fixed_quantity=article_attachment.attached_quantity,
488
        )
489
490
    return compilation_builder.build()
491
492
493
def get_article_compilations_for_single_articles(
494
    article_ids: set[ArticleID],
495
) -> dict[ArticleID, ArticleCompilation]:
496
    """Return a compilation of the articles (with a quantity of one)
497
    plus the articles attached to it (if any).
498
    """
499
    if not article_ids:
500
        return {}
501
502
    compilations_by_article_id: dict[ArticleID, ArticleCompilation] = {}
503
504
    db_articles = db.session.scalars(
505
        select(DbArticle).filter(DbArticle.id.in_(article_ids))
506
    ).all()
507
508
    attached_articles_by_attached_to_article_id = (
509
        get_attached_articles_for_articles(article_ids)
510
    )
511
512
    for db_article in db_articles:
513
        compilation_builder = ArticleCompilationBuilder()
514
515
        article = _db_entity_to_article(db_article)
516
        compilation_builder.append_article(article, fixed_quantity=1)
517
518
        db_attached_articles = attached_articles_by_attached_to_article_id[
519
            db_article.id
520
        ]
521
        article_attachments = _get_article_attachments(db_attached_articles)
522
        for article_attachment in article_attachments:
523 1
            compilation_builder.append_article(
524
                article_attachment.attached_article,
525
                fixed_quantity=article_attachment.attached_quantity,
526
            )
527
528 1
        compilation = compilation_builder.build()
529
530
        compilations_by_article_id[article.id] = compilation
531
532
    return compilations_by_article_id
533
534
535
def get_attachable_articles(article_id: ArticleID) -> list[Article]:
536 1
    """Return the articles that can be attached to that article."""
537
    db_article = _get_db_article(article_id)
538
539 1
    db_attached_articles = {
540
        db_attached.article for db_attached in db_article.attached_articles
541
    }
542
543
    db_unattachable_articles = {db_article}.union(db_attached_articles)
544
545
    unattachable_article_ids = {
546
        db_article.id for db_article in db_unattachable_articles
547
    }
548
549
    db_articles = db.session.scalars(
550
        select(DbArticle)
551
        .filter_by(shop_id=db_article.shop_id)
552
        .filter(db.not_(DbArticle.id.in_(unattachable_article_ids)))
553
        .order_by(DbArticle.item_number)
554
    ).all()
555
556
    return [_db_entity_to_article(db_article) for db_article in db_articles]
557
558
559
def sum_ordered_articles_by_payment_state(
560
    shop_ids: set[ShopID],
561
) -> list[tuple[ShopID, ArticleNumber, str, PaymentState, int]]:
562
    """Sum ordered articles for those shops, grouped by order payment state."""
563 1
    subquery = (
564
        select(
565 1
            DbLineItem.article_id,
566 1
            DbOrder._payment_state.label('payment_state'),
567
            db.func.sum(DbLineItem.quantity).label('quantity'),
568 1
        )
569
        .join(DbOrder)
570 1
        .group_by(DbLineItem.article_id, DbOrder._payment_state)
571
        .subquery()
572
    )
573 1
574
    rows = db.session.execute(
575
        select(
576
            DbArticle.shop_id,
577
            DbArticle.item_number,
578
            DbArticle.name,
579
            subquery.c.payment_state,
580
            subquery.c.quantity,
581
        )
582
        .outerjoin(
583
            subquery,
584 1
            db.and_(DbArticle.id == subquery.c.article_id),
585
        )
586
        .filter(DbArticle.shop_id.in_(shop_ids))
587
        .order_by(DbArticle.item_number, subquery.c.payment_state)
588
    ).all()
589
590
    shop_ids_and_article_numbers_and_names = {
591
        (row[0], row[1], row[2]) for row in rows
592
    }  # Remove duplicates.
593
594
    quantities = {}
595
596 1
    for (
597
        shop_id,
598
        article_number,
599
        name,
600 1
        payment_state_name,
601
        quantity,
602
    ) in rows:
603
        if payment_state_name is None:
604
            continue
605
606
        payment_state = PaymentState[payment_state_name]
607
        key = (shop_id, article_number, name, payment_state)
608
609
        quantities[key] = quantity
610
611 1
    def generate():
612
        for shop_id, article_number, name in sorted(
613
            shop_ids_and_article_numbers_and_names
614
        ):
615
            for payment_state in PaymentState:
616
                key = (shop_id, article_number, name, payment_state)
617
                quantity = quantities.get(key, 0)
618
619
                yield (
620
                    shop_id,
621
                    article_number,
622
                    name,
623
                    payment_state,
624
                    quantity,
625
                )
626
627 1
    return list(generate())
628
629
630
def _db_entity_to_article(db_article: DbArticle) -> Article:
631 1
    return Article(
632
        id=db_article.id,
633 1
        shop_id=db_article.shop_id,
634
        item_number=db_article.item_number,
635
        type_=db_article.type_,
636
        type_params=db_article.type_params or {},
637
        name=db_article.name,
638
        price=db_article.price,
639
        tax_rate=db_article.tax_rate,
640 1
        available_from=db_article.available_from,
641
        available_until=db_article.available_until,
642
        total_quantity=db_article.total_quantity,
643 1
        quantity=db_article.quantity,
644 1
        max_quantity_per_order=db_article.max_quantity_per_order,
645
        not_directly_orderable=db_article.not_directly_orderable,
646 1
        separate_order_required=db_article.separate_order_required,
647
        processing_required=db_article.processing_required,
648 1
    )
649 1
650
651
def _get_article_attachments(
652 1
    db_attached_articles: list[DbArticle],
653 1
) -> list[ArticleAttachment]:
654 1
    return [
655
        ArticleAttachment(
656 1
            attached_article=_db_entity_to_article(db_attached_article.article),
657
            attached_quantity=db_attached_article.quantity,
658 1
        )
659
        for db_attached_article in db_attached_articles
660
    ]
661