Passed
Pull Request — master (#140)
by Matěj
02:17
created

org_fedora_oscap.cpioarchive.CpioEntry.seek()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 20
rs 9.3333
c 0
b 0
f 0
cc 5
nop 3
1
from __future__ import absolute_import
2
3
""" cpioarchive: Support for cpio archives
4
Copyright (C) 2006 Ignacio Vazquez-Abrams """
5
6
""" This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
7
8
This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
9
10
You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """
11
12
import atexit
13
14
def version():
15
  """Returns the version number of the module."""
16
  return '0.1'
17
18
class CpioError(Exception):
19
  """Exception class for cpioarchive exceptions"""
20
  pass
21
22
class CpioEntry(object):
23
  """Information about a single file in a cpio archive. Provides a file-like
24
interface for interacting with the entry."""
25
  def __init__(self, hdr, cpio, offset):
26
    """Create a new CpioEntry instance. Internal use only."""
27
    if len(hdr)<110:
28
      raise CpioError('cpio header too short')
29
    if not hdr.startswith(b'070701'):
30
      raise CpioError('cpio header invalid')
31
    self.inode=int(hdr[6:14], 16)
32
    self.mode=int(hdr[14:22], 16)
33
    self.uid=int(hdr[22:30], 16)
34
    self.gid=int(hdr[30:38], 16)
35
    self.nlinks=int(hdr[38:46], 16)
36
    self.mtime=int(hdr[46:54], 16)
37
    self.size=int(hdr[54:62], 16)
38
    """Size of the file stored in the entry."""
39
    self.devmajor=int(hdr[62:70], 16)
40
    self.devminor=int(hdr[70:78], 16)
41
    self.rdevmajor=int(hdr[78:86], 16)
42
    self.rdevminor=int(hdr[86:94], 16)
43
    namesize=int(hdr[94:102], 16)
44
    self.checksum=int(hdr[102:110], 16)
45
    if len(hdr)<110+namesize:
46
      raise CpioError('cpio header too short')
47
    self.name=hdr[110:110+namesize-1].decode("utf-8")
48
    """Name of the file stored in the entry."""
49
    self.datastart=offset+110+namesize
50
    self.datastart+=(4-(self.datastart%4))%4
51
    self.curoffset=0
52
    self.cpio=cpio
53
    self.closed=False
54
55
  def close(self):
56
    """Close this cpio entry. Further calls to methods will raise an
57
exception."""
58
    self.closed=True
59
60
  def flush(self):
61
    """Flush the entry (no-op)."""
62
    pass
63
64
  def read(self, size=None):
65
    """Read data from the entry.
66
67
Keyword arguments:
68
size -- Number of bytes to read (default: whole entry)
69
"""
70
    if self.closed:
71
      raise ValueError('read operation on closed file')
72
    self.cpio.file.seek(self.datastart+self.curoffset, 0)
73
    if size and size<self.size-self.curoffset:
74
      ret=self.cpio.file.read(size)
75
    else:
76
      ret=self.cpio.file.read(self.size-self.curoffset)
77
    self.curoffset+=len(ret)
78
    return ret
79
80
  def seek(self, offset, whence=0):
81
    """Move to new position within an entry.
82
83
Keyword arguments:
84
offset -- Byte count
85
whence -- Describes how offset is used.
86
  0: From beginning of file
87
  1: Forwards from current position
88
  2: Backwards from current position
89
  Other values are ignored.
90
"""
91
    if self.closed:
92
      raise ValueError('seek operation on closed file')
93
    if whence==0:
94
      self.curoffset=offset
95
    elif whence==1:
96
      self.curoffset+=offset
97
    elif whence==2:
98
      self.curoffset-=offset
99
    self.curoffset=min(max(0, self.curoffset), self.size)
100
101
  def tell(self):
102
    """Get current position within an entry"""
103
    if self.closed:
104
      raise ValueError('tell operation on closed file')
105
    return self.curoffset
106
107
class CpioArchive(object):
108
  @classmethod
109
  def open(name=None, mode='r', fileobj=None):
110
    """Open a cpio archive. Defers to CpioArchive.__init__()."""
111
    return CpioArchive(name, mode, fileobj)
112
113
  def __init__(self, name=None, mode='r', fileobj=None):
114
    """Open a cpio archive.
115
116
Keyword arguments:
117
name -- Filename to open (default: open a file object instead)
118
mode -- Filemode to open the archive in (default: read-only)
119
fileobj -- File object to use (default: open by filename instead)
120
"""
121
    if not mode=='r':
122
      raise NotImplementedError()
123
    self._infos=[]
124
    if name:
125
      self._readfile(name)
126
      self.external=False
127
    elif fileobj:
128
      self._readobj(fileobj)
129
      self.external=True
130
    else:
131
      raise CpioError('Oh come on! Pass me something to work with...')
132
    self._ptr=0
133
    self.closed=False
134
    atexit.register(self.close)
135
136
  def close(self):
137
    """Close the CpioArchive. Also closes all associated entries."""
138
    if self.closed:
139
      return
140
    [x.close() for x in self._infos]
141
    self.closed=True
142
    if not self.external:
143
      self.file.close()
144
145
  def __next__(self):
146
    return self.next()
147
148
  def next(self):
149
    """Return the next entry in the archive."""
150
    if self.closed:
151
      raise ValueError('next operation on closed file')
152
    if self._ptr>len(self._infos):
153
      raise StopIteration()
154
    ret=self._infos[self._ptr]
155
    self._ptr+=1
156
    return ret
157
158
  def __iter__(self):
159
    return iter(self._infos)
160
161
  def _readfile(self, name):
162
    self._readobj(open(name, 'rb'))
163
164
  def _readobj(self, fileobj):
165
    self.file=fileobj
166
    start=self.file.tell()
167
    istart=self.file.tell()
168
    text=self.file.read(110)
169
    while text:
170
      namelen=int(text[94:102], 16)
171
      text+=self.file.read(namelen)
172
      ce=CpioEntry(text, self, istart)
173
      if not ce.name=="TRAILER!!!":
174
        self._infos.append(ce)
175
      else:
176
        return
177
      self.file.seek((4-(self.file.tell()-istart)%4)%4, 1)
178
      self.file.seek(self._infos[-1].size, 1)
179
      self.file.seek((4-(self.file.tell()-istart)%4)%4, 1)
180
181
      istart=self.file.tell()
182
      text=self.file.read(110)
183
    else:
184
      raise CpioError('premature end of headers')
185