get_attachable_articles()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1.1865

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 1
dl 0
loc 22
ccs 3
cts 7
cp 0.4286
crap 1.1865
rs 9.7
c 0
b 0
f 0
1
"""
2
byceps.services.shop.article.article_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections import defaultdict
10
from datetime import datetime
11 1
from decimal import Decimal
12 1
13 1
from moneyed import Money
14 1
from sqlalchemy import delete, select, update
15
from sqlalchemy.sql import Select
16 1
17 1
from byceps.database import db, paginate, Pagination
18 1
from byceps.services.shop.order.dbmodels.line_item import DbLineItem
19
from byceps.services.shop.order.dbmodels.order import DbOrder
20 1
from byceps.services.shop.order.models.order import PaymentState
21 1
from byceps.services.shop.shop.models import ShopID
22 1
from byceps.services.ticketing.models.ticket import TicketCategoryID
23 1
from byceps.util.result import Err, Ok, Result
24 1
25 1
from .dbmodels.article import DbArticle
26 1
from .dbmodels.attached_article import DbAttachedArticle
27
from .errors import NoArticlesAvailableError
28 1
from .models import (
29 1
    Article,
30 1
    ArticleAttachment,
31
    ArticleCompilation,
32
    ArticleCompilationBuilder,
33
    ArticleID,
34
    ArticleNumber,
35
    ArticleType,
36
    ArticleTypeParams,
37
    AttachedArticleID,
38
)
39
40
41
class UnknownArticleIdError(ValueError):
42 1
    pass
43 1
44
45
def create_article(
46 1
    shop_id: ShopID,
47
    item_number: ArticleNumber,
48
    type_: ArticleType,
49
    name: str,
50
    price: Money,
51
    tax_rate: Decimal,
52
    total_quantity: int,
53
    max_quantity_per_order: int,
54
    processing_required: bool,
55
    *,
56
    type_params: ArticleTypeParams | None = None,
57
    available_from: datetime | None = None,
58
    available_until: datetime | None = None,
59
    not_directly_orderable: bool = False,
60
    separate_order_required: bool = False,
61
) -> Article:
62
    """Create an article."""
63
    db_article = DbArticle(
64 1
        shop_id,
65
        item_number,
66
        type_,
67
        name,
68
        price,
69
        tax_rate,
70
        total_quantity,
71
        max_quantity_per_order,
72
        processing_required,
73
        type_params=type_params,
74
        available_from=available_from,
75
        available_until=available_until,
76
        not_directly_orderable=not_directly_orderable,
77
        separate_order_required=separate_order_required,
78
    )
79
80
    db.session.add(db_article)
81 1
    db.session.commit()
82 1
83
    return _db_entity_to_article(db_article)
84 1
85
86
def create_ticket_article(
87 1
    shop_id: ShopID,
88
    item_number: ArticleNumber,
89
    name: str,
90
    price: Money,
91
    tax_rate: Decimal,
92
    total_quantity: int,
93
    max_quantity_per_order: int,
94
    ticket_category_id: TicketCategoryID,
95
    *,
96
    available_from: datetime | None = None,
97
    available_until: datetime | None = None,
98
    not_directly_orderable: bool = False,
99
    separate_order_required: bool = False,
100
) -> Article:
101
    """Create an article that represents a ticket."""
102
    type_params: ArticleTypeParams = {
103 1
        'ticket_category_id': str(ticket_category_id),
104
    }
105
    processing_required = True
106 1
107
    return create_article(
108 1
        shop_id,
109
        item_number,
110
        ArticleType.ticket,
111
        name,
112
        price,
113
        tax_rate,
114
        total_quantity,
115
        max_quantity_per_order,
116
        processing_required,
117
        type_params=type_params,
118
        available_from=available_from,
119
        available_until=available_until,
120
        not_directly_orderable=not_directly_orderable,
121
        separate_order_required=separate_order_required,
122
    )
123
124
125
def create_ticket_bundle_article(
126 1
    shop_id: ShopID,
127
    item_number: ArticleNumber,
128
    name: str,
129
    price: Money,
130
    tax_rate: Decimal,
131
    total_quantity: int,
132
    max_quantity_per_order: int,
133
    ticket_category_id: TicketCategoryID,
134
    ticket_quantity: int,
135
    *,
136
    available_from: datetime | None = None,
137
    available_until: datetime | None = None,
138
    not_directly_orderable: bool = False,
139
    separate_order_required: bool = False,
140
) -> Article:
141
    """Create an article that represents a ticket bundle."""
142
    type_params: ArticleTypeParams = {
143 1
        'ticket_category_id': str(ticket_category_id),
144
        'ticket_quantity': ticket_quantity,
145
    }
146
    processing_required = True
147 1
148
    return create_article(
149 1
        shop_id,
150
        item_number,
151
        ArticleType.ticket_bundle,
152
        name,
153
        price,
154
        tax_rate,
155
        total_quantity,
156
        max_quantity_per_order,
157
        processing_required,
158
        type_params=type_params,
159
        available_from=available_from,
160
        available_until=available_until,
161
        not_directly_orderable=not_directly_orderable,
162
        separate_order_required=separate_order_required,
163
    )
164
165
166
def update_article(
167 1
    article_id: ArticleID,
168
    name: str,
169
    price: Money,
170
    tax_rate: Decimal,
171
    available_from: datetime | None,
172
    available_until: datetime | None,
173
    total_quantity: int,
174
    max_quantity_per_order: int,
175
    not_directly_orderable: bool,
176
    separate_order_required: bool,
177
) -> Article:
178
    """Update the article."""
179
    db_article = _get_db_article(article_id)
180
181
    db_article.name = name
182
    db_article.price_amount = price.amount
183
    db_article.price_currency = price.currency
184
    db_article.tax_rate = tax_rate
185
    db_article.available_from = available_from
186
    db_article.available_until = available_until
187
    db_article.total_quantity = total_quantity
188
    db_article.max_quantity_per_order = max_quantity_per_order
189
    db_article.not_directly_orderable = not_directly_orderable
190
    db_article.separate_order_required = separate_order_required
191
192
    db.session.commit()
193
194
    return _db_entity_to_article(db_article)
195
196
197
def attach_article(
198 1
    article_id_to_attach: ArticleID,
199
    quantity: int,
200
    article_id_to_attach_to: ArticleID,
201
) -> None:
202
    """Attach an article to another article."""
203
    db_attached_article = DbAttachedArticle(
204
        article_id_to_attach, quantity, article_id_to_attach_to
205
    )
206
207
    db.session.add(db_attached_article)
208
    db.session.commit()
209
210
211
def unattach_article(attached_article_id: AttachedArticleID) -> None:
212 1
    """Unattach an article from another."""
213
    db.session.execute(
214
        delete(DbAttachedArticle).filter_by(id=attached_article_id)
215
    )
216
    db.session.commit()
217
218
219
def increase_quantity(
220 1
    article_id: ArticleID, quantity_to_increase_by: int, *, commit: bool = True
221
) -> None:
222
    """Increase article quantity by the given value."""
223
    db.session.execute(
224 1
        update(DbArticle)
225
        .where(DbArticle.id == article_id)
226
        .values(quantity=DbArticle.quantity + quantity_to_increase_by)
227
    )
228
229
    if commit:
230 1
        db.session.commit()
231
232
233
def decrease_quantity(
234 1
    article_id: ArticleID, quantity_to_decrease_by: int, *, commit: bool = True
235
) -> None:
236
    """Decrease article quantity by the given value."""
237
    db.session.execute(
238 1
        update(DbArticle)
239
        .where(DbArticle.id == article_id)
240
        .values(quantity=DbArticle.quantity - quantity_to_decrease_by)
241
    )
242
243
    if commit:
244 1
        db.session.commit()
245
246
247
def delete_article(article_id: ArticleID) -> None:
248 1
    """Delete an article."""
249
    db.session.execute(delete(DbArticle).filter_by(id=article_id))
250
    db.session.commit()
251
252
253
def find_article(article_id: ArticleID) -> Article | None:
254 1
    """Return the article with that ID, or `None` if not found."""
255
    db_article = find_db_article(article_id)
256 1
257
    if db_article is None:
258 1
        return None
259
260
    return _db_entity_to_article(db_article)
261 1
262
263
def get_article(article_id: ArticleID) -> Article:
264 1
    """Return the article with that ID.
265
266
    Raise an exception if not found.
267
    """
268
    article = find_article(article_id)
269 1
270
    if article is None:
271 1
        raise UnknownArticleIdError(article_id)
272
273
    return article
274 1
275
276
def find_db_article(article_id: ArticleID) -> DbArticle | None:
277 1
    """Return the database entity for the article with that ID, or
278
    `None` if not found.
279
    """
280
    return db.session.get(DbArticle, article_id)
281 1
282
283
def _get_db_article(article_id: ArticleID) -> DbArticle:
284 1
    """Return the database entity for the article with that id.
285
286
    Raise an exception if not found.
287
    """
288
    db_article = find_db_article(article_id)
289 1
290
    if db_article is None:
291 1
        raise UnknownArticleIdError(article_id)
292
293
    return db_article
294 1
295
296
def find_article_with_details(article_id: ArticleID) -> DbArticle | None:
297 1
    """Return the article with that ID, or `None` if not found."""
298
    return (
299
        db.session.execute(
300
            select(DbArticle)
301
            .options(
302
                db.joinedload(DbArticle.articles_attached_to).joinedload(
303
                    DbAttachedArticle.article
304
                ),
305
                db.joinedload(DbArticle.attached_articles).joinedload(
306
                    DbAttachedArticle.article
307
                ),
308
            )
309
            .filter_by(id=article_id)
310
        )
311
        .unique()
312
        .scalar_one_or_none()
313
    )
314
315
316
def is_name_available(shop_id: ShopID, name: str) -> bool:
317 1
    """Check if the name is yet unused."""
318
    return not db.session.scalar(
319
        select(
320
            db.exists()
321
            .where(DbArticle.shop_id == shop_id)
322
            .where(db.func.lower(DbArticle.name) == name.lower())
323
        )
324 1
    )
325
326
327
def find_attached_article(
328
    attached_article_id: AttachedArticleID,
329
) -> DbAttachedArticle | None:
330
    """Return the attached article with that ID, or `None` if not found."""
331
    return db.session.get(DbAttachedArticle, attached_article_id)
332
333
334
def get_attached_articles_for_articles(
335
    article_ids: set[ArticleID],
336
) -> dict[ArticleID, list[DbAttachedArticle]]:
337
    """Return the attached article with that ID, or `None` if not found."""
338
    if not article_ids:
339
        return {}
340
341
    rows = db.session.execute(
342
        select(DbAttachedArticle.attached_to_article_id, DbAttachedArticle)
343
        .filter(DbAttachedArticle.attached_to_article_id.in_(article_ids))
344
        .options(db.joinedload(DbAttachedArticle.article))
345
    ).all()
346 1
347
    attached_articles_by_attached_to_article_id = defaultdict(list)
348
    for attached_to_article_id, db_attached_article in rows:
349
        attached_articles_by_attached_to_article_id[
350
            attached_to_article_id
351
        ].append(db_attached_article)
352
353
    return attached_articles_by_attached_to_article_id
354
355 1
356
def get_article_by_number(article_number: ArticleNumber) -> Article:
357
    """Return the article with that item number."""
358
    db_article = db.session.execute(
359
        select(DbArticle).filter_by(item_number=article_number)
360
    ).scalar_one()
361
362
    return _db_entity_to_article(db_article)
363
364
365
def get_articles(article_ids: set[ArticleID]) -> list[Article]:
366
    """Return the articles with those IDs."""
367 1
    if not article_ids:
368
        return []
369
370
    db_articles = db.session.scalars(
371
        select(DbArticle).filter(DbArticle.id.in_(article_ids))
372
    ).all()
373
374
    return [_db_entity_to_article(db_article) for db_article in db_articles]
375
376
377
def get_articles_for_shop(shop_id: ShopID) -> list[Article]:
378 1
    """Return all articles for that shop, ordered by article number."""
379
    db_articles = db.session.scalars(
380
        select(DbArticle)
381
        .filter_by(shop_id=shop_id)
382
        .order_by(DbArticle.item_number)
383
    ).all()
384
385
    return [_db_entity_to_article(db_article) for db_article in db_articles]
386
387
388
def get_articles_for_shop_paginated(
389 1
    shop_id: ShopID,
390
    page: int,
391
    per_page: int,
392
    *,
393
    search_term=None,
394
) -> Pagination:
395 1
    """Return all articles for that shop, paginated.
396 1
397
    Ordered by article number, reversed.
398 1
    """
399
    stmt = (
400
        select(DbArticle)
401 1
        .filter_by(shop_id=shop_id)
402 1
        .order_by(DbArticle.item_number.desc())
403 1
    )
404
405 1
    if search_term:
406
        stmt = _filter_by_search_term(stmt, search_term)
407
408 1
    return paginate(stmt, page, per_page)
409 1
410
411 1
def _filter_by_search_term(stmt: Select, search_term: str) -> Select:
412
    terms = search_term.split(' ')
413
    clauses = map(_generate_search_clauses_for_term, terms)
414
415
    return stmt.filter(db.and_(*clauses))
416
417 1
418
def _generate_search_clauses_for_term(search_term: str) -> Select:
419
    ilike_pattern = f'%{search_term}%'
420
421
    return db.or_(
422
        DbArticle.item_number.ilike(ilike_pattern),
423
        DbArticle.name.ilike(ilike_pattern),
424 1
    )
425
426 1
427
def get_article_compilation_for_orderable_articles(
428
    shop_id: ShopID,
429
) -> Result[ArticleCompilation, NoArticlesAvailableError]:
430
    """Return a compilation of the articles which can be ordered from
431
    that shop, less the ones that are only orderable in a dedicated
432
    order.
433
    """
434
    now = datetime.utcnow()
435
436
    db_orderable_articles = db.session.scalars(
437
        select(DbArticle)
438
        .filter_by(shop_id=shop_id)
439
        .filter_by(not_directly_orderable=False)
440
        .filter_by(separate_order_required=False)
441
        # Select only articles that are available in between the
442
        # temporal boundaries for this article, if specified.
443
        .filter(
444
            db.or_(
445
                DbArticle.available_from.is_(None),
446
                now >= DbArticle.available_from,
447
            )
448 1
        )
449
        .filter(
450 1
            db.or_(
451 1
                DbArticle.available_until.is_(None),
452
                now < DbArticle.available_until,
453
            )
454
        )
455 1
        .order_by(DbArticle.name)
456
    ).all()
457
458
    if not db_orderable_articles:
459 1
        return Err(NoArticlesAvailableError())
460
461
    compilation_builder = ArticleCompilationBuilder()
462 1
463
    for db_article in db_orderable_articles:
464
        article = _db_entity_to_article(db_article)
465
        compilation_builder.append_article(article)
466
467
        article_attachments = _get_article_attachments(
468 1
            db_article.attached_articles
469
        )
470 1
        for article_attachment in article_attachments:
471
            compilation_builder.append_article(
472 1
                article_attachment.attached_article,
473
                fixed_quantity=article_attachment.attached_quantity,
474
            )
475
476
    compilation = compilation_builder.build()
477
478 1
    return Ok(compilation)
479
480
481
def get_article_compilation_for_single_article(
482 1
    article_id: ArticleID,
483
) -> ArticleCompilation:
484
    """Return a compilation built from just the given article (with a
485 1
    quantity of one) plus the articles attached to it (if any).
486
    """
487
    db_article = _get_db_article(article_id)
488
489
    compilation_builder = ArticleCompilationBuilder()
490
491
    article = _db_entity_to_article(db_article)
492
    compilation_builder.append_article(article, fixed_quantity=1)
493
494
    article_attachments = _get_article_attachments(db_article.attached_articles)
495
    for article_attachment in article_attachments:
496
        compilation_builder.append_article(
497
            article_attachment.attached_article,
498
            fixed_quantity=article_attachment.attached_quantity,
499
        )
500
501
    return compilation_builder.build()
502
503
504
def get_article_compilations_for_single_articles(
505
    article_ids: set[ArticleID],
506
) -> dict[ArticleID, ArticleCompilation]:
507
    """Return a compilation of the articles (with a quantity of one)
508
    plus the articles attached to it (if any).
509
    """
510
    if not article_ids:
511
        return {}
512
513
    compilations_by_article_id: dict[ArticleID, ArticleCompilation] = {}
514
515
    db_articles = db.session.scalars(
516
        select(DbArticle).filter(DbArticle.id.in_(article_ids))
517
    ).all()
518
519
    attached_articles_by_attached_to_article_id = (
520
        get_attached_articles_for_articles(article_ids)
521
    )
522
523 1
    for db_article in db_articles:
524
        compilation_builder = ArticleCompilationBuilder()
525
526
        article = _db_entity_to_article(db_article)
527
        compilation_builder.append_article(article, fixed_quantity=1)
528 1
529
        db_attached_articles = attached_articles_by_attached_to_article_id[
530
            db_article.id
531
        ]
532
        article_attachments = _get_article_attachments(db_attached_articles)
533
        for article_attachment in article_attachments:
534
            compilation_builder.append_article(
535
                article_attachment.attached_article,
536 1
                fixed_quantity=article_attachment.attached_quantity,
537
            )
538
539 1
        compilation = compilation_builder.build()
540
541
        compilations_by_article_id[article.id] = compilation
542
543
    return compilations_by_article_id
544
545
546
def get_attachable_articles(article_id: ArticleID) -> list[Article]:
547
    """Return the articles that can be attached to that article."""
548
    db_article = _get_db_article(article_id)
549
550
    db_attached_articles = {
551
        db_attached.article for db_attached in db_article.attached_articles
552
    }
553
554
    db_unattachable_articles = {db_article}.union(db_attached_articles)
555
556
    unattachable_article_ids = {
557
        db_article.id for db_article in db_unattachable_articles
558
    }
559
560
    db_articles = db.session.scalars(
561
        select(DbArticle)
562
        .filter_by(shop_id=db_article.shop_id)
563 1
        .filter(db.not_(DbArticle.id.in_(unattachable_article_ids)))
564
        .order_by(DbArticle.item_number)
565 1
    ).all()
566 1
567
    return [_db_entity_to_article(db_article) for db_article in db_articles]
568 1
569
570 1
def sum_ordered_articles_by_payment_state(
571
    shop_ids: set[ShopID],
572
) -> list[tuple[ShopID, ArticleNumber, str, PaymentState, int]]:
573 1
    """Sum ordered articles for those shops, grouped by order payment state."""
574
    subquery = (
575
        select(
576
            DbLineItem.article_id,
577
            DbOrder._payment_state.label('payment_state'),
578
            db.func.sum(DbLineItem.quantity).label('quantity'),
579
        )
580
        .join(DbOrder)
581
        .group_by(DbLineItem.article_id, DbOrder._payment_state)
582
        .subquery()
583
    )
584 1
585
    rows = db.session.execute(
586
        select(
587
            DbArticle.shop_id,
588
            DbArticle.item_number,
589
            DbArticle.name,
590
            subquery.c.payment_state,
591
            subquery.c.quantity,
592
        )
593
        .outerjoin(
594
            subquery,
595
            db.and_(DbArticle.id == subquery.c.article_id),
596 1
        )
597
        .filter(DbArticle.shop_id.in_(shop_ids))
598
        .order_by(DbArticle.item_number, subquery.c.payment_state)
599
    ).all()
600 1
601
    shop_ids_and_article_numbers_and_names = {
602
        (row[0], row[1], row[2]) for row in rows
603
    }  # Remove duplicates.
604
605
    quantities = {}
606
607
    for (
608
        shop_id,
609
        article_number,
610
        name,
611 1
        payment_state_name,
612
        quantity,
613
    ) in rows:
614
        if payment_state_name is None:
615
            continue
616
617
        payment_state = PaymentState[payment_state_name]
618
        key = (shop_id, article_number, name, payment_state)
619
620
        quantities[key] = quantity
621
622
    def generate():
623
        for shop_id, article_number, name in sorted(
624
            shop_ids_and_article_numbers_and_names
625
        ):
626
            for payment_state in PaymentState:
627 1
                key = (shop_id, article_number, name, payment_state)
628
                quantity = quantities.get(key, 0)
629
630
                yield (
631 1
                    shop_id,
632
                    article_number,
633 1
                    name,
634
                    payment_state,
635
                    quantity,
636
                )
637
638
    return list(generate())
639
640 1
641
def _db_entity_to_article(db_article: DbArticle) -> Article:
642
    return Article(
643 1
        id=db_article.id,
644 1
        shop_id=db_article.shop_id,
645
        item_number=db_article.item_number,
646 1
        type_=db_article.type_,
647
        type_params=db_article.type_params or {},
648 1
        name=db_article.name,
649 1
        price=db_article.price,
650
        tax_rate=db_article.tax_rate,
651
        available_from=db_article.available_from,
652 1
        available_until=db_article.available_until,
653 1
        total_quantity=db_article.total_quantity,
654 1
        quantity=db_article.quantity,
655
        max_quantity_per_order=db_article.max_quantity_per_order,
656 1
        not_directly_orderable=db_article.not_directly_orderable,
657
        separate_order_required=db_article.separate_order_required,
658 1
        processing_required=db_article.processing_required,
659
    )
660
661 1
662 1
def _get_article_attachments(
663
    db_attached_articles: list[DbArticle],
664
) -> list[ArticleAttachment]:
665
    return [
666
        ArticleAttachment(
667
            attached_article=_db_entity_to_article(db_attached_article.article),
668
            attached_quantity=db_attached_article.quantity,
669
        )
670
        for db_attached_article in db_attached_articles
671
    ]
672