1
|
|
|
<?php |
2
|
|
|
namespace Elgg\Queue; |
3
|
|
|
|
4
|
|
|
/** |
5
|
|
|
* FIFO queue that uses the database for persistence |
6
|
|
|
* |
7
|
|
|
* WARNING: API IN FLUX. DO NOT USE DIRECTLY. |
8
|
|
|
* |
9
|
|
|
* @access private |
10
|
|
|
* |
11
|
|
|
* @package Elgg.Core |
12
|
|
|
* @subpackage Queue |
13
|
|
|
* @since 1.9.0 |
14
|
|
|
*/ |
15
|
|
|
class DatabaseQueue implements \Elgg\Queue\Queue { |
16
|
|
|
|
17
|
|
|
/** @var string Name of the queue */ |
18
|
|
|
protected $name; |
19
|
|
|
|
20
|
|
|
/** @var \Elgg\Database Database adapter */ |
21
|
|
|
protected $db; |
22
|
|
|
|
23
|
|
|
/** @var string The identifier of the worker pulling from the queue */ |
24
|
|
|
protected $workerId; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* Create a queue |
28
|
|
|
* |
29
|
|
|
* @param string $name Name of the queue. Must be less than 256 characters. |
30
|
|
|
* @param \Elgg\Database $db Database adapter |
31
|
|
|
*/ |
32
|
22 |
|
public function __construct($name, \Elgg\Database $db) { |
33
|
22 |
|
$this->db = $db; |
34
|
22 |
|
$this->name = $name; |
35
|
22 |
|
$this->workerId = md5(microtime() . getmypid()); |
36
|
22 |
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* {@inheritdoc} |
40
|
|
|
*/ |
41
|
10 |
|
public function enqueue($item) { |
42
|
10 |
|
$prefix = $this->db->prefix; |
43
|
10 |
|
$name = $this->db->sanitizeString($this->name); |
|
|
|
|
44
|
10 |
|
$blob = $this->db->sanitizeString(serialize($item)); |
|
|
|
|
45
|
10 |
|
$time = time(); |
46
|
|
|
|
47
|
10 |
|
$query = "INSERT INTO {$prefix}queue |
48
|
10 |
|
SET name = '$name', data = '$blob', timestamp = $time"; |
49
|
10 |
|
return $this->db->insertData($query) !== false; |
50
|
|
|
} |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* {@inheritdoc} |
54
|
|
|
*/ |
55
|
3 |
|
public function dequeue() { |
56
|
3 |
|
$prefix = $this->db->prefix; |
57
|
3 |
|
$name = $this->db->sanitizeString($this->name); |
|
|
|
|
58
|
3 |
|
$worker_id = $this->db->sanitizeString($this->workerId); |
|
|
|
|
59
|
|
|
|
60
|
3 |
|
$update = "UPDATE {$prefix}queue |
61
|
3 |
|
SET worker = '$worker_id' |
62
|
3 |
|
WHERE name = '$name' AND worker IS NULL |
63
|
|
|
ORDER BY id ASC LIMIT 1"; |
64
|
3 |
|
$num = $this->db->updateData($update, true); |
65
|
3 |
|
if ($num === 1) { |
66
|
2 |
|
$select = "SELECT data FROM {$prefix}queue |
67
|
2 |
|
WHERE worker = '$worker_id'"; |
68
|
2 |
|
$obj = $this->db->getDataRow($select); |
69
|
2 |
|
if ($obj) { |
|
|
|
|
70
|
2 |
|
$data = unserialize($obj->data); |
71
|
2 |
|
$delete = "DELETE FROM {$prefix}queue |
72
|
2 |
|
WHERE name = '$name' AND worker = '$worker_id'"; |
73
|
2 |
|
$this->db->deleteData($delete); |
74
|
2 |
|
return $data; |
75
|
|
|
} |
76
|
|
|
} |
77
|
|
|
|
78
|
2 |
|
return null; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* {@inheritdoc} |
83
|
|
|
*/ |
84
|
1 |
|
public function clear() { |
85
|
1 |
|
$prefix = $this->db->prefix; |
86
|
1 |
|
$name = $this->db->sanitizeString($this->name); |
|
|
|
|
87
|
|
|
|
88
|
1 |
|
$this->db->deleteData("DELETE FROM {$prefix}queue WHERE name = '$name'"); |
89
|
1 |
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* {@inheritdoc} |
93
|
|
|
*/ |
94
|
3 |
|
public function size() { |
95
|
3 |
|
$prefix = $this->db->prefix; |
96
|
3 |
|
$name = $this->db->sanitizeString($this->name); |
|
|
|
|
97
|
|
|
|
98
|
3 |
|
$result = $this->db->getDataRow("SELECT COUNT(id) AS total FROM {$prefix}queue WHERE name = '$name'"); |
99
|
3 |
|
return (int) $result->total; |
100
|
|
|
} |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
|
This function has been deprecated. The supplier of the function has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.