Passed
Pull Request — master (#16)
by William
01:31
created

app.getemail.process_email_balances()   F

Complexity

Conditions 22

Size

Total Lines 124
Code Lines 85

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 22
eloc 85
nop 0
dl 0
loc 124
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like app.getemail.process_email_balances() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
"""
3
Email balance import script for pycashflow.
4
Processes IMAP email inboxes to extract and import balance information.
5
6
This script can be run standalone from cron or called from within the application.
7
"""
8
import os
9
import sys
10
11
# When run as standalone script, add parent directory to path for imports
12
# This allows 'from app import ...' to work when called from cron
13
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14
if parent_dir not in sys.path:
15
    sys.path.insert(0, parent_dir)
16
17
import imaplib
18
import email
19
from email.header import decode_header
20
from datetime import datetime, timedelta
21
from app import db
22
from app.models import User, Email, Balance
23
24
25
def process_email_balances():
26
    """
27
    Process email inboxes for all users with email settings configured
28
    and update their balance information from bank emails.
29
30
    This function supports both SQLite and PostgreSQL through SQLAlchemy.
31
    """
32
    # OUTER LOOP: Get all users with email settings configured
33
    users_with_email = db.session.query(User, Email).join(
34
        Email, User.id == Email.user_id
35
    ).all()
36
37
    for user, email_config in users_with_email:
38
        user_id = user.id
39
        username = email_config.email
40
        password = email_config.password
41
        imap_server = email_config.server
42
        subjectstr = email_config.subjectstr
43
        startstr = email_config.startstr
44
        endstr = email_config.endstr
45
46
        try:
47
            # Create IMAP connection for THIS user
48
            imap = imaplib.IMAP4_SSL(imap_server)
49
            imap.login(username, password)
50
51
            status, messages = imap.select("INBOX", readonly=True)
52
53
            # Filter for emails from the past day to limit inbox processing
54
            # IMAP SINCE searches for messages with Date on or after the specified date
55
            yesterday = (datetime.now() - timedelta(days=1)).strftime("%d-%b-%Y")
56
            status, message_ids = imap.search(None, f'SINCE {yesterday}')
57
58
            # Parse message IDs from the search result
59
            if message_ids[0]:
60
                email_ids = message_ids[0].split()
61
            else:
62
                email_ids = []
63
64
            print(f"Found {len(email_ids)} email(s) from the past day for user {user_id}")
65
66
            email_content = {}
67
68
            # INNER LOOP: Process only emails from the past day
69
            for email_id in email_ids:
70
                try:
71
                    res, msg = imap.fetch(email_id, "(RFC822)")
72
                    for response in msg:
73
                        if isinstance(response, tuple):
74
                            msg = email.message_from_bytes(response[1])
75
76
                            # Decode subject
77
                            subject, encoding = decode_header(msg["Subject"])[0]
78
                            if isinstance(subject, bytes):
79
                                try:
80
                                    subject = subject.decode(encoding)
81
                                except:
82
                                    subject = "subject"
83
84
                            # Decode sender
85
                            From, encoding = decode_header(msg.get("From"))[0]
86
                            if isinstance(From, bytes):
87
                                try:
88
                                    From = From.decode(encoding)
89
                                except:
90
                                    pass
91
92
                            # Extract email body
93
                            if msg.is_multipart():
94
                                for part in msg.walk():
95
                                    content_type = part.get_content_type()
96
                                    content_disposition = str(part.get("Content-Disposition"))
97
                                    try:
98
                                        body = part.get_payload(decode=True).decode()
99
                                    except:
100
                                        pass
101
                                    if content_type == "text/plain" and "attachment" not in content_disposition:
102
                                        try:
103
                                            email_content[subject] = body
104
                                        except:
105
                                            pass
106
                            else:
107
                                content_type = msg.get_content_type()
108
                                body = msg.get_payload(decode=True).decode()
109
                                if content_type == "text/plain":
110
                                    try:
111
                                        email_content[subject] = body
112
                                    except:
113
                                        pass
114
                except:
115
                    pass  # Skip individual email errors
116
117
            # Extract balance from emails for THIS user
118
            try:
119
                start_index = email_content[subjectstr].find(startstr) + len(startstr)
120
                end_index = email_content[subjectstr].find(endstr)
121
                new_balance = email_content[subjectstr][start_index:end_index].replace(',', '')
122
                new_balance = new_balance.replace('$', '')
123
                new_balance = float(new_balance)
124
125
                # Insert balance WITH user_id using SQLAlchemy ORM
126
                balance = Balance(
127
                    amount=new_balance,
128
                    date=datetime.today().date(),
129
                    user_id=user_id
130
                )
131
                db.session.add(balance)
132
                db.session.commit()
133
                print(f"Successfully imported balance ${new_balance} for user {user_id}")
134
            except KeyError:
135
                # No email with the specified subject found
136
                pass
137
            except Exception as e:
138
                # No balance found in emails for this user
139
                print(f"Could not extract balance for user {user_id}: {e}")
140
141
            # Close IMAP connection for THIS user
142
            imap.close()
143
            imap.logout()
144
145
        except Exception as e:
146
            # Failed to connect to IMAP for this user - skip to next user
147
            print(f"Failed to process emails for user {user_id}: {e}")
148
            continue
149
150
151
# Allow script to be run standalone from cron
152
if __name__ == "__main__":
153
    from app import create_app
154
155
    print("Starting email balance import...")
156
    app = create_app()
157
158
    with app.app_context():
159
        try:
160
            process_email_balances()
161
            print("Email balance import completed successfully")
162
        except Exception as e:
163
            print(f"Email balance import failed: {e}")
164
            sys.exit(1)
165