Completed
Push — master ( 8229ad...0db8a8 )
by Jan
17s queued 13s
created

org_fedora_oscap.cpioarchive.version()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
from __future__ import absolute_import
2
3
import atexit
4
5
""" cpioarchive: Support for cpio archives
6
Copyright (C) 2006 Ignacio Vazquez-Abrams """
7
8
""" This library is free software; you can redistribute it and/or modify it under the terms of the
9
GNU Lesser General Public License as published by the Free Software Foundation;
10
either version 2.1 of the License, or (at your option) any later version.
11
12
This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
See the GNU Lesser General Public License for more details.
15
16
You should have received a copy of the GNU Lesser General Public License along with this library;
17
if not, write to the Free Software Foundation, Inc.,
18
59 Temple Place, Suite 330,
19
Boston, MA 02111-1307 USA
20
"""
21
22
23
def version():
24
    """Returns the version number of the module."""
25
    return '0.1'
26
27
28
class CpioError(Exception):
29
    """Exception class for cpioarchive exceptions"""
30
    pass
31
32
33
class CpioEntry(object):
34
    """Information about a single file in a cpio archive.
35
    Provides a file-like interface for interacting with the entry."""
36
    def __init__(self, hdr, cpio, offset):
37
        """Create a new CpioEntry instance. Internal use only."""
38
        if len(hdr) < 110:
39
            raise CpioError('cpio header too short')
40
        if not hdr.startswith(b'070701'):
41
            raise CpioError('cpio header invalid')
42
        self.inode = int(hdr[6:14], 16)
43
        self.mode = int(hdr[14:22], 16)
44
        self.uid = int(hdr[22:30], 16)
45
        self.gid = int(hdr[30:38], 16)
46
        self.nlinks = int(hdr[38:46], 16)
47
        self.mtime = int(hdr[46:54], 16)
48
        self.size = int(hdr[54:62], 16)
49
        """Size of the file stored in the entry."""
50
        self.devmajor = int(hdr[62:70], 16)
51
        self.devminor = int(hdr[70:78], 16)
52
        self.rdevmajor = int(hdr[78:86], 16)
53
        self.rdevminor = int(hdr[86:94], 16)
54
        namesize = int(hdr[94:102], 16)
55
        self.checksum = int(hdr[102:110], 16)
56
        if len(hdr) < 110+namesize:
57
            raise CpioError('cpio header too short')
58
        self.name = hdr[110:110+namesize-1].decode("utf-8")
59
        """Name of the file stored in the entry."""
60
        self.datastart = offset+110+namesize
61
        self.datastart += (4-(self.datastart % 4)) % 4
62
        self.curoffset = 0
63
        self.cpio = cpio
64
        self.closed = False
65
66
    def close(self):
67
        """Close this cpio entry. Further calls to methods will raise an exception."""
68
        self.closed = True
69
70
    def flush(self):
71
        """Flush the entry (no-op)."""
72
        pass
73
74
    def read(self, size=None):
75
        """Read data from the entry.
76
77
        Args:
78
            size -- Number of bytes to read (default: whole entry)
79
        """
80
        if self.closed:
81
            raise ValueError('read operation on closed file')
82
        self.cpio.file.seek(self.datastart+self.curoffset, 0)
83
        if size and size < self.size-self.curoffset:
84
            ret = self.cpio.file.read(size)
85
        else:
86
            ret = self.cpio.file.read(self.size-self.curoffset)
87
        self.curoffset += len(ret)
88
        return ret
89
90
    def seek(self, offset, whence=0):
91
        """Move to new position within an entry.
92
93
        Args:
94
            offset -- Byte count
95
            whence -- Describes how offset is used.
96
                0: From beginning of file
97
                1: Forwards from current position
98
                2: Backwards from current position
99
                Other values are ignored.
100
        """
101
        if self.closed:
102
            raise ValueError('seek operation on closed file')
103
        if whence == 0:
104
            self.curoffset = offset
105
        elif whence == 1:
106
            self.curoffset += offset
107
        elif whence == 2:
108
            self.curoffset -= offset
109
        self.curoffset = min(max(0, self.curoffset), self.size)
110
111
    def tell(self):
112
        """Get current position within an entry"""
113
        if self.closed:
114
            raise ValueError('tell operation on closed file')
115
        return self.curoffset
116
117
118
class CpioArchive(object):
119
    @classmethod
120
    def open(name=None, mode='r', fileobj=None):
121
        """Open a cpio archive. Defers to CpioArchive.__init__()."""
122
        return CpioArchive(name, mode, fileobj)
123
124
    def __init__(self, name=None, mode='r', fileobj=None):
125
        """Open a cpio archive.
126
127
        Args:
128
            name -- Filename to open (default: open a file object instead)
129
            mode -- Filemode to open the archive in (default: read-only)
130
            fileobj -- File object to use (default: open by filename instead)
131
        """
132
        if not mode == 'r':
133
            raise NotImplementedError()
134
        self._infos = []
135
        if name:
136
            self._readfile(name)
137
            self.external = False
138
        elif fileobj:
139
            self._readobj(fileobj)
140
            self.external = True
141
        else:
142
            raise CpioError('Oh come on! Pass me something to work with...')
143
        self._ptr = 0
144
        self.closed = False
145
        atexit.register(self.close)
146
147
    def close(self):
148
        """Close the CpioArchive. Also closes all associated entries."""
149
        if self.closed:
150
            return
151
        [x.close() for x in self._infos]
152
        self.closed = True
153
        if not self.external:
154
            self.file.close()
155
156
    def __next__(self):
157
        # pylint: disable = E1102
158
        return self.next()
159
160
    def next(self):
161
        """Return the next entry in the archive."""
162
        if self.closed:
163
            raise ValueError('next operation on closed file')
164
        if self._ptr > len(self._infos):
165
            raise StopIteration()
166
        ret = self._infos[self._ptr]
167
        self._ptr += 1
168
        return ret
169
170
    def __iter__(self):
171
        return iter(self._infos)
172
173
    def _readfile(self, name):
174
        self._readobj(open(name, 'rb'))
175
176
    def _readobj(self, fileobj):
177
        self.file = fileobj
178
        start = self.file.tell()
179
        istart = self.file.tell()
180
        text = self.file.read(110)
181
        while text:
182
            namelen = int(text[94:102], 16)
183
            text += self.file.read(namelen)
184
            ce = CpioEntry(text, self, istart)
185
            if not ce.name == "TRAILER!!!":
186
                self._infos.append(ce)
187
            else:
188
                return
189
            self.file.seek((4-(self.file.tell()-istart) % 4) % 4, 1)
190
            self.file.seek(self._infos[-1].size, 1)
191
            self.file.seek((4-(self.file.tell()-istart) % 4) % 4, 1)
192
193
            istart = self.file.tell()
194
            text = self.file.read(110)
195
        else:
196
            raise CpioError('premature end of headers')
197