Passed
Push — master ( c4cb0d...f71bbb )
by Daniel
01:58
created

amd.io.CifReader._folder_generator()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 3
nop 4
1
"""Contains I/O tools, including a .CIF reader and CSD reader
2
(``csd-python-api`` only) to extract periodic set representations
3
of crystals which can be passed to :func:`.calculate.AMD` and :func:`.calculate.PDD`.
4
5
These intermediate :class:`.periodicset.PeriodicSet` representations can be written
6
to a .hdf5 file with :class:`SetWriter`, which can be read back with :class:`SetReader`.
7
This is much faster than rereading a .CIF and recomputing invariants.
8
"""
9
10
import os
11
import warnings
12
13
import numpy as np
14
15
from .periodicset import PeriodicSet
16
from .utils import _extend_signature, cellpar_to_cell
17
from ._reader import _Reader
18
19
try:
20
    import ccdc.io       # EntryReader
21
    import ccdc.search   # TextNumericSearch
22
    _CCDC_ENABLED = True
23
except (ImportError, RuntimeError) as exception:
24
    _CCDC_ENABLED = False
25
26
27
def _warning(message, category, filename, lineno, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument args seems to be unused.
Loading history...
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
28
    return f'{filename}:{lineno}: {category.__name__}: {message}\n'
29
30
warnings.formatwarning = _warning
31
32
33
class CifReader(_Reader):
34
    """Read all structures in a .CIF with ``ase`` or ``ccdc``
35
    (``csd-python-api`` only), yielding  :class:`.periodicset.PeriodicSet`
36
    objects which can be passed to :func:`.calculate.AMD` or
37
    :func:`.calculate.PDD`.
38
39
    Examples:
40
41
        ::
42
43
            # Put all crystals in a .CIF in a list
44
            structures = list(amd.CifReader('mycif.cif'))
45
46
            # Reads just one if the .CIF has just one crystal
47
            periodic_set = amd.CifReader('mycif.cif').read_one()
48
49
            # If a folder has several .CIFs each with one crystal, use
50
            structures = list(amd.CifReader('path/to/folder', folder=True))
51
52
            # Make list of AMDs (with k=100) of crystals in a .CIF
53
            amds = [amd.AMD(periodic_set, 100) for periodic_set in amd.CifReader('mycif.cif')]
54
    """
55
56
    @_extend_signature(_Reader.__init__)
57
    def __init__(
58
            self,
59
            path,
60
            reader='ase',
61
            folder=False,
62
            **kwargs):
63
64
        super().__init__(**kwargs)
65
66
        if reader not in ('ase', 'ccdc'):
67
            raise ValueError(f'Invalid reader {reader}; must be ase or ccdc.')
68
69
        if reader == 'ase' and self.heaviest_component:
70
            raise NotImplementedError('Parameter heaviest_component not implimented for ase, only ccdc.')
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (105/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
71
72
        if reader == 'ase':
73
            extensions = {'cif'}
74
            file_parser = ase.io.cif.parse_cif
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ase does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
Undefined variable 'ase'
Loading history...
75
            pset_converter = self._CIFBlock_to_PeriodicSet
76
77
        elif reader == 'ccdc':
78
            if not _CCDC_ENABLED:
79
                raise ImportError("Failed to import csd-python-api; check it is installed and licensed.")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (105/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
80
            extensions = ccdc.io.EntryReader.known_suffixes
81
            file_parser = ccdc.io.EntryReader
82
            pset_converter = self._Entry_to_PeriodicSet
83
84
        if folder:
85
            generator = self._folder_generator(path, file_parser, extensions)
0 ignored issues
show
introduced by
The variable file_parser does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable extensions does not seem to be defined for all execution paths.
Loading history...
86
        else:
87
            generator = file_parser(path)
88
89
        self._generator = self._map(pset_converter, generator)
0 ignored issues
show
introduced by
The variable pset_converter does not seem to be defined for all execution paths.
Loading history...
90
91
    def _folder_generator(self, path, file_parser, extensions):
92
        for file in os.listdir(path):
93
            suff = os.path.splitext(file)[1][1:]
94
            if suff.lower() in extensions:
95
                self.current_filename = file
96
                yield from file_parser(os.path.join(path, file))
97
98
99
class CSDReader(_Reader):
100
    """Read Entries from the CSD, yielding :class:`.periodicset.PeriodicSet` objects.
101
102
    The CSDReader returns :class:`.periodicset.PeriodicSet` objects which can be passed
103
    to :func:`.calculate.AMD` or :func:`.calculate.PDD`.
104
105
    Examples:
106
107
        Get crystals with refcodes in a list::
108
109
            refcodes = ['DEBXIT01', 'DEBXIT05', 'HXACAN01']
110
            structures = list(amd.CSDReader(refcodes))
111
112
        Read refcode families (any whose refcode starts with strings in the list)::
113
114
            refcodes = ['ACSALA', 'HXACAN']
115
            structures = list(amd.CSDReader(refcodes, families=True))
116
117
        Create a generic reader, read crystals by name with :meth:`CSDReader.entry()`::
118
119
            reader = amd.CSDReader()
120
            debxit01 = reader.entry('DEBXIT01')
121
122
            # looping over this generic reader will yield all CSD entries
123
            for periodic_set in reader:
124
                ...
125
126
        Make list of AMD (with k=100) for crystals in these families::
127
128
            refcodes = ['ACSALA', 'HXACAN']
129
            amds = []
130
            for periodic_set in amd.CSDReader(refcodes, families=True):
131
                amds.append(amd.AMD(periodic_set, 100))
132
    """
133
134
    @_extend_signature(_Reader.__init__)
135
    def __init__(
136
            self,
137
            refcodes=None,
138
            families=False,
139
            **kwargs):
140
141
        if not _CCDC_ENABLED:
142
            raise ImportError("Failed to import csd-python-api; check it is installed and licensed.")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
143
144
        super().__init__(**kwargs)
145
146
        if isinstance(refcodes, str) and refcodes.lower() == 'csd':
147
            refcodes = None
148
149
        if refcodes is None:
150
            families = False
151
        else:
152
            refcodes = [refcodes] if isinstance(refcodes, str) else list(refcodes)
153
154
        # families parameter reads all crystals with ids starting with passed refcodes
155
        if families:
156
            all_refcodes = []
157
            for refcode in refcodes:
158
                query = ccdc.search.TextNumericSearch()
159
                query.add_identifier(refcode)
160
                all_refcodes.extend((hit.identifier for hit in query.search()))
0 ignored issues
show
introduced by
The variable hit does not seem to be defined in case the for loop on line 157 is not entered. Are you sure this can never be the case?
Loading history...
161
162
            # filter to unique refcodes
163
            seen = set()
164
            seen_add = seen.add
165
            refcodes = [
166
                refcode for refcode in all_refcodes
167
                if not (refcode in seen or seen_add(refcode))]
168
169
        self._entry_reader = ccdc.io.EntryReader('CSD')
170
        self._generator = self._map(
171
            self._Entry_to_PeriodicSet,
172
            self._ccdc_generator(refcodes))
173
174
    def _ccdc_generator(self, refcodes):
175
        """Generates ccdc Entries from CSD refcodes"""
176
177
        if refcodes is None:
178
            for entry in self._entry_reader:
179
                yield entry
180
        else:
181
            for refcode in refcodes:
182
                try:
183
                    entry = self._entry_reader.entry(refcode)
184
                    yield entry
185
                except RuntimeError:
186
                    warnings.warn(
187
                        f'Identifier {refcode} not found in database')
188
189
    def entry(self, refcode: str) -> PeriodicSet:
190
        """Read a PeriodicSet given any CSD refcode."""
191
192
        entry = self._entry_reader.entry(refcode)
193
        periodic_set = self._Entry_to_PeriodicSet(entry)
194
        return periodic_set
195
196
197
def crystal_to_periodicset(crystal):
198
    """ccdc.crystal.Crystal --> amd.periodicset.PeriodicSet.
199
    Ignores disorder, missing sites/coords, checks & no options.
200
    Is a stripped-down version of the function used in CifReader."""
201
202
    cell = cellpar_to_cell(*crystal.cell_lengths, *crystal.cell_angles)
203
204
    # asymmetric unit fractional coordinates
205
    asym_frac_motif = np.array([tuple(a.fractional_coordinates)
206
                                for a in crystal.asymmetric_unit_molecule.atoms])
207
    asym_frac_motif = np.mod(asym_frac_motif, 1)
208
209
    # if the above removed everything, skip this structure
210
    if asym_frac_motif.shape[0] == 0:
211
        raise ValueError(f'{crystal.identifier} has no coordinates')
212
213
    sitesym = crystal.symmetry_operators
214
    if not sitesym:
215
        sitesym = ('x,y,z', )
216
    r = _Reader()
217
    r.current_identifier = crystal.identifier
218
    frac_motif, asym_unit, multiplicities, _ = r.expand(asym_frac_motif, sitesym)
219
    motif = frac_motif @ cell
220
221
    kwargs = {
222
        'name': crystal.identifier,
223
        'asymmetric_unit': asym_unit,
224
        'wyckoff_multiplicities': multiplicities,
225
    }
226
227
    periodic_set = PeriodicSet(motif, cell, **kwargs)
228
    return periodic_set
229
230
231
def cifblock_to_periodicset(block):
232
    """ase.io.cif.CIFBlock --> amd.periodicset.PeriodicSet.
233
    Ignores disorder, missing sites/coords, checks & no options.
234
    Is a stripped-down version of the function used in CifReader."""
235
236
    cell = block.get_cell().array
237
    asym_frac_motif = [block.get(name) for name in _Reader.atom_site_fract_tags]
238
239
    if None in asym_frac_motif:
240
        asym_motif = [block.get(name) for name in _Reader.atom_site_cartn_tags]
241
        if None in asym_motif:
242
            warnings.warn(
243
                f'Skipping {block.name} as coordinates were not found')
244
            return None
245
246
        asym_frac_motif = np.array(asym_motif) @ np.linalg.inv(cell)
247
248
    asym_frac_motif = np.mod(np.array(asym_frac_motif).T, 1)
249
250
    if asym_frac_motif.shape[0] == 0:
251
        raise ValueError(f'{block.name} has no coordinates')
252
253
    sitesym = ('x,y,z', )
254
    for tag in _Reader.symop_tags:
255
        if tag in block:
256
            sitesym = block[tag]
257
            break
258
259
    if isinstance(sitesym, str):
260
        sitesym = [sitesym]
261
262
    dummy_reader = _Reader()
263
    dummy_reader.current_identifier = block.name
264
    frac_motif, asym_unit, multiplicities, _ = dummy_reader.expand(asym_frac_motif, sitesym)
265
    motif = frac_motif @ cell
266
267
    kwargs = {
268
        'name': block.name,
269
        'asymmetric_unit': asym_unit,
270
        'wyckoff_multiplicities': multiplicities
271
    }
272
273
    periodic_set = PeriodicSet(motif, cell, **kwargs)
274
    return periodic_set
275