1 | <?php |
||||
2 | /******************************************************************************* |
||||
3 | * This file is part of the GraphQL Bundle package. |
||||
4 | * |
||||
5 | * (c) YnloUltratech <[email protected]> |
||||
6 | * |
||||
7 | * For the full copyright and license information, please view the LICENSE |
||||
8 | * file that was distributed with this source code. |
||||
9 | ******************************************************************************/ |
||||
10 | |||||
11 | namespace Ynlo\GraphQLBundle\Subscription; |
||||
12 | |||||
13 | use Lcobucci\JWT\Builder; |
||||
14 | use Lcobucci\JWT\Signer\Hmac\Sha256; |
||||
15 | use Lcobucci\JWT\Signer\Key; |
||||
16 | use Symfony\Component\Console\Output\OutputInterface; |
||||
17 | use Symfony\Component\HttpFoundation\Request; |
||||
18 | use Ynlo\GraphQLBundle\Definition\Registry\DefinitionRegistry; |
||||
19 | use Ynlo\GraphQLBundle\Model\NodeInterface; |
||||
20 | use Ynlo\GraphQLBundle\Subscription\PubSub\PubSubHandlerInterface; |
||||
21 | use Ynlo\GraphQLBundle\Subscription\PubSub\SubscriptionMessage; |
||||
22 | use Ynlo\GraphQLBundle\Util\IDEncoder; |
||||
23 | |||||
24 | class SubscriptionManager |
||||
25 | { |
||||
26 | /** |
||||
27 | * @var DefinitionRegistry |
||||
28 | */ |
||||
29 | protected $registry; |
||||
30 | |||||
31 | /** |
||||
32 | * @var PubSubHandlerInterface |
||||
33 | */ |
||||
34 | protected $pubSubHandler; |
||||
35 | |||||
36 | /** |
||||
37 | * @var string |
||||
38 | */ |
||||
39 | protected $secret; |
||||
40 | |||||
41 | /** |
||||
42 | * SubscriptionManager constructor. |
||||
43 | * |
||||
44 | * @param DefinitionRegistry $definitionRegistry |
||||
45 | * @param PubSubHandlerInterface $pubSubHandler |
||||
46 | * @param string $secret |
||||
47 | */ |
||||
48 | public function __construct(DefinitionRegistry $definitionRegistry, PubSubHandlerInterface $pubSubHandler, string $secret) |
||||
49 | { |
||||
50 | $this->registry = $definitionRegistry; |
||||
51 | $this->pubSubHandler = $pubSubHandler; |
||||
52 | $this->secret = $secret; |
||||
53 | } |
||||
54 | |||||
55 | /** |
||||
56 | * Get subscription handler |
||||
57 | */ |
||||
58 | public function handler(): PubSubHandlerInterface |
||||
59 | { |
||||
60 | return $this->pubSubHandler; |
||||
61 | } |
||||
62 | |||||
63 | /** |
||||
64 | * Subscribe given request to given subscription, |
||||
65 | * when this subscription is dispatched this request will be executed |
||||
66 | * |
||||
67 | * @param string $id |
||||
68 | * @param string $channel |
||||
69 | * @param array $args |
||||
70 | * @param Request $request |
||||
71 | * @param \DateTime|null $expireAt |
||||
72 | */ |
||||
73 | public function subscribe($id, string $channel, $args, Request $request, \DateTime $expireAt = null): void |
||||
74 | { |
||||
75 | $this->convertNodes($args); |
||||
76 | $this->pubSubHandler->sub( |
||||
77 | $channel, |
||||
78 | $id, |
||||
79 | [ |
||||
80 | 'channel' => $channel, |
||||
81 | 'arguments' => $args, |
||||
82 | 'request' => $request, |
||||
83 | ], |
||||
84 | $expireAt |
||||
85 | ); |
||||
86 | } |
||||
87 | |||||
88 | /** |
||||
89 | * @param string $name subscription name or class |
||||
90 | * @param array $filters array of filters to compare with subscriptions |
||||
91 | * @param array $data data to submit to the subscription |
||||
92 | */ |
||||
93 | public function publish(string $name, array $filters = [], array $data = []): void |
||||
94 | { |
||||
95 | $resolvers = array_flip($this->registry->getEndpoint()->getSubscriptionsResolvers()); |
||||
96 | if (isset($resolvers[$name])) { |
||||
97 | $name = $resolvers[$name]; |
||||
98 | } |
||||
99 | $this->convertNodes($filters); |
||||
100 | |||||
101 | array_walk_recursive( |
||||
102 | $data, |
||||
103 | function (&$value, $key) { |
||||
104 | if (is_object($value)) { |
||||
105 | throw new \RuntimeException( |
||||
106 | sprintf('The object "%s" in key "%s" can\'t be part of publish data, only scalar values can be sent.', get_class($value), $key) |
||||
107 | ); |
||||
108 | } |
||||
109 | } |
||||
110 | ); |
||||
111 | |||||
112 | $this->pubSubHandler->pub($name, $filters, $data); |
||||
113 | } |
||||
114 | |||||
115 | /** |
||||
116 | * @param OutputInterface $output |
||||
117 | * @param bool $debug |
||||
118 | */ |
||||
119 | public function consume(OutputInterface $output, $debug = false): void |
||||
120 | { |
||||
121 | $channels = array_keys($this->registry->getEndpoint()->getSubscriptionsResolvers()); |
||||
122 | $this->pubSubHandler->consume( |
||||
123 | $channels, |
||||
124 | function (SubscriptionMessage $message) use ($output, $debug) { |
||||
125 | /** @var Request $request */ |
||||
126 | $request = $message->getMeta()['request'] ?? null; |
||||
127 | $subscribedFilters = $message->getMeta()['arguments'] ?? []; |
||||
128 | $subscribedChannel = $message->getMeta()['channel'] ?? null; |
||||
129 | if ($request && ($subscribedChannel === $message->getChannel()) |
||||
130 | && $this->matchFilters($subscribedFilters, $message->getFilters())) { |
||||
131 | $output->writeln(sprintf('[INFO] Process subscription "%s" of channel "%s"', $message->getId(), $message->getChannel())); |
||||
132 | $this->sendRequest($request, $message, $output, $debug); |
||||
133 | } |
||||
134 | } |
||||
135 | ); |
||||
136 | } |
||||
137 | |||||
138 | /** |
||||
139 | * @param array $subscribed |
||||
140 | * @param array $filters |
||||
141 | * |
||||
142 | * @return bool |
||||
143 | */ |
||||
144 | private function matchFilters(array $subscribed, array $filters): bool |
||||
145 | { |
||||
146 | foreach ($subscribed as $subProperty => $subValue) { |
||||
147 | if (isset($filters[$subProperty])) { |
||||
148 | $filterValue = $filters[$subProperty]; |
||||
149 | |||||
150 | if (is_array($subValue) && in_array($filterValue, $subValue, true)) { |
||||
151 | continue; |
||||
152 | } |
||||
153 | |||||
154 | if ($subValue !== $filterValue) { |
||||
155 | return false; |
||||
156 | } |
||||
157 | } |
||||
158 | } |
||||
159 | |||||
160 | return true; |
||||
161 | } |
||||
162 | |||||
163 | /** |
||||
164 | * Convert nodes to ID |
||||
165 | * |
||||
166 | * @param array $data |
||||
167 | */ |
||||
168 | private function convertNodes(array &$data): void |
||||
169 | { |
||||
170 | array_walk_recursive( |
||||
171 | $data, |
||||
172 | function (&$value) { |
||||
173 | if ($value instanceof NodeInterface) { |
||||
174 | $value = IDEncoder::encode($value); |
||||
175 | } |
||||
176 | } |
||||
177 | ); |
||||
178 | } |
||||
179 | |||||
180 | /** |
||||
181 | * Send a subscription request |
||||
182 | * |
||||
183 | * @param Request $originRequest |
||||
184 | * @param SubscriptionMessage $message |
||||
185 | * @param OutputInterface $output |
||||
186 | * @param boolean $debug |
||||
187 | */ |
||||
188 | private function sendRequest(Request $originRequest, SubscriptionMessage $message, OutputInterface $output, bool $debug = false): void |
||||
189 | { |
||||
190 | // TODO: execute this code ASYNC in order to send multiple request at the same time |
||||
191 | // call fsockopen and not wait for response does not works as expected and sometimes does not trigger the subscription |
||||
192 | |||||
193 | $host = $originRequest->getHost(); |
||||
194 | $port = $originRequest->getPort(); |
||||
195 | $path = $originRequest->getPathInfo(); |
||||
196 | |||||
197 | $handle = fsockopen($originRequest->isSecure() ? 'ssl://'.$host : $host, $port, $errno, $errstr, 10); |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
198 | |||||
199 | $signer = new Sha256(); |
||||
200 | $subscriptionToken = (new Builder())->identifiedBy($message->getId()) |
||||
201 | ->withClaim('data', serialize($message->getData())) |
||||
202 | ->issuedAt(time()) |
||||
203 | ->canOnlyBeUsedAfter(time() + 60) |
||||
204 | ->expiresAt(time() + 60) |
||||
205 | ->getToken($signer, new Key($this->secret)); |
||||
206 | |||||
207 | $body = $originRequest->getContent(); |
||||
208 | $length = strlen($body); |
||||
209 | $out = "POST $path HTTP/1.1\r\n"; |
||||
210 | $out .= "Host: $host\r\n"; |
||||
211 | $auth = $originRequest->headers->get('Authorization'); |
||||
212 | $out .= "Authorization: $auth\r\n"; |
||||
213 | $out .= "Subscription: $subscriptionToken\r\n"; |
||||
214 | $out .= "Content-Length: $length\r\n"; |
||||
215 | $out .= "Content-Type: application/json\r\n"; |
||||
216 | $out .= "Connection: Close\r\n\r\n"; |
||||
217 | $out .= $body; |
||||
218 | fwrite($handle, $out); |
||||
0 ignored issues
–
show
It seems like
$handle can also be of type false ; however, parameter $handle of fwrite() does only seem to accept resource , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
219 | |||||
220 | $emptyResponse = true; |
||||
221 | while (true) { |
||||
222 | $buffer = fgets($handle); |
||||
0 ignored issues
–
show
It seems like
$handle can also be of type false ; however, parameter $handle of fgets() does only seem to accept resource , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
223 | if (!$buffer) { |
||||
224 | break; |
||||
225 | } |
||||
226 | $emptyResponse = false; |
||||
227 | if ($debug) { |
||||
228 | $output->write($buffer); |
||||
229 | } |
||||
230 | } |
||||
231 | if ($emptyResponse) { |
||||
232 | $output->writeln(sprintf('[INFO] Empty response for subscription %s', $message->getId())); |
||||
233 | } else { |
||||
234 | $output->writeln(sprintf('[INFO] Response received successfully for subscription %s', $message->getId())); |
||||
235 | } |
||||
236 | fclose($handle); |
||||
0 ignored issues
–
show
It seems like
$handle can also be of type false ; however, parameter $handle of fclose() does only seem to accept resource , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
237 | } |
||||
238 | } |
||||
239 |