SubscriptionManager::convertNodes()   A
last analyzed

Complexity

Conditions 2
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 5
c 1
b 0
f 0
dl 0
loc 7
ccs 0
cts 7
cp 0
rs 10
cc 2
nc 1
nop 1
crap 6
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription;
12
13
use Lcobucci\JWT\Builder;
14
use Lcobucci\JWT\Signer\Hmac\Sha256;
15
use Lcobucci\JWT\Signer\Key;
16
use Symfony\Component\Console\Output\OutputInterface;
17
use Symfony\Component\HttpFoundation\Request;
18
use Ynlo\GraphQLBundle\Definition\Registry\DefinitionRegistry;
19
use Ynlo\GraphQLBundle\Model\NodeInterface;
20
use Ynlo\GraphQLBundle\Subscription\PubSub\PubSubHandlerInterface;
21
use Ynlo\GraphQLBundle\Subscription\PubSub\SubscriptionMessage;
22
use Ynlo\GraphQLBundle\Util\IDEncoder;
23
24
class SubscriptionManager
25
{
26
    /**
27
     * @var DefinitionRegistry
28
     */
29
    protected $registry;
30
31
    /**
32
     * @var PubSubHandlerInterface
33
     */
34
    protected $pubSubHandler;
35
36
    /**
37
     * @var string
38
     */
39
    protected $secret;
40
41
    /**
42
     * SubscriptionManager constructor.
43
     *
44
     * @param DefinitionRegistry     $definitionRegistry
45
     * @param PubSubHandlerInterface $pubSubHandler
46
     * @param string                 $secret
47
     */
48
    public function __construct(DefinitionRegistry $definitionRegistry, PubSubHandlerInterface $pubSubHandler, string $secret)
49
    {
50
        $this->registry = $definitionRegistry;
51
        $this->pubSubHandler = $pubSubHandler;
52
        $this->secret = $secret;
53
    }
54
55
    /**
56
     * Get subscription handler
57
     */
58
    public function handler(): PubSubHandlerInterface
59
    {
60
        return $this->pubSubHandler;
61
    }
62
63
    /**
64
     * Subscribe given request to given subscription,
65
     * when this subscription is dispatched this request will be executed
66
     *
67
     * @param string         $id
68
     * @param string         $channel
69
     * @param array          $args
70
     * @param Request        $request
71
     * @param \DateTime|null $expireAt
72
     */
73
    public function subscribe($id, string $channel, $args, Request $request, \DateTime $expireAt = null): void
74
    {
75
        $this->convertNodes($args);
76
        $this->pubSubHandler->sub(
77
            $channel,
78
            $id,
79
            [
80
                'channel' => $channel,
81
                'arguments' => $args,
82
                'request' => $request,
83
            ],
84
            $expireAt
85
        );
86
    }
87
88
    /**
89
     * @param string $name    subscription name or class
90
     * @param array  $filters array of filters to compare with subscriptions
91
     * @param array  $data    data to submit to the subscription
92
     */
93
    public function publish(string $name, array $filters = [], array $data = []): void
94
    {
95
        $resolvers = array_flip($this->registry->getEndpoint()->getSubscriptionsResolvers());
96
        if (isset($resolvers[$name])) {
97
            $name = $resolvers[$name];
98
        }
99
        $this->convertNodes($filters);
100
101
        array_walk_recursive(
102
            $data,
103
            function (&$value, $key) {
104
                if (is_object($value)) {
105
                    throw new \RuntimeException(
106
                        sprintf('The object "%s" in key "%s" can\'t be part of publish data, only scalar values can be sent.', get_class($value), $key)
107
                    );
108
                }
109
            }
110
        );
111
112
        $this->pubSubHandler->pub($name, $filters, $data);
113
    }
114
115
    /**
116
     * @param OutputInterface $output
117
     * @param bool            $debug
118
     */
119
    public function consume(OutputInterface $output, $debug = false): void
120
    {
121
        $channels = array_keys($this->registry->getEndpoint()->getSubscriptionsResolvers());
122
        $this->pubSubHandler->consume(
123
            $channels,
124
            function (SubscriptionMessage $message) use ($output, $debug) {
125
                /** @var Request $request */
126
                $request = $message->getMeta()['request'] ?? null;
127
                $subscribedFilters = $message->getMeta()['arguments'] ?? [];
128
                $subscribedChannel = $message->getMeta()['channel'] ?? null;
129
                if ($request && ($subscribedChannel === $message->getChannel())
130
                    && $this->matchFilters($subscribedFilters, $message->getFilters())) {
131
                    $output->writeln(sprintf('[INFO] Process subscription "%s" of channel "%s"', $message->getId(), $message->getChannel()));
132
                    $this->sendRequest($request, $message, $output, $debug);
133
                }
134
            }
135
        );
136
    }
137
138
    /**
139
     * @param array $subscribed
140
     * @param array $filters
141
     *
142
     * @return bool
143
     */
144
    private function matchFilters(array $subscribed, array $filters): bool
145
    {
146
        foreach ($subscribed as $subProperty => $subValue) {
147
            if (isset($filters[$subProperty])) {
148
                $filterValue = $filters[$subProperty];
149
150
                if (is_array($subValue) && in_array($filterValue, $subValue, true)) {
151
                    continue;
152
                }
153
154
                if ($subValue !== $filterValue) {
155
                    return false;
156
                }
157
            }
158
        }
159
160
        return true;
161
    }
162
163
    /**
164
     * Convert nodes to ID
165
     *
166
     * @param array $data
167
     */
168
    private function convertNodes(array &$data): void
169
    {
170
        array_walk_recursive(
171
            $data,
172
            function (&$value) {
173
                if ($value instanceof NodeInterface) {
174
                    $value = IDEncoder::encode($value);
175
                }
176
            }
177
        );
178
    }
179
180
    /**
181
     * Send a subscription request
182
     *
183
     * @param Request             $originRequest
184
     * @param SubscriptionMessage $message
185
     * @param OutputInterface     $output
186
     * @param boolean             $debug
187
     */
188
    private function sendRequest(Request $originRequest, SubscriptionMessage $message, OutputInterface $output, bool $debug = false): void
189
    {
190
        // TODO: execute this code ASYNC in order to send multiple request at the same time
191
        // call fsockopen and not wait for response does not works as expected and sometimes does not trigger the subscription
192
193
        $host = $originRequest->getHost();
194
        $port = $originRequest->getPort();
195
        $path = $originRequest->getPathInfo();
196
197
        $handle = fsockopen($originRequest->isSecure() ? 'ssl://'.$host : $host, $port, $errno, $errstr, 10);
0 ignored issues
show
Bug introduced by
It seems like $port can also be of type string; however, parameter $port of fsockopen() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

197
        $handle = fsockopen($originRequest->isSecure() ? 'ssl://'.$host : $host, /** @scrutinizer ignore-type */ $port, $errno, $errstr, 10);
Loading history...
198
199
        $signer = new Sha256();
200
        $subscriptionToken = (new Builder())->identifiedBy($message->getId())
201
                                            ->withClaim('data', serialize($message->getData()))
202
                                            ->issuedAt(time())
203
                                            ->canOnlyBeUsedAfter(time() + 60)
204
                                            ->expiresAt(time() + 60)
205
                                            ->getToken($signer, new Key($this->secret));
206
207
        $body = $originRequest->getContent();
208
        $length = strlen($body);
209
        $out = "POST $path HTTP/1.1\r\n";
210
        $out .= "Host: $host\r\n";
211
        $auth = $originRequest->headers->get('Authorization');
212
        $out .= "Authorization: $auth\r\n";
213
        $out .= "Subscription: $subscriptionToken\r\n";
214
        $out .= "Content-Length: $length\r\n";
215
        $out .= "Content-Type: application/json\r\n";
216
        $out .= "Connection: Close\r\n\r\n";
217
        $out .= $body;
218
        fwrite($handle, $out);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fwrite() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

218
        fwrite(/** @scrutinizer ignore-type */ $handle, $out);
Loading history...
219
220
        $emptyResponse = true;
221
        while (true) {
222
            $buffer = fgets($handle);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fgets() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

222
            $buffer = fgets(/** @scrutinizer ignore-type */ $handle);
Loading history...
223
            if (!$buffer) {
224
                break;
225
            }
226
            $emptyResponse = false;
227
            if ($debug) {
228
                $output->write($buffer);
229
            }
230
        }
231
        if ($emptyResponse) {
232
            $output->writeln(sprintf('[INFO] Empty response for subscription %s', $message->getId()));
233
        } else {
234
            $output->writeln(sprintf('[INFO] Response received successfully for subscription %s', $message->getId()));
235
        }
236
        fclose($handle);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fclose() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

236
        fclose(/** @scrutinizer ignore-type */ $handle);
Loading history...
237
    }
238
}
239