Total Complexity | 60 |
Total Lines | 729 |
Duplicated Lines | 5.49 % |
Coverage | 69.17% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like byceps.services.shop.order.service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | byceps.services.shop.order.service |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2006-2021 Jochen Kupperschmidt |
||
6 | :License: Revised BSD (see `LICENSE` file for details) |
||
7 | """ |
||
8 | |||
9 | 1 | from __future__ import annotations |
|
10 | 1 | from datetime import datetime |
|
11 | 1 | from typing import Iterator, Mapping, Optional, Sequence |
|
12 | |||
13 | 1 | from flask import current_app |
|
14 | 1 | from flask_babel import lazy_gettext |
|
15 | 1 | from sqlalchemy.exc import IntegrityError |
|
16 | |||
17 | 1 | from ....database import db, paginate, Pagination |
|
18 | 1 | from ....events.shop import ShopOrderCanceled, ShopOrderPaid, ShopOrderPlaced |
|
19 | 1 | from ....typing import UserID |
|
20 | |||
21 | 1 | from ...user import service as user_service |
|
22 | |||
23 | 1 | from ..article import service as article_service |
|
24 | 1 | from ..cart.models import Cart, CartItem |
|
25 | 1 | from ..shop.dbmodels import Shop as DbShop |
|
26 | 1 | from ..shop import service as shop_service |
|
27 | 1 | from ..shop.transfer.models import ShopID |
|
28 | 1 | from ..storefront import service as storefront_service |
|
29 | 1 | from ..storefront.transfer.models import StorefrontID |
|
30 | |||
31 | 1 | from .dbmodels.line_item import LineItem as DbLineItem |
|
32 | 1 | from .dbmodels.order import Order as DbOrder |
|
33 | 1 | from .dbmodels.order_event import OrderEvent as DbOrderEvent, OrderEventData |
|
34 | 1 | from .models.orderer import Orderer |
|
35 | 1 | from . import action_service, event_service, sequence_service |
|
36 | 1 | from .transfer.models import ( |
|
37 | Address, |
||
38 | Order, |
||
39 | OrderID, |
||
40 | LineItem, |
||
41 | OrderNumber, |
||
42 | OrderState, |
||
43 | PaymentState, |
||
44 | ) |
||
45 | |||
46 | |||
47 | 1 | class OrderFailed(Exception): |
|
48 | 1 | pass |
|
49 | |||
50 | |||
51 | 1 | def place_order( |
|
52 | storefront_id: StorefrontID, |
||
53 | orderer: Orderer, |
||
54 | cart: Cart, |
||
55 | *, |
||
56 | created_at: Optional[datetime] = None, |
||
57 | ) -> tuple[Order, ShopOrderPlaced]: |
||
58 | """Place an order for one or more articles.""" |
||
59 | 1 | storefront = storefront_service.get_storefront(storefront_id) |
|
60 | 1 | shop = shop_service.get_shop(storefront.shop_id) |
|
61 | |||
62 | 1 | orderer_user = user_service.get_user(orderer.user_id) |
|
63 | |||
64 | 1 | order_number_sequence = sequence_service.get_order_number_sequence( |
|
65 | storefront.order_number_sequence_id |
||
66 | ) |
||
67 | 1 | order_number = sequence_service.generate_order_number( |
|
68 | order_number_sequence.id |
||
69 | ) |
||
70 | |||
71 | 1 | cart_items = cart.get_items() |
|
72 | |||
73 | 1 | order = _build_order(shop.id, order_number, orderer, created_at) |
|
74 | 1 | line_items = list(_build_line_items(cart_items, order)) |
|
75 | 1 | order.total_amount = cart.calculate_total_amount() |
|
76 | 1 | order.processing_required = any(item.processing_required for item in line_items) |
|
77 | |||
78 | 1 | db.session.add(order) |
|
79 | 1 | db.session.add_all(line_items) |
|
80 | |||
81 | 1 | _reduce_article_stock(cart_items) |
|
82 | |||
83 | 1 | try: |
|
84 | 1 | db.session.commit() |
|
85 | except IntegrityError as e: |
||
86 | current_app.logger.error('Order %s failed: %s', order_number, e) |
||
87 | db.session.rollback() |
||
88 | raise OrderFailed() |
||
89 | |||
90 | 1 | order_dto = _order_to_transfer_object(order) |
|
91 | |||
92 | 1 | event = ShopOrderPlaced( |
|
93 | occurred_at=order.created_at, |
||
94 | initiator_id=orderer_user.id, |
||
95 | initiator_screen_name=orderer_user.screen_name, |
||
96 | order_id=order.id, |
||
97 | order_number=order.order_number, |
||
98 | orderer_id=orderer_user.id, |
||
99 | orderer_screen_name=orderer_user.screen_name, |
||
100 | ) |
||
101 | |||
102 | 1 | return order_dto, event |
|
103 | |||
104 | |||
105 | 1 | def _build_order( |
|
106 | shop_id: ShopID, |
||
107 | order_number: OrderNumber, |
||
108 | orderer: Orderer, |
||
109 | created_at: Optional[datetime], |
||
110 | ) -> DbOrder: |
||
111 | """Build an order.""" |
||
112 | 1 | return DbOrder( |
|
113 | shop_id, |
||
114 | order_number, |
||
115 | orderer.user_id, |
||
116 | orderer.first_names, |
||
117 | orderer.last_name, |
||
118 | orderer.country, |
||
119 | orderer.zip_code, |
||
120 | orderer.city, |
||
121 | orderer.street, |
||
122 | created_at=created_at, |
||
123 | ) |
||
124 | |||
125 | |||
126 | 1 | def _build_line_items(cart_items: list[CartItem], order: DbOrder) -> Iterator[DbLineItem]: |
|
127 | """Build line items from the cart's content.""" |
||
128 | 1 | for cart_item in cart_items: |
|
129 | 1 | article = cart_item.article |
|
130 | 1 | quantity = cart_item.quantity |
|
131 | 1 | line_amount = cart_item.line_amount |
|
132 | |||
133 | 1 | yield DbLineItem( |
|
134 | order, |
||
135 | article.item_number, |
||
136 | article.type_, |
||
137 | article.description, |
||
138 | article.price, |
||
139 | article.tax_rate, |
||
140 | quantity, |
||
141 | line_amount, |
||
142 | article.processing_required, |
||
143 | ) |
||
144 | |||
145 | |||
146 | 1 | def _reduce_article_stock(cart_items: list[CartItem]) -> None: |
|
147 | """Reduce article stock according to what is in the cart.""" |
||
148 | 1 | for cart_item in cart_items: |
|
149 | 1 | article = cart_item.article |
|
150 | 1 | quantity = cart_item.quantity |
|
151 | |||
152 | 1 | article_service.decrease_quantity(article.id, quantity, commit=False) |
|
153 | |||
154 | |||
155 | 1 | def add_note(order_id: OrderID, author_id: UserID, text: str) -> None: |
|
156 | """Add a note to the order.""" |
||
157 | order = get_order(order_id) |
||
158 | author = user_service.get_user(author_id) |
||
159 | |||
160 | event_type = 'order-note-added' |
||
161 | data = { |
||
162 | 'author_id': str(author.id), |
||
163 | 'text': text, |
||
164 | } |
||
165 | |||
166 | event_service.create_event(event_type, order.id, data) |
||
167 | |||
168 | |||
169 | 1 | def set_invoiced_flag(order_id: OrderID, initiator_id: UserID) -> None: |
|
170 | """Record that the invoice for that order has been (externally) created.""" |
||
171 | order = _get_order_entity(order_id) |
||
172 | initiator = user_service.get_user(initiator_id) |
||
173 | |||
174 | now = datetime.utcnow() |
||
175 | event_type = 'order-invoiced' |
||
176 | data = { |
||
177 | 'initiator_id': str(initiator.id), |
||
178 | } |
||
179 | |||
180 | event = DbOrderEvent(now, event_type, order.id, data) |
||
181 | db.session.add(event) |
||
182 | |||
183 | order.invoice_created_at = now |
||
184 | |||
185 | db.session.commit() |
||
186 | |||
187 | |||
188 | 1 | def unset_invoiced_flag(order_id: OrderID, initiator_id: UserID) -> None: |
|
189 | """Withdraw record of the invoice for that order having been created.""" |
||
190 | order = _get_order_entity(order_id) |
||
191 | initiator = user_service.get_user(initiator_id) |
||
192 | |||
193 | now = datetime.utcnow() |
||
194 | event_type = 'order-invoiced-withdrawn' |
||
195 | data = { |
||
196 | 'initiator_id': str(initiator.id), |
||
197 | } |
||
198 | |||
199 | event = DbOrderEvent(now, event_type, order.id, data) |
||
200 | db.session.add(event) |
||
201 | |||
202 | order.invoice_created_at = None |
||
203 | |||
204 | db.session.commit() |
||
205 | |||
206 | |||
207 | 1 | View Code Duplication | def set_shipped_flag(order_id: OrderID, initiator_id: UserID) -> None: |
208 | """Mark the order as shipped.""" |
||
209 | order = _get_order_entity(order_id) |
||
210 | initiator = user_service.get_user(initiator_id) |
||
211 | |||
212 | if not order.processing_required: |
||
213 | raise ValueError('Order contains no items that require shipping.') |
||
214 | |||
215 | now = datetime.utcnow() |
||
216 | event_type = 'order-shipped' |
||
217 | data = { |
||
218 | 'initiator_id': str(initiator.id), |
||
219 | } |
||
220 | |||
221 | event = DbOrderEvent(now, event_type, order.id, data) |
||
222 | db.session.add(event) |
||
223 | |||
224 | order.processed_at = now |
||
225 | |||
226 | db.session.commit() |
||
227 | |||
228 | |||
229 | 1 | View Code Duplication | def unset_shipped_flag(order_id: OrderID, initiator_id: UserID) -> None: |
230 | """Mark the order as not shipped.""" |
||
231 | order = _get_order_entity(order_id) |
||
232 | initiator = user_service.get_user(initiator_id) |
||
233 | |||
234 | if not order.processing_required: |
||
235 | raise ValueError('Order contains no items that require shipping.') |
||
236 | |||
237 | now = datetime.utcnow() |
||
238 | event_type = 'order-shipped-withdrawn' |
||
239 | data = { |
||
240 | 'initiator_id': str(initiator.id), |
||
241 | } |
||
242 | |||
243 | event = DbOrderEvent(now, event_type, order.id, data) |
||
244 | db.session.add(event) |
||
245 | |||
246 | order.processed_at = None |
||
247 | |||
248 | db.session.commit() |
||
249 | |||
250 | |||
251 | 1 | class OrderAlreadyCanceled(Exception): |
|
252 | 1 | pass |
|
253 | |||
254 | |||
255 | 1 | class OrderAlreadyMarkedAsPaid(Exception): |
|
256 | 1 | pass |
|
257 | |||
258 | |||
259 | 1 | def cancel_order( |
|
260 | order_id: OrderID, initiator_id: UserID, reason: str |
||
261 | ) -> ShopOrderCanceled: |
||
262 | """Cancel the order. |
||
263 | |||
264 | Reserved quantities of articles from that order are made available |
||
265 | again. |
||
266 | """ |
||
267 | 1 | order = _get_order_entity(order_id) |
|
268 | |||
269 | 1 | if _is_canceled(order): |
|
270 | raise OrderAlreadyCanceled() |
||
271 | |||
272 | 1 | initiator = user_service.get_user(initiator_id) |
|
273 | 1 | orderer_user = user_service.get_user(order.placed_by_id) |
|
274 | |||
275 | 1 | has_order_been_paid = _is_paid(order) |
|
276 | |||
277 | 1 | now = datetime.utcnow() |
|
278 | |||
279 | 1 | updated_at = now |
|
280 | 1 | payment_state_from = order.payment_state |
|
281 | 1 | payment_state_to = ( |
|
282 | PaymentState.canceled_after_paid |
||
283 | if has_order_been_paid |
||
284 | else PaymentState.canceled_before_paid |
||
285 | ) |
||
286 | |||
287 | 1 | _update_payment_state(order, payment_state_to, updated_at, initiator.id) |
|
288 | 1 | order.cancelation_reason = reason |
|
289 | |||
290 | 1 | event_type = ( |
|
291 | 'order-canceled-after-paid' |
||
292 | if has_order_been_paid |
||
293 | else 'order-canceled-before-paid' |
||
294 | ) |
||
295 | 1 | data = { |
|
296 | 'initiator_id': str(initiator.id), |
||
297 | 'former_payment_state': payment_state_from.name, |
||
298 | 'reason': reason, |
||
299 | } |
||
300 | |||
301 | 1 | event = DbOrderEvent(now, event_type, order.id, data) |
|
302 | 1 | db.session.add(event) |
|
303 | |||
304 | # Make the reserved quantity of articles available again. |
||
305 | 1 | for item in order.items: |
|
306 | 1 | article_service.increase_quantity( |
|
307 | item.article.id, item.quantity, commit=False |
||
308 | ) |
||
309 | |||
310 | 1 | db.session.commit() |
|
311 | |||
312 | 1 | action_service.execute_actions( |
|
313 | _order_to_transfer_object(order), payment_state_to, initiator.id |
||
314 | ) |
||
315 | |||
316 | 1 | return ShopOrderCanceled( |
|
317 | occurred_at=updated_at, |
||
318 | initiator_id=initiator.id, |
||
319 | initiator_screen_name=initiator.screen_name, |
||
320 | order_id=order.id, |
||
321 | order_number=order.order_number, |
||
322 | orderer_id=orderer_user.id, |
||
323 | orderer_screen_name=orderer_user.screen_name, |
||
324 | ) |
||
325 | |||
326 | |||
327 | 1 | def mark_order_as_paid( |
|
328 | order_id: OrderID, |
||
329 | payment_method: str, |
||
330 | initiator_id: UserID, |
||
331 | *, |
||
332 | additional_event_data: Optional[Mapping[str, str]] = None, |
||
333 | ) -> ShopOrderPaid: |
||
334 | """Mark the order as paid.""" |
||
335 | 1 | order = _get_order_entity(order_id) |
|
336 | |||
337 | 1 | if _is_paid(order): |
|
338 | raise OrderAlreadyMarkedAsPaid() |
||
339 | |||
340 | 1 | initiator = user_service.get_user(initiator_id) |
|
341 | 1 | orderer_user = user_service.get_user(order.placed_by_id) |
|
342 | |||
343 | 1 | now = datetime.utcnow() |
|
344 | |||
345 | 1 | updated_at = now |
|
346 | 1 | payment_state_from = order.payment_state |
|
347 | 1 | payment_state_to = PaymentState.paid |
|
348 | |||
349 | 1 | order.payment_method = payment_method |
|
350 | 1 | _update_payment_state(order, payment_state_to, updated_at, initiator.id) |
|
351 | |||
352 | 1 | event_type = 'order-paid' |
|
353 | # Add required, internally set properties after given additional |
||
354 | # ones to ensure the former are not overridden by the latter. |
||
355 | 1 | event_data: OrderEventData = {} |
|
356 | 1 | if additional_event_data is not None: |
|
357 | 1 | event_data.update(additional_event_data) |
|
358 | 1 | event_data.update( |
|
359 | { |
||
360 | 'initiator_id': str(initiator.id), |
||
361 | 'former_payment_state': payment_state_from.name, |
||
362 | 'payment_method': payment_method, |
||
363 | } |
||
364 | ) |
||
365 | |||
366 | 1 | event = DbOrderEvent(now, event_type, order.id, event_data) |
|
367 | 1 | db.session.add(event) |
|
368 | |||
369 | 1 | db.session.commit() |
|
370 | |||
371 | 1 | action_service.execute_actions( |
|
372 | _order_to_transfer_object(order), payment_state_to, initiator.id |
||
373 | ) |
||
374 | |||
375 | 1 | return ShopOrderPaid( |
|
376 | occurred_at=updated_at, |
||
377 | initiator_id=initiator.id, |
||
378 | initiator_screen_name=initiator.screen_name, |
||
379 | order_id=order.id, |
||
380 | order_number=order.order_number, |
||
381 | orderer_id=orderer_user.id, |
||
382 | orderer_screen_name=orderer_user.screen_name, |
||
383 | payment_method=payment_method, |
||
384 | ) |
||
385 | |||
386 | |||
387 | 1 | def _update_payment_state( |
|
388 | order: DbOrder, |
||
389 | state: PaymentState, |
||
390 | updated_at: datetime, |
||
391 | initiator_id: UserID, |
||
392 | ) -> None: |
||
393 | 1 | order.payment_state = state |
|
394 | 1 | order.payment_state_updated_at = updated_at |
|
395 | 1 | order.payment_state_updated_by_id = initiator_id |
|
396 | |||
397 | |||
398 | 1 | def delete_order(order_id: OrderID) -> None: |
|
399 | """Delete an order.""" |
||
400 | 1 | order = get_order(order_id) |
|
401 | |||
402 | 1 | db.session.query(DbOrderEvent) \ |
|
403 | .filter_by(order_id=order.id) \ |
||
404 | .delete() |
||
405 | |||
406 | 1 | db.session.query(DbLineItem) \ |
|
407 | .filter_by(order_number=order.order_number) \ |
||
408 | .delete() |
||
409 | |||
410 | 1 | db.session.query(DbOrder) \ |
|
411 | .filter_by(id=order.id) \ |
||
412 | .delete() |
||
413 | |||
414 | 1 | db.session.commit() |
|
415 | |||
416 | |||
417 | 1 | def count_open_orders(shop_id: ShopID) -> int: |
|
418 | """Return the number of open orders for the shop.""" |
||
419 | return db.session \ |
||
420 | .query(DbOrder) \ |
||
421 | .filter_by(shop_id=shop_id) \ |
||
422 | .filter_by(_payment_state=PaymentState.open.name) \ |
||
423 | .count() |
||
424 | |||
425 | |||
426 | 1 | def count_orders_per_payment_state(shop_id: ShopID) -> dict[PaymentState, int]: |
|
427 | """Count orders for the shop, grouped by payment state.""" |
||
428 | counts_by_payment_state = dict.fromkeys(PaymentState, 0) |
||
429 | |||
430 | rows = db.session \ |
||
431 | .query( |
||
432 | DbOrder._payment_state, |
||
433 | db.func.count(DbOrder.id) |
||
434 | ) \ |
||
435 | .filter(DbOrder.shop_id == shop_id) \ |
||
436 | .group_by(DbOrder._payment_state) \ |
||
437 | .all() |
||
438 | |||
439 | for payment_state_str, count in rows: |
||
440 | payment_state = PaymentState[payment_state_str] |
||
441 | counts_by_payment_state[payment_state] = count |
||
442 | |||
443 | return counts_by_payment_state |
||
444 | |||
445 | |||
446 | 1 | def _find_order_entity(order_id: OrderID) -> Optional[DbOrder]: |
|
447 | """Return the order database entity with that id, or `None` if not |
||
448 | found. |
||
449 | """ |
||
450 | 1 | return db.session.query(DbOrder).get(order_id) |
|
451 | |||
452 | |||
453 | 1 | def _get_order_entity(order_id: OrderID) -> DbOrder: |
|
454 | """Return the order database entity with that id, or raise an |
||
455 | exception. |
||
456 | """ |
||
457 | 1 | order = _find_order_entity(order_id) |
|
458 | |||
459 | 1 | if order is None: |
|
460 | raise ValueError(f'Unknown order ID "{order_id}"') |
||
461 | |||
462 | 1 | return order |
|
463 | |||
464 | |||
465 | 1 | def find_order(order_id: OrderID) -> Optional[Order]: |
|
466 | """Return the order with that id, or `None` if not found.""" |
||
467 | 1 | order = _find_order_entity(order_id) |
|
468 | |||
469 | 1 | if order is None: |
|
470 | return None |
||
471 | |||
472 | 1 | return _order_to_transfer_object(order) |
|
473 | |||
474 | |||
475 | 1 | def get_order(order_id: OrderID) -> Order: |
|
476 | """Return the order with that id, or raise an exception.""" |
||
477 | 1 | order = _get_order_entity(order_id) |
|
478 | 1 | return _order_to_transfer_object(order) |
|
479 | |||
480 | |||
481 | 1 | def find_order_with_details(order_id: OrderID) -> Optional[Order]: |
|
482 | """Return the order with that id, or `None` if not found.""" |
||
483 | 1 | order = db.session.query(DbOrder) \ |
|
484 | .options( |
||
485 | db.joinedload(DbOrder.items), |
||
486 | ) \ |
||
487 | .get(order_id) |
||
488 | |||
489 | 1 | if order is None: |
|
490 | 1 | return None |
|
491 | |||
492 | 1 | return _order_to_transfer_object(order) |
|
493 | |||
494 | |||
495 | 1 | def find_order_by_order_number(order_number: OrderNumber) -> Optional[Order]: |
|
496 | """Return the order with that order number, or `None` if not found.""" |
||
497 | order = db.session \ |
||
498 | .query(DbOrder) \ |
||
499 | .filter_by(order_number=order_number) \ |
||
500 | .one_or_none() |
||
501 | |||
502 | if order is None: |
||
503 | return None |
||
504 | |||
505 | return _order_to_transfer_object(order) |
||
506 | |||
507 | |||
508 | 1 | def find_orders_by_order_numbers( |
|
509 | order_numbers: set[OrderNumber], |
||
510 | ) -> Sequence[Order]: |
||
511 | """Return the orders with those order numbers.""" |
||
512 | if not order_numbers: |
||
513 | return [] |
||
514 | |||
515 | orders = db.session \ |
||
516 | .query(DbOrder) \ |
||
517 | .filter(DbOrder.order_number.in_(order_numbers)) \ |
||
518 | .all() |
||
519 | |||
520 | return list(map(_order_to_transfer_object, orders)) |
||
521 | |||
522 | |||
523 | 1 | def get_order_count_by_shop_id() -> dict[ShopID, int]: |
|
524 | """Return order count (including 0) per shop, indexed by shop ID.""" |
||
525 | shop_ids_and_order_counts = db.session \ |
||
526 | .query( |
||
527 | DbShop.id, |
||
528 | db.func.count(DbOrder.shop_id) |
||
529 | ) \ |
||
530 | .outerjoin(DbOrder) \ |
||
531 | .group_by(DbShop.id) \ |
||
532 | .all() |
||
533 | |||
534 | return dict(shop_ids_and_order_counts) |
||
535 | |||
536 | |||
537 | 1 | def get_orders_for_shop_paginated( |
|
538 | shop_id: ShopID, |
||
539 | page: int, |
||
540 | per_page: int, |
||
541 | *, |
||
542 | search_term=None, |
||
543 | only_payment_state: Optional[PaymentState] = None, |
||
544 | only_processed: Optional[bool] = None, |
||
545 | ) -> Pagination: |
||
546 | """Return all orders for that shop, ordered by creation date. |
||
547 | |||
548 | If a payment state is specified, only orders in that state are |
||
549 | returned. |
||
550 | """ |
||
551 | query = db.session \ |
||
552 | .query(DbOrder) \ |
||
553 | .filter_by(shop_id=shop_id) \ |
||
554 | .order_by(DbOrder.created_at.desc()) |
||
555 | |||
556 | if search_term: |
||
557 | ilike_pattern = f'%{search_term}%' |
||
558 | query = query \ |
||
559 | .filter(DbOrder.order_number.ilike(ilike_pattern)) |
||
560 | |||
561 | if only_payment_state is not None: |
||
562 | query = query.filter_by(_payment_state=only_payment_state.name) |
||
563 | |||
564 | if only_processed is not None: |
||
565 | query = query.filter(DbOrder.processing_required == True) |
||
566 | |||
567 | if only_processed: |
||
568 | query = query.filter(DbOrder.processed_at != None) |
||
569 | else: |
||
570 | query = query.filter(DbOrder.processed_at == None) |
||
571 | |||
572 | return paginate( |
||
573 | query, |
||
574 | page, |
||
575 | per_page, |
||
576 | item_mapper=lambda order: _order_to_transfer_object(order), |
||
577 | ) |
||
578 | |||
579 | |||
580 | 1 | def get_orders_placed_by_user(user_id: UserID) -> Sequence[Order]: |
|
581 | """Return orders placed by the user.""" |
||
582 | 1 | orders = db.session \ |
|
583 | .query(DbOrder) \ |
||
584 | .options( |
||
585 | db.joinedload(DbOrder.items), |
||
586 | ) \ |
||
587 | .filter_by(placed_by_id=user_id) \ |
||
588 | .order_by(DbOrder.created_at.desc()) \ |
||
589 | .all() |
||
590 | |||
591 | 1 | return list(map(_order_to_transfer_object, orders)) |
|
592 | |||
593 | |||
594 | 1 | def get_orders_placed_by_user_for_shop( |
|
595 | user_id: UserID, shop_id: ShopID |
||
596 | ) -> Sequence[Order]: |
||
597 | """Return orders placed by the user in that shop.""" |
||
598 | 1 | orders = db.session \ |
|
599 | .query(DbOrder) \ |
||
600 | .options( |
||
601 | db.joinedload(DbOrder.items), |
||
602 | ) \ |
||
603 | .filter_by(shop_id=shop_id) \ |
||
604 | .filter_by(placed_by_id=user_id) \ |
||
605 | .order_by(DbOrder.created_at.desc()) \ |
||
606 | .all() |
||
607 | |||
608 | 1 | return list(map(_order_to_transfer_object, orders)) |
|
609 | |||
610 | |||
611 | 1 | def has_user_placed_orders(user_id: UserID, shop_id: ShopID) -> bool: |
|
612 | """Return `True` if the user has placed orders in that shop.""" |
||
613 | 1 | orders_total = db.session \ |
|
614 | .query(DbOrder) \ |
||
615 | .filter_by(shop_id=shop_id) \ |
||
616 | .filter_by(placed_by_id=user_id) \ |
||
617 | .count() |
||
618 | |||
619 | 1 | return orders_total > 0 |
|
620 | |||
621 | |||
622 | 1 | _PAYMENT_METHOD_LABELS = { |
|
623 | 'bank_transfer': lazy_gettext('bank transfer'), |
||
624 | 'cash': lazy_gettext('cash'), |
||
625 | 'direct_debit': lazy_gettext('direct debit'), |
||
626 | 'free': lazy_gettext('free'), |
||
627 | } |
||
628 | |||
629 | |||
630 | 1 | def find_payment_method_label(payment_method: str) -> Optional[str]: |
|
631 | """Return a label for the payment method.""" |
||
632 | 1 | return _PAYMENT_METHOD_LABELS.get(payment_method) |
|
633 | |||
634 | |||
635 | 1 | def get_payment_date(order_id: OrderID) -> Optional[datetime]: |
|
636 | """Return the date the order has been marked as paid, or `None` if |
||
637 | it has not been paid. |
||
638 | """ |
||
639 | 1 | return db.session \ |
|
640 | .query(DbOrder.payment_state_updated_at) \ |
||
641 | .filter_by(id=order_id) \ |
||
642 | .scalar() |
||
643 | |||
644 | |||
645 | 1 | def _order_to_transfer_object(order: DbOrder) -> Order: |
|
646 | """Create transfer object from order database entity.""" |
||
647 | 1 | address = Address( |
|
648 | country=order.country, |
||
649 | zip_code=order.zip_code, |
||
650 | city=order.city, |
||
651 | street=order.street, |
||
652 | ) |
||
653 | |||
654 | 1 | items = list(map(line_item_to_transfer_object, order.items)) |
|
655 | |||
656 | 1 | state = _get_order_state(order) |
|
657 | 1 | is_open = order.payment_state == PaymentState.open |
|
658 | 1 | is_canceled = _is_canceled(order) |
|
659 | 1 | is_paid = _is_paid(order) |
|
660 | 1 | is_invoiced = order.invoice_created_at is not None |
|
661 | 1 | is_processing_required = order.processing_required |
|
662 | 1 | is_processed = order.processed_at is not None |
|
663 | |||
664 | 1 | return Order( |
|
665 | id=order.id, |
||
666 | shop_id=order.shop_id, |
||
667 | order_number=order.order_number, |
||
668 | created_at=order.created_at, |
||
669 | placed_by_id=order.placed_by_id, |
||
670 | first_names=order.first_names, |
||
671 | last_name=order.last_name, |
||
672 | address=address, |
||
673 | total_amount=order.total_amount, |
||
674 | items=items, |
||
675 | payment_method=order.payment_method, |
||
676 | payment_state=order.payment_state, |
||
677 | state=state, |
||
678 | is_open=is_open, |
||
679 | is_canceled=is_canceled, |
||
680 | is_paid=is_paid, |
||
681 | is_invoiced=is_invoiced, |
||
682 | is_processing_required=is_processing_required, |
||
683 | is_processed=is_processed, |
||
684 | cancelation_reason=order.cancelation_reason, |
||
685 | ) |
||
686 | |||
687 | |||
688 | 1 | def line_item_to_transfer_object( |
|
689 | item: DbLineItem, |
||
690 | ) -> LineItem: |
||
691 | """Create transfer object from line item database entity.""" |
||
692 | 1 | return LineItem( |
|
693 | order_number=item.order_number, |
||
694 | article_number=item.article_number, |
||
695 | article_type=item.article_type, |
||
696 | description=item.description, |
||
697 | unit_price=item.unit_price, |
||
698 | tax_rate=item.tax_rate, |
||
699 | quantity=item.quantity, |
||
700 | line_amount=item.line_amount, |
||
701 | ) |
||
702 | |||
703 | |||
704 | 1 | def _get_order_state(order: DbOrder) -> OrderState: |
|
705 | 1 | is_canceled = _is_canceled(order) |
|
706 | 1 | is_paid = _is_paid(order) |
|
707 | 1 | is_processing_required = order.processing_required |
|
708 | 1 | is_processed = order.processed_at is not None |
|
709 | |||
710 | 1 | if is_canceled: |
|
711 | 1 | return OrderState.canceled |
|
712 | |||
713 | 1 | if is_paid: |
|
714 | 1 | if not is_processing_required or is_processed: |
|
715 | 1 | return OrderState.complete |
|
716 | |||
717 | 1 | return OrderState.open |
|
718 | |||
719 | |||
720 | 1 | def _is_canceled(order: DbOrder) -> bool: |
|
721 | 1 | return order.payment_state in { |
|
722 | PaymentState.canceled_before_paid, |
||
723 | PaymentState.canceled_after_paid, |
||
724 | } |
||
725 | |||
726 | |||
727 | 1 | def _is_paid(order: DbOrder) -> bool: |
|
728 | return order.payment_state == PaymentState.paid |
||
729 |