| 1 |  |  | <?php | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | namespace BrainExe\Core\MessageQueue; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  | use BrainExe\Annotations\Annotations\Service; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  | use BrainExe\Core\EventDispatcher\AbstractEvent; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  | use BrainExe\Core\Traits\IdGeneratorTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  | use BrainExe\Core\Traits\RedisTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  | use BrainExe\Core\Traits\TimeTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | use Generator; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  | /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  |  * @api | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  |  * @Service("MessageQueue.Gateway", public=false) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  | class Gateway | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  | { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  |     use TimeTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  |     use RedisTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  |     use IdGeneratorTrait; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  |     const QUEUE_DELAYED   = 'message_queue:delayed'; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  |     const QUEUE_IMMEDIATE = 'message_queue:immediate'; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  |     const META_DATA       = 'message_queue:meta_data'; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |     const RETRY_TIME      = 3600; // try again after 1 hour | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  |      * @param string $eventId | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  |      * @param string $eventType | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  |      * @return bool success | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 | 3 |  |     public function deleteEvent(string $eventId, string $eventType = null) : bool | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 | 3 |  |         if (!empty($eventType)) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 | 3 |  |             $eventId = sprintf('%s:%s', $eventType, $eventId); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 | 3 |  |         $redis = $this->getRedis(); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 | 3 |  |         $delayed = $redis->zrem(self::QUEUE_DELAYED, $eventId); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 | 3 |  |         if ($delayed) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 | 1 |  |             $redis->hdel(self::META_DATA, [$eventId]); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 | 1 |  |             return true; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 | 2 |  |         $immediate = $this->getRedis()->lrange(self::QUEUE_IMMEDIATE, 0, 100); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 | 2 |  |         foreach ($immediate as $rawJob) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 | 1 |  |             list($jobId) = explode('#', $rawJob, 2); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 | 1 |  |             if (strpos($jobId, "$eventId") === 0) { | 
                            
                    |  |  |  | 
                                                                                        
                                                                                     | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 | 1 |  |                 return (bool)$this->redis->lrem(self::QUEUE_IMMEDIATE, 1, $rawJob); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 | 1 |  |             } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  |         return false; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 | 1 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 56 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 57 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 58 |  |  |      * @param AbstractEvent $event | 
            
                                                                                                            
                            
            
                                    
            
            
                | 59 |  |  |      * @param int $timestamp | 
            
                                                                                                            
                            
            
                                    
            
            
                | 60 |  |  |      * @return string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 61 |  |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 62 |  |  |     public function addEvent(AbstractEvent $event, int $timestamp = 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 63 | 2 |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 64 |  |  |         $randomId = $this->generateUniqueId('jobid'); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 65 | 2 |  |         $jobId    = sprintf('%s:%s', $event->eventName, $randomId); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 66 | 2 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 67 |  |  |         $job = new Job($event, $jobId, $timestamp); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 68 | 2 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 69 |  |  |         $this->addJob($job); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 70 | 2 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 | 2 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 |  |  |      * @param Job $job | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 74 |  |  |      */ | 
            
                                                                        
                            
            
                                    
            
            
                | 75 |  |  |     public function addJob(Job $job) | 
            
                                                                        
                            
            
                                    
            
            
                | 76 | 3 |  |     { | 
            
                                                                        
                            
            
                                    
            
            
                | 77 |  |  |         $serialized = base64_encode(serialize($job)); | 
            
                                                                        
                            
            
                                    
            
            
                | 78 | 3 |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 79 |  |  |         $pipeline = $this->getRedis()->pipeline(['fire-and-forget' => true]); | 
            
                                                                        
                            
            
                                    
            
            
                | 80 | 3 |  |         if (empty($job->timestamp)) { | 
            
                                                                        
                            
            
                                    
            
            
                | 81 | 3 |  |             // immediate execution in background | 
            
                                                                        
                            
            
                                    
            
            
                | 82 |  |  |             $pipeline->lpush(self::QUEUE_IMMEDIATE, $job->jobId . '#' . $serialized); | 
            
                                                                        
                            
            
                                    
            
            
                | 83 | 1 |  |         } else { | 
            
                                                                        
                            
            
                                    
            
            
                | 84 |  |  |             // delayed execution | 
            
                                                                        
                            
            
                                    
            
            
                | 85 |  |  |             $pipeline->hset(self::META_DATA, $job->jobId, $serialized); | 
            
                                                                        
                            
            
                                    
            
            
                | 86 | 2 |  |             $pipeline->zadd(self::QUEUE_DELAYED, (int)$job->timestamp, $job->jobId); | 
            
                                                                        
                            
            
                                    
            
            
                | 87 | 2 |  |         } | 
            
                                                                        
                            
            
                                    
            
            
                | 88 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 89 |  |  |         $pipeline->execute(); | 
            
                                                                        
                            
            
                                    
            
            
                | 90 | 3 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 91 | 3 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  |      * @param string $eventType | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  |      * @param int $since | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  |      * @return Job[] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  |     public function getEventsByType(string $eventType = null, int $since = 0) : array | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 | 3 |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 |  |  |         return iterator_to_array($this->getEventsByTypeGenerator($eventType, $since)); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 | 3 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 101 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 102 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 103 |  |  |      * @param string $eventType | 
            
                                                                                                            
                            
            
                                    
            
            
                | 104 |  |  |      * @param int $since | 
            
                                                                                                            
                            
            
                                    
            
            
                | 105 |  |  |      * @return Generator|Job[] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 106 |  |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 107 |  |  |     public function getEventsByTypeGenerator(string $eventType = null, int $since = 0) : Generator | 
            
                                                                                                            
                            
            
                                    
            
            
                | 108 | 3 |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 109 |  |  |         $redis = $this->getRedis(); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 110 | 3 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 111 |  |  |         $resultRaw = $redis->zrangebyscore( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 112 | 3 |  |             self::QUEUE_DELAYED, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 113 | 3 |  |             $since, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 114 |  |  |             '+inf', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 115 | 3 |  |             ['withscores' => true] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 116 | 3 |  |         ); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 117 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 118 |  |  |         $keys = []; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 119 | 3 |  |         foreach ($resultRaw as $jobId => $timestamp) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 120 | 2 |  |             if (empty($eventType) || strpos($jobId, $eventType . ":") === 0) { | 
                            
                    |  |  |  | 
                                                                                        
                                                                                     | 
            
                                                                                                            
                            
            
                                    
            
            
                | 121 | 2 |  |                 $keys[$jobId] = $timestamp; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 122 | 2 |  |             } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 123 | 2 |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 124 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 125 |  |  |         yield from $this->getFromTimesQueue($keys); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 126 |  |  |         yield from $this->getFromImmediateQueue(); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 127 | 2 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 128 | 2 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 129 | 2 |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 130 |  |  |      * @param Job $job | 
            
                                                                                                            
                            
            
                                    
            
            
                | 131 | 2 |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 132 | 2 |  |     public function restoreJob(Job $job) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 133 |  |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 134 |  |  |         $now = $this->now(); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 135 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 136 |  |  |         $job->timestamp = $now + self::RETRY_TIME; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 137 | 3 |  |         $job->errorCounter++; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 138 | 3 |  |         $this->addJob($job); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 139 | 1 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 140 | 1 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 141 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 142 | 1 |  |      * @return int | 
            
                                                                                                            
                            
            
                                    
            
            
                | 143 | 1 |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 144 |  |  |     public function countAllJobs() : int | 
            
                                                                                                            
                            
            
                                    
            
            
                | 145 |  |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 146 | 3 |  |         $delayed   = $this->getRedis()->zcard(self::QUEUE_DELAYED); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 147 |  |  |         $immediate = $this->getRedis()->llen(self::QUEUE_IMMEDIATE); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 148 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 149 |  |  |         return $delayed + $immediate; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 150 |  |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 151 | 1 |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 152 |  |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 153 | 1 |  |      * @param array $keys | 
            
                                                                                                            
                            
            
                                    
            
            
                | 154 |  |  |      * @return Generator | 
            
                                                                                                            
                            
            
                                    
            
            
                | 155 | 1 |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 156 | 1 |  |     private function getFromTimesQueue(array $keys) : Generator | 
            
                                                                                                            
                            
            
                                    
            
            
                | 157 | 1 |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 158 | 1 |  |         if (!empty($keys)) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 159 |  |  |             $events = $this->getRedis()->hmget(self::META_DATA, array_keys($keys)); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 160 |  |  |             foreach ($events as $jobId => $rawJob) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 161 |  |  |                 /** @var Job $job */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 162 |  |  |                 $job = unserialize(base64_decode($rawJob)); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 163 | 1 |  |                 yield $job->jobId => $job; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 164 |  |  |             } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 165 | 1 |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 166 | 1 |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 167 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 168 | 1 |  |     /** | 
            
                                                                                                            
                            
            
                                    
            
            
                | 169 |  |  |      * @return Generator | 
            
                                                                                                            
                            
            
                                    
            
            
                | 170 |  |  |      */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 171 |  |  |     private function getFromImmediateQueue() : Generator | 
            
                                                                                                            
                            
            
                                    
            
            
                | 172 |  |  |     { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 173 |  |  |         $immediate = $this->getRedis()->lrange(self::QUEUE_IMMEDIATE, 0, 100); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 174 |  |  |         foreach ($immediate as $rawJob) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 175 |  |  |             list($jobId, $rawJob) = explode('#', $rawJob, 2); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 176 |  |  |             if (empty($eventType) || strpos($jobId, $evenotType . ":") === 0) { | 
                            
                    |  |  |  | 
                                                                                        
                                                                                            
                                                                                     | 
            
                                                                                                            
                            
            
                                    
            
            
                | 177 |  |  |                 /** @var Job $job */ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 178 |  |  |                 $job = unserialize(base64_decode($rawJob)); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 179 |  |  |                 yield $job->jobId => $job; | 
            
                                                                                                            
                            
            
                                    
            
            
                | 180 |  |  |             } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 181 |  |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 182 |  |  |     } | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 183 |  |  | } | 
            
                                                        
            
                                    
            
            
                | 184 |  |  |  | 
            
                        
It is generally a best practice as it is often more readable to use concatenation instead of interpolation for variables inside strings.