Completed
Pull Request — master (#868)
by
unknown
01:15
created

SYMBOLIC_LINK_REPARSE_BUFFER.PrintName()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 5
rs 9.4285
1
# Functions for identifying and following symlinks in Windows
2
# from user `eryksun` http://stackoverflow.com/a/27979160
3
# 16 Jan 2015
4
# See http://stackoverflow.com/a/15259028 for more information
5
6
import os
7
from shutil import copystat, copy2, Error
8
9
from ctypes import c_ubyte, c_buffer, c_wchar_p, c_uint32
10
from ctypes.wintypes import (
11
    POINTER, DWORD, LPCWSTR, HANDLE,
12
    LPVOID, BOOL, USHORT, WCHAR, ULONG,
13
    WinDLL, Structure, addressof, WinError,
14
    Union, byref
15
)
16
17
kernel32 = WinDLL('kernel32')
18
LPDWORD = POINTER(DWORD)
19
UCHAR = c_ubyte
20
21
GetFileAttributesW = kernel32.GetFileAttributesW
22
GetFileAttributesW.restype = DWORD
23
GetFileAttributesW.argtypes = (LPCWSTR,)  # lpFileName In
24
25
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
26
FILE_ATTRIBUTE_REPARSE_POINT = 0x00400
27
28
CreateFileW = kernel32.CreateFileW
29
CreateFileW.restype = HANDLE
30
CreateFileW.argtypes = (LPCWSTR,  # lpFileName In
31
                        DWORD,    # dwDesiredAccess In
32
                        DWORD,    # dwShareMode In
33
                        LPVOID,   # lpSecurityAttributes In_opt
34
                        DWORD,    # dwCreationDisposition In
35
                        DWORD,    # dwFlagsAndAttributes In
36
                        HANDLE)   # hTemplateFile In_opt
37
38
CloseHandle = kernel32.CloseHandle
39
CloseHandle.restype = BOOL
40
CloseHandle.argtypes = (HANDLE,)  # hObject In
41
42
INVALID_HANDLE_VALUE = HANDLE(-1).value
43
OPEN_EXISTING = 3
44
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
45
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
46
47
DeviceIoControl = kernel32.DeviceIoControl
48
DeviceIoControl.restype = BOOL
49
DeviceIoControl.argtypes = (HANDLE,   # hDevice In
50
                            DWORD,    # dwIoControlCode In
51
                            LPVOID,   # lpInBuffer In_opt
52
                            DWORD,    # nInBufferSize In
53
                            LPVOID,   # lpOutBuffer Out_opt
54
                            DWORD,    # nOutBufferSize In
55
                            LPDWORD,  # lpBytesReturned Out_opt
56
                            LPVOID)   # lpOverlapped Inout_opt
57
58
FSCTL_GET_REPARSE_POINT = 0x000900A8
59
IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
60
IO_REPARSE_TAG_SYMLINK = 0xA000000C
61
MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000
62
63
64
class GENERIC_REPARSE_BUFFER(Structure):
65
    _fields_ = (('DataBuffer', UCHAR * 1),)
66
67
68
class SYMBOLIC_LINK_REPARSE_BUFFER(Structure):
69
    _fields_ = (('SubstituteNameOffset', USHORT),
70
                ('SubstituteNameLength', USHORT),
71
                ('PrintNameOffset', USHORT),
72
                ('PrintNameLength', USHORT),
73
                ('Flags', ULONG),
74
                ('PathBuffer', WCHAR * 1))
75
76
    @property
77
    def PrintName(self):
78
        arrayt = WCHAR * (self.PrintNameLength // 2)
79
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
80
        return arrayt.from_address(addressof(self) + offset).value
81
82
83
class MOUNT_POINT_REPARSE_BUFFER(Structure):
84
    _fields_ = (('SubstituteNameOffset', USHORT),
85
                ('SubstituteNameLength', USHORT),
86
                ('PrintNameOffset', USHORT),
87
                ('PrintNameLength', USHORT),
88
                ('PathBuffer', WCHAR * 1))
89
90
    @property
91
    def PrintName(self):
92
        arrayt = WCHAR * (self.PrintNameLength // 2)
93
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
94
        return arrayt.from_address(addressof(self) + offset).value
95
96
97
class REPARSE_DATA_BUFFER(Structure):
98
    class REPARSE_BUFFER(Union):
99
        _fields_ = (('SymbolicLinkReparseBuffer',
100
                     SYMBOLIC_LINK_REPARSE_BUFFER),
101
                    ('MountPointReparseBuffer',
102
                     MOUNT_POINT_REPARSE_BUFFER),
103
                    ('GenericReparseBuffer',
104
                     GENERIC_REPARSE_BUFFER))
105
    _fields_ = (('ReparseTag', ULONG),
106
                ('ReparseDataLength', USHORT),
107
                ('Reserved', USHORT),
108
                ('ReparseBuffer', REPARSE_BUFFER))
109
    _anonymous_ = ('ReparseBuffer',)
110
111
112
def islink(path):
113
    result = GetFileAttributesW(path)
114
    if result == INVALID_FILE_ATTRIBUTES:
115
        raise WinError()
116
    return bool(result & FILE_ATTRIBUTE_REPARSE_POINT)
117
118
119
def readlink(path):
120
    reparse_point_handle = CreateFileW(path,
121
                                       0,
122
                                       0,
123
                                       None,
124
                                       OPEN_EXISTING,
125
                                       FILE_FLAG_OPEN_REPARSE_POINT |
126
                                       FILE_FLAG_BACKUP_SEMANTICS,
127
                                       None)
128
    if reparse_point_handle == INVALID_HANDLE_VALUE:
129
        raise WinError()
130
    target_buffer = c_buffer(MAXIMUM_REPARSE_DATA_BUFFER_SIZE)
131
    n_bytes_returned = DWORD()
132
    io_result = DeviceIoControl(reparse_point_handle,
133
                                FSCTL_GET_REPARSE_POINT,
134
                                None, 0,
135
                                target_buffer, len(target_buffer),
136
                                byref(n_bytes_returned),
137
                                None)
138
    CloseHandle(reparse_point_handle)
139
    if not io_result:
140
        raise WinError()
141
    rdb = REPARSE_DATA_BUFFER.from_buffer(target_buffer)
142
    if rdb.ReparseTag == IO_REPARSE_TAG_SYMLINK:
143
        return rdb.SymbolicLinkReparseBuffer.PrintName
144
    elif rdb.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT:
145
        return rdb.MountPointReparseBuffer.PrintName
146
    raise ValueError("not a link")
147
148
149
# symlink function from Stackoverflow user Gian Marco Gherardi, 23 Feb 2013
150
# http://stackoverflow.com/a/15043806
151
def symlink(source, link_name):
152
    csl = kernel32.CreateSymbolicLinkW
153
    csl.argtypes = (c_wchar_p, c_wchar_p, c_uint32)
154
    csl.restype = c_ubyte
155
156
    flags = 1 if os.path.isdir(source) else 0
157
158
    if csl(link_name, source, flags) == 0:
159
        raise WinError()
160
161
162
# reimplement copytree, but using the windows-specfic symlink functions
163
def copytree(src, dst, symlinks=False):
164
    names = os.listdir(src)
165
    os.makedirs(dst)
166
    errors = []
167
    for name in names:
168
        srcname = os.path.join(src, name)
169
        dstname = os.path.join(dst, name)
170
        try:
171
            if symlinks and islink(srcname):
172
                linkto = readlink(srcname)
173
                symlink(linkto, dstname)
174
            elif os.path.isdir(srcname):
175
                copytree(srcname, dstname, symlinks)
176
            else:
177
                copy2(srcname, dstname)
178
        except OSError as why:
179
            errors.append((srcname, dstname, str(why)))
180
        # catch the Error from the recursive copytree so that we can
181
        # continue with other files
182
        except Error as err:
183
            errors.extend(err.args[0])
184
    try:
185
        copystat(src, dst)
186
    except OSError as why:
187
        # can't copy file access times on Windows
188
        if why.winerror is None:
189
            errors.extend((src, dst, str(why)))
190
    if errors:
191
        raise Error(errors)
192