Completed
Push — master ( 3041fb...0ec14e )
by Dmitry
03:31
created

Executor::processQueue()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 5.024

Importance

Changes 0
Metric Value
dl 0
loc 19
ccs 6
cts 10
cp 0.6
rs 9.6333
c 0
b 0
f 0
cc 4
nc 4
nop 0
crap 5.024
1
<?php
2
3
namespace Basis;
4
5
use Basis\Procedure\JobQueue\Cleanup;
6
use Basis\Procedure\JobQueue\Take;
7
use Basis\Procedure\JobResult\Foreign;
8
use Exception;
9
use Tarantool\Mapper\Entity;
10
use Tarantool\Mapper\Plugin\Procedure;
11
use Tarantool\Mapper\Procedure\FindOrCreate;
12
use Tarantool\Mapper\Repository;
13
14
class Executor
15
{
16
    use Toolkit;
17
18 3
    public function normalize(array $request): array
19
    {
20 3
        if (!array_key_exists('job', $request) || !$request['job']) {
21
            throw new Exception("No job defined");
22
        }
23
24 3
        if (!array_key_exists('service', $request) || !$request['service']) {
25 3
            if ($this->get(Runner::class)->hasJob($request['job'])) {
26 3
                $request['service'] = $this->getServiceName();
27
            } else {
28
                $request['service'] = explode('.', $request['job'])[0];
29
            }
30
        }
31
32 3
        if (!array_key_exists('params', $request)) {
33
            $request['params'] = [];
34
        } else {
35 3
            $request['params'] = $this->get(Converter::class)->toArray($request['params']);
36
        }
37
38 3
        $context = $this->get(Context::class)->toArray();
39 3
        $jobContext = $this->findOrCreate('job_context', [
40 3
            'hash' => md5(json_encode($context))
41
        ], [
42 3
            'hash' => md5(json_encode($context)),
43 3
            'context' => $context,
44
        ]);
45
46 3
        $request['context'] = $jobContext->id;
47
48 3
        $request['hash'] = md5(json_encode([
49 3
            $request['service'],
50 3
            $request['job'],
51 3
            $request['params'],
52
        ]));
53
54 3
        return $request;
55
    }
56
57 3
    public function initRequest($request)
58
    {
59 3
        $request = $this->normalize($request);
60 3
        $request['status'] = 'new';
61
62
        $params = [
63 3
            'status' => $request['status'],
64 3
            'hash' => $request['hash'],
65
        ];
66
67 3
        return $this->findOrCreate('job_queue', $params, $request);
68
    }
69
70 2
    public function send(string $job, array $params = [], string $service = null)
71
    {
72 2
        $this->initRequest(compact('job', 'params', 'service'));
73 2
    }
74
75 1
    public function dispatch(string $job, array $params = [], string $service = null)
76
    {
77 1
        $recipient = $this->getServiceName();
78 1
        $request = compact('job', 'params', 'service', 'recipient');
79 1
        $request = $this->normalize($request);
80
81 1
        $result = $this->findOne('job_result', [
82 1
            'service' => $this->getServiceName(),
83 1
            'hash' => $request['hash'],
84
        ]);
85 1
        if ($result) {
86
            if ($result->expire && $result->expire < time()) {
87
                $this->getMapper()->remove($result);
88
            } else {
89
                return $result->data;
90
            }
91
        }
92
93 1
        $this->initRequest($request);
94 1
        return $this->getResult($request['hash']);
95
    }
96
97 2
    public function process()
98
    {
99 2
        $this->transferResult();
100 2
        return $this->processQueue();
101
    }
102
103 3
    public function processQueue()
104
    {
105 3
        $tuple = $this->get(Take::class)();
106 3
        if (!$tuple) {
107
            return;
108
        }
109
110 3
        $request = $this->getRepository('job_queue')->getInstance($tuple);
111
112 3
        if ($request->service != $this->getServiceName()) {
113
            try {
114
                return $this->transferRequest($request);
115
            } catch (Exception $e) {
116
                return $this->processRequest($request);
117
            }
118
        } 
119
120 3
        return $this->processRequest($request);
121
    }
122
123
    protected function transferRequest(Entity $request)
124
    {
125
        $context = $request->getContext();
126
        $remoteContext = $this->findOrCreate("$request->service.job_context", [
0 ignored issues
show
Bug introduced by
The property service does not seem to exist in Tarantool\Mapper\Entity.

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
127
            'hash' => $context->hash
128
        ], [
129
            'hash' => $context->hash,
130
            'context' => $context->context,
131
        ]);
132
133
        $template = $this->get(Converter::class)->toArray($request);
134
        $template['context'] = $remoteContext->id;
135
        $template['status'] = 'new';
136
        unset($template['id']);
137
138
        $params = [
139
            'hash' => $template['hash'],
140
            'status' => $template['status'],
141
        ];
142
143
        $this->findOrCreate("$request->service.job_queue", $params, $template);
144
145
        $request->status = 'transfered';
0 ignored issues
show
Bug introduced by
The property status does not seem to exist in Tarantool\Mapper\Entity.

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
146
        $request->save();
147
148
        return $request;
149
    }
150
151 3
    public function processRequest($request)
152
    {
153 3
        $context = $this->get(Context::class);
154 3
        $backup = $context->toArray();
155 3
        $context->reset($request->getContext()->context);
156
157 3
        $runner = $this->get(Runner::class);
158 3
        if ($request->service != $this->getServiceName()) {
159
            $runner = $this->get(Dispatcher::class);
160
        }
161 3
        $result = $runner->dispatch($request->job, $request->params, $request->service);
162
163 3
        $context->reset($backup);
164
165 3
        if ($request->recipient) {
166 2
            $this->findOrCreate('job_result', [
167 2
                'service' => $request->recipient,
168 2
                'hash' => $request->hash,
169
            ], [
170 2
                'service' => $request->recipient,
171 2
                'hash' => $request->hash,
172 2
                'data' => $this->get(Converter::class)->toArray($result),
173 2
                'expire' => property_exists($result, 'expire') ? $result->expire : 0,
174
            ]);
175
        }
176
177 3
        return $this->getMapper()->remove($request);
178
    }
179
180 2
    protected function transferResult()
181
    {
182 2
        $remote = $this->get(Foreign::class)($this->getServiceName());
183 2
        if (count($remote)) {
184
            $group = [];
185
            foreach ($remote as $tuple) {
186
                $result = $this->getRepository('job_result')->getInstance($tuple);
187
                if (!array_key_exists($result->service, $group)) {
188
                    $group[$result->service] = [];
189
                }
190
                $group[$result->service][] = $result;
191
            }
192
            foreach ($group as $service => $results) {
193
                foreach ($results as $result) {
194
                    $this->findOrCreate("$service.job_result", [
195
                        'service' => $result->service,
196
                        'hash' => $result->hash,
197
                    ], [
198
                        'service' => $result->service,
199
                        'hash' => $result->hash,
200
                        'data' => $result->data,
201
                        'expire' => $result->expire,
202
                    ]);
203
                    $this->getMapper()->remove($result);
204
                }
205
206
                $this->getRepository("$result->service.job_queue")
0 ignored issues
show
Bug introduced by
The variable $result does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
207
                    ->getMapper()
208
                    ->getPlugin(Procedure::class)
209
                    ->get(Cleanup::class)($result->service);
210
            }
211
        }
212 2
    }
213
214 2
    public function getResult($hash)
215
    {
216 2
        $result = $this->findOne('job_result', [
217 2
            'service' => $this->getServiceName(),
218 2
            'hash' => $hash,
219
        ]);
220
221 2
        if (!$result) {
222 1
            if (!$this->processQueue()) {
223
                $request = $this->findOne('job_queue', [
224
                    'status' => 'transfered',
225
                    'hash' => $hash,
226
                ]);
227
                if ($request && $request->service) {
228
                    $this->get(Dispatcher::class)
229
                        ->dispatch('module.execute', [], $request->service);
230
                } else {
231
                    usleep(50000); // 50 milliseconds sleep
232
                }
233
            }
234 1
            $this->getRepository('job_result')->flushCache();
235 1
            return $this->getResult($hash);
236
        }
237
238 2
        return $this->get(Converter::class)->toObject($result->data);
239
    }
240
241 3
    public function getServiceName()
242
    {
243 3
        return $this->get(Service::class)->getName();
244
    }
245
}
246