Completed
Push — master ( b632c8...1f9ce5 )
by Andrea
01:18
created

MDRepository   F

Complexity

Total Complexity 166

Size/Duplication

Total Lines 757
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 166
c 1
b 0
f 0
dl 0
loc 757
rs 1.263

34 Methods

Rating   Name   Duplication   Size   Complexity  
A extensions() 0 11 2
F search() 0 63 15
F entity_set() 0 37 11
B lookup() 0 31 2
A _match() 0 6 3
A _hash() 0 9 3
A producer() 0 7 2
B summary() 0 24 3
F fetch_metadata() 0 152 31
F consumer() 0 97 22
A _eattribute() 0 13 3
A _index_entity() 0 5 2
A is_idp() 0 6 1
A __getitem__() 0 2 1
A _entity_attributes() 0 8 2
A __delitem__() 0 2 1
A sane() 0 6 1
A sha1_id() 0 2 1
B display() 0 18 5
D import_metadata() 0 31 8
A keys() 0 2 1
A is_sp() 0 6 1
A __setitem__() 0 2 1
A __iter__() 0 4 4
A error_set() 0 6 1
C merge() 0 55 8
A set_entity_attributes() 0 20 3
A __init__() 0 10 1
F load_dir() 0 28 9
A entities() 0 12 3
F parse_metadata() 0 45 10
B annotate() 0 22 4
F _lookup() 0 100 31
A _strings() 0 8 4

How to fix   Complexity   

Complex Class

Complex classes like MDRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
3
This is the implementation of the active repository of SAML metadata. The 'local' and 'remote' pipes operate on this.
4
5
"""
6
from StringIO import StringIO
7
from datetime import datetime
8
import hashlib
9
import urllib
10
from UserDict import DictMixin, UserDict
11
from lxml import etree
12
from lxml.builder import ElementMaker
13
from lxml.etree import DocumentInvalid
14
import os
15
import re
16
from copy import deepcopy
17
from pyff import merge_strategies
18
import pyff.index
19
from pyff.logs import log
20
from pyff.utils import schema, URLFetch, filter_lang, root, duration2timedelta, template
21
import xmlsec
22
from pyff.constants import NS, NF_URI, DIGESTS, EVENT_DROP_ENTITY, EVENT_IMPORTED_METADATA, EVENT_IMPORT_FAIL
23
import traceback
24
import threading
25
from Queue import Queue
26
27
28
__author__ = 'leifj'
29
30
31
def _is_self_signed_err(ebuf):
32
    for e in ebuf:
33
        if e['func'] == 'xmlSecOpenSSLX509StoreVerify' and re.match('err=18', e['message']):
34
            return True
35
    return False
36
37
38
etree.set_default_parser(etree.XMLParser(resolve_entities=False))
39
40
41
def _e(error_log, m=None):
42
    def _f(x):
43
        if ":WARNING:" in x:
44
            return False
45
        if m is not None and not m in x:
46
            return False
47
        return True
48
49
    return "\n".join(filter(_f, ["%s" % e for e in error_log]))
50
51
52
class MetadataException(Exception):
53
    pass
54
55
56
class Event(UserDict):
57
    pass
58
59
60
class Observable(object):
61
    def __init__(self):
62
        self.callbacks = []
63
64
    def subscribe(self, callback):
65
        self.callbacks.append(callback)
66
67
    def fire(self, **attrs):
68
        e = Event(attrs)
69
        e['time'] = datetime.now()
70
        for fn in self.callbacks:
71
            fn(e)
72
73
74
class MDRepository(DictMixin, Observable):
75
    """A class representing a set of SAML Metadata. Instances present as dict-like objects where
76
    the keys are URIs and values are EntitiesDescriptor elements containing sets of metadata.
77
    """
78
79
    def __init__(self, index=pyff.index.MemoryIndex(), metadata_cache_enabled=False, min_cache_ttl="PT5M"):
80
        self.md = {}
81
        self.index = index
82
        self.metadata_cache_enabled = metadata_cache_enabled
83
        self.min_cache_ttl = min_cache_ttl
84
        self.respect_cache_duration = True
85
        self.default_cache_duration = "PT10M"
86
        self.retry_limit = 5
87
88
        super(MDRepository, self).__init__()
89
90
    def is_idp(self, entity):
91
        """Returns True if the supplied EntityDescriptor has an IDPSSODescriptor Role
92
93
:param entity: An EntityDescriptor element
94
        """
95
        return bool(entity.find(".//{%s}IDPSSODescriptor" % NS['md']) is not None)
96
97
    def is_sp(self, entity):
98
        """Returns True if the supplied EntityDescriptor has an SPSSODescriptor Role
99
100
:param entity: An EntityDescriptor element
101
        """
102
        return bool(entity.find(".//{%s}SPSSODescriptor" % NS['md']) is not None)
103
104
    def display(self, entity):
105
        """Utility-method for computing a displayable string for a given entity.
106
107
:param entity: An EntityDescriptor element
108
        """
109
        for displayName in filter_lang(entity.findall(".//{%s}DisplayName" % NS['mdui'])):
110
            return displayName.text
111
112
        for serviceName in filter_lang(entity.findall(".//{%s}ServiceName" % NS['md'])):
113
            return serviceName.text
114
115
        for organizationDisplayName in filter_lang(entity.findall(".//{%s}OrganizationDisplayName" % NS['md'])):
116
            return organizationDisplayName.text
117
118
        for organizationName in filter_lang(entity.findall(".//{%s}OrganizationName" % NS['md'])):
119
            return organizationName.text
120
121
        return entity.get('entityID')
122
123
    def __iter__(self):
124
        for t in [self.md[url] for url in self.md.keys()]:
125
            for entity in t.findall(".//{%s}EntityDescriptor" % NS['md']):
126
                yield entity
127
128
    def sha1_id(self, e):
129
        return pyff.index.hash_id(e, 'sha1')
130
131
    def search(self, query, path=None, page=None, page_limit=10, entity_filter=None):
132
        """
133
:param query: A string to search for.
134
:param path: The repository collection (@Name) to search in - None for search in all collections
135
:param page:  When using paged search, the page index
136
:param page_limit: When using paged search, the maximum entry per page
137
:param entity_filter: A lookup expression used to filter the entries before search is done.
138
139
Returns a list of dict's for each EntityDescriptor present in the metadata store such
140
that any of the DisplayName, ServiceName, OrganizationName or OrganizationDisplayName
141
elements match the query (as in contains the query as a substring).
142
143
The dict in the list contains three items:
144
145
:param label: A displayable string, useful as a UI label
146
:param value: The entityID of the EntityDescriptor
147
:param id: A sha1-ID of the entityID - on the form {sha1}<sha1-hash-of-entityID>
148
        """
149
150
        def _strings(e):
151
            lst = [e.get('entityID')]
152
            for attr in ['.//{%s}DisplayName' % NS['mdui'],
153
                         './/{%s}ServiceName' % NS['md'],
154
                         './/{%s}OrganizationDisplayName' % NS['md'],
155
                         './/{%s}OrganizationName' % NS['md']]:
156
                lst.extend([x.text.lower() for x in e.findall(attr)])
157
            return filter(lambda s: s is not None, lst)
158
159
        def _match(query, e):
160
            #log.debug("looking for %s in %s" % (query,",".join(_strings(e))))
161
            for qstr in _strings(e):
162
                if query in qstr:
163
                    return True
164
            return False
165
166
        f = []
167
        if path is not None:
168
            f.append(path)
169
        if entity_filter is not None:
170
            f.append(entity_filter)
171
        mexpr = None
172
        if f:
173
            mexpr = "+".join(f)
174
175
        log.debug("mexpr: %s" % mexpr)
176
177
        res = [{'label': self.display(e),
178
                'value': e.get('entityID'),
179
                'id': pyff.index.hash_id(e, 'sha1')}
180
               for e in pyff.index.EntitySet(filter(lambda ent: _match(query, ent), self.lookup(mexpr)))]
181
182
        res.sort(key=lambda i: i['label'])
183
184
        log.debug(res)
185
186
        if page is not None:
187
            total = len(res)
188
            begin = (page - 1) * page_limit
189
            end = begin + page_limit
190
            more = (end < total)
191
            return res[begin:end], more, total
192
        else:
193
            return res
194
195
    def sane(self):
196
        """A very basic test for sanity. An empty metadata set is probably not a sane output of any process.
197
198
:return: True iff there is at least one EntityDescriptor in the active set.
199
        """
200
        return len(self.md) > 0
201
202
    def extensions(self, e):
203
        """Return a list of the Extensions elements in the EntityDescriptor
204
205
:param e: an EntityDescriptor
206
:return: a list
207
        """
208
        ext = e.find(".//{%s}Extensions" % NS['md'])
209
        if ext is None:
210
            ext = etree.Element("{%s}Extensions" % NS['md'])
211
            e.insert(0, ext)
212
        return ext
213
214
    def annotate(self, e, category, title, message, source=None):
215
        """Add an ATOM annotation to an EntityDescriptor or an EntitiesDescriptor. This is a simple way to
216
        add non-normative text annotations to metadata, eg for the purpuse of generating reports.
217
218
:param e: An EntityDescriptor or an EntitiesDescriptor element
219
:param category: The ATOM category
220
:param title: The ATOM title
221
:param message: The ATOM content
222
:param source: An optional source URL. It is added as a <link> element with @rel='saml-metadata-source'
223
        """
224
        if e.tag != "{%s}EntityDescriptor" % NS['md'] and e.tag != "{%s}EntitiesDescriptor" % NS['md']:
225
            raise MetadataException("I can only annotate EntityDescriptor or EntitiesDescriptor elements")
226
        subject = e.get('Name', e.get('entityID', None))
227
        atom = ElementMaker(nsmap={'atom': 'http://www.w3.org/2005/Atom'}, namespace='http://www.w3.org/2005/Atom')
228
        args = [atom.published("%s" % datetime.now().isoformat()),
229
                atom.link(href=subject, rel="saml-metadata-subject")]
230
        if source is not None:
231
            args.append(atom.link(href=source, rel="saml-metadata-source"))
232
        args.extend([atom.title(title),
233
                     atom.category(term=category),
234
                     atom.content(message, type="text/plain")])
235
        self.extensions(e).append(atom.entry(*args))
236
237
    def _entity_attributes(self, e):
238
        ext = self.extensions(e)
239
        #log.debug(ext)
240
        ea = ext.find(".//{%s}EntityAttributes" % NS['mdattr'])
241
        if ea is None:
242
            ea = etree.Element("{%s}EntityAttributes" % NS['mdattr'])
243
            ext.append(ea)
244
        return ea
245
246
    def _eattribute(self, e, attr, nf):
247
        ea = self._entity_attributes(e)
248
        #log.debug(ea)
249
        a = ea.xpath(".//saml:Attribute[@NameFormat='%s' and @Name='%s']" % (nf, attr), namespaces=NS)
250
        if a is None or len(a) == 0:
251
            a = etree.Element("{%s}Attribute" % NS['saml'])
252
            a.set('NameFormat', nf)
253
            a.set('Name', attr)
254
            ea.append(a)
255
        else:
256
            a = a[0]
257
            #log.debug(etree.tostring(self.extensions(e)))
258
        return a
259
260
    def set_entity_attributes(self, e, d, nf=NF_URI):
261
262
        """Set an entity attribute on an EntityDescriptor
263
264
:param e: The EntityDescriptor element
265
:param d: A dict of attribute-value pairs that should be added as entity attributes
266
:param nf: The nameFormat (by default "urn:oasis:names:tc:SAML:2.0:attrname-format:uri") to use.
267
:raise: MetadataException unless e is an EntityDescriptor element
268
        """
269
        if e.tag != "{%s}EntityDescriptor" % NS['md']:
270
            raise MetadataException("I can only add EntityAttribute(s) to EntityDescriptor elements")
271
272
        #log.debug("set %s" % d)
273
        for attr, value in d.iteritems():
274
            #log.debug("set %s to %s" % (attr,value))
275
            a = self._eattribute(e, attr, nf)
276
            #log.debug(etree.tostring(a))
277
            velt = etree.Element("{%s}AttributeValue" % NS['saml'])
278
            velt.text = value
279
            a.append(velt)
280
            #log.debug(etree.tostring(a))
281
282
    def fetch_metadata(self, resources, qsize=5, timeout=120, stats=None, xrd=None):
283
        """Fetch a series of metadata URLs and optionally verify signatures.
284
285
:param resources: A list of triples (url,cert-or-fingerprint,id)
286
:param qsize: The number of parallell downloads to run
287
:param timeout: The number of seconds to wait (120 by default) for each download
288
:param stats: A dictionary used for storing statistics. Useful for cherrypy cpstats
289
290
The list of triples is processed by first downloading the URL. If a cert-or-fingerprint
291
is supplied it is used to validate the signature on the received XML. Two forms of XML
292
is supported: SAML Metadata and XRD.
293
294
SAML metadata is (if valid and contains a valid signature) stored under the 'id'
295
identifier (which defaults to the URL unless provided in the triple.
296
297
XRD elements are processed thus: for all <Link> elements that contain a ds;KeyInfo
298
elements with a X509Certificate and where the <Rel> element contains the string
299
'urn:oasis:names:tc:SAML:2.0:metadata', the corresponding <URL> element is download
300
and verified.
301
        """
302
        if stats is None:
303
            stats = {}
304
305
        def producer(q, resources, cache=self.metadata_cache_enabled):
306
            print resources
307
            for url, verify, id, tries in resources:
308
                log.debug("starting fetcher for '%s'" % url)
309
                thread = URLFetch(url, verify, id, enable_cache=cache, tries=tries)
310
                thread.start()
311
                q.put(thread, True)
312
313
        def consumer(q, njobs, stats, next_jobs=None, resolved=None):
314
            if next_jobs is None:
315
                next_jobs = []
316
            if resolved is None:
317
                resolved = set()
318
            nfinished = 0
319
320
            while nfinished < njobs:
321
                info = None
322
                try:
323
                    log.debug("waiting for next thread to finish...")
324
                    thread = q.get(True)
325
                    thread.join(timeout)
326
327
                    if thread.isAlive():
328
                        raise MetadataException("thread timeout fetching '%s'" % thread.url)
329
330
                    info = {
331
                        'Time Spent': thread.time()
332
                    }
333
334
                    if thread.ex is not None:
335
                        raise thread.ex
336
                    else:
337
                        if thread.result is not None:
338
                            info['Bytes'] = len(thread.result)
339
                        else:
340
                            raise MetadataException("empty response fetching '%s'" % thread.url)
341
                        info['Cached'] = thread.cached
342
                        info['Date'] = str(thread.date)
343
                        info['Last-Modified'] = str(thread.last_modified)
344
                        info['Tries'] = thread.tries
345
346
                    xml = thread.result.strip()
347
348
                    if thread.status is not None:
349
                        info['Status'] = thread.status
350
351
                    t = self.parse_metadata(StringIO(xml), key=thread.verify, base_url=thread.url)
352
                    if t is None:
353
                        self.fire(type=EVENT_IMPORT_FAIL, url=thread.url)
354
                        raise MetadataException("no valid metadata found at '%s'" % thread.url)
355
356
                    relt = root(t)
357
                    if relt.tag in ('{%s}XRD' % NS['xrd'], '{%s}XRDS' % NS['xrd']):
358
                        log.debug("%s looks like an xrd document" % thread.url)
359
                        for xrd in t.xpath("//xrd:XRD", namespaces=NS):
360
                            log.debug("xrd: %s" % xrd)
361
                            for link in xrd.findall(".//{%s}Link[@rel='%s']" % (NS['xrd'], NS['md'])):
362
                                url = link.get("href")
363
                                certs = xmlsec.CertDict(link)
364
                                fingerprints = certs.keys()
365
                                fp = None
366
                                if len(fingerprints) > 0:
367
                                    fp = fingerprints[0]
368
                                log.debug("fingerprint: %s" % fp)
369
                                next_jobs.append((url, fp, url, 0))
370
371
                    elif relt.tag in ('{%s}EntityDescriptor' % NS['md'], '{%s}EntitiesDescriptor' % NS['md']):
372
                        cacheDuration = self.default_cache_duration
373
                        if self.respect_cache_duration:
374
                            cacheDuration = root(t).get('cacheDuration', self.default_cache_duration)
375
                        offset = duration2timedelta(cacheDuration)
376
377
                        if thread.cached:
378
                            if thread.last_modified + offset < datetime.now() - duration2timedelta(self.min_cache_ttl):
379
                                raise MetadataException("cached metadata expired")
380
                            else:
381
                                log.debug("found cached metadata for '%s' (last-modified: %s)" % (thread.url, thread.last_modified))
382
                                ne = self.import_metadata(t, url=thread.id)
383
                                info['Number of Entities'] = ne
384
                        else:
385
                            log.debug("got fresh metadata for '%s' (date: %s)" % (thread.url, thread.date))
386
                            ne = self.import_metadata(t, url=thread.id)
387
                            info['Number of Entities'] = ne
388
                        info['Cache Expiration Time'] = str(thread.last_modified + offset)
389
                        certs = xmlsec.CertDict(relt)
390
                        cert = None
391
                        if certs.values():
392
                            cert = certs.values()[0].strip()
393
                        resolved.add((thread.url, cert))
394
                    else:
395
                        raise MetadataException("unknown metadata type for '%s' (%s)" % (thread.url, relt.tag))
396
                except Exception, ex:
397
                    #traceback.print_exc(ex)
398
                    log.warn("problem fetching '%s' (will retry): %s" % (thread.url, ex))
399
                    if info is not None:
400
                        info['Exception'] = ex
401
                    if thread.tries < self.retry_limit:
402
                        next_jobs.append((thread.url, thread.verify, thread.id, thread.tries + 1))
403
                    else:
404
                        #traceback.print_exc(ex)
405
                        log.error("retry limit exceeded for %s (last error was: %s)" % (thread.url, ex))
406
                finally:
407
                    nfinished += 1
408
                    if info is not None:
409
                        stats[thread.url] = info
410
411
        resources = [(url, verify, rid, 0) for url, verify, rid in resources]
412
        resolved = set()
413
        cache = True
414
        while len(resources) > 0:
415
            log.debug("fetching %d resources (%s)" % (len(resources), repr(resources)))
416
            next_jobs = []
417
            q = Queue(qsize)
418
            prod_thread = threading.Thread(target=producer, args=(q, resources, cache))
419
            cons_thread = threading.Thread(target=consumer, args=(q, len(resources), stats, next_jobs, resolved))
420
            prod_thread.start()
421
            cons_thread.start()
422
            prod_thread.join()
423
            cons_thread.join()
424
            log.debug("after fetch: %d jobs to retry" % len(next_jobs))
425
            if len(next_jobs) > 0:
426
                resources = next_jobs
427
                cache = False
428
            else:
429
                resources = []
430
431
        if xrd is not None:
432
            with open(xrd, "w") as fd:
433
                fd.write(template("trust.xrd").render(links=resolved))
434
435
    def parse_metadata(self, fn, key=None, base_url=None, fail_on_error=False, filter_invalid=True):
436
        """Parse a piece of XML and split it up into EntityDescriptor elements. Each such element
437
        is stored in the MDRepository instance.
438
439
:param fn: a file-like object containing SAML metadata
440
:param key: a certificate (file) or a SHA1 fingerprint to use for signature verification
441
:param base_url: use this base url to resolve relative URLs for XInclude processing
442
        """
443
        try:
444
            t = etree.parse(fn, base_url=base_url, parser=etree.XMLParser(resolve_entities=False))
445
            t.xinclude()
446
            if filter_invalid:
447
                for e in t.findall('{%s}EntityDescriptor' % NS['md']):
448
                    if not schema().validate(e):
449
                        error = _e(schema().error_log, m=base_url)
450
                        log.debug("removing '%s': schema validation failed (%s)" % (e.get('entityID'), error))
451
                        e.getparent().remove(e)
452
                        self.fire(type=EVENT_DROP_ENTITY, url=base_url, entityID=e.get('entityID'), error=error)
453
            else:
454
            # Having removed the invalid entities this should now never happen...
455
                schema().assertValid(t)
456
        except DocumentInvalid, ex:
457
            traceback.print_exc()
458
            log.debug("schema validation failed on '%s': %s" % (base_url, _e(ex.error_log, m=base_url)))
459
            raise MetadataException("schema validation failed")
460
        except Exception, ex:
461
            #log.debug(_e(schema().error_log))
462
            log.error(ex)
463
            if fail_on_error:
464
                raise ex
465
            return None
466
        if key is not None:
467
            try:
468
                log.debug("verifying signature using %s" % key)
469
                refs = xmlsec.verified(t, key)
470
                if len(refs) != 1:
471
                    raise MetadataException("XML metadata contains %d signatures - exactly 1 is required" % len(refs))
472
                t = refs[0] # prevent wrapping attacks
473
            except Exception, ex:
474
                tb = traceback.format_exc()
475
                print tb
476
                log.error(ex)
477
                return None
478
479
        return t
480
481
    def _index_entity(self, e):
482
        #log.debug("adding %s to index" % e.get('entityID'))
483
        if 'ID' in e.attrib:
484
            del e.attrib['ID']
485
        self.index.add(e)
486
487
    def import_metadata(self, t, url=None):
488
        """
489
:param t: An EntitiesDescriptor element
490
:param url: An optional URL to used to identify the EntitiesDescriptor in the MDRepository
491
492
Import an EntitiesDescriptor element using the @Name attribute (or the supplied url parameter). All
493
EntityDescriptor elements are stripped of any @ID attribute and are then indexed before the collection
494
is stored in the MDRepository object.
495
        """
496
        if url is None:
497
            top = t.xpath("//md:EntitiesDescriptor", namespaces=NS)
498
            if top is not None and len(top) == 1:
499
                url = top[0].get("Name", None)
500
        if url is None:
501
            raise MetadataException("No collection name found")
502
        self[url] = t
503
        # we always clean incoming ID
504
        # add to the index
505
        ne = 0
506
507
        if t is not None:
508
            if root(t).tag == "{%s}EntityDescriptor" % NS['md']:
509
                self._index_entity(root(t))
510
                ne += 1
511
            else:
512
                for e in t.findall(".//{%s}EntityDescriptor" % NS['md']):
513
                    self._index_entity(e)
514
                    ne += 1
515
516
        self.fire(type=EVENT_IMPORTED_METADATA, size=ne, url=url)
517
        return ne
518
519
    def entities(self, t=None):
520
        """
521
:param t: An EntitiesDescriptor element
522
523
Returns the list of contained EntityDescriptor elements
524
        """
525
        if t is None:
526
            return []
527
        elif root(t).tag == "{%s}EntityDescriptor" % NS['md']:
528
            return [root(t)]
529
        else:
530
            return t.findall(".//{%s}EntityDescriptor" % NS['md'])
531
532
    def load_dir(self, directory, ext=".xml", url=None):
533
        """
534
:param directory: A directory to walk.
535
:param ext: Include files with this extension (default .xml)
536
537
Traverse a directory tree looking for metadata. Files ending in the specified extension are included. Directories
538
starting with '.' are excluded.
539
        """
540
        if url is None:
541
            url = directory
542
        log.debug("walking %s" % directory)
543
        if not directory in self.md:
544
            entities = []
545
            for top, dirs, files in os.walk(directory):
546
                for dn in dirs:
547
                    if dn.startswith("."):
548
                        dirs.remove(dn)
549
                for nm in files:
550
                    log.debug("found file %s" % nm)
551
                    if nm.endswith(ext):
552
                        fn = os.path.join(top, nm)
553
                        try:
554
                            t = self.parse_metadata(fn, fail_on_error=True)
555
                            entities.extend(self.entities(t))  # local metadata is assumed to be ok
556
                        except Exception, ex:
557
                            log.error(ex)
558
            self.import_metadata(self.entity_set(entities, url))
559
        return self.md[url]
560
561
    def _lookup(self, member, xp=None):
562
        """
563
:param member: Either an entity, URL or a filter expression.
564
565
Find a (set of) EntityDescriptor element(s) based on the specified 'member' expression.
566
        """
567
568
        def _hash(hn, strv):
569
            if hn == 'null':
570
                return strv
571
            if not hasattr(hashlib, hn):
572
                raise MetadataException("Unknown digest mechanism: '%s'" % hn)
573
            hash_m = getattr(hashlib, hn)
574
            h = hash_m()
575
            h.update(strv)
576
            return h.hexdigest()
577
578
        if xp is None:
579
            xp = "//md:EntityDescriptor"
580
        if member is None:
581
            lst = []
582
            for m in self.keys():
583
                log.debug("resolving %s filtered by %s" % (m, xp))
584
                lst.extend(self._lookup(m, xp))
585
            return lst
586
        elif hasattr(member, 'xpath'):
587
            log.debug("xpath filter %s <- %s" % (xp, member))
588
            return member.xpath(xp, namespaces=NS)
589
        elif type(member) is str or type(member) is unicode:
590
            log.debug("string lookup %s" % member)
591
592
            if '+' in member:
593
                member = member.strip('+')
594
                log.debug("lookup intersection of '%s'" % ' and '.join(member.split('+')))
595
                hits = None
596
                for f in member.split("+"):
597
                    f = f.strip()
598
                    if hits is None:
599
                        hits = set(self._lookup(f, xp))
600
                    else:
601
                        other = self._lookup(f, xp)
602
                        hits.intersection_update(other)
603
604
                    if not hits:
605
                        log.debug("empty intersection")
606
                        return []
607
608
                if hits is not None and hits:
609
                    return list(hits)
610
                else:
611
                    return []
612
613
            if "!" in member:
614
                (src, xp) = member.split("!")
615
                if len(src) == 0:
616
                    src = None
617
                    log.debug("filtering using %s" % xp)
618
                else:
619
                    log.debug("selecting %s filtered by %s" % (src, xp))
620
                return self._lookup(src, xp)
621
622
            m = re.match("^\{(.+)\}(.+)$", member)
623
            if m is not None:
624
                log.debug("attribute-value match: %s='%s'" % (m.group(1), m.group(2)))
625
                return self.index.get(m.group(1), m.group(2).rstrip("/"))
626
627
            m = re.match("^(.+)=(.+)$", member)
628
            if m is not None:
629
                log.debug("attribute-value match: %s='%s'" % (m.group(1), m.group(2)))
630
                return self.index.get(m.group(1), m.group(2).rstrip("/"))
631
632
            log.debug("basic lookup %s" % member)
633
            for idx in DIGESTS:
634
                e = self.index.get(idx, member)
635
                if e:
636
                    log.debug("found %s in %s index" % (e, idx))
637
                    return e
638
639
            e = self.get(member, None)
640
            if e is not None:
641
                return self._lookup(e, xp)
642
643
            e = self.get("%s.xml" % member, None)  # hackish but helps save people from their misstakes
644
            if e is not None:
645
                if not "://" in member:  # not an absolute URL
646
                    log.warn("Found %s.xml as an alias - AVOID extensions in 'select as' statements" % member)
647
                return self._lookup(e, xp)
648
649
            if "://" in member:  # looks like a URL and wasn't an entity or collection - recurse away!
650
                log.debug("recursively fetching members from '%s'" % member)
651
                # note that this supports remote lists which may be more rope than is healthy
652
                return [self._lookup(line, xp) for line in urllib.urlopen(member).iterlines()]
653
654
            return []
655
        elif hasattr(member, '__iter__') and type(member) is not dict:
656
            if not len(member):
657
                member = self.keys()
658
            return [self._lookup(m, xp) for m in member]
659
        else:
660
            raise MetadataException("What about %s ??" % member)
661
662
    def lookup(self, member, xp=None):
663
        """
664
Lookup elements in the working metadata repository
665
666
:param member: A selector (cf below)
667
:type member: basestring
668
:param xp: An optional xpath filter
669
:type xp: basestring
670
:return: An interable of EntityDescriptor elements
671
:rtype: etree.Element
672
673
**Selector Syntax**
674
675
    - selector "+" selector
676
    - [sourceID] "!" xpath
677
    - attribute=value or {attribute}value
678
    - entityID
679
    - sourceID (@Name)
680
    - <URL containing one selector per line>
681
682
The first form results in the intersection of the results of doing a lookup on the selectors. The second form
683
results in the EntityDescriptor elements from the source (defaults to all EntityDescriptors) that match the
684
xpath expression. The attribute-value forms resuls in the EntityDescriptors that contain the specified entity
685
attribute pair. If non of these forms apply, the lookup is done using either source ID (normally @Name from
686
the EntitiesDescriptor) or the entityID of single EntityDescriptors. If member is a URI but isn't part of
687
the metadata repository then it is fetched an treated as a list of (one per line) of selectors. If all else
688
fails an empty list is returned.
689
690
        """
691
        l = self._lookup(member, xp)
692
        return list(set(filter(lambda x: x is not None, l)))
693
694
    def entity_set(self, entities, name, cacheDuration=None, validUntil=None, validate=True):
695
        """
696
:param entities: a set of entities specifiers (lookup is used to find entities from this set)
697
:param name: the @Name attribute
698
:param cacheDuration: an XML timedelta expression, eg PT1H for 1hr
699
:param validUntil: a relative time eg 2w 4d 1h for 2 weeks, 4 days and 1hour from now.
700
701
Produce an EntityDescriptors set from a list of entities. Optional Name, cacheDuration and validUntil are affixed.
702
        """
703
        attrs = dict(Name=name, nsmap=NS)
704
        if cacheDuration is not None:
705
            attrs['cacheDuration'] = cacheDuration
706
        if validUntil is not None:
707
            attrs['validUntil'] = validUntil
708
        t = etree.Element("{%s}EntitiesDescriptor" % NS['md'], **attrs)
709
        nent = 0
710
        seen = {}  # TODO make better de-duplication
711
        for member in entities:
712
            for ent in self.lookup(member):
713
                entityID = ent.get('entityID', None)
714
                if (ent is not None) and (entityID is not None) and (not seen.get(entityID, False)):
715
                    t.append(deepcopy(ent))
716
                    seen[entityID] = True
717
                    nent += 1
718
719
        log.debug("selecting %d entities from %d entity set(s) before validation" % (nent, len(entities)))
720
721
        if not nent:
722
            return None
723
724
        if validate:
725
            try:
726
                schema().assertValid(t)
727
            except DocumentInvalid, ex:
728
                log.debug(_e(ex.error_log))
729
                raise MetadataException("XML schema validation failed: %s" % name)
730
        return t
731
732
    def error_set(self, url, title, ex):
733
        """
734
Creates an "error" EntitiesDescriptor - empty but for an annotation about the error that occured
735
        """
736
        t = etree.Element("{%s}EntitiesDescriptor" % NS['md'], Name=url, nsmap=NS)
737
        self.annotate(t, "error", title, ex, source=url)
738
739
    def keys(self):
740
        return self.md.keys()
741
742
    def __getitem__(self, item):
743
        return self.md[item]
744
745
    def __setitem__(self, key, value):
746
        self.md[key] = value
747
748
    def __delitem__(self, key):
749
        del self.md[key]
750
751
    def summary(self, uri):
752
        """
753
:param uri: An EntitiesDescriptor URI present in the MDRepository
754
:return: an information dict
755
756
Returns a dict object with basic information about the EntitiesDescriptor
757
        """
758
        seen = dict()
759
        info = dict()
760
        t = root(self[uri])
761
        info['Name'] = t.get('Name', uri)
762
        info['cacheDuration'] = t.get('cacheDuration', None)
763
        info['validUntil'] = t.get('validUntil', None)
764
        info['Duplicates'] = []
765
        info['Size'] = 0
766
        for e in self.entities(self[uri]):
767
            entityID = e.get('entityID')
768
            if seen.get(entityID, False):
769
                info['Duplicates'].append(entityID)
770
            else:
771
                seen[entityID] = True
772
            info['Size'] += 1
773
774
        return info
775
776
    def merge(self, t, nt, strategy=pyff.merge_strategies.replace_existing, strategy_name=None):
777
        """
778
:param t: The EntitiesDescriptor element to merge *into*
779
:param nt:  The EntitiesDescriptor element to merge *from*
780
:param strategy: A callable implementing the merge strategy pattern
781
:param strategy_name: The name of a strategy to import. Overrides the callable if present.
782
:return:
783
784
Two EntitiesDescriptor elements are merged - the second into the first. For each element
785
in the second collection that is present (using the @entityID attribute as key) in the
786
first the strategy callable is called with the old and new EntityDescriptor elements
787
as parameters. The strategy callable thus must implement the following pattern:
788
789
:param old_e: The EntityDescriptor from t
790
:param e: The EntityDescriptor from nt
791
:return: A merged EntityDescriptor element
792
793
Before each call to strategy old_e is removed from the MDRepository index and after
794
merge the resultant EntityDescriptor is added to the index before it is used to
795
replace old_e in t.
796
        """
797
        if strategy_name is not None:
798
            if not '.' in strategy_name:
799
                strategy_name = "pyff.merge_strategies.%s" % strategy_name
800
            (mn, sep, fn) = strategy_name.rpartition('.')
801
            #log.debug("import %s from %s" % (fn,mn))
802
            module = None
803
            if '.' in mn:
804
                (pn, sep, modn) = mn.rpartition('.')
805
                module = getattr(__import__(pn, globals(), locals(), [modn], -1), modn)
806
            else:
807
                module = __import__(mn, globals(), locals(), [], -1)
808
            strategy = getattr(module, fn)  # we might aswell let this fail early if the strategy is wrongly named
809
810
        if strategy is None:
811
            raise MetadataException("No merge strategy - refusing to merge")
812
813
        for e in nt.findall(".//{%s}EntityDescriptor" % NS['md']):
814
            entityID = e.get("entityID")
815
            # we assume ddup:ed tree
816
            old_e = t.find(".//{%s}EntityDescriptor[@entityID='%s']" % (NS['md'], entityID))
817
            #log.debug("merging %s into %s" % (e,old_e))
818
            # update index!
819
820
            try:
821
                self.index.remove(old_e)
822
                #log.debug("removed old entity from index")
823
                strategy(old_e, e)
824
                new_e = t.find(".//{%s}EntityDescriptor[@entityID='%s']" % (NS['md'], entityID))
825
                if new_e:
826
                    self.index.add(new_e)  # we don't know which strategy was employed
827
            except Exception, ex:
828
                traceback.print_exc()
829
                self.index.add(old_e)
830
                raise ex
831