Completed
Push — master ( 3f30e7...37b632 )
by Dmitry
08:21
created

Executor::process()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 5.0214

Importance

Changes 0
Metric Value
dl 0
loc 36
ccs 19
cts 21
cp 0.9048
rs 9.0328
c 0
b 0
f 0
cc 5
nc 4
nop 0
crap 5.0214
1
<?php
2
3
namespace Basis;
4
5
use Basis\Procedure\JobQueue\Cast;
6
use Basis\Procedure\JobQueue\Take;
7
use Exception;
8
9
class Executor
10
{
11
    use Toolkit;
12
13 3
    public function request($request)
14
    {
15 3
        if (!array_key_exists('job', $request) || !$request['job']) {
16
            throw new Exception("No job defined");
17
        }
18
19 3
        if (!array_key_exists('service', $request) || !$request['service']) {
20 3
            if ($this->get(Runner::class)->hasJob($request['job'])) {
21 3
                $request['service'] = $this->getServiceName();
22
            } else {
23
                $request['service'] = explode('.', $request['job'])[0];
24
            }
25
        }
26
27 3
        if (!array_key_exists('params', $request)) {
28
            $request['params'] = [];
29
        } else {
30 3
            $request['params'] = $this->get(Converter::class)->toArray($request['params']);
31
        }
32
33 3
        $context = $this->get(Context::class)->toArray();
34 3
        $jobContext = $this->findOrCreate('job_context', [
35 3
            'hash' => md5(json_encode($context))
36
        ]);
37 3
        if (!$jobContext->context) {
38 3
            $jobContext->context = $context;
39 3
            $jobContext->save();
40
        }
41
42 3
        $request['context'] = $jobContext->id;
43
44 3
        $request['hash'] = md5(json_encode([
45 3
            $request['service'],
46 3
            $request['job'],
47 3
            $request['params'],
48
        ]));
49
50
        // ready
51 3
        $request['status'] = 'r';
52
53 3
        $tuple = [];
54 3
        foreach ($this->getRepository('job_queue')->getSpace()->getTupleMap() as $key => $index) {
55 3
            if (array_key_exists($key, $request)) {
56 3
                $tuple[$index] = $request[$key];
57
            }
58
        }
59
60 3
        $casting = $this->get(Cast::class)($request['hash'], $tuple);
61
62 3
        $this->getRepository('job_queue')->flushCache();
63 3
        return $this->getRepository('job_queue')->getInstance($casting['tuple']);
64
    }
65
66 2
    public function send(string $job, array $params = [], string $service = null)
67
    {
68 2
        $this->request(compact('job', 'params', 'service'));
69 2
    }
70
71 1
    public function dispatch(string $job, array $params = [], string $service = null)
72
    {
73 1
        $recipient = $this->getServiceName();
74 1
        $request = $this->request(compact('job', 'params', 'service', 'recipient'));
75
76 1
        return $this->result($request->hash);
77
    }
78
79 3
    public function process()
80
    {
81 3
        $tuple = $this->get(Take::class)();
82 3
        if (!$tuple) {
83
            return;
84
        }
85
86 3
        $request = $this->getRepository('job_queue')->getInstance($tuple);
87
88 3
        if ($request->service != $this->getServiceName()) {
89
            // move to service queue
90
            throw new Exception("Error Processing Request", 1);
91
92
        } else {
93 3
            $context = $this->get(Context::class);
94 3
            $runner = $this->get(Runner::class);
95
96 3
            $contextBackup = $context->toArray();
97 3
            $jobContext = $this->findOrFail('job_context', $request->context);
98 3
            $context->reset($jobContext->context);
99 3
            $result = $runner->dispatch($request->job, $request->params);
100
101 3
            $context->reset($contextBackup);
102
103 3
            if ($request->recipient) {
104 2
                $this->create('job_result', [
105 2
                    'service' => $request->recipient,
106 2
                    'hash' => $request->hash,
107 2
                    'data' => $this->get(Converter::class)->toArray($result),
108 2
                    'expire' => property_exists($result, 'expire') ? $result->expire : 0,
109
                ]);
110
            }
111
        }
112
113 3
        return $this->getMapper()->remove($request);
114
    }
115
116 2
    public function result($hash)
117
    {
118 2
        $result = $this->findOne('job_result', [
119 2
            'service' => $this->getServiceName(),
120 2
            'hash' => $hash,
121
        ]);
122
123 2
        if (!$result) {
124 1
            if (!$this->process()) {
125
                usleep(50000); // 50 milliseconds sleep
126
            }
127 1
            return $this->result($hash);
128
        }
129
130 2
        return $this->get(Converter::class)->toObject($result->data);
131
    }
132
133 3
    public function getServiceName()
134
    {
135 3
        return $this->get(Service::class)->getName();
136
    }
137
}
138