Passed
Pull Request — master (#140)
by Matěj
01:23
created

CpioArchive.__next__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from __future__ import absolute_import
2
3
import atexit
4
5
""" cpioarchive: Support for cpio archives
6
Copyright (C) 2006 Ignacio Vazquez-Abrams """
7
8
""" This library is free software; you can redistribute it and/or modify it under the terms of the
9
GNU Lesser General Public License as published by the Free Software Foundation;
10
either version 2.1 of the License, or (at your option) any later version.
11
12
This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
See the GNU Lesser General Public License for more details.
15
16
You should have received a copy of the GNU Lesser General Public License along with this library;
17
if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""
19
20
21
def version():
22
    """Returns the version number of the module."""
23
    return '0.1'
24
25
26
class CpioError(Exception):
27
    """Exception class for cpioarchive exceptions"""
28
    pass
29
30
31
class CpioEntry(object):
32
    """Information about a single file in a cpio archive.
33
    Provides a file-like interface for interacting with the entry."""
34
    def __init__(self, hdr, cpio, offset):
35
        """Create a new CpioEntry instance. Internal use only."""
36
        if len(hdr) < 110:
37
            raise CpioError('cpio header too short')
38
        if not hdr.startswith(b'070701'):
39
            raise CpioError('cpio header invalid')
40
        self.inode = int(hdr[6:14], 16)
41
        self.mode = int(hdr[14:22], 16)
42
        self.uid = int(hdr[22:30], 16)
43
        self.gid = int(hdr[30:38], 16)
44
        self.nlinks = int(hdr[38:46], 16)
45
        self.mtime = int(hdr[46:54], 16)
46
        self.size = int(hdr[54:62], 16)
47
        """Size of the file stored in the entry."""
48
        self.devmajor = int(hdr[62:70], 16)
49
        self.devminor = int(hdr[70:78], 16)
50
        self.rdevmajor = int(hdr[78:86], 16)
51
        self.rdevminor = int(hdr[86:94], 16)
52
        namesize = int(hdr[94:102], 16)
53
        self.checksum = int(hdr[102:110], 16)
54
        if len(hdr) < 110+namesize:
55
            raise CpioError('cpio header too short')
56
        self.name = hdr[110:110+namesize-1].decode("utf-8")
57
        """Name of the file stored in the entry."""
58
        self.datastart = offset+110+namesize
59
        self.datastart += (4-(self.datastart % 4)) % 4
60
        self.curoffset = 0
61
        self.cpio = cpio
62
        self.closed = False
63
64
    def close(self):
65
        """Close this cpio entry. Further calls to methods will raise an exception."""
66
        self.closed = True
67
68
    def flush(self):
69
        """Flush the entry (no-op)."""
70
        pass
71
72
    def read(self, size=None):
73
        """Read data from the entry.
74
75
        Args:
76
            size -- Number of bytes to read (default: whole entry)
77
        """
78
        if self.closed:
79
            raise ValueError('read operation on closed file')
80
        self.cpio.file.seek(self.datastart+self.curoffset, 0)
81
        if size and size < self.size-self.curoffset:
82
            ret = self.cpio.file.read(size)
83
        else:
84
            ret = self.cpio.file.read(self.size-self.curoffset)
85
        self.curoffset += len(ret)
86
        return ret
87
88
    def seek(self, offset, whence=0):
89
        """Move to new position within an entry.
90
91
        Args:
92
            offset -- Byte count
93
            whence -- Describes how offset is used.
94
                0: From beginning of file
95
                1: Forwards from current position
96
                2: Backwards from current position
97
                Other values are ignored.
98
        """
99
        if self.closed:
100
            raise ValueError('seek operation on closed file')
101
        if whence == 0:
102
            self.curoffset = offset
103
        elif whence == 1:
104
            self.curoffset += offset
105
        elif whence == 2:
106
            self.curoffset -= offset
107
        self.curoffset = min(max(0, self.curoffset), self.size)
108
109
    def tell(self):
110
        """Get current position within an entry"""
111
        if self.closed:
112
            raise ValueError('tell operation on closed file')
113
        return self.curoffset
114
115
116
class CpioArchive(object):
117
    @classmethod
118
    def open(name=None, mode='r', fileobj=None):
119
        """Open a cpio archive. Defers to CpioArchive.__init__()."""
120
        return CpioArchive(name, mode, fileobj)
121
122
    def __init__(self, name=None, mode='r', fileobj=None):
123
        """Open a cpio archive.
124
125
        Args:
126
            name -- Filename to open (default: open a file object instead)
127
            mode -- Filemode to open the archive in (default: read-only)
128
            fileobj -- File object to use (default: open by filename instead)
129
        """
130
        if not mode == 'r':
131
            raise NotImplementedError()
132
        self._infos = []
133
        if name:
134
            self._readfile(name)
135
            self.external = False
136
        elif fileobj:
137
            self._readobj(fileobj)
138
            self.external = True
139
        else:
140
            raise CpioError('Oh come on! Pass me something to work with...')
141
        self._ptr = 0
142
        self.closed = False
143
        atexit.register(self.close)
144
145
    def close(self):
146
        """Close the CpioArchive. Also closes all associated entries."""
147
        if self.closed:
148
            return
149
        [x.close() for x in self._infos]
150
        self.closed = True
151
        if not self.external:
152
            self.file.close()
153
154
    def __next__(self):
155
        return self.next()
156
157
    def next(self):
158
        """Return the next entry in the archive."""
159
        if self.closed:
160
            raise ValueError('next operation on closed file')
161
        if self._ptr > len(self._infos):
162
            raise StopIteration()
163
        ret = self._infos[self._ptr]
164
        self._ptr += 1
165
        return ret
166
167
    def __iter__(self):
168
        return iter(self._infos)
169
170
    def _readfile(self, name):
171
        self._readobj(open(name, 'rb'))
172
173
    def _readobj(self, fileobj):
174
        self.file = fileobj
175
        start = self.file.tell()
176
        istart = self.file.tell()
177
        text = self.file.read(110)
178
        while text:
179
            namelen = int(text[94:102], 16)
180
            text += self.file.read(namelen)
181
            ce = CpioEntry(text, self, istart)
182
            if not ce.name == "TRAILER!!!":
183
                self._infos.append(ce)
184
            else:
185
                return
186
            self.file.seek((4-(self.file.tell()-istart) % 4) % 4, 1)
187
            self.file.seek(self._infos[-1].size, 1)
188
            self.file.seek((4-(self.file.tell()-istart) % 4) % 4, 1)
189
190
            istart = self.file.tell()
191
            text = self.file.read(110)
192
        else:
193
            raise CpioError('premature end of headers')
194