Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/util/crypto.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
Module for handling symmetric encryption and decryption of short text values (mostly used for
18
encrypted datastore values aka secrets).
19
20
NOTE: In the past, this module used and relied on keyczar, but since keyczar doesn't support
21
Python 3, we moved to cryptography library.
22
23
symmetric_encrypt and symmetric_decrypt functions except values as returned by the AESKey.Encrypt()
24
and AESKey.Decrypt() methods in keyczar. Those functions follow the same approach (AES in CBC mode
25
with SHA1 HMAC signature) as keyczar methods, but they use and rely on primitives and methods from
26
the cryptography library.
27
28
This was done to make the keyczar -> cryptography migration fully backward compatible.
29
30
Eventually, we should  move to Fernet (https://cryptography.io/en/latest/fernet/) recipe for
31
symmetric encryption / decryption, because it offers more robustness and safer defaults (SHA256
32
instead of SHA1, etc.).
33
"""
34
35
from __future__ import absolute_import
36
37
import os
38
import json
39
import binascii
40
import base64
41
42
from hashlib import sha1
43
44
import six
45
46
from cryptography.hazmat.primitives.ciphers import Cipher
47
from cryptography.hazmat.primitives.ciphers import algorithms
48
from cryptography.hazmat.primitives.ciphers import modes
49
from cryptography.hazmat.primitives import hashes
50
from cryptography.hazmat.primitives import hmac
51
from cryptography.hazmat.backends import default_backend
52
53
__all__ = [
54
    'KEYCZAR_HEADER_SIZE',
55
    'KEYCZAR_AES_BLOCK_SIZE',
56
    'KEYCZAR_HLEN',
57
58
    'read_crypto_key',
59
60
    'symmetric_encrypt',
61
    'symmetric_decrypt',
62
63
    'cryptography_symmetric_encrypt',
64
    'cryptography_symmetric_decrypt',
65
66
    # NOTE: Keyczar functions are here for testing reasons - they are only used by tests
67
    'keyczar_symmetric_encrypt',
68
    'keyczar_symmetric_decrypt',
69
70
    'AESKey'
71
]
72
73
# Keyczar related constants
74
KEYCZAR_HEADER_SIZE = 5
75
KEYCZAR_AES_BLOCK_SIZE = 16
76
KEYCZAR_HLEN = sha1().digest_size
77
78
# Minimum key size which can be used for symmetric crypto
79
MINIMUM_AES_KEY_SIZE = 128
80
81
DEFAULT_AES_KEY_SIZE = 256
82
83
assert DEFAULT_AES_KEY_SIZE >= MINIMUM_AES_KEY_SIZE
84
85
86
class AESKey(object):
87
    """
88
    Class representing AES key object.
89
    """
90
91
    aes_key_string = None
92
    hmac_key_string = None
93
    hmac_key_size = None
94
    mode = None
95
    size = None
96
97
    def __init__(self, aes_key_string, hmac_key_string, hmac_key_size, mode='CBC',
98
                 size=DEFAULT_AES_KEY_SIZE):
99
        if mode not in ['CBC']:
100
            raise ValueError('Unsupported mode: %s' % (mode))
101
102
        if size < MINIMUM_AES_KEY_SIZE:
103
            raise ValueError('Unsafe key size: %s' % (size))
104
105
        self.aes_key_string = aes_key_string
106
        self.hmac_key_string = hmac_key_string
107
        self.hmac_key_size = int(hmac_key_size)
108
        self.mode = mode.upper()
109
        self.size = int(size)
110
111
        # We also store bytes version of the key since bytes are needed by encrypt and decrypt
112
        # methods
113
        self.hmac_key_bytes = Base64WSDecode(self.hmac_key_string)
114
        self.aes_key_bytes = Base64WSDecode(self.aes_key_string)
115
116
    @classmethod
117
    def generate(self, key_size=DEFAULT_AES_KEY_SIZE):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method generate should be named cls.
Loading history...
118
        """
119
        Generate a new AES key with the corresponding HMAC key.
120
121
        :rtype: :class:`AESKey`
122
        """
123
        if key_size < MINIMUM_AES_KEY_SIZE:
124
            raise ValueError('Unsafe key size: %s' % (key_size))
125
126
        aes_key_bytes = os.urandom(int(key_size / 8))
127
        aes_key_string = Base64WSEncode(aes_key_bytes)
128
129
        hmac_key_bytes = os.urandom(int(key_size / 8))
130
        hmac_key_string = Base64WSEncode(hmac_key_bytes)
131
132
        return AESKey(aes_key_string=aes_key_string, hmac_key_string=hmac_key_string,
133
                      hmac_key_size=key_size, mode='CBC', size=key_size)
134
135
    def to_json(self):
136
        """
137
        Return JSON representation of this key which is fully compatible with keyczar JSON key
138
        file format.
139
140
        :rtype: ``str``
141
        """
142
        data = {
143
            'hmacKey': {
144
                'hmacKeyString': self.hmac_key_string,
145
                'size': self.hmac_key_size
146
            },
147
            'aesKeyString': self.aes_key_string,
148
            'mode': self.mode.upper(),
149
            'size': int(self.size)
150
        }
151
        return json.dumps(data)
152
153
    def __repr__(self):
154
        return ('<AESKey hmac_key_size=%s,mode=%s,size=%s>' % (self.hmac_key_size, self.mode,
155
                                                               self.size))
156
157
158
def read_crypto_key(key_path):
159
    """
160
    Read crypto key from keyczar JSON key file format and return parsed AESKey object.
161
162
    :param key_path: Absolute path to file containing crypto key in Keyczar JSON format.
163
    :type key_path: ``str``
164
165
    :rtype: :class:`AESKey`
166
    """
167
    with open(key_path, 'r') as fp:
168
        content = fp.read()
169
170
    content = json.loads(content)
171
172
    try:
173
        aes_key = AESKey(aes_key_string=content['aesKeyString'],
174
                         hmac_key_string=content['hmacKey']['hmacKeyString'],
175
                         hmac_key_size=content['hmacKey']['size'],
176
                         mode=content['mode'].upper(),
177
                         size=content['size'])
178
    except KeyError as e:
179
        msg = 'Invalid or malformed key file "%s": %s' % (key_path, str(e))
180
        raise KeyError(msg)
181
182
    return aes_key
183
184
185
def symmetric_encrypt(encrypt_key, plaintext):
186
    return cryptography_symmetric_encrypt(encrypt_key=encrypt_key, plaintext=plaintext)
187
188
189
def symmetric_decrypt(decrypt_key, ciphertext):
190
    return cryptography_symmetric_decrypt(decrypt_key=decrypt_key, ciphertext=ciphertext)
191
192
193
def cryptography_symmetric_encrypt(encrypt_key, plaintext):
194
    """
195
    Encrypt the provided plaintext using AES encryption.
196
197
    NOTE 1: This function return a string which is fully compatible with Keyczar.Encrypt() method.
198
199
    NOTE 2: This function is loosely based on keyczar AESKey.Encrypt() (Apache 2.0 license).
200
201
    The final encrypted string value consists of:
202
203
    [message bytes][HMAC signature bytes for the message] where message consists of
204
    [keyczar header plaintext][IV bytes][ciphertext bytes]
205
206
    NOTE: Header itself is unused, but it's added so the format is compatible with keyczar format.
207
208
    """
209
    assert isinstance(encrypt_key, AESKey), 'encrypt_key needs to be AESKey class instance'
210
    assert isinstance(plaintext, (six.text_type, six.string_types, six.binary_type)), \
211
        'plaintext needs to either be a string/unicode or bytes'
212
213
    aes_key_bytes = encrypt_key.aes_key_bytes
214
    hmac_key_bytes = encrypt_key.hmac_key_bytes
215
216
    assert isinstance(aes_key_bytes, six.binary_type)
217
    assert isinstance(hmac_key_bytes, six.binary_type)
218
219
    # Pad data
220
    data = pkcs5_pad(plaintext)
221
222
    # Generate IV
223
    iv_bytes = os.urandom(KEYCZAR_AES_BLOCK_SIZE)
224
225
    backend = default_backend()
226
    cipher = Cipher(algorithms.AES(aes_key_bytes), modes.CBC(iv_bytes), backend=backend)
227
    encryptor = cipher.encryptor()
228
229
    # NOTE: We don't care about actual Keyczar header value, we only care about the length (5
230
    # bytes) so we simply add 5 0's
231
    header_bytes = b'00000'
232
233
    if isinstance(data, (six.text_type, six.string_types)):
234
        # Convert data to bytes
235
        data = data.encode('utf-8')
236
237
    ciphertext_bytes = encryptor.update(data) + encryptor.finalize()
238
    msg_bytes = header_bytes + iv_bytes + ciphertext_bytes
239
240
    # Generate HMAC signature for the message (header + IV + ciphertext)
241
    h = hmac.HMAC(hmac_key_bytes, hashes.SHA1(), backend=backend)
242
    h.update(msg_bytes)
243
    sig_bytes = h.finalize()
244
245
    result = msg_bytes + sig_bytes
246
247
    # Convert resulting byte string to hex notation ASCII string
248
    result = binascii.hexlify(result).upper()
249
250
    return result
251
252
253
def cryptography_symmetric_decrypt(decrypt_key, ciphertext):
254
    """
255
    Decrypt the provided ciphertext which has been encrypted using symmetric_encrypt() method (it
256
    assumes input is in hex notation as returned by binascii.hexlify).
257
258
    NOTE 1: This function assumes ciphertext has been encrypted using symmetric AES crypto from
259
    keyczar library. Underneath it uses crypto primitives from cryptography library which is Python
260
    3 compatible.
261
262
    NOTE 2: This function is loosely based on keyczar AESKey.Decrypt() (Apache 2.0 license).
263
    """
264
    assert isinstance(decrypt_key, AESKey), 'decrypt_key needs to be AESKey class instance'
265
    assert isinstance(ciphertext, (six.text_type, six.string_types, six.binary_type)), \
266
        'ciphertext needs to either be a string/unicode or bytes'
267
268
    aes_key_bytes = decrypt_key.aes_key_bytes
269
    hmac_key_bytes = decrypt_key.hmac_key_bytes
270
271
    assert isinstance(aes_key_bytes, six.binary_type)
272
    assert isinstance(hmac_key_bytes, six.binary_type)
273
274
    # Convert from hex notation ASCII string to bytes
275
    ciphertext = binascii.unhexlify(ciphertext)
276
277
    data_bytes = ciphertext[KEYCZAR_HEADER_SIZE:]  # remove header
278
279
    # Verify ciphertext contains IV + HMAC signature
280
    if len(data_bytes) < (KEYCZAR_AES_BLOCK_SIZE + KEYCZAR_HLEN):
281
        raise ValueError('Invalid or malformed ciphertext (too short)')
282
283
    iv_bytes = data_bytes[:KEYCZAR_AES_BLOCK_SIZE]  # first block is IV
284
    ciphertext_bytes = data_bytes[KEYCZAR_AES_BLOCK_SIZE:-KEYCZAR_HLEN]  # strip IV and signature
285
    signature_bytes = data_bytes[-KEYCZAR_HLEN:]  # last 20 bytes are signature
286
287
    # Verify HMAC signature
288
    backend = default_backend()
289
    h = hmac.HMAC(hmac_key_bytes, hashes.SHA1(), backend=backend)
290
    h.update(ciphertext[:-KEYCZAR_HLEN])
291
    h.verify(signature_bytes)
292
293
    # Decrypt ciphertext
294
    cipher = Cipher(algorithms.AES(aes_key_bytes), modes.CBC(iv_bytes), backend=backend)
295
296
    decryptor = cipher.decryptor()
297
    decrypted = decryptor.update(ciphertext_bytes) + decryptor.finalize()
298
299
    # Unpad
300
    decrypted = pkcs5_unpad(decrypted)
301
    return decrypted
302
303
###
304
# NOTE: Those methods below are deprecated and only used for testing purposes
305
##
306
307
308
def keyczar_symmetric_encrypt(encrypt_key, plaintext):
309
    """
310
    Encrypt the given message using the encrypt_key. Returns a UTF-8 str
311
    ready to be stored in database. Note that we convert the hex notation
312
    to a ASCII notation to produce a UTF-8 friendly string.
313
314
    Also, this method will not return the same output on multiple invocations
315
    of same method. The reason is that the Encrypt method uses a different
316
    'Initialization Vector' per run and the IV is part of the output.
317
318
    :param encrypt_key: Symmetric AES key to use for encryption.
319
    :type encrypt_key: :class:`AESKey`
320
321
    :param plaintext: Plaintext / message to be encrypted.
322
    :type plaintext: ``str``
323
324
    :rtype: ``str``
325
    """
326
    from keyczar.keys import AesKey as KeyczarAesKey
327
    from keyczar.keys import HmacKey as KeyczarHmacKey
328
    from keyczar.keyinfo import GetMode
329
330
    encrypt_key = KeyczarAesKey(encrypt_key.aes_key_string,
331
                                KeyczarHmacKey(encrypt_key.hmac_key_string,
332
                                               encrypt_key.hmac_key_size),
333
                                encrypt_key.size,
334
                                GetMode(encrypt_key.mode))
335
336
    return binascii.hexlify(encrypt_key.Encrypt(plaintext)).upper()
337
338
339
def keyczar_symmetric_decrypt(decrypt_key, ciphertext):
340
    """
341
    Decrypt the given crypto text into plain text. Returns the original
342
    string input. Note that we first convert the string to hex notation
343
    and then decrypt. This is reverse of the encrypt operation.
344
345
    :param decrypt_key: Symmetric AES key to use for decryption.
346
    :type decrypt_key: :class:`keyczar.keys.AESKey`
347
348
    :param crypto: Crypto text to be decrypted.
349
    :type crypto: ``str``
350
351
    :rtype: ``str``
352
    """
353
    from keyczar.keys import AesKey as KeyczarAesKey
354
    from keyczar.keys import HmacKey as KeyczarHmacKey
355
    from keyczar.keyinfo import GetMode
356
357
    decrypt_key = KeyczarAesKey(decrypt_key.aes_key_string,
358
                                KeyczarHmacKey(decrypt_key.hmac_key_string,
359
                                               decrypt_key.hmac_key_size),
360
                                decrypt_key.size,
361
                                GetMode(decrypt_key.mode))
362
363
    return decrypt_key.Decrypt(binascii.unhexlify(ciphertext))
364
365
366
def pkcs5_pad(data):
367
    """
368
    Pad data using PKCS5
369
    """
370
    pad = KEYCZAR_AES_BLOCK_SIZE - len(data) % KEYCZAR_AES_BLOCK_SIZE
371
    data = data + pad * chr(pad)
372
    return data
373
374
375
def pkcs5_unpad(data):
376
    """
377
    Unpad data padded using PKCS5.
378
    """
379
    if isinstance(data, six.binary_type):
380
        # Make sure we are operating with a string type
381
        data = data.decode('utf-8')
382
383
    pad = ord(data[-1])
384
    data = data[:-pad]
385
    return data
386
387
388
def Base64WSEncode(s):
389
    """
390
    Return Base64 web safe encoding of s. Suppress padding characters (=).
391
392
    Uses URL-safe alphabet: - replaces +, _ replaces /. Will convert s of type
393
    unicode to string type first.
394
395
    @param s: string to encode as Base64
396
    @type s: string
397
398
    @return: Base64 representation of s.
399
    @rtype: string
400
401
    NOTE: Taken from keyczar (Apache 2.0 license)
402
    """
403
    if isinstance(s, six.text_type):
404
        # Make sure input string is always converted to bytes (if not already)
405
        s = s.encode('utf-8')
406
407
    return base64.urlsafe_b64encode(s).decode('utf-8').replace("=", "")
408
409
410
def Base64WSDecode(s):
411
    """
412
    Return decoded version of given Base64 string. Ignore whitespace.
413
414
    Uses URL-safe alphabet: - replaces +, _ replaces /. Will convert s of type
415
    unicode to string type first.
416
417
    @param s: Base64 string to decode
418
    @type s: string
419
420
    @return: original string that was encoded as Base64
421
    @rtype: string
422
423
    @raise Base64DecodingError: If length of string (ignoring whitespace) is one
424
      more than a multiple of four.
425
426
    NOTE: Taken from keyczar (Apache 2.0 license)
427
    """
428
    s = ''.join(s.splitlines())
429
    s = str(s.replace(" ", ""))  # kill whitespace, make string (not unicode)
430
    d = len(s) % 4
431
432
    if d == 1:
433
        raise ValueError('Base64 decoding errors')
434
    elif d == 2:
435
        s += "=="
436
    elif d == 3:
437
        s += "="
438
439
    try:
440
        return base64.urlsafe_b64decode(s)
441
    except TypeError as e:
442
        # Decoding raises TypeError if s contains invalid characters.
443
        raise ValueError('Base64 decoding error: %s' % (str(e)))
444