Completed
Pull Request — master (#868)
by
unknown
01:04
created

readlink()   B

Complexity

Conditions 5

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 28
rs 8.0894
1
## Functions for identifying and following symlinks in Windows
2
## from user `eryksun` http://stackoverflow.com/a/27979160
3
## 16 Jan 2015
4
## See http://stackoverflow.com/a/15259028 for more information
5
6
from ctypes import *
7
from ctypes.wintypes import *
8
9
kernel32 = WinDLL('kernel32')
10
LPDWORD = POINTER(DWORD)
11
UCHAR = c_ubyte
12
13
GetFileAttributesW = kernel32.GetFileAttributesW
14
GetFileAttributesW.restype = DWORD
15
GetFileAttributesW.argtypes = (LPCWSTR,) #lpFileName In
16
17
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
18
FILE_ATTRIBUTE_REPARSE_POINT = 0x00400
19
20
CreateFileW = kernel32.CreateFileW
21
CreateFileW.restype = HANDLE
22
CreateFileW.argtypes = (LPCWSTR, #lpFileName In
23
                        DWORD,   #dwDesiredAccess In
24
                        DWORD,   #dwShareMode In
25
                        LPVOID,  #lpSecurityAttributes In_opt
26
                        DWORD,   #dwCreationDisposition In
27
                        DWORD,   #dwFlagsAndAttributes In
28
                        HANDLE)  #hTemplateFile In_opt
29
30
CloseHandle = kernel32.CloseHandle
31
CloseHandle.restype = BOOL
32
CloseHandle.argtypes = (HANDLE,) #hObject In
33
34
INVALID_HANDLE_VALUE = HANDLE(-1).value
35
OPEN_EXISTING = 3
36
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
37
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
38
39
DeviceIoControl = kernel32.DeviceIoControl
40
DeviceIoControl.restype = BOOL
41
DeviceIoControl.argtypes = (HANDLE,  #hDevice In
42
                            DWORD,   #dwIoControlCode In
43
                            LPVOID,  #lpInBuffer In_opt
44
                            DWORD,   #nInBufferSize In
45
                            LPVOID,  #lpOutBuffer Out_opt
46
                            DWORD,   #nOutBufferSize In
47
                            LPDWORD, #lpBytesReturned Out_opt
48
                            LPVOID)  #lpOverlapped Inout_opt
49
50
FSCTL_GET_REPARSE_POINT = 0x000900A8
51
IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
52
IO_REPARSE_TAG_SYMLINK = 0xA000000C
53
MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000
54
55
class GENERIC_REPARSE_BUFFER(Structure):
56
    _fields_ = (('DataBuffer', UCHAR * 1),)
57
58
59
class SYMBOLIC_LINK_REPARSE_BUFFER(Structure):
60
    _fields_ = (('SubstituteNameOffset', USHORT),
61
                ('SubstituteNameLength', USHORT),
62
                ('PrintNameOffset', USHORT),
63
                ('PrintNameLength', USHORT),
64
                ('Flags', ULONG),
65
                ('PathBuffer', WCHAR * 1))
66
    @property
67
    def PrintName(self):
68
        arrayt = WCHAR * (self.PrintNameLength // 2)
69
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
70
        return arrayt.from_address(addressof(self) + offset).value
71
72
73
class MOUNT_POINT_REPARSE_BUFFER(Structure):
74
    _fields_ = (('SubstituteNameOffset', USHORT),
75
                ('SubstituteNameLength', USHORT),
76
                ('PrintNameOffset', USHORT),
77
                ('PrintNameLength', USHORT),
78
                ('PathBuffer', WCHAR * 1))
79
    @property
80
    def PrintName(self):
81
        arrayt = WCHAR * (self.PrintNameLength // 2)
82
        offset = type(self).PathBuffer.offset + self.PrintNameOffset
83
        return arrayt.from_address(addressof(self) + offset).value
84
85
86
class REPARSE_DATA_BUFFER(Structure):
87
    class REPARSE_BUFFER(Union):
88
        _fields_ = (('SymbolicLinkReparseBuffer',
89
                        SYMBOLIC_LINK_REPARSE_BUFFER),
90
                    ('MountPointReparseBuffer',
91
                        MOUNT_POINT_REPARSE_BUFFER),
92
                    ('GenericReparseBuffer',
93
                        GENERIC_REPARSE_BUFFER))
94
    _fields_ = (('ReparseTag', ULONG),
95
                ('ReparseDataLength', USHORT),
96
                ('Reserved', USHORT),
97
                ('ReparseBuffer', REPARSE_BUFFER))
98
    _anonymous_ = ('ReparseBuffer',)
99
100
101
def islink(path):
102
    result = GetFileAttributesW(path)
103
    if result == INVALID_FILE_ATTRIBUTES:
104
        raise WinError()
105
    return bool(result & FILE_ATTRIBUTE_REPARSE_POINT)
106
107
def readlink(path):
108
    reparse_point_handle = CreateFileW(path,
109
                                       0,
110
                                       0,
111
                                       None,
112
                                       OPEN_EXISTING,
113
                                       FILE_FLAG_OPEN_REPARSE_POINT |
114
                                       FILE_FLAG_BACKUP_SEMANTICS,
115
                                       None)
116
    if reparse_point_handle == INVALID_HANDLE_VALUE:
117
        raise WinError()
118
    target_buffer = c_buffer(MAXIMUM_REPARSE_DATA_BUFFER_SIZE)
119
    n_bytes_returned = DWORD()
120
    io_result = DeviceIoControl(reparse_point_handle,
121
                                FSCTL_GET_REPARSE_POINT,
122
                                None, 0,
123
                                target_buffer, len(target_buffer),
124
                                byref(n_bytes_returned),
125
                                None)
126
    CloseHandle(reparse_point_handle)
127
    if not io_result:
128
        raise WinError()
129
    rdb = REPARSE_DATA_BUFFER.from_buffer(target_buffer)
130
    if rdb.ReparseTag == IO_REPARSE_TAG_SYMLINK:
131
        return rdb.SymbolicLinkReparseBuffer.PrintName
132
    elif rdb.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT:
133
        return rdb.MountPointReparseBuffer.PrintName
134
    raise ValueError("not a link")
135
136
# symlink function from Stackoverflow user Gian Marco Gherardi, 23 Feb 2013
137
# http://stackoverflow.com/a/15043806
138
def symlink(source, link_name):
139
    csl = ctypes.windll.kernel32.CreateSymbolicLinkW
140
    csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
141
    csl.restype = ctypes.c_ubyte
142
143
    flags = 1 if os.path.isdir(source) else 0
144
145
    if csl(link_name, source, flags) == 0:
146
        raise ctypes.WinError()
147