fuzzer.rest_fuzzer.fuzz.schema_to_object_builder()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 21
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 21
rs 8
c 0
b 0
f 0
cc 7
nop 2
1
import random
2
import requests
3
from fuzzer.primitive_fuzzer.fuzz import random_integers, random_ascii_chars, random_float
4
5
6
MUTATION_LIMIT = 0.1
7
8
9
def build_one_of_type(item_type, p=1.0):
10
    empty_values_for_types = {
11
        "str": random_ascii_chars,
12
        "int": random_integers,
13
        "float": random_float,
14
        "list": lambda: [],
15
        "dict": lambda: {},
16
        "null": lambda: None
17
    }
18
    basic_types = ['str', 'int', 'float', 'list', 'dict', 'null']
19
    return empty_values_for_types[item_type] \
20
        if MUTATION_LIMIT < p \
21
        else empty_values_for_types[random.choice(basic_types)]
22
23
24
def schema_to_object_builder(schema_obj, p=1.0):
25
    mutations, schema = schema_obj
26
    mutation = mutations if isinstance(mutations, float) else mutations[0]
27
28
    type_of_object = schema['type'] if isinstance(schema, dict) else "null"
29
    root_object = build_one_of_type(type_of_object, p)()
30
31
    if not isinstance(schema, dict):
32
        return root_object
33
34
    if isinstance(root_object, list):
35
        root_object.append(schema_to_object_builder(
36
            schema.get('inner', {}),
37
            p=schema.get('inner', {})[0]
38
        ))
39
40
    elif isinstance(root_object, dict):
41
        for prop in schema['inner'][1]:
42
            root_object[prop['name']] = schema_to_object_builder((schema['inner'][0], prop), p=mutation)
43
44
    return root_object
45
46
47
def api(host, port, api_object, body_schema):
48
    """
49
    api_object = {
50
        "url": "/some/path",
51
        "method": "POST",
52
        "tests": 100,
53
        "body": {}
54
    }
55
    """
56
    url = '{host}:{port}{url}'.format(host=host, port=port, url=api_object['url'])
57
58
    if api_object['method'] in ['POST', 'PUT', 'PATCH']:
59
        request_body = schema_to_object_builder(body_schema)
60
        return request_body, requests.request(
61
            method=api_object['method'],
62
            url=url,
63
            json=request_body
64
        )
65
66
    elif api_object['method'] in ['GET', 'DELETE']:
67
        return None, requests.request(
68
            method=api_object['method'],
69
            url=url
70
        )
71
72
73
def api_nx(req_spec):
74
    host = req_spec.get('host')
75
    port = req_spec.get('port')
76
    api_object = req_spec.get('req_body')
77
    body_schema = req_spec.get('req_body_schema')
78
    tests = api_object.get('tests', 1000)
79
80
    for _ in range(tests):
81
        with open('fuzz.log', 'a+') as f:
82
            request_body, r = api(host, port, api_object, body_schema)
83
            f.write(
84
                "url: {}\nrequest_body: {}\nresponse: {}\nstatus_code: {}\n\n".format(
85
                    api_object['url'], request_body,  r.text, r.status_code
86
                )
87
            )
88