Passed
Push — master ( 6f99a2...0038b7 )
by Daniel
09:15 queued 06:11
created

amd.io._map()   B

Complexity

Conditions 7

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 27
rs 8
c 0
b 0
f 0
cc 7
nop 3
1
"""Tools for reading crystals from files, or from the CSD with ``csd-python-api``. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
2
The readers return :class:`.periodicset.PeriodicSet` objects representing the 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
3
crystal which can be passed to :func:`.calculate.AMD` and :func:`.calculate.PDD` 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
4
to get their invariants.
5
"""
6
7
import os
8
import functools
9
import warnings
10
from typing import Callable, Iterable, Sequence, Tuple
11
12
import numpy as np
13
import numba
14
import ase.io.cif
15
import ase.data
16
from ase.spacegroup.spacegroup import parse_sitesym
17
18
from .utils import cellpar_to_cell
19
from .periodicset import PeriodicSet
20
21
try:
22
    import ccdc.io
23
    import ccdc.search
24
    _CSD_PYTHON_API_ENABLED = True
25
except (ImportError, RuntimeError) as _:
26
    _CSD_PYTHON_API_ENABLED = False
27
28
def _custom_warning(message, category, filename, lineno, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument filename seems to be unused.
Loading history...
Unused Code introduced by
The argument lineno seems to be unused.
Loading history...
Unused Code introduced by
The argument args seems to be unused.
Loading history...
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
29
    return f'{category.__name__}: {message}\n'
30
31
warnings.formatwarning = _custom_warning
32
33
_EQUIV_SITE_TOL = 1e-3
34
_DISORDER_OPTIONS = {'skip', 'ordered_sites', 'all_sites'}
35
_ATOM_SITE_FRACT_TAGS = [
36
    '_atom_site_fract_x',
37
    '_atom_site_fract_y',
38
    '_atom_site_fract_z',
39
]
40
_ATOM_SITE_CARTN_TAGS = [
41
    '_atom_site_cartn_x',
42
    '_atom_site_cartn_y',
43
    '_atom_site_cartn_z',
44
]
45
_SYMOP_TAGS = [
46
    '_space_group_symop_operation_xyz',
47
    '_space_group_symop.operation_xyz',
48
    '_symmetry_equiv_pos_as_xyz',
49
]
50
51
52
class _Reader:
53
54
    def __init__(
55
            self,
56
            remove_hydrogens=False,
57
            disorder='skip',
58
            heaviest_component=False,
59
            show_warnings=True,
60
    ):
61
62
        if disorder not in _DISORDER_OPTIONS:
63
            raise ValueError(f'disorder parameter {disorder} must be one of {_DISORDER_OPTIONS}')
64
65
        self.remove_hydrogens = remove_hydrogens
66
        self.disorder = disorder
67
        self.heaviest_component = heaviest_component
68
        self.show_warnings = show_warnings
69
        self._generator = []
70
71
    def __iter__(self):
72
        yield from self._generator
73
74
    def read_one(self):
75
        """Read the first, and usually the only, item."""
76
        try:
77
            return next(iter(self._generator))
78
        except StopIteration:
79
            return None
80
81
82
class CifReader(_Reader):
83
    """Read all structures in a .cif file or all files in a folder 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
84
    with ase or csd-python-api (if installed), yielding 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
85
    :class:`.periodicset.PeriodicSet` s.
86
87
    Parameters
88
    ----------
89
    path : str
90
        Path to a .cif file or directory. (Other files are accepted when using
91
        ``reader='ccdc'``, if csd-python-api is installed.)
92
    reader : str, optional
93
        The backend package used for parsing. Default is :code:`ase`,
94
        to use csd-python-api change to :code:`ccdc`. The ccdc reader should 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
95
        be able to read any format accepted by :class:`ccdc.io.EntryReader`,
96
        though only cifs have been tested.
97
    remove_hydrogens : bool, optional
98
        Remove Hydrogens from the crystal.
99
    disorder : str, optional
100
        Controls how disordered structures are handled. Default is ``skip`` which skips any crystal 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
101
        with disorder, since disorder conflicts with the periodic set model. To read disordered 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
102
        structures anyway, choose either :code:`ordered_sites` to remove sites with disorder or 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
103
        :code:`all_sites` include all sites regardless.
104
    heaviest_component : bool, optional
105
        csd-python-api only. Removes all but the heaviest molecule in the asymmeric unit,
106
        intended for removing solvents. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
107
    show_warnings : bool, optional
108
        Controls whether warnings that arise during reading are printed.
109
110
    Yields
111
    ------
112
    :class:`.periodicset.PeriodicSet`
113
        Represents the crystal as a periodic set, consisting of a finite set of points (motif)
114
        and lattice (unit cell). Contains other useful data, e.g. the crystal's name and
115
        information about the asymmetric unit for calculation.
116
117
    Examples
118
    --------
119
    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
120
        ::
121
122
            # Put all crystals in a .CIF in a list
123
            structures = list(amd.CifReader('mycif.cif'))
124
125
            # Can also accept path to a directory, reading all files inside
126
            structures = list(amd.CifReader('path/to/folder'))
127
128
            # Reads just one if the .CIF has just one crystal
129
            periodic_set = amd.CifReader('mycif.cif').read_one()
130
131
            # List of AMDs (k=100) of crystals in a .CIF
132
            amds = [amd.AMD(periodic_set, 100) for periodic_set in amd.CifReader('mycif.cif')]
133
    """
134
135
    def __init__(
0 ignored issues
show
best-practice introduced by
Too many arguments (7/5)
Loading history...
136
            self,
137
            path,
138
            reader='ase',
139
            remove_hydrogens=False,
140
            disorder='skip',
141
            heaviest_component=False,
142
            show_warnings=True,
143
    ):
144
145
        super().__init__(
146
            remove_hydrogens=remove_hydrogens,
147
            disorder=disorder,
148
            heaviest_component=heaviest_component,
149
            show_warnings=show_warnings,
150
        )
151
152
        if reader in ('ase', 'pycodcif'):
153
            if heaviest_component:
154
                raise NotImplementedError('Parameter heaviest_component only implimented for reader="ccdc".')
155
156
            extensions = {'cif'}
157
            file_parser = functools.partial(ase.io.cif.parse_cif, reader=reader)
158
            converter = functools.partial(cifblock_to_periodicset,
159
                                          remove_hydrogens=remove_hydrogens,
160
                                          disorder=disorder)
161
162
        elif reader == 'ccdc':
163
            if not _CSD_PYTHON_API_ENABLED:
164
                raise ImportError("Failed to import csd-python-api; check it is installed and licensed.")
165
            extensions = ccdc.io.EntryReader.known_suffixes
166
            file_parser = ccdc.io.EntryReader
167
            converter = functools.partial(entry_to_periodicset,
168
                                          remove_hydrogens=remove_hydrogens,
169
                                          disorder=disorder,
170
                                          heaviest_component=heaviest_component)
171
172
        else:
173
            raise ValueError(f'Invalid reader {reader}.')
174
175
        if os.path.isfile(path):
176
            generator = file_parser(path)
177
        elif os.path.isdir(path):
178
            generator = self._generate_from_dir(path, file_parser, extensions)
179
        else:
180
            raise FileNotFoundError(f'No such file or directory: {path}')
181
182
        self._generator = _map(converter, generator, self.show_warnings)
183
184
    def _generate_from_dir(self, path, file_parser, extensions):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
185
        for file in os.listdir(path):
186
            suff = os.path.splitext(file)[1][1:]
187
            if suff.lower() in extensions:
188
                yield from file_parser(os.path.join(path, file))
189
190
191
class CSDReader(_Reader):
192
    """Read structures from the CSD with csd-python-api, yielding 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
193
    :class:`.periodicset.PeriodicSet` s.
194
195
    Parameters
196
    ----------
197
    refcodes : List[str], optional
198
        List of CSD refcodes to read. If None or 'CSD', iterates over the whole CSD.
199
    families : bool, optional
200
        Read all entries whose refcode starts with the given strings, or 'families' 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
201
        (e.g. giving 'DEBXIT' reads all entries starting with DEBXIT).
202
    remove_hydrogens : bool, optional
203
        Remove hydrogens from the crystal.
204
    disorder : str, optional
205
        Controls how disordered structures are handled. Default is ``skip`` which skips any crystal 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
206
        with disorder, since disorder conflicts with the periodic set model. To read disordered 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
207
        structures anyway, choose either :code:`ordered_sites` to remove sites with disorder or 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
208
        :code:`all_sites` include all sites regardless.
209
    heaviest_component : bool, optional
210
        csd-python-api only. Removes all but the heaviest molecule in the asymmeric unit,
211
        intended for removing solvents. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
212
    show_warnings : bool, optional
213
        Controls whether warnings that arise during reading are printed.
214
215
    Yields
216
    ------
217
    :class:`.periodicset.PeriodicSet`
218
        Represents the crystal as a periodic set, consisting of a finite set of points (motif)
219
        and lattice (unit cell). Contains other useful data, e.g. the crystal's name and
220
        information about the asymmetric unit for calculation.
221
222
    Examples
223
    --------
224
    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
225
        ::
226
227
            # Put these entries in a list
228
            refcodes = ['DEBXIT01', 'DEBXIT05', 'HXACAN01']
229
            structures = list(amd.CSDReader(refcodes))
230
231
            # Read refcode families (any whose refcode starts with strings in the list)
232
            refcode_families = ['ACSALA', 'HXACAN']
233
            structures = list(amd.CSDReader(refcode_families, families=True))
234
235
            # Get AMDs (k=100) for crystals in these families
236
            refcodes = ['ACSALA', 'HXACAN']
237
            amds = []
238
            for periodic_set in amd.CSDReader(refcodes, families=True):
239
                amds.append(amd.AMD(periodic_set, 100))
240
241
            # Giving the reader nothing reads from the whole CSD.
242
            reader = amd.CSDReader()
243
            
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
244
            # looping over this generic reader will yield all CSD entries
245
            for periodic_set in reader:
246
                ...
247
            
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
248
            # or, read structures by refcode on demand
249
            debxit01 = reader.entry('DEBXIT01')
250
    """
251
252
    def __init__(
0 ignored issues
show
best-practice introduced by
Too many arguments (7/5)
Loading history...
253
            self,
254
            refcodes=None,
255
            families=False,
256
            remove_hydrogens=False,
257
            disorder='skip',
258
            heaviest_component=False,
259
            show_warnings=True,
260
    ):
261
262
        super().__init__(
263
            remove_hydrogens=remove_hydrogens,
264
            disorder=disorder,
265
            heaviest_component=heaviest_component,
266
            show_warnings=show_warnings,
267
        )
268
269
        if not _CSD_PYTHON_API_ENABLED:
270
            raise ImportError('Failed to import csd-python-api; check it is installed and licensed.')
271
272
        if isinstance(refcodes, str) and refcodes.lower() == 'csd':
273
            refcodes = None
274
275
        if refcodes is None:
276
            families = False
277
        else:
278
            refcodes = [refcodes] if isinstance(refcodes, str) else list(refcodes)
279
280
        if families:
281
            refcodes = _refcodes_from_families(refcodes)
282
283
        self._entry_reader = ccdc.io.EntryReader('CSD')
284
        converter = functools.partial(entry_to_periodicset,
285
                                      remove_hydrogens=self.remove_hydrogens,
286
                                      disorder=self.disorder,
287
                                      heaviest_component=self.heaviest_component)
288
289
        generator = self._ccdc_generator(refcodes)
290
        self._generator = _map(converter, generator, self.show_warnings)
291
292
    def entry(self, refcode: str, **kwargs) -> PeriodicSet:
293
        """Read a crystal given a CSD refcode, returning a :class:`.periodicset.PeriodicSet`.
294
        If given kwargs, overrides the kwargs given to the Reader."""
295
296
        try:
297
            entry = self._entry_reader.entry(refcode)
298
        except RuntimeError:
299
            warnings.warn(f'{refcode} not found in database')
300
301
        kwargs_ = {
302
            'remove_hydrogens': self.remove_hydrogens,
303
            'disorder': self.disorder,
304
            'heaviest_component': self.heaviest_component
305
        }
306
        kwargs_.update(kwargs)
307
        converter = functools.partial(entry_to_periodicset, **kwargs_)
308
309
        if 'show_warnings' in kwargs:
0 ignored issues
show
Unused Code introduced by
Consider using dict.get for getting values from a dict if a key is present or a default if not
Loading history...
310
            show_warnings = kwargs['show_warnings']
311
        else:
312
            show_warnings = self.show_warnings
313
314
        try:
315
            periodic_set = next(iter(_map(converter, [entry], show_warnings)))
316
        except StopIteration:
317
            periodic_set = None
318
319
        return periodic_set
320
321
    def family(self, refcode_family: str, **kwargs):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
322
        
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
323
        kwargs_ = {
324
            'remove_hydrogens': self.remove_hydrogens,
325
            'disorder': self.disorder,
326
            'heaviest_component': self.heaviest_component
327
        }
328
        kwargs_.update(kwargs)
329
        converter = functools.partial(entry_to_periodicset, **kwargs_)
330
        refcodes = _refcodes_from_families([refcode_family])
331
        generator = self._ccdc_generator(refcodes)
332
333
        if 'show_warnings' in kwargs:
0 ignored issues
show
Unused Code introduced by
Consider using dict.get for getting values from a dict if a key is present or a default if not
Loading history...
334
            show_warnings = kwargs['show_warnings']
335
        else:
336
            show_warnings = self.show_warnings
337
338
        yield from _map(converter, generator, show_warnings)
339
340
    def _ccdc_generator(self, refcodes):
341
        """Generates ccdc Entries from CSD refcodes."""
342
343
        if refcodes is None:
344
            for entry in self._entry_reader:
345
                yield entry
346
        else:
347
            for refcode in refcodes:
348
                try:
349
                    entry = self._entry_reader.entry(refcode)
350
                    yield entry
351
                except RuntimeError:
352
                    warnings.warn(f'{refcode} not found in database')
353
354
355
class _ParseError(ValueError):
356
    """Raised when an item cannot be parsed into a periodic set."""
357
    pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
358
359
360
def entry_to_periodicset(
361
        entry,
362
        remove_hydrogens=False,
363
        disorder='skip',
364
        heaviest_component=False
365
) -> PeriodicSet:
366
    """:class:`ccdc.entry.Entry` --> :class:`amd.periodicset.PeriodicSet`.
367
    Entry is the type returned by :class:`ccdc.io.EntryReader`.
368
369
    Parameters
370
    ----------
371
    entry : :class:`ccdc.entry.Entry`
372
        A ccdc Entry object representing a database entry.
373
    remove_hydrogens : bool, optional
374
        Remove Hydrogens from the crystal.
375
    disorder : str, optional
376
        Controls how disordered structures are handled. Default is ``skip`` which skips any crystal 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
377
        with disorder, since disorder conflicts with the periodic set model. To read disordered 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
378
        structures anyway, choose either :code:`ordered_sites` to remove sites with disorder or 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
379
        :code:`all_sites` include all sites regardless.
380
    heaviest_component : bool, optional
381
        Removes all but the heaviest molecule in the asymmeric unit,
382
        intended for removing solvents. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
383
384
    Returns
385
    -------
386
    :class:`.periodicset.PeriodicSet`
387
        Represents the crystal as a periodic set, consisting of a finite set of points (motif)
388
        and lattice (unit cell). Contains other useful data, e.g. the crystal's name and
389
        information about the asymmetric unit for calculation.
390
391
    Raises
392
    ------
393
    _ParseError :
394
        Raised if the structure can/should not be parsed for the following reasons:
395
        1. entry.has_3d_structure is False,
396
        2. disorder == 'skip' and any of:
397
            (a) any disorder flag is True,
398
            (b) any atom has fractional occupancy,
399
            (c) any atom's label ends with '?',
400
        3. entry.crystal.molecule.all_atoms_have_sites is False,
401
        4. a.fractional_coordinates is None for any a in entry.crystal.disordered_molecule,
402
        5. motif is empty after removing H, disordered sites or solvents.
403
    """
404
405
    crystal = entry.crystal
406
407
    if not entry.has_3d_structure:
408
        raise _ParseError(f'{crystal.identifier} has no 3D structure')
409
410
    molecule = crystal.disordered_molecule
411
412
    if disorder == 'skip':
413
        if crystal.has_disorder or entry.has_disorder or \
414
            any(_atom_has_disorder(a.label, a.occupancy) for a in molecule.atoms):
415
            raise _ParseError(f'{crystal.identifier} has disorder')
416
417
    elif disorder == 'ordered_sites':
418
        molecule.remove_atoms(a for a in molecule.atoms
419
                              if _atom_has_disorder(a.label, a.occupancy))
420
421
    if remove_hydrogens:
422
        molecule.remove_atoms(a for a in molecule.atoms if a.atomic_symbol in 'HD')
423
424
    if heaviest_component and len(molecule.components) > 1:
425
        molecule = _heaviest_component(molecule)
426
427
    # nonsensical results are likely if not all atoms have sites, but attempt to read anyway
428
    if any(a.fractional_coordinates is None for a in molecule.atoms):
429
        warnings.warn(f'has atoms without sites')
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
430
        molecule.remove_atoms(a for a in molecule.atoms if a.fractional_coordinates is None)
431
        # raise _ParseError(f'{crystal.identifier} has atoms without sites')
432
433
    if not molecule.all_atoms_have_sites:
434
        raise _ParseError(f'{crystal.identifier} has atoms without sites')
435
436
    crystal.molecule = molecule
437
    asym_atoms = crystal.asymmetric_unit_molecule.atoms
438
    asym_unit = np.array([tuple(a.fractional_coordinates) for a in asym_atoms])
439
    asym_unit = np.mod(asym_unit, 1)
440
    asym_types = [a.atomic_number for a in asym_atoms]
441
    cell = cellpar_to_cell(*crystal.cell_lengths, *crystal.cell_angles)
442
443
    sitesym = crystal.symmetry_operators
444
    if not sitesym:
445
        sitesym = ['x,y,z']
446
447
    if disorder != 'all_sites':
448
        keep_sites = _unique_sites(asym_unit)
449
        if not np.all(keep_sites):
450
            warnings.warn(f'may have overlapping sites; duplicates will be removed')
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
451
        asym_unit = asym_unit[keep_sites]
452
        asym_types = [sym for sym, keep in zip(asym_types, keep_sites) if keep]
453
454
    if asym_unit.shape[0] == 0:
455
        raise _ParseError(f'{crystal.identifier} has no valid sites')
456
457
    frac_motif, asym_inds, multiplicities, inverses = _expand_asym_unit(asym_unit, sitesym)
458
    full_types = np.array([asym_types[i] for i in inverses])
459
    motif = frac_motif @ cell
460
461
    kwargs = {
462
        'name': crystal.identifier,
463
        'asymmetric_unit': asym_inds,
464
        'wyckoff_multiplicities': multiplicities,
465
        'types': full_types
466
    }
467
468
    return PeriodicSet(motif, cell, **kwargs)
469
470
471
def cifblock_to_periodicset(
472
        block,
473
        remove_hydrogens=False,
474
        disorder='skip'
475
) -> PeriodicSet:
476
    """:class:`ase.io.cif.CIFBlock` --> :class:`amd.periodicset.PeriodicSet`. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
477
    CIFBlock is the type returned by :class:`ase.io.cif.parse_cif`.
478
479
    Parameters
480
    ----------
481
    block : :class:`ase.io.cif.CIFBlock`
482
        An ase CIFBlock object representing a crystal.
483
    remove_hydrogens : bool, optional
484
        Remove Hydrogens from the crystal.
485
    disorder : str, optional
486
        Controls how disordered structures are handled. Default is ``skip`` which skips any crystal 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
487
        with disorder, since disorder conflicts with the periodic set model. To read disordered 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
488
        structures anyway, choose either :code:`ordered_sites` to remove sites with disorder or 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
489
        :code:`all_sites` include all sites regardless.
490
491
    Returns
492
    -------
493
    :class:`.periodicset.PeriodicSet`
494
        Represents the crystal as a periodic set, consisting of a finite set of points (motif)
495
        and lattice (unit cell). Contains other useful data, e.g. the crystal's name and
496
        information about the asymmetric unit for calculation.
497
498
    Raises
499
    ------
500
    _ParseError
501
        Raised if the structure can/should not be parsed for the following reasons:
502
        1. no sites found or motif is empty after removing H or disordered sites,
503
        2. a site has missing coordinates,
504
        3. disorder == 'skip' and any of:
505
            (a) any atom has fractional occupancy,
506
            (b) any atom's label ends with '?'.
507
    """
508
509
    cell = block.get_cell().array
510
511
    # asymmetric unit fractional coords
512
    asym_unit = [block.get(name) for name in _ATOM_SITE_FRACT_TAGS]
513
    if None in asym_unit:
514
        asym_motif = [block.get(name) for name in _ATOM_SITE_CARTN_TAGS]
515
        if None in asym_motif:
516
            raise _ParseError(f'{block.name} has no sites')
517
        asym_unit = np.array(asym_motif) @ np.linalg.inv(cell)
518
519
    if any(None in coords for coords in asym_unit):
520
        raise _ParseError(f'{block.name} has atoms without sites')
521
522
    asym_unit = np.mod(np.array(asym_unit).T, 1)
523
524
    try:
525
        asym_types = [ase.data.atomic_numbers[s] for s in block.get_symbols()]
526
    except ase.io.cif.NoStructureData as _:
527
        asym_types = [0 for _ in range(len(asym_unit))]
528
529
    sitesym = ['x,y,z', ]
530
    for tag in _SYMOP_TAGS:
531
        if tag in block:
532
            sitesym = block[tag]
533
            break
534
    if isinstance(sitesym, str):
535
        sitesym = [sitesym]
536
537
    remove_sites = []
538
539
    occupancies = block.get('_atom_site_occupancy')
540
    labels = block.get('_atom_site_label')
541
    if occupancies is not None:
542
        if disorder == 'skip':
543
            if any(_atom_has_disorder(lab, occ) for lab, occ in zip(labels, occupancies)):
544
                raise _ParseError(f'{block.name} has disorder')
545
        elif disorder == 'ordered_sites':
546
            remove_sites.extend(
547
                (i for i, (lab, occ) in enumerate(zip(labels, occupancies))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable i does not seem to be defined.
Loading history...
548
                 if _atom_has_disorder(lab, occ)))
549
550
    if remove_hydrogens:
551
        remove_sites.extend((i for i, num in enumerate(asym_types) if num == 1))
552
553
    asym_unit = np.delete(asym_unit, remove_sites, axis=0)
554
    asym_types = [s for i, s in enumerate(asym_types) if i not in remove_sites]
555
556
    if disorder != 'all_sites':
557
        keep_sites = _unique_sites(asym_unit)
558
        if not np.all(keep_sites):
559
            warnings.warn(f'may have overlapping sites; duplicates will be removed')
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
560
        asym_unit = asym_unit[keep_sites]
561
        asym_types = [sym for sym, keep in zip(asym_types, keep_sites) if keep]
562
563
    if asym_unit.shape[0] == 0:
564
        raise _ParseError(f'{block.name} has no valid sites')
565
566
    frac_motif, asym_inds, multiplicities, inverses = _expand_asym_unit(asym_unit, sitesym)
567
    full_types = np.array([asym_types[i] for i in inverses])
568
    motif = frac_motif @ cell
569
570
    kwargs = {
571
        'name': block.name,
572
        'asymmetric_unit': asym_inds,
573
        'wyckoff_multiplicities': multiplicities,
574
        'types': full_types
575
    }
576
577
    return PeriodicSet(motif, cell, **kwargs)
578
579
580
def _expand_asym_unit(
581
        asym_unit: np.ndarray,
582
        sitesym: Sequence[str]
583
) -> Tuple[np.ndarray, ...]:
584
    """
585
    Asymmetric unit's fractional coords + site symmetries (as strings)
586
    -->
587
    fractional motif, asymmetric unit indices, multiplicities and inverses.
588
    """
589
590
    rotations, translations = parse_sitesym(sitesym)
591
    all_sites = []
592
    asym_inds = [0]
593
    multiplicities = []
594
    inverses = []
595
596
    for inv, site in enumerate(asym_unit):
597
        multiplicity = 0
598
599
        for rot, trans in zip(rotations, translations):
600
            site_ = np.mod(np.dot(rot, site) + trans, 1)
601
602
            if not all_sites:
603
                all_sites.append(site_)
604
                inverses.append(inv)
605
                multiplicity += 1
606
                continue
607
608
            # check if site_ overlaps with existing sites
609
            diffs1 = np.abs(site_ - all_sites)
610
            diffs2 = np.abs(diffs1 - 1)
611
            mask = np.all((diffs1 <= _EQUIV_SITE_TOL) | (diffs2 <= _EQUIV_SITE_TOL), axis=-1)
612
613
            if np.any(mask):
614
                where_equal = np.argwhere(mask).flatten()
615
                for ind in where_equal:
616
                    if inverses[ind] == inv:
617
                        pass
618
                    else:
619
                        warnings.warn(f'has equivalent sites at positions {inverses[ind]}, {inv}')
620
            else:
621
                all_sites.append(site_)
622
                inverses.append(inv)
623
                multiplicity += 1
624
625
        if multiplicity > 0:
626
            multiplicities.append(multiplicity)
627
            asym_inds.append(len(all_sites))
628
629
    frac_motif = np.array(all_sites)
630
    asym_inds = np.array(asym_inds[:-1])
631
    multiplicities = np.array(multiplicities)
632
    return frac_motif, asym_inds, multiplicities, inverses
633
634
635
def _atom_has_disorder(label, occupancy):
636
    """Return True if label ends with ? or occupancy < 1."""
637
    return label.endswith('?') or (np.isscalar(occupancy) and occupancy < 1)
638
639
640
@numba.njit()
641
def _unique_sites(asym_unit):
642
    site_diffs1 = np.abs(np.expand_dims(asym_unit, 1) - asym_unit)
643
    site_diffs2 = np.abs(site_diffs1 - 1)
644
    sites_neq_mask = np.logical_and((site_diffs1 > _EQUIV_SITE_TOL), 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
645
                                    (site_diffs2 > _EQUIV_SITE_TOL))
646
    overlapping = np.triu(sites_neq_mask.sum(axis=-1) == 0, 1)
647
    return overlapping.sum(axis=0) == 0
648
649
650
def _heaviest_component(molecule):
651
    """Heaviest component (removes all but the heaviest component of the asym unit).
652
    Intended for removing solvents. Probably doesn't play well with disorder"""
653
    component_weights = []
654
    for component in molecule.components:
655
        weight = 0
656
        for a in component.atoms:
657
            if isinstance(a.atomic_weight, (float, int)):
658
                if isinstance(a.occupancy, (float, int)):
659
                    weight += a.occupancy * a.atomic_weight
660
                else:
661
                    weight += a.atomic_weight
662
        component_weights.append(weight)
663
    largest_component_ind = np.argmax(np.array(component_weights))
664
    molecule = molecule.components[largest_component_ind]
665
    return molecule
666
667
668
def _refcodes_from_families(refcode_families):
669
    """List of strings --> all CSD refcodes starting with one of the strings.
670
    Intended to be passed a list of families and return all refcodes in them."""
671
    all_refcodes = []
672
    for refcode in refcode_families:
673
        query = ccdc.search.TextNumericSearch()
674
        query.add_identifier(refcode)
675
        hits = [hit.identifier for hit in query.search()]
676
        all_refcodes.extend(hits)
677
678
    # filter to unique refcodes
679
    seen = set()
680
    seen_add = seen.add
681
    refcodes = [
682
        refcode for refcode in all_refcodes
683
        if not (refcode in seen or seen_add(refcode))]
684
685
    return refcodes
686
687
688
def _map(func: Callable, iterable: Iterable, show_warnings: bool):
689
    """Iterates over iterable, passing items through func and yielding the result. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
690
    Catches _ParseError and warnings, optionally printing them.
691
    """
692
693
    if not show_warnings:
694
        warnings.simplefilter('ignore')
695
696
    for item in iterable:
697
698
        with warnings.catch_warnings(record=True) as warning_msgs:
699
700
            parse_failed = False
701
            try:
702
                periodic_set = func(item)
703
            except _ParseError as err:
704
                parse_failed = str(err)
705
706
        if parse_failed:
707
            warnings.warn(parse_failed)
708
            continue
709
710
        for warning in warning_msgs:
711
            msg = f'{periodic_set.name} {warning.message}'
712
            warnings.warn(msg, category=warning.category)
713
714
        yield periodic_set
715