Completed
Pull Request — master (#13)
by Rafael
06:46
created

SubscriptionManager::subscribe()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 12
ccs 0
cts 12
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 5
crap 2
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription;
12
13
use Lcobucci\JWT\Builder;
14
use Lcobucci\JWT\Signer\Hmac\Sha256;
15
use Symfony\Component\Console\Output\OutputInterface;
16
use Symfony\Component\HttpFoundation\Request;
17
use Ynlo\GraphQLBundle\Definition\Registry\DefinitionRegistry;
18
use Ynlo\GraphQLBundle\Model\NodeInterface;
19
use Ynlo\GraphQLBundle\Subscription\PubSub\PubSubHandlerInterface;
20
use Ynlo\GraphQLBundle\Subscription\PubSub\SubscriptionMessage;
21
use Ynlo\GraphQLBundle\Util\IDEncoder;
22
23
class SubscriptionManager
24
{
25
    /**
26
     * @var DefinitionRegistry
27
     */
28
    protected $registry;
29
30
    /**
31
     * @var PubSubHandlerInterface
32
     */
33
    protected $pubSubHandler;
34
35
    /**
36
     * @var string
37
     */
38
    protected $secret;
39
40
    /**
41
     * SubscriptionManager constructor.
42
     *
43
     * @param DefinitionRegistry     $definitionRegistry
44
     * @param PubSubHandlerInterface $pubSubHandler
45
     * @param string                 $secret
46
     */
47
    public function __construct(DefinitionRegistry $definitionRegistry, PubSubHandlerInterface $pubSubHandler, string $secret)
48
    {
49
        $this->registry = $definitionRegistry;
50
        $this->pubSubHandler = $pubSubHandler;
51
        $this->secret = $secret;
52
    }
53
54
    /**
55
     * Subscribe given request to given subscription,
56
     * when this subscription is dispatched this request will be executed
57
     *
58
     * @param string         $id
59
     * @param string         $channel
60
     * @param array          $args
61
     * @param Request        $request
62
     * @param \DateTime|null $expireAt
63
     */
64
    public function subscribe($id, string $channel, $args, Request $request, \DateTime $expireAt = null): void
65
    {
66
        $this->convertNodes($args);
67
        $this->pubSubHandler->sub(
68
            $channel,
69
            $id,
70
            [
71
                'channel' => $channel,
72
                'arguments' => $args,
73
                'request' => $request,
74
            ],
75
            $expireAt
76
        );
77
    }
78
79
    /**
80
     * @param string $name    subscription name or class
81
     * @param array  $filters array of filters to compare with subscriptions
82
     * @param array  $data    data to submit to the subscription
83
     */
84
    public function publish(string $name, array $filters = [], array $data = []): void
85
    {
86
        $resolvers = array_flip($this->registry->getEndpoint()->getSubscriptionsResolvers());
87
        if (isset($resolvers[$name])) {
88
            $name = $resolvers[$name];
89
        }
90
        $this->convertNodes($filters);
91
92
        array_walk_recursive(
93
            $data,
94
            function (&$value, $key) {
95
                if (is_object($value)) {
96
                    throw new \RuntimeException(
97
                        sprintf('The object "%s" in key "%s" can\'t be part of publish data, only scalar values can be sent.', get_class($value), $key)
98
                    );
99
                }
100
            }
101
        );
102
103
        $this->pubSubHandler->pub($name, $filters, $data);
104
    }
105
106
    /**
107
     * @param OutputInterface $output
108
     * @param bool            $debug
109
     */
110
    public function consume(OutputInterface $output, $debug = false): void
111
    {
112
        $channels = array_keys($this->registry->getEndpoint()->getSubscriptionsResolvers());
113
        $this->pubSubHandler->consume(
114
            $channels,
115
            function (SubscriptionMessage $message) use ($output, $debug) {
116
                /** @var Request $request */
117
                $request = $message->getMeta()['request'];
118
                $subscribedFilters = $message->getMeta()['arguments'] ?? [];
119
                $subscribedChannel = $message->getMeta()['channel'] ?? null;
120
                if (($subscribedChannel === $message->getChannel())
121
                    && $this->matchFilters($subscribedFilters, $message->getFilters())) {
122
                    $output->writeln(sprintf('[INFO] Process subscription "%s" of channel "%s"', $message->getId(), $message->getChannel()));
123
                    $this->sendRequest($request, $message, $output, $debug);
124
                }
125
            }
126
        );
127
    }
128
129
    /**
130
     * @param array $subscribed
131
     * @param array $filters
132
     *
133
     * @return bool
134
     */
135
    private function matchFilters(array $subscribed, array $filters): bool
136
    {
137
        foreach ($subscribed as $subProperty => $subValue) {
138
            if (isset($filters[$subProperty])) {
139
                $filterValue = $filters[$subProperty];
140
141
                if (is_array($subValue) && in_array($filterValue, $subValue, true)) {
142
                    continue;
143
                }
144
145
                if ($subValue !== $filterValue) {
146
                    return false;
147
                }
148
            }
149
        }
150
151
        return true;
152
    }
153
154
    /**
155
     * Convert nodes to ID
156
     *
157
     * @param array $data
158
     */
159
    private function convertNodes(array &$data): void
160
    {
161
        array_walk_recursive(
162
            $data,
163
            function (&$value) {
164
                if ($value instanceof NodeInterface) {
165
                    $value = IDEncoder::encode($value);
166
                }
167
            }
168
        );
169
    }
170
171
    /**
172
     * Send a subscription request
173
     *
174
     * @param Request             $originRequest
175
     * @param SubscriptionMessage $message
176
     * @param OutputInterface     $output
177
     * @param boolean             $debug
178
     */
179
    private function sendRequest(Request $originRequest, SubscriptionMessage $message, OutputInterface $output, bool $debug = false): void
180
    {
181
        $host = $originRequest->getHost();
182
        $port = $originRequest->getPort();
183
        $path = $originRequest->getPathInfo();
184
185
        $handle = fsockopen($host, $port, $errno, $errstr, 10);
0 ignored issues
show
Bug introduced by
It seems like $port can also be of type string; however, parameter $port of fsockopen() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

185
        $handle = fsockopen($host, /** @scrutinizer ignore-type */ $port, $errno, $errstr, 10);
Loading history...
186
187
        $signer = new Sha256();
188
        $subscriptionToken = (new Builder())->setId($message->getId())
189
                                            ->set('data', serialize($message->getData()))
190
                                            ->setIssuedAt(time())
191
                                            ->setNotBefore(time() + 60)
192
                                            ->setExpiration(time() + 60)
193
                                            ->sign($signer, $this->secret)
194
                                            ->getToken();
195
196
        $body = $originRequest->getContent();
197
        $length = strlen($body);
198
        $out = "POST $path HTTP/1.1\r\n";
199
        $out .= "Host: $host\r\n";
200
        $auth = $originRequest->headers->get('Authorization');
201
        $out .= "Authorization: $auth\r\n";
202
        $out .= "Subscription: $subscriptionToken\r\n";
203
        $out .= "Content-Length: $length\r\n";
204
        $out .= "Content-Type: application/json\r\n";
205
        $out .= "Connection: Close\r\n\r\n";
206
        $out .= $body;
207
        fwrite($handle, $out);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fwrite() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

207
        fwrite(/** @scrutinizer ignore-type */ $handle, $out);
Loading history...
208
209
        if ($debug) {
210
            /// in debug mode wait for response
211
            $output->writeln(sprintf('[DEBUG] Getting response for subscription %s', $message->getId()));
212
            while (true) {
213
                $buffer = fgets($handle);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fgets() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

213
                $buffer = fgets(/** @scrutinizer ignore-type */ $handle);
Loading history...
214
                if (!$buffer) {
215
                    break;
216
                }
217
218
                $output->write($buffer);
219
            }
220
        }
221
        fclose($handle);
0 ignored issues
show
Bug introduced by
It seems like $handle can also be of type false; however, parameter $handle of fclose() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

221
        fclose(/** @scrutinizer ignore-type */ $handle);
Loading history...
222
    }
223
}
224