ConnectStage::processStage()   B
last analyzed

Complexity

Conditions 6
Paths 3

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 16
cts 16
cp 1
rs 8.6737
c 0
b 0
f 0
cc 6
eloc 14
nc 3
nop 1
crap 6
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\EventType;
13
use AsyncSockets\Exception\SocketException;
14
use AsyncSockets\RequestExecutor\ExecutionContext;
15
use AsyncSockets\RequestExecutor\LimitationSolverInterface;
16
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
17
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
18
19
/**
20
 * Class ConnectStageAbstract
21
 */
22
class ConnectStage extends AbstractTimeAwareStage
23
{
24
    /**
25
     * LimitationSolverInterface
26
     *
27
     * @var LimitationSolverInterface
28
     */
29
    private $decider;
30
31
    /**
32
     * ConnectStageAbstract constructor.
33
     *
34
     * @param RequestExecutorInterface  $executor         Request executor
35
     * @param EventCaller               $eventCaller      Event caller
36
     * @param LimitationSolverInterface $decider          Limitation solver for running requests
37
     * @param ExecutionContext          $executionContext Execution context
38
     */
39 147
    public function __construct(
40
        RequestExecutorInterface $executor,
41
        EventCaller $eventCaller,
42
        LimitationSolverInterface $decider,
43
        ExecutionContext $executionContext
44
    ) {
45 147
        parent::__construct($executor, $eventCaller, $executionContext);
46 147
        $this->decider = $decider;
47 147
    }
48
49
    /** {@inheritdoc} */
50 147
    public function processStage(array $requestDescriptors)
51
    {
52
        /** @var RequestDescriptor[] $requestDescriptors */
53 147
        $totalItems = count($requestDescriptors);
54 147
        $result     = [];
55 147
        foreach ($requestDescriptors as $descriptor) {
56 147
            $decision = $this->decide($descriptor, $totalItems);
57 147
            if ($decision === LimitationSolverInterface::DECISION_PROCESS_SCHEDULED) {
58 6
                break;
59 147
            } elseif ($decision === LimitationSolverInterface::DECISION_SKIP_CURRENT) {
60 55
                continue;
61 142
            } elseif ($decision !== LimitationSolverInterface::DECISION_OK) {
62 6
                throw new \LogicException('Unknown decision ' . $decision . ' received.');
63
            }
64
65 140
            if ($this->connectSocket($descriptor)) {
66 110
                $result[] = $descriptor;
67 110
            }
68 129
        }
69
70 129
        return $result;
71
    }
72
73
    /**
74
     * Decide how to process given operation
75
     *
76
     * @param RequestDescriptor $requestDescriptor Operation to decide
77
     * @param int               $totalItems Total amount of pending requestDescriptors
78
     *
79
     * @return int One of LimitationSolverInterface::DECISION_* consts
80
     */
81 147
    private function decide(RequestDescriptor $requestDescriptor, $totalItems)
82
    {
83 147
        $meta = $requestDescriptor->getMetadata();
84 147
        if ($requestDescriptor->isRunning()) {
85 53
            return LimitationSolverInterface::DECISION_SKIP_CURRENT;
86
        }
87
88 144
        $isSkippingThis = $meta[RequestExecutorInterface::META_CONNECTION_START_TIME] !== null;
89
90 144
        if ($isSkippingThis) {
91 2
            return LimitationSolverInterface::DECISION_SKIP_CURRENT;
92
        }
93
94 142
        $decision = $this->decider->decide(
95 142
            $this->executor,
96 142
            $requestDescriptor->getSocket(),
97 142
            $this->executionContext,
98
            $totalItems
99 142
        );
100 142
        if ($decision !== LimitationSolverInterface::DECISION_OK) {
101 8
            return $decision;
102
        }
103
104 140
        return LimitationSolverInterface::DECISION_OK;
105
    }
106
107
    /**
108
     * Start connecting process
109
     *
110
     * @param RequestDescriptor $descriptor Socket operation data
111
     *
112
     * @return bool True if successfully connected, false otherwise
113
     */
114 140
    private function connectSocket(RequestDescriptor $descriptor)
115
    {
116 140
        $descriptor->initialize();
117
118 140
        $socket = $descriptor->getSocket();
119 140
        $event  = $this->createEvent($descriptor, EventType::INITIALIZE);
120
121
        try {
122 140
            $this->callSocketSubscribers($descriptor, $event);
123 128
            $this->setSocketOperationTime($descriptor, RequestExecutorInterface::META_CONNECTION_START_TIME);
124
125 128
            if (!$socket->isConnected()) {
126 126
                $meta = $descriptor->getMetadata();
127 126
                $socket->open(
128 126
                    $meta[ RequestExecutorInterface::META_ADDRESS ],
129 126
                    $meta[ RequestExecutorInterface::META_SOCKET_STREAM_CONTEXT]
130 126
                );
131 108
            } else {
132 2
                $this->setSocketOperationTime($descriptor, RequestExecutorInterface::META_CONNECTION_FINISH_TIME);
133
            }
134
135 110
            $descriptor->setRunning(true);
136
137 110
            $result = true;
138 140
        } catch (SocketException $e) {
139 18
            $descriptor->setMetadata(RequestExecutorInterface::META_REQUEST_COMPLETE, true);
140 18
            $this->callExceptionSubscribers($descriptor, $e);
141
142 14
            $result = false;
143
        }
144
145 124
        return $result;
146
    }
147
}
148