PdoQueueStoreAdapter::ack()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 18
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 12
c 1
b 0
f 0
nc 3
nop 1
dl 0
loc 18
ccs 4
cts 4
cp 1
crap 3
rs 9.8666
1
<?php
2
3
namespace Da\Mailer\Queue\Backend\Pdo;
4
5
use Da\Mailer\Exception\InvalidCallException;
6
use Da\Mailer\Queue\Backend\MailJobInterface;
7
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
8
use PDO;
9
10
class PdoQueueStoreAdapter implements QueueStoreAdapterInterface
11
{
12
    /**
13
     * @var string the name of the table to store the messages. If not created, please use
14
     */
15
    private $tableName;
16
/**
17
     * @var PdoQueueStoreConnection
18
     */
19
    protected $connection;
20
/**
21
     * PdoQueueStoreAdapter constructor.
22
     *
23
     * @param PdoQueueStoreConnection $connection
24
     * @param string $tableName the name of the table in the database where the mail jobs are stored
25
     */
26 5
    public function __construct(PdoQueueStoreConnection $connection, $tableName = 'mail_queue')
27
    {
28 5
        $this->connection = $connection;
29 5
        $this->tableName = $tableName;
30 5
        $this->init();
31 5
    }
32
33
    /**
34
     * @return PdoQueueStoreAdapter|QueueStoreAdapterInterface
35
     */
36 5
    public function init()
37
    {
38 5
        $this->getConnection()->connect();
39
        return $this;
40 5
    }
41
42
    /**
43
     * @return PdoQueueStoreConnection
44
     */
45
    public function getConnection()
46 5
    {
47
        return $this->connection;
48 5
    }
49
50
    /**
51
     * Adds a MailJob to the queue.
52
     *
53
     * @param MailJobInterface|PdoMailJob $mailJob
54
     *
55
     * @return bool whether it has been successfully inserted or not
56
     */
57
    public function enqueue(MailJobInterface $mailJob)
58 4
    {
59
        $sql = sprintf('INSERT INTO `%s` (`message`, `timeToSend`) VALUES (:message, :timeToSend)', $this->tableName);
60 4
        $query = $this->getConnection()->getInstance()->prepare($sql);
61 4
        $query->bindValue(':message', $mailJob->getMessage());
62 4
        $query->bindValue(':timeToSend', $mailJob->getTimeToSend());
0 ignored issues
show
Bug introduced by
The method getTimeToSend() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\Pdo\PdoMailJob or Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob or Da\Mailer\Queue\Backend\Redis\RedisMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

62
        $query->bindValue(':timeToSend', $mailJob->/** @scrutinizer ignore-call */ getTimeToSend());
Loading history...
63 4
        return $query->execute();
64 4
    }
65 4
66 4
    /**
67
     * Returns a MailJob extracted from the database. The row at the database is marked as 'A'ctive or in process.
68 4
     *
69
     * @return MailJobInterface|PdoMailJob
70
     */
71
    public function dequeue()
72
    {
73
        $this->getConnection()->getInstance()->beginTransaction();
74
        $mailJob = null;
75
        $sqlText = 'SELECT `id`, `message`, `attempt`
76 4
            FROM `%s` WHERE `timeToSend` <= :timeToSend AND `state`=:state
77
            ORDER BY id ASC LIMIT 1 FOR UPDATE';
78 4
        $sql = sprintf($sqlText, $this->tableName);
79
        $query = $this->getConnection()->getInstance()->prepare($sql);
80 4
        $query->bindValue(':state', PdoMailJob::STATE_NEW);
81
        $query->bindValue(':timeToSend', date('Y-m-d H:i:s'), time());
82
        $query->execute();
83 4
        $queryResult = $query->fetch(PDO::FETCH_ASSOC);
84 4
        if ($queryResult) {
85 4
        //
86
            $sqlText = 'UPDATE `%s` SET `state`=:state WHERE `id`=:id';
87 4
            $sql = sprintf($sqlText, $this->tableName);
88 4
            $query = $this->getConnection()->getInstance()->prepare($sql);
89 4
            $query->bindValue(':state', PdoMailJob::STATE_ACTIVE);
90
            $query->bindValue(':id', $queryResult['id'], PDO::PARAM_INT);
91 4
            $query->execute();
92
            $mailJob = new PdoMailJob($queryResult);
93 4
        }
94 4
95 4
        $this->getConnection()->getInstance()->commit();
96 4
        return $mailJob;
97 4
    }
98 4
99
    /**
100 4
     * 'Ack'knowledge the MailJob. Once a MailJob as been processed it could be:.
101 4
     *
102
     * - Updated its status to 'C'ompleted
103 4
     * - Updated its status to 'N'ew and set its `timeToSend` attribute to a future date
104
     *
105 4
     * @param MailJobInterface|PdoMailJob $mailJob
106
     *
107
     * @return bool
108
     */
109
    public function ack(MailJobInterface $mailJob)
110
    {
111
        if ($mailJob->isNewRecord()) {
112
            throw new InvalidCallException('PdoMailJob cannot be a new object to be acknowledged');
113
        }
114
115
        $sqlText = 'UPDATE `%s`
116
                SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
117
                WHERE `id`=:id';
118 4
        $sql = sprintf($sqlText, $this->tableName);
119
        $sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
120 4
        $query = $this->getConnection()->getInstance()->prepare($sql);
121 1
        $query->bindValue(':id', $mailJob->getId(), PDO::PARAM_INT);
0 ignored issues
show
Bug introduced by
The method getId() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
        $query->bindValue(':id', $mailJob->/** @scrutinizer ignore-call */ getId(), PDO::PARAM_INT);
Loading history...
122
        $query->bindValue(':attempt', $mailJob->getAttempt(), PDO::PARAM_INT);
0 ignored issues
show
Bug introduced by
The method getAttempt() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

122
        $query->bindValue(':attempt', $mailJob->/** @scrutinizer ignore-call */ getAttempt(), PDO::PARAM_INT);
Loading history...
123
        $query->bindValue(':state', $mailJob->getState());
0 ignored issues
show
Bug introduced by
The method getState() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\Pdo\PdoMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

123
        $query->bindValue(':state', $mailJob->/** @scrutinizer ignore-call */ getState());
Loading history...
124
        $query->bindValue(':timeToSend', $mailJob->getTimeToSend());
125
        $query->bindValue(':sentTime', $sentTime);
126 3
        return $query->execute();
127 3
    }
128 3
129 3
    /**
130
     * {@inheritdoc}
131 3
     */
132 3
    public function isEmpty()
133 3
    {
134 3
        $sql = sprintf('SELECT COUNT(`id`) FROM `%s` WHERE `timeToSend` <= :timeToSend AND `state`=:state ORDER BY id ASC LIMIT 1', $this->tableName);
135 3
        $query = $this->getConnection()->getInstance()->prepare($sql);
136
        $query->bindValue(':state', PdoMailJob::STATE_NEW);
137 3
        $query->bindValue(':timeToSend', date('Y-m-d H:i:s'), time());
138
        $query->execute();
139
        return intval($query->fetchColumn(0)) === 0;
140
    }
141
}
142