Test Failed
Push — master ( 583804...f2aaff )
by Dominik
01:33
created

_import_osm_xml()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
cc 1
1
# ~*~ coding: utf-8 ~*~
2
#-
3
# OSMAlchemy - OpenStreetMap to SQLAlchemy bridge
4
# Copyright (c) 2016 Dominik George <[email protected]>
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining a copy
7
# of this software and associated documentation files (the "Software"), to deal
8
# in the Software without restriction, including without limitation the rights
9
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
# copies of the Software, and to permit persons to whom the Software is
11
# furnished to do so, subject to the following conditions:
12
#
13
# The above copyright notice and this permission notice shall be included in all
14
# copies or substantial portions of the Software.
15
#
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
#
24
# Alternatively, you are free to use OSMAlchemy under Simplified BSD, The
25
# MirOS Licence, GPL-2+, LGPL-2.1+, AGPL-3+ or the same terms as Python
26
# itself.
27
28
""" Utility code for OSMAlchemy. """
29
30
import dateutil.parser
31
import xml.dom.minidom as minidom
32
33
def _import_osm_dom(osma, session, dom):
34
    """ Import a DOM tree from OSM XML into an OSMAlchemy model.
35
36
    Not called directly; used by _import_osm_xml and _import_osm_file.
37
    """
38
39
    def _dom_attrs_to_any(e, element):
40
        if "version" in e.attributes.keys():
41
            element.version = int(e.attributes["version"].value)
42
        if "changeset" in e.attributes.keys():
43
            element.changeset = int(e.attributes["changeset"].value)
44
        if "user" in e.attributes.keys():
45
            element.user = e.attributes["user"].value
46
        if "uid" in e.attributes.keys():
47
            element.uid = int(e.attributes["uid"].value)
48
        if "visible" in e.attributes.keys():
49
            element.visible = True if e.attributes["visible"].value == "true" else False
50
        if "timestamp" in e.attributes.keys():
51
            element.timestamp = dateutil.parser.parse(e.attributes["timestamp"].value)
52
53
    def _dom_tags_to_any(e, element):
54
        # Target dictionary
55
        tags = {}
56
57
        # Iterate over all <tag /> nodes in the DOM element
58
        for t in e.getElementsByTagName("tag"):
59
            # Append data to tags
60
            tags[t.attributes["k"].value] = t.attributes["v"].value
61
62
        # Store tags dictionary in object
63
        element.tags = tags
64
65
    def _dom_to_node(e):
66
        with session.no_autoflush:
67
            # Get mandatory node id
68
            id = int(e.attributes["id"].value)
69
70
            # Find object in database and create if non-existent
71
            node = session.query(osma.Node).filter_by(id=id).scalar()
72
            if node is None:
73
                node = osma.Node(id=id)
74
75
            # Store mandatory latitude and longitude
76
            node.latitude = e.attributes["lat"].value
77
            node.longitude = e.attributes["lon"].value
78
79
            # Store other attributes and tags
80
            _dom_attrs_to_any(e, node)
81
            _dom_tags_to_any(e, node)
82
83
        # Add to session
84
        session.add(node)
85
        session.commit()
86
87
    def _dom_to_way(e):
88
        with session.no_autoflush:
89
            # Get mandatory way id
90
            id = int(e.attributes["id"].value)
91
92
            # Find object in database and create if non-existent
93
            way = session.query(osma.Way).filter_by(id=id).scalar()
94
            if way is None:
95
                way = osma.Way(id=id)
96
97
            # Find all related nodes
98
            for n in e.getElementsByTagName("nd"):
99
                # Get node id and find object
100
                ref = int(n.attributes["ref"].value)
101
                node = session.query(osma.Node).filter_by(id=ref).one()
102
                # Append to nodes in way
103
                way.nodes.append(node)
104
105
            # Store other attributes and tags
106
            _dom_attrs_to_any(e, way)
107
            _dom_tags_to_any(e, way)
108
109
        # Add to session
110
        session.add(way)
111
        session.commit()
112
113
    def _dom_to_relation(e):
114
        with session.no_autoflush:
115
            # Get mandatory way id
116
            id = int(e.attributes["id"].value)
117
118
            # Find object in database and create if non-existent
119
            relation = session.query(osma.Relation).filter_by(id=id).scalar()
120
            if relation is None:
121
                relation = osma.Relation(id=id)
122
123
            # Find all members
124
            for m in e.getElementsByTagName("member"):
125
                # Get member attributes
126
                ref = int(m.attributes["ref"].value)
127
                type = m.attributes["type"].value
128
129
                if "role" in m.attributes.keys():
130
                    role = m.attributes["role"].value
131
                else:
132
                    role = ""
133
                element = session.query(osma.Element).filter_by(id=ref, type=type).scalar()
134
                if element is None:
135
                    # We do not know the member yet, create a stub
136
                    if type == "node":
137
                        element = osma.Node(id=ref)
138
                    elif type == "way":
139
                        element = osma.Way(id=ref)
140
                    elif type == "relation":
141
                        element = osma.Relation(id=ref)
142
                    # We need to commit here because element could be repeated
143
                    session.add(element)
144
                    session.commit()
145
                # Append to members
146
                relation.members.append((element, role))
147
148
            # Store other attributes and tags
149
            _dom_attrs_to_any(e, relation)
150
            _dom_tags_to_any(e, relation)
151
152
        # Add to session
153
        session.add(relation)
154
        session.commit()
155
156
    # Get root element
157
    osm = dom.documentElement
158
159
    # Iterate over children to find nodes, ways and relations
160
    for e in osm.childNodes:
161
        # Determine element type
162
        if e.nodeName == "node":
163
            _dom_to_node(e)
164
        elif e.nodeName == "way":
165
            _dom_to_way(e)
166
        elif e.nodeName == "relation":
167
            _dom_to_relation(e)
168
169
def _import_osm_xml(osma, session, xml):
170
    """ Import a string in OSM XML format into an OSMAlchemy model.
171
172
      osma - reference to the OSMAlchemy model instance
173
      session - an SQLAlchemy session
174
      xml - string containing the XML data
175
    """
176
177
    # Parse string into DOM structure
178
    dom = minidom.parseString(xml)
179
180
    return _import_osm_dom(osma, session, dom)
181
182
def _import_osm_file(osma, session, file):
183
    """ Import a file in OSM XML format into an OSMAlchemy model.
184
185
      osma - reference to the OSMAlchemy model instance
186
      session - an SQLAlchemy session
187
      path - path to the file to import or open file object
188
    """
189
190
    # Parse document into DOM structure
191
    dom = minidom.parse(file)
192
193
    return _import_osm_dom(osma, session, dom)
194