Passed
Push — master ( f67228...ea1bad )
by Matthew
08:44
created

JobManagerTest::testSetupChannel()   B

Complexity

Conditions 5
Paths 16

Size

Total Lines 43
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 43
rs 8.439
c 0
b 0
f 0
cc 5
eloc 34
nc 16
nop 0
1
<?php
2
3
namespace Dtc\QueueBundle\Tests\RabbitMQ;
4
5
use Dtc\QueueBundle\Exception\UnsupportedException;
6
use Dtc\QueueBundle\Model\Job;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Model\Run;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\RabbitMQ\JobManager;
12
use Dtc\QueueBundle\Tests\FibonacciWorker;
13
use Dtc\QueueBundle\Tests\Manager\AutoRetryTrait;
14
use Dtc\QueueBundle\Tests\Manager\BaseJobManagerTest;
15
use Dtc\QueueBundle\Tests\Manager\PriorityTestTrait;
16
use Dtc\QueueBundle\Tests\Manager\RetryableTrait;
17
use Dtc\QueueBundle\Tests\Manager\SaveJobTrait;
18
use PhpAmqpLib\Connection\AMQPStreamConnection;
19
20
/**
21
 * @author David
22
 *
23
 * This test requires local beanstalkd running
24
 */
25
class JobManagerTest extends BaseJobManagerTest
26
{
27
    use PriorityTestTrait;
28
    use AutoRetryTrait;
29
    use RetryableTrait;
30
    use SaveJobTrait;
31
    public static $connection;
32
33
    public static function setUpBeforeClass()
34
    {
35
        $host = getenv('RABBIT_MQ_HOST');
36
        $port = 5672;
37
        $user = 'guest';
38
        $pass = 'guest';
39
        $vhost = '/';
40
        $jobTimingClass = JobTiming::class;
41
        $runClass = Run::class;
42
        self::$connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
43
44
        self::$jobTimingManager = new JobTimingManager($jobTimingClass, false);
45
        self::$runManager = new RunManager($runClass);
46
        self::$jobManager = new JobManager(self::$runManager, self::$jobTimingManager, \Dtc\QueueBundle\RabbitMQ\Job::class);
47
        self::$jobManager->setAMQPConnection(self::$connection);
48
        self::$jobManager->setMaxPriority(255);
49
        self::$jobManager->setQueueArgs('dtc_queue', false, true, false, false);
50
        self::$jobManager->setExchangeArgs('dtc_queue_exchange', 'direct', false, true, false);
51
        $channel = self::$connection->channel();
52
        $channel->queue_delete('dtc_queue');
53
        $channel->close();
54
        self::$worker = new FibonacciWorker();
55
        self::$jobManager->setupChannel();
56
        $channel = self::$jobManager->getChannel();
57
        $drained = self::drainQueue($channel);
58
59
        if ($drained) {
60
            echo "\nRabbitMQ - drained $drained prior to start of test";
61
        }
62
        parent::setUpBeforeClass();
63
    }
64
65
    public function testConstructor()
66
    {
67
        $test = null;
68
        try {
69
            $test = new JobManager(self::$runManager, self::$jobTimingManager, Job::class);
70
        } catch (\Exception $exception) {
71
            self::fail("shouldn't get here");
72
        }
73
        self::assertNotNull($test);
74
    }
75
76
    public function testSetupChannel()
77
    {
78
        $jobManager = new JobManager(self::$runManager, self::$jobTimingManager, Job::class);
79
        $failed = false;
80
        try {
81
            $jobManager->setupChannel();
82
            $failed = true;
83
        } catch (\Exception $exception) {
84
            self::assertTrue(true);
85
        }
86
        self::assertFalse($failed);
87
88
        try {
89
            $jobManager->setQueueArgs('dtc_queue', false, true, false, false);
90
            $failed = true;
91
        } catch (\Exception $exception) {
92
            self::assertTrue(true);
93
        }
94
        self::assertFalse($failed);
95
96
        $jobManager->setMaxPriority('asdf');
97
        try {
98
            $jobManager->setQueueArgs('dtc_queue', false, true, false, false);
99
            $failed = true;
100
        } catch (\Exception $exception) {
101
            self::assertTrue(true);
102
        }
103
        self::assertFalse($failed);
104
105
        $jobManager->setMaxPriority(255);
106
        $jobManager->setQueueArgs('dtc_queue', false, true, false, false);
107
        try {
108
            $jobManager->setupChannel();
109
            $failed = true;
110
        } catch (\Exception $exception) {
111
            self::assertTrue(true);
112
        }
113
        self::assertFalse($failed);
114
        $jobManager->setExchangeArgs('dtc_queue_exchange', 'direct', false, true, false);
115
        $jobManager->setAMQPConnection(self::$connection);
116
        $jobManager->setupChannel();
117
        self::assertTrue(true);
118
    }
119
120
    /**
121
     * RabbitMQ has no ability to delete specific messages off the queue.
122
     */
123
    public function testDeleteJob()
124
    {
125
        $job = new self::$jobClass(self::$worker, false, null);
126
        $job->fibonacci(1);
127
        self::assertNotNull($job->getId(), 'Job id should be generated');
128
129
        try {
130
            self::$jobManager->deleteJob($job);
131
            $this->fail('expected exception');
132
        } catch (\Exception $e) {
133
            self::assertTrue(true);
134
        }
135
        self::$jobManager->getJob();
136
    }
137
138
    public function testGetJobByWorker()
139
    {
140
        $failed = false;
141
        try {
142
            self::$jobManager->getJob(self::$worker->getName());
143
            $failed = true;
144
        } catch (\Exception $exception) {
145
            self::assertTrue(true);
146
        }
147
        self::assertFalse($failed);
148
    }
149
150
    public function testExpiredJob()
151
    {
152
        $job = new self::$jobClass(self::$worker, false, null);
153
        $time = time() - 1;
154
        $job->setExpiresAt(new \DateTime("@$time"))->fibonacci(1);
155
        self::assertNotNull($job->getId(), 'Job id should be generated');
156
157
        $jobInQueue = self::$jobManager->getJob();
158
        self::assertNull($jobInQueue, 'The job should have been dropped...');
159
160
        $job = new self::$jobClass(self::$worker, false, null);
161
        $time = time() - 1;
162
        $job->setExpiresAt(new \DateTime("@$time"))->fibonacci(1);
163
164
        $job = new self::$jobClass(self::$worker, false, null);
165
        $time = time() - 1;
166
        $job->setExpiresAt(new \DateTime("@$time"))->fibonacci(5);
167
168
        $job = new self::$jobClass(self::$worker, false, null);
169
        $time = time() - 1;
170
        $job->setExpiresAt(new \DateTime("@$time"))->fibonacci(2);
171
172
        $job = new self::$jobClass(self::$worker, false, null);
173
        $job->fibonacci(1);
174
        $jobInQueue = self::$jobManager->getJob();
175
        self::assertNotNull($jobInQueue, 'There should be a job.');
176
        self::assertEquals(
177
            $job->getId(),
178
            $jobInQueue->getId(),
179
            'Job id returned by manager should be the same'
180
        );
181
    }
182
183
    protected static function drainQueue($channel)
184
    {
185
        $drained = 0;
186
        do {
187
            $message = $channel->basic_get('dtc_queue');
188
            if ($message) {
189
                $channel->basic_ack($message->delivery_info['delivery_tag']);
190
                ++$drained;
191
            }
192
        } while ($message);
193
194
        return $drained;
195
    }
196
197
    public function testGetWaitingJobCount()
198
    {
199
        /** @var JobManager $jobManager */
200
        $jobManager = self::$jobManager;
201
        self::drainQueue($jobManager->getChannel());
202
203
        $job = new self::$jobClass(self::$worker, false, null);
204
        $job->fibonacci(1);
205
        self::assertNotNull($job->getId(), 'Job id should be generated');
206
207
        self::assertEquals(1, self::getWaitingJobCount(2));
208
209
        $failure = false;
210
        try {
211
            $jobManager->getWaitingJobCount('fibonacci');
212
            $failure = true;
213
        } catch (UnsupportedException $exception) {
214
            self::assertTrue(true);
215
        }
216
        self::assertFalse($failure);
217
218
        $failure = false;
219
        try {
220
            $jobManager->getWaitingJobCount(null, 'fibonacci');
221
            $failure = true;
222
        } catch (UnsupportedException $exception) {
223
            self::assertTrue(true);
224
        }
225
        self::assertFalse($failure);
226
227
        $failure = false;
228
        try {
229
            $jobManager->getWaitingJobCount('fibonacci', 'fibonacci');
230
            $failure = true;
231
        } catch (UnsupportedException $exception) {
232
            self::assertTrue(true);
233
        }
234
        self::assertFalse($failure);
235
236
        $job = new self::$jobClass(self::$worker, false, null);
237
        $job->fibonacci(1);
238
        self::assertNotNull($job->getId(), 'Job id should be generated');
239
240
        self::assertEquals(2, self::getWaitingJobCount(2));
241
    }
242
}
243