Completed
Pull Request — master (#868)
by
unknown
56s
created

readlink()   B

Complexity

Conditions 5

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 28
rs 8.0894
1
# Functions for identifying and following symlinks in Windows
2
# from user `eryksun` http://stackoverflow.com/a/27979160
3
# 16 Jan 2015
4
# See http://stackoverflow.com/a/15259028 for more information
5
6
import os
7
from shutil import copystat, copy2
8
9
from ctypes import *
10
from ctypes.wintypes import *
11
12
kernel32 = WinDLL('kernel32')
13
LPDWORD = POINTER(DWORD)
14
UCHAR = c_ubyte
15
16
GetFileAttributesW = kernel32.GetFileAttributesW
17
GetFileAttributesW.restype = DWORD
18
GetFileAttributesW.argtypes = (LPCWSTR,) #lpFileName In
19
20
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
21
FILE_ATTRIBUTE_REPARSE_POINT = 0x00400
22
23
CreateFileW = kernel32.CreateFileW
24
CreateFileW.restype = HANDLE
25
CreateFileW.argtypes = (LPCWSTR, #lpFileName In
26
                        DWORD,   #dwDesiredAccess In
27
                        DWORD,   #dwShareMode In
28
                        LPVOID,  #lpSecurityAttributes In_opt
29
                        DWORD,   #dwCreationDisposition In
30
                        DWORD,   #dwFlagsAndAttributes In
31
                        HANDLE)  #hTemplateFile In_opt
32
33
CloseHandle = kernel32.CloseHandle
34
CloseHandle.restype = BOOL
35
CloseHandle.argtypes = (HANDLE,) #hObject In
36
37
INVALID_HANDLE_VALUE = HANDLE(-1).value
38
OPEN_EXISTING = 3
39
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
40
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
41
42
DeviceIoControl = kernel32.DeviceIoControl
43
DeviceIoControl.restype = BOOL
44
DeviceIoControl.argtypes = (HANDLE,  #hDevice In
45
                            DWORD,   #dwIoControlCode In
46
                            LPVOID,  #lpInBuffer In_opt
47
                            DWORD,   #nInBufferSize In
48
                            LPVOID,  #lpOutBuffer Out_opt
49
                            DWORD,   #nOutBufferSize In
50
                            LPDWORD, #lpBytesReturned Out_opt
51
                            LPVOID)  #lpOverlapped Inout_opt
52
53
FSCTL_GET_REPARSE_POINT = 0x000900A8
54
IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
55
IO_REPARSE_TAG_SYMLINK = 0xA000000C
56
MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000
57
58
class GENERIC_REPARSE_BUFFER(Structure):
59
    _fields_ = (('DataBuffer', UCHAR * 1),)
60
61
62
class SYMBOLIC_LINK_REPARSE_BUFFER(Structure):
63
    _fields_ = (('SubstituteNameOffset', USHORT),
64
                ('SubstituteNameLength', USHORT),
65
                ('PrintNameOffset', USHORT),
66
                ('PrintNameLength', USHORT),
67
                ('Flags', ULONG),
68
                ('PathBuffer', WCHAR * 1))
69
    @property
70
    def PrintName(self):
71
        arrayt = WCHAR * (self.PrintNameLength // 2)
72
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
73
        return arrayt.from_address(addressof(self) + offset).value
74
75
76
class MOUNT_POINT_REPARSE_BUFFER(Structure):
77
    _fields_ = (('SubstituteNameOffset', USHORT),
78
                ('SubstituteNameLength', USHORT),
79
                ('PrintNameOffset', USHORT),
80
                ('PrintNameLength', USHORT),
81
                ('PathBuffer', WCHAR * 1))
82
    @property
83
    def PrintName(self):
84
        arrayt = WCHAR * (self.PrintNameLength // 2)
85
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
86
        return arrayt.from_address(addressof(self) + offset).value
87
88
89
class REPARSE_DATA_BUFFER(Structure):
90
    class REPARSE_BUFFER(Union):
91
        _fields_ = (('SymbolicLinkReparseBuffer',
92
                        SYMBOLIC_LINK_REPARSE_BUFFER),
93
                    ('MountPointReparseBuffer',
94
                        MOUNT_POINT_REPARSE_BUFFER),
95
                    ('GenericReparseBuffer',
96
                        GENERIC_REPARSE_BUFFER))
97
    _fields_ = (('ReparseTag', ULONG),
98
                ('ReparseDataLength', USHORT),
99
                ('Reserved', USHORT),
100
                ('ReparseBuffer', REPARSE_BUFFER))
101
    _anonymous_ = ('ReparseBuffer',)
102
103
104
def islink(path):
105
    result = GetFileAttributesW(path)
106
    if result == INVALID_FILE_ATTRIBUTES:
107
        raise WinError()
108
    return bool(result & FILE_ATTRIBUTE_REPARSE_POINT)
109
110
def readlink(path):
111
    reparse_point_handle = CreateFileW(path,
112
                                       0,
113
                                       0,
114
                                       None,
115
                                       OPEN_EXISTING,
116
                                       FILE_FLAG_OPEN_REPARSE_POINT |
117
                                       FILE_FLAG_BACKUP_SEMANTICS,
118
                                       None)
119
    if reparse_point_handle == INVALID_HANDLE_VALUE:
120
        raise WinError()
121
    target_buffer = c_buffer(MAXIMUM_REPARSE_DATA_BUFFER_SIZE)
122
    n_bytes_returned = DWORD()
123
    io_result = DeviceIoControl(reparse_point_handle,
124
                                FSCTL_GET_REPARSE_POINT,
125
                                None, 0,
126
                                target_buffer, len(target_buffer),
127
                                byref(n_bytes_returned),
128
                                None)
129
    CloseHandle(reparse_point_handle)
130
    if not io_result:
131
        raise WinError()
132
    rdb = REPARSE_DATA_BUFFER.from_buffer(target_buffer)
133
    if rdb.ReparseTag == IO_REPARSE_TAG_SYMLINK:
134
        return rdb.SymbolicLinkReparseBuffer.PrintName
135
    elif rdb.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT:
136
        return rdb.MountPointReparseBuffer.PrintName
137
    raise ValueError("not a link")
138
139
# symlink function from Stackoverflow user Gian Marco Gherardi, 23 Feb 2013
140
# http://stackoverflow.com/a/15043806
141
def symlink(source, link_name):
142
    csl = kernel32.CreateSymbolicLinkW
143
    csl.argtypes = (c_wchar_p, c_wchar_p, c_uint32)
144
    csl.restype = c_ubyte
145
146
    flags = 1 if os.path.isdir(source) else 0
147
148
    if csl(link_name, source, flags) == 0:
149
        raise WinError()
150
151
# reimplement copytree, but using the windows-specfic symlink functions
152
def copytree(src, dst, symlinks=False):
153
    names = os.listdir(src)
154
    os.makedirs(dst)
155
    errors = []
156
    for name in names:
157
        srcname = os.path.join(src, name)
158
        dstname = os.path.join(dst, name)
159
        try:
160
            if symlinks and islink(srcname):
161
                linkto = readlink(srcname)
162
                symlink(linkto, dstname)
163
            elif os.path.isdir(srcname):
164
                copytree(srcname, dstname, symlinks)
165
            else:
166
                copy2(srcname, dstname)
167
        except OSError as why:
168
            errors.append((srcname, dstname, str(why)))
169
        # catch the Error from the recursive copytree so that we can
170
        # continue with other files
171
        except Error as err:
172
            errors.extend(err.args[0])
173
    try:
174
        copystat(src, dst)
175
    except OSError as why:
176
        # can't copy file access times on Windows
177
        if why.winerror is None:
178
            errors.extend((src, dst, str(why)))
179
    if errors:
180
        raise Error(errors)
181