1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* |
4
|
|
|
* @copyright 2015 LibreWorks contributors |
5
|
|
|
* @license http://opensource.org/licenses/MIT MIT License |
6
|
|
|
*/ |
7
|
|
|
|
8
|
|
|
/** |
9
|
|
|
* A Swift spool that uses new MongoDB Drivers (http://docs.php.net/set.mongodb) |
10
|
|
|
* |
11
|
|
|
* @copyright 2015 LibreWorks contributors |
12
|
|
|
* @license http://opensource.org/licenses/MIT MIT License |
13
|
|
|
*/ |
14
|
|
|
class Swift_MongoDbSpool extends Swift_ConfigurableSpool |
|
|
|
|
15
|
|
|
{ |
16
|
|
|
/** |
17
|
|
|
* @var \MongoDB\Driver\Manager |
18
|
|
|
*/ |
19
|
|
|
private $manager; |
20
|
|
|
/** |
21
|
|
|
* @var string |
22
|
|
|
*/ |
23
|
|
|
private $collection; |
24
|
|
|
/** |
25
|
|
|
* @var \MongoDB\Driver\ReadPreference |
26
|
|
|
*/ |
27
|
|
|
private $rp; |
28
|
|
|
/** |
29
|
|
|
* @var MongoDB\Driver\WriteConcern |
30
|
|
|
*/ |
31
|
|
|
private $wc; |
32
|
|
|
|
33
|
|
|
private static $limit1 = array('limit' => 1); |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Creates a new MongoDB spool. |
37
|
|
|
* |
38
|
|
|
* @param \MongoDB\Driver\Manager $manager The manager |
39
|
|
|
* @param string $collection The collection name (e.g. "mydb.emails") |
40
|
|
|
* @param \MongoDB\Driver\ReadPreference $rp Optional read preference |
41
|
|
|
* @param \MongoDB\Driver\WriteConcern $wc Optional write concern |
42
|
|
|
*/ |
43
|
2 |
|
public function __construct(\MongoDB\Driver\Manager $manager, $collection, \MongoDB\Driver\ReadPreference $rp = null, \MongoDB\Driver\WriteConcern $wc = null) |
44
|
|
|
{ |
45
|
2 |
|
$this->manager = $manager; |
46
|
2 |
|
$this->collection = $this->checkBlank($collection); |
47
|
1 |
|
$this->rp = $rp; |
48
|
1 |
|
$this->wc = $wc; |
49
|
1 |
|
} |
50
|
|
|
|
51
|
1 |
|
private function checkBlank($value) |
52
|
|
|
{ |
53
|
1 |
|
$value = trim($value); |
54
|
1 |
|
if (strlen($value) == 0) { |
55
|
1 |
|
throw new \InvalidArgumentException("This parameter cannot be blank"); |
56
|
|
|
} |
57
|
|
|
return $value; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* Tests if this Spool mechanism has started. |
62
|
|
|
* |
63
|
|
|
* @return bool |
64
|
|
|
*/ |
65
|
1 |
|
public function isStarted() |
66
|
|
|
{ |
67
|
1 |
|
return true; |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* Starts this Spool mechanism. |
72
|
|
|
*/ |
73
|
|
|
public function start() |
74
|
|
|
{ |
75
|
|
|
} |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* Stops this Spool mechanism. |
79
|
|
|
*/ |
80
|
|
|
public function stop() |
81
|
|
|
{ |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* Queues a message. |
86
|
|
|
* |
87
|
|
|
* @param Swift_Mime_SimpleMessage $message The message to store |
88
|
|
|
* @throws Swift_IoException |
89
|
|
|
* @return bool |
90
|
|
|
*/ |
91
|
1 |
|
public function queueMessage(Swift_Mime_SimpleMessage $message) |
92
|
|
|
{ |
93
|
1 |
|
$bulk = new \MongoDB\Driver\BulkWrite(); |
94
|
1 |
|
$bulk->insert(array( |
95
|
1 |
|
'message' => new \MongoDB\BSON\Binary(serialize($message), \MongoDB\BSON\Binary::TYPE_GENERIC) |
96
|
|
|
)); |
97
|
1 |
|
$this->write($bulk); |
98
|
1 |
|
return true; |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* Execute a recovery if for any reason a process is sending for too long. |
103
|
|
|
* |
104
|
|
|
* @param int $timeout in second Defaults is for very slow smtp responses |
105
|
|
|
* @throws Swift_IoException |
106
|
|
|
*/ |
107
|
1 |
|
public function recover($timeout = 900) |
108
|
|
|
{ |
109
|
1 |
|
$bulk = new \MongoDB\Driver\BulkWrite(); |
110
|
1 |
|
$bulk->update( |
111
|
1 |
|
array('sentOn' => array('$lte' => $this->now(0 - ($timeout * 1000)))), |
112
|
1 |
|
array('$set' => array('sentOn' => null)), |
113
|
1 |
|
array('multi' => true) |
114
|
|
|
); |
115
|
1 |
|
$this->write($bulk); |
116
|
1 |
|
} |
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* Sends messages using the given transport instance. |
120
|
|
|
* |
121
|
|
|
* @param Swift_Transport $transport A transport instance |
122
|
|
|
* @param string[] $failedRecipients An array of failures by-reference |
123
|
|
|
* @throws Swift_IoException |
124
|
|
|
* @return int The number of sent e-mails |
125
|
|
|
*/ |
126
|
1 |
|
public function flushQueue(Swift_Transport $transport, &$failedRecipients = null) |
127
|
|
|
{ |
128
|
1 |
|
if (!$transport->isStarted()) { |
129
|
|
|
$transport->start(); |
130
|
|
|
} |
131
|
1 |
|
$limit = $this->getMessageLimit(); |
132
|
1 |
|
$results = $this->find(array('sentOn' => null), $limit > 0 ? array('limit' => $limit) : array()); |
133
|
1 |
|
$failedRecipients = (array) $failedRecipients; |
134
|
1 |
|
$count = 0; |
135
|
1 |
|
$time = time(); |
136
|
1 |
|
$timeLimit = $this->getTimeLimit(); |
137
|
1 |
|
foreach ($results as $result) { |
138
|
1 |
|
if (!isset($result->message) || !($result->message instanceof \MongoDB\BSON\Binary)) { |
|
|
|
|
139
|
1 |
|
continue; |
140
|
|
|
} |
141
|
1 |
|
$id = $result->_id; |
142
|
1 |
|
$this->setSentOn($id, $this->now()); |
143
|
1 |
|
$count += $transport->send(unserialize($result->message->getData()), $failedRecipients); |
|
|
|
|
144
|
1 |
|
$this->delete($id); |
145
|
1 |
|
if ($timeLimit && (time() - $time) >= $timeLimit) { |
146
|
1 |
|
break; |
147
|
|
|
} |
148
|
|
|
} |
149
|
1 |
|
return $count; |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* Performs a query. |
154
|
|
|
* |
155
|
|
|
* @param array $query The query to perform |
156
|
|
|
* @param array $options Optional array of query options |
157
|
|
|
* @return \MongoDB\Driver\Cursor The results cursor |
158
|
|
|
* @throws Swift_IoException |
159
|
|
|
*/ |
160
|
1 |
|
protected function find(array $query, array $options = array()) |
161
|
|
|
{ |
162
|
|
|
try { |
163
|
1 |
|
$q = new \MongoDB\Driver\Query($query, $options); |
164
|
1 |
|
return $this->manager->executeQuery($this->collection, $q, $this->rp); |
165
|
|
|
} catch (\Exception $e) { |
166
|
|
|
throw new Swift_IoException("Could not query for emails", 0, $e); |
167
|
|
|
} |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* Deletes a single record. |
172
|
|
|
* |
173
|
|
|
* @param \MongoDB\BSON\ObjectID $id The ID of the message to delete |
174
|
|
|
* @throws Swift_IoException |
175
|
|
|
*/ |
176
|
1 |
|
protected function delete(\MongoDB\BSON\ObjectID $id) |
177
|
|
|
{ |
178
|
1 |
|
$bulk = new MongoDB\Driver\BulkWrite(); |
179
|
1 |
|
$bulk->delete(array('_id' => $id), self::$limit1); |
180
|
1 |
|
$this->write($bulk); |
181
|
1 |
|
} |
182
|
|
|
|
183
|
|
|
/** |
184
|
|
|
* Sets a single record's sentOn field. |
185
|
|
|
* |
186
|
|
|
* @param \MongoDB\BSON\ObjectID $id The ID of the message to update |
187
|
|
|
* @param \MongoDB\BSON\UTCDateTime $time The time (or null) to set |
188
|
|
|
* @return \MongoDB\Driver\WriteResult the write result |
189
|
|
|
* @throws Swift_IoException |
190
|
|
|
*/ |
191
|
1 |
|
protected function setSentOn(\MongoDB\BSON\ObjectID $id, \MongoDB\BSON\UTCDateTime $time = null) |
192
|
|
|
{ |
193
|
1 |
|
$bulk = new \MongoDB\Driver\BulkWrite(); |
194
|
1 |
|
$bulk->update(array('_id' => $id), array('$set' => array('sentOn' => $time))); |
195
|
1 |
|
$this->write($bulk); |
196
|
1 |
|
} |
197
|
|
|
|
198
|
|
|
/** |
199
|
|
|
* Executes a bulk write to MongoDB, wrapping exceptions. |
200
|
|
|
* |
201
|
|
|
* @param \MongoDB\Driver\BulkWrite $bulk |
202
|
|
|
* @return \MongoDB\Driver\WriteResult the write result |
203
|
|
|
* @throws Swift_IoException if things go wrong |
204
|
|
|
*/ |
205
|
3 |
|
protected function write(\MongoDB\Driver\BulkWrite $bulk) |
206
|
|
|
{ |
207
|
|
|
try { |
208
|
3 |
|
return $this->manager->executeBulkWrite($this->collection, $bulk, $this->wc); |
209
|
|
|
} catch (\Exception $e) { |
210
|
|
|
throw new Swift_IoException("Could not update email sent on date", 0, $e); |
211
|
|
|
} |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* Gets the current date. |
216
|
|
|
* |
217
|
|
|
* @param int $offset Milliseconds offset |
218
|
|
|
* @return \MongoDB\BSON\UTCDateTime the current date |
219
|
|
|
*/ |
220
|
1 |
|
protected function now($offset = 0) |
221
|
|
|
{ |
222
|
1 |
|
return new \MongoDB\BSON\UTCDateTime(microtime(true) * 1000 + $offset); |
223
|
|
|
} |
224
|
|
|
} |
225
|
|
|
|
You can fix this by adding a namespace to your class:
When choosing a vendor namespace, try to pick something that is not too generic to avoid conflicts with other libraries.