Passed
Push — main ( 68ee7a...c0bc2e )
by Jochen
04:34
created

byceps.services.shop.order.service.cancel_order()   B

Complexity

Conditions 5

Size

Total Lines 65
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 5.0023

Importance

Changes 0
Metric Value
eloc 41
dl 0
loc 65
rs 8.4293
c 0
b 0
f 0
cc 5
nop 3
ccs 21
cts 22
cp 0.9545
crap 5.0023

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
byceps.services.shop.order.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime
11 1
from typing import Iterator, Mapping, Optional, Sequence
12
13 1
from flask import current_app
14 1
from flask_babel import lazy_gettext
15 1
from sqlalchemy.exc import IntegrityError
16
17 1
from ....database import db, paginate, Pagination
18 1
from ....events.shop import ShopOrderCanceled, ShopOrderPaid, ShopOrderPlaced
19 1
from ....typing import UserID
20
21 1
from ...user import service as user_service
22
23 1
from ..article import service as article_service
24 1
from ..cart.models import Cart
25 1
from ..shop.dbmodels import Shop as DbShop
26 1
from ..shop import service as shop_service
27 1
from ..shop.transfer.models import ShopID
28 1
from ..storefront import service as storefront_service
29 1
from ..storefront.transfer.models import StorefrontID
30
31 1
from .dbmodels.order import Order as DbOrder
32 1
from .dbmodels.order_event import OrderEvent as DbOrderEvent, OrderEventData
33 1
from .dbmodels.order_item import OrderItem as DbOrderItem
34 1
from .models.orderer import Orderer
35 1
from . import action_service, event_service, sequence_service
36 1
from .transfer.models import (
37
    Address,
38
    Order,
39
    OrderID,
40
    OrderItem,
41
    OrderNumber,
42
    PaymentMethod,
43
    PaymentState,
44
)
45
46
47 1
class OrderFailed(Exception):
48 1
    pass
49
50
51 1
def place_order(
52
    storefront_id: StorefrontID,
53
    orderer: Orderer,
54
    cart: Cart,
55
    *,
56
    created_at: Optional[datetime] = None,
57
) -> tuple[Order, ShopOrderPlaced]:
58
    """Place an order for one or more articles."""
59 1
    storefront = storefront_service.get_storefront(storefront_id)
60 1
    shop = shop_service.get_shop(storefront.shop_id)
61
62 1
    orderer_user = user_service.get_user(orderer.user_id)
63
64 1
    order_number_sequence = sequence_service.get_order_number_sequence(
65
        storefront.order_number_sequence_id
66
    )
67 1
    order_number = sequence_service.generate_order_number(
68
        order_number_sequence.id
69
    )
70
71 1
    order = _build_order(shop.id, order_number, orderer, created_at)
72 1
    order_items = list(_build_order_items(cart, order))
73 1
    order.total_amount = cart.calculate_total_amount()
74 1
    order.shipping_required = any(
75
        item.shipping_required for item in order_items
76
    )
77
78 1
    db.session.add(order)
79 1
    db.session.add_all(order_items)
80
81 1
    _reduce_article_stock(cart)
82
83 1
    try:
84 1
        db.session.commit()
85
    except IntegrityError as e:
86
        current_app.logger.error('Order %s failed: %s', order_number, e)
87
        db.session.rollback()
88
        raise OrderFailed()
89
90 1
    order_dto = _order_to_transfer_object(order)
91
92 1
    event = ShopOrderPlaced(
93
        occurred_at=order.created_at,
94
        initiator_id=orderer_user.id,
95
        initiator_screen_name=orderer_user.screen_name,
96
        order_id=order.id,
97
        order_number=order.order_number,
98
        orderer_id=orderer_user.id,
99
        orderer_screen_name=orderer_user.screen_name,
100
    )
101
102 1
    return order_dto, event
103
104
105 1
def _build_order(
106
    shop_id: ShopID,
107
    order_number: OrderNumber,
108
    orderer: Orderer,
109
    created_at: Optional[datetime],
110
) -> DbOrder:
111
    """Build an order."""
112 1
    return DbOrder(
113
        shop_id,
114
        order_number,
115
        orderer.user_id,
116
        orderer.first_names,
117
        orderer.last_name,
118
        orderer.country,
119
        orderer.zip_code,
120
        orderer.city,
121
        orderer.street,
122
        created_at=created_at,
123
    )
124
125
126 1
def _build_order_items(cart: Cart, order: DbOrder) -> Iterator[DbOrderItem]:
127
    """Build order items from the cart's content."""
128 1
    for cart_item in cart.get_items():
129 1
        article = cart_item.article
130 1
        quantity = cart_item.quantity
131 1
        line_amount = cart_item.line_amount
132
133 1
        yield DbOrderItem(
134
            order,
135
            article.item_number,
136
            article.description,
137
            article.price,
138
            article.tax_rate,
139
            quantity,
140
            line_amount,
141
            article.shipping_required,
142
        )
143
144
145 1
def _reduce_article_stock(cart: Cart) -> None:
146
    """Reduce article stock according to what is in the cart."""
147 1
    for cart_item in cart.get_items():
148 1
        article = cart_item.article
149 1
        quantity = cart_item.quantity
150
151 1
        article_service.decrease_quantity(article.id, quantity, commit=False)
152
153
154 1
def add_note(order_id: OrderID, author_id: UserID, text: str) -> None:
155
    """Add a note to the order."""
156
    order = get_order(order_id)
157
    author = user_service.get_user(author_id)
158
159
    event_type = 'order-note-added'
160
    data = {
161
        'author_id': str(author.id),
162
        'text': text,
163
    }
164
165
    event_service.create_event(event_type, order.id, data)
166
167
168 1
def set_invoiced_flag(order_id: OrderID, initiator_id: UserID) -> None:
169
    """Record that the invoice for that order has been (externally) created."""
170
    order = _get_order_entity(order_id)
171
    initiator = user_service.get_user(initiator_id)
172
173
    now = datetime.utcnow()
174
    event_type = 'order-invoiced'
175
    data = {
176
        'initiator_id': str(initiator.id),
177
    }
178
179
    event = DbOrderEvent(now, event_type, order.id, data)
180
    db.session.add(event)
181
182
    order.invoice_created_at = now
183
184
    db.session.commit()
185
186
187 1
def unset_invoiced_flag(order_id: OrderID, initiator_id: UserID) -> None:
188
    """Withdraw record of the invoice for that order having been created."""
189
    order = _get_order_entity(order_id)
190
    initiator = user_service.get_user(initiator_id)
191
192
    now = datetime.utcnow()
193
    event_type = 'order-invoiced-withdrawn'
194
    data = {
195
        'initiator_id': str(initiator.id),
196
    }
197
198
    event = DbOrderEvent(now, event_type, order.id, data)
199
    db.session.add(event)
200
201
    order.invoice_created_at = None
202
203
    db.session.commit()
204
205
206 1 View Code Duplication
def set_shipped_flag(order_id: OrderID, initiator_id: UserID) -> None:
207
    """Mark the order as shipped."""
208
    order = _get_order_entity(order_id)
209
    initiator = user_service.get_user(initiator_id)
210
211
    if not order.shipping_required:
212
        raise ValueError('Order contains no items that require shipping.')
213
214
    now = datetime.utcnow()
215
    event_type = 'order-shipped'
216
    data = {
217
        'initiator_id': str(initiator.id),
218
    }
219
220
    event = DbOrderEvent(now, event_type, order.id, data)
221
    db.session.add(event)
222
223
    order.shipped_at = now
224
225
    db.session.commit()
226
227
228 1 View Code Duplication
def unset_shipped_flag(order_id: OrderID, initiator_id: UserID) -> None:
229
    """Mark the order as not shipped."""
230
    order = _get_order_entity(order_id)
231
    initiator = user_service.get_user(initiator_id)
232
233
    if not order.shipping_required:
234
        raise ValueError('Order contains no items that require shipping.')
235
236
    now = datetime.utcnow()
237
    event_type = 'order-shipped-withdrawn'
238
    data = {
239
        'initiator_id': str(initiator.id),
240
    }
241
242
    event = DbOrderEvent(now, event_type, order.id, data)
243
    db.session.add(event)
244
245
    order.shipped_at = None
246
247
    db.session.commit()
248
249
250 1
class OrderAlreadyCanceled(Exception):
251 1
    pass
252
253
254 1
class OrderAlreadyMarkedAsPaid(Exception):
255 1
    pass
256
257
258 1
def cancel_order(
259
    order_id: OrderID, initiator_id: UserID, reason: str
260
) -> ShopOrderCanceled:
261
    """Cancel the order.
262
263
    Reserved quantities of articles from that order are made available
264
    again.
265
    """
266 1
    order = _get_order_entity(order_id)
267
268 1
    if order.is_canceled:
269
        raise OrderAlreadyCanceled()
270
271 1
    initiator = user_service.get_user(initiator_id)
272 1
    orderer_user = user_service.get_user(order.placed_by_id)
273
274 1
    has_order_been_paid = order.is_paid
275
276 1
    now = datetime.utcnow()
277
278 1
    updated_at = now
279 1
    payment_state_from = order.payment_state
280 1
    payment_state_to = (
281
        PaymentState.canceled_after_paid
282
        if has_order_been_paid
283
        else PaymentState.canceled_before_paid
284
    )
285
286 1
    _update_payment_state(order, payment_state_to, updated_at, initiator.id)
287 1
    order.cancelation_reason = reason
288
289 1
    event_type = (
290
        'order-canceled-after-paid'
291
        if has_order_been_paid
292
        else 'order-canceled-before-paid'
293
    )
294 1
    data = {
295
        'initiator_id': str(initiator.id),
296
        'former_payment_state': payment_state_from.name,
297
        'reason': reason,
298
    }
299
300 1
    event = DbOrderEvent(now, event_type, order.id, data)
301 1
    db.session.add(event)
302
303
    # Make the reserved quantity of articles available again.
304 1
    for item in order.items:
305 1
        article_service.increase_quantity(
306
            item.article.id, item.quantity, commit=False
307
        )
308
309 1
    db.session.commit()
310
311 1
    action_service.execute_actions(
312
        _order_to_transfer_object(order), payment_state_to, initiator.id
313
    )
314
315 1
    return ShopOrderCanceled(
316
        occurred_at=updated_at,
317
        initiator_id=initiator.id,
318
        initiator_screen_name=initiator.screen_name,
319
        order_id=order.id,
320
        order_number=order.order_number,
321
        orderer_id=orderer_user.id,
322
        orderer_screen_name=orderer_user.screen_name,
323
    )
324
325
326 1
def mark_order_as_paid(
327
    order_id: OrderID,
328
    payment_method: PaymentMethod,
329
    initiator_id: UserID,
330
    *,
331
    additional_event_data: Optional[Mapping[str, str]] = None,
332
) -> ShopOrderPaid:
333
    """Mark the order as paid."""
334 1
    order = _get_order_entity(order_id)
335
336 1
    if order.is_paid:
337
        raise OrderAlreadyMarkedAsPaid()
338
339 1
    initiator = user_service.get_user(initiator_id)
340 1
    orderer_user = user_service.get_user(order.placed_by_id)
341
342 1
    now = datetime.utcnow()
343
344 1
    updated_at = now
345 1
    payment_state_from = order.payment_state
346 1
    payment_state_to = PaymentState.paid
347
348 1
    order.payment_method = payment_method
349 1
    _update_payment_state(order, payment_state_to, updated_at, initiator.id)
350
351 1
    event_type = 'order-paid'
352
    # Add required, internally set properties after given additional
353
    # ones to ensure the former are not overridden by the latter.
354 1
    event_data: OrderEventData = {}
355 1
    if additional_event_data is not None:
356 1
        event_data.update(additional_event_data)
357 1
    event_data.update(
358
        {
359
            'initiator_id': str(initiator.id),
360
            'former_payment_state': payment_state_from.name,
361
            'payment_method': payment_method.name,
362
        }
363
    )
364
365 1
    event = DbOrderEvent(now, event_type, order.id, event_data)
366 1
    db.session.add(event)
367
368 1
    db.session.commit()
369
370 1
    action_service.execute_actions(
371
        _order_to_transfer_object(order), payment_state_to, initiator.id
372
    )
373
374 1
    return ShopOrderPaid(
375
        occurred_at=updated_at,
376
        initiator_id=initiator.id,
377
        initiator_screen_name=initiator.screen_name,
378
        order_id=order.id,
379
        order_number=order.order_number,
380
        orderer_id=orderer_user.id,
381
        orderer_screen_name=orderer_user.screen_name,
382
        payment_method=payment_method,
383
    )
384
385
386 1
def _update_payment_state(
387
    order: DbOrder,
388
    state: PaymentState,
389
    updated_at: datetime,
390
    initiator_id: UserID,
391
) -> None:
392 1
    order.payment_state = state
393 1
    order.payment_state_updated_at = updated_at
394 1
    order.payment_state_updated_by_id = initiator_id
395
396
397 1
def delete_order(order_id: OrderID) -> None:
398
    """Delete an order."""
399 1
    order = get_order(order_id)
400
401 1
    db.session.query(DbOrderEvent) \
402
        .filter_by(order_id=order.id) \
403
        .delete()
404
405 1
    db.session.query(DbOrderItem) \
406
        .filter_by(order_number=order.order_number) \
407
        .delete()
408
409 1
    db.session.query(DbOrder) \
410
        .filter_by(id=order.id) \
411
        .delete()
412
413 1
    db.session.commit()
414
415
416 1
def count_open_orders(shop_id: ShopID) -> int:
417
    """Return the number of open orders for the shop."""
418
    return DbOrder.query \
419
        .for_shop(shop_id) \
420
        .filter_by(_payment_state=PaymentState.open.name) \
421
        .count()
422
423
424 1
def count_orders_per_payment_state(shop_id: ShopID) -> dict[PaymentState, int]:
425
    """Count orders for the shop, grouped by payment state."""
426
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
427
428
    rows = db.session \
429
        .query(
430
            DbOrder._payment_state,
431
            db.func.count(DbOrder.id)
432
        ) \
433
        .filter(DbOrder.shop_id == shop_id) \
434
        .group_by(DbOrder._payment_state) \
435
        .all()
436
437
    for payment_state_str, count in rows:
438
        payment_state = PaymentState[payment_state_str]
439
        counts_by_payment_state[payment_state] = count
440
441
    return counts_by_payment_state
442
443
444 1
def _find_order_entity(order_id: OrderID) -> Optional[DbOrder]:
445
    """Return the order database entity with that id, or `None` if not
446
    found.
447
    """
448 1
    return db.session.query(DbOrder).get(order_id)
449
450
451 1
def _get_order_entity(order_id: OrderID) -> DbOrder:
452
    """Return the order database entity with that id, or raise an
453
    exception.
454
    """
455 1
    order = _find_order_entity(order_id)
456
457 1
    if order is None:
458
        raise ValueError(f'Unknown order ID "{order_id}"')
459
460 1
    return order
461
462
463 1
def find_order(order_id: OrderID) -> Optional[Order]:
464
    """Return the order with that id, or `None` if not found."""
465 1
    order = _find_order_entity(order_id)
466
467 1
    if order is None:
468
        return None
469
470 1
    return _order_to_transfer_object(order)
471
472
473 1
def get_order(order_id: OrderID) -> Order:
474
    """Return the order with that id, or raise an exception."""
475 1
    order = _get_order_entity(order_id)
476 1
    return _order_to_transfer_object(order)
477
478
479 1
def find_order_with_details(order_id: OrderID) -> Optional[Order]:
480
    """Return the order with that id, or `None` if not found."""
481 1
    order = db.session.query(DbOrder) \
482
        .options(
483
            db.joinedload(DbOrder.items),
484
        ) \
485
        .get(order_id)
486
487 1
    if order is None:
488 1
        return None
489
490 1
    return _order_to_transfer_object(order)
491
492
493 1
def find_order_by_order_number(order_number: OrderNumber) -> Optional[Order]:
494
    """Return the order with that order number, or `None` if not found."""
495
    order = DbOrder.query \
496
        .filter_by(order_number=order_number) \
497
        .one_or_none()
498
499
    if order is None:
500
        return None
501
502
    return _order_to_transfer_object(order)
503
504
505 1
def find_orders_by_order_numbers(
506
    order_numbers: set[OrderNumber],
507
) -> Sequence[Order]:
508
    """Return the orders with those order numbers."""
509
    if not order_numbers:
510
        return []
511
512
    orders = DbOrder.query \
513
        .filter(DbOrder.order_number.in_(order_numbers)) \
514
        .all()
515
516
    return list(map(_order_to_transfer_object, orders))
517
518
519 1
def get_order_count_by_shop_id() -> dict[ShopID, int]:
520
    """Return order count (including 0) per shop, indexed by shop ID."""
521
    shop_ids_and_order_counts = db.session \
522
        .query(
523
            DbShop.id,
524
            db.func.count(DbOrder.shop_id)
525
        ) \
526
        .outerjoin(DbOrder) \
527
        .group_by(DbShop.id) \
528
        .all()
529
530
    return dict(shop_ids_and_order_counts)
531
532
533 1
def get_orders_for_shop_paginated(
534
    shop_id: ShopID,
535
    page: int,
536
    per_page: int,
537
    *,
538
    search_term=None,
539
    only_payment_state: Optional[PaymentState] = None,
540
    only_shipped: Optional[bool] = None,
541
) -> Pagination:
542
    """Return all orders for that shop, ordered by creation date.
543
544
    If a payment state is specified, only orders in that state are
545
    returned.
546
    """
547
    query = DbOrder.query \
548
        .for_shop(shop_id) \
549
        .order_by(DbOrder.created_at.desc())
550
551
    if search_term:
552
        ilike_pattern = f'%{search_term}%'
553
        query = query \
554
            .filter(DbOrder.order_number.ilike(ilike_pattern))
555
556
    if only_payment_state is not None:
557
        query = query.filter_by(_payment_state=only_payment_state.name)
558
559
    if only_shipped is not None:
560
        query = query.filter(DbOrder.shipping_required == True)
561
562
        if only_shipped:
563
            query = query.filter(DbOrder.shipped_at != None)
564
        else:
565
            query = query.filter(DbOrder.shipped_at == None)
566
567
    return paginate(
568
        query,
569
        page,
570
        per_page,
571
        item_mapper=lambda order: _order_to_transfer_object(order),
572
    )
573
574
575 1
def get_orders_placed_by_user(user_id: UserID) -> Sequence[Order]:
576
    """Return orders placed by the user."""
577 1
    orders = DbOrder.query \
578
        .options(
579
            db.joinedload(DbOrder.items),
580
        ) \
581
        .placed_by(user_id) \
582
        .order_by(DbOrder.created_at.desc()) \
583
        .all()
584
585 1
    return list(map(_order_to_transfer_object, orders))
586
587
588 1
def get_orders_placed_by_user_for_shop(
589
    user_id: UserID, shop_id: ShopID
590
) -> Sequence[Order]:
591
    """Return orders placed by the user in that shop."""
592 1
    orders = DbOrder.query \
593
        .options(
594
            db.joinedload(DbOrder.items),
595
        ) \
596
        .for_shop(shop_id) \
597
        .placed_by(user_id) \
598
        .order_by(DbOrder.created_at.desc()) \
599
        .all()
600
601 1
    return list(map(_order_to_transfer_object, orders))
602
603
604 1
def has_user_placed_orders(user_id: UserID, shop_id: ShopID) -> bool:
605
    """Return `True` if the user has placed orders in that shop."""
606 1
    orders_total = DbOrder.query \
607
        .for_shop(shop_id) \
608
        .placed_by(user_id) \
609
        .count()
610
611 1
    return orders_total > 0
612
613
614 1
_PAYMENT_METHOD_LABELS = {
615
    PaymentMethod.bank_transfer: lazy_gettext('bank transfer'),
616
    PaymentMethod.cash: lazy_gettext('cash'),
617
    PaymentMethod.direct_debit: lazy_gettext('direct debit'),
618
    PaymentMethod.free: lazy_gettext('free'),
619
}
620
621
622 1
def find_payment_method_label(payment_method: PaymentMethod) -> Optional[str]:
623
    """Return a label for the payment method."""
624 1
    return _PAYMENT_METHOD_LABELS.get(payment_method)
625
626
627 1
def get_payment_date(order_id: OrderID) -> Optional[datetime]:
628
    """Return the date the order has been marked as paid, or `None` if
629
    it has not been paid.
630
    """
631 1
    return db.session \
632
        .query(DbOrder.payment_state_updated_at) \
633
        .filter_by(id=order_id) \
634
        .scalar()
635
636
637 1
def _order_to_transfer_object(order: DbOrder) -> Order:
638
    """Create transfer object from order database entity."""
639 1
    address = Address(
640
        country=order.country,
641
        zip_code=order.zip_code,
642
        city=order.city,
643
        street=order.street,
644
    )
645
646 1
    items = list(map(order_item_to_transfer_object, order.items))
647
648 1
    return Order(
649
        id=order.id,
650
        shop_id=order.shop_id,
651
        order_number=order.order_number,
652
        created_at=order.created_at,
653
        placed_by_id=order.placed_by_id,
654
        first_names=order.first_names,
655
        last_name=order.last_name,
656
        address=address,
657
        total_amount=order.total_amount,
658
        items=items,
659
        payment_method=order.payment_method,
660
        payment_state=order.payment_state,
661
        is_open=order.is_open,
662
        is_canceled=order.is_canceled,
663
        is_paid=order.is_paid,
664
        is_invoiced=order.is_invoiced,
665
        is_shipping_required=order.is_shipping_required,
666
        is_shipped=order.is_shipped,
667
        cancelation_reason=order.cancelation_reason,
668
    )
669
670
671 1
def order_item_to_transfer_object(
672
    item: DbOrderItem,
673
) -> OrderItem:
674
    """Create transfer object from order item database entity."""
675 1
    return OrderItem(
676
        order_number=item.order_number,
677
        article_number=item.article_number,
678
        description=item.description,
679
        unit_price=item.unit_price,
680
        tax_rate=item.tax_rate,
681
        quantity=item.quantity,
682
        line_amount=item.line_amount,
683
    )
684