api_weather   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 72
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 44
dl 0
loc 72
rs 10
c 0
b 0
f 0
wmc 11

3 Methods

Rating   Name   Duplication   Size   Complexity  
A Weather.__init__() 0 2 1
A Weather.main() 0 26 4
B Weather.city_map() 0 18 6
1
# -*- coding: utf-8 -*-
2
"""
3
cron: 30 7 * * *
4
new Env('天气预报');
5
"""
6
7
import json
8
from os.path import dirname, join
9
10
import requests
11
12
from notify_mtr import send
13
from utils import get_data
14
15
16
class Weather:
17
    def __init__(self, check_items):
18
        self.check_items = check_items
19
20
    @staticmethod
21
    def city_map():
22
        cur_dir = dirname(__file__)
23
        try:
24
            with open(join(cur_dir, "city.json"), "r", encoding="utf-8") as city_file:
25
                city_map = json.load(city_file)
26
                if not city_map:
27
                    raise FileNotFoundError
28
        except FileNotFoundError as e:
29
            r = requests.get(
30
                "https://fastly.jsdelivr.net/gh/Oreomeow/checkinpanel@master/city.json"
31
            )
32
            if r.status_code != 200:
33
                raise ConnectionError("下载 city.json 失败!") from e
34
            city_map = r.json()
35
            with open(join(cur_dir, "city.json"), "w", encoding="utf-8") as city_file:
36
                json.dump(city_map, city_file, ensure_ascii=False)
37
        return city_map
38
39
    def main(self):
40
        msg_all = ""
41
        for city_name in self.check_items:
42
            city_code = self.city_map().get(city_name, "101020100")
43
            weather_url = f"http://t.weather.itboy.net/api/weather/city/{city_code}"
44
            r = requests.get(weather_url)
45
            if r.status_code == 200 and r.json().get("status") == 200:
46
                d = r.json()
47
                msg = (
48
                    f'城市:{d["cityInfo"]["parent"]} {d["cityInfo"]["city"]}\n'
49
                    f'日期:{d["data"]["forecast"][0]["ymd"]} {d["data"]["forecast"][0]["week"]}\n'
50
                    f'天气:{d["data"]["forecast"][0]["type"]}\n'
51
                    f'温度:{d["data"]["forecast"][0]["high"]} {d["data"]["forecast"][0]["low"]}\n'
52
                    f'湿度:{d["data"]["shidu"]}\n'
53
                    f'空气质量:{d["data"]["quality"]}\n'
54
                    f'PM2.5:{d["data"]["pm25"]}\n'
55
                    f'PM10:{d["data"]["pm10"]}\n'
56
                    f'风力风向 {d["data"]["forecast"][0]["fx"]} {d["data"]["forecast"][0]["fl"]}\n'
57
                    f'感冒指数:{d["data"]["ganmao"]}\n'
58
                    f'温馨提示:{d["data"]["forecast"][0]["notice"]}\n'
59
                    f'更新时间:{d["time"]}'
60
                )
61
            else:
62
                msg = f"{city_name} 天气查询失败!"
63
            msg_all += msg + "\n\n"
64
        return msg_all
65
66
67
if __name__ == "__main__":
68
    _data = get_data()
69
    _check_items = _data.get("CITY", [])
70
    result = Weather(check_items=_check_items).main()
71
    send("天气预报", result)
72