Issues (50)

src/MQueue/Queue/Standard.php (1 issue)

Labels
Severity
1
<?php
2
3
/**
4
 * @license LGPLv3, https://opensource.org/licenses/LGPL-3.0
5
 * @copyright Aimeos (aimeos.org), 2016-2025
6
 * @package Base
7
 * @subpackage MQueue
8
 */
9
10
11
namespace Aimeos\Base\MQueue\Queue;
12
13
14
/**
15
 * Default queue implementation
16
 *
17
 * @package Base
18
 * @subpackage MQueue
19
 */
20
class Standard implements Iface
21
{
22
	private \Aimeos\Base\DB\Connection\Iface $conn;
23
	private string $cname;
24
	private string $queue;
25
	private array $sql;
26
	private int $rtime;
27
28
29
	/**
30
	 * Initializes the queue object
31
	 *
32
	 * @param \Aimeos\Base\DB\Connection\Iface $conn Database connection
33
	 * @param string $queue Message queue name
34
	 * @param string[] $sql Associative list of SQL statements as key/value pairs for insert/reserve/get/delete
35
	 * @param int $rtime Time before the job is released again in seconds
36
	 */
37
	public function __construct( \Aimeos\Base\DB\Connection\Iface $conn, string $queue, array $sql, int $rtime )
38
	{
39
		$this->cname = md5( microtime( true ) . getmypid() );
40
		$this->conn = $conn;
41
		$this->queue = $queue;
42
		$this->sql = $sql;
43
		$this->rtime = $rtime;
44
	}
45
46
47
	/**
48
	 * Adds a new message to the message queue
49
	 *
50
	 * @param string $msg Message, e.g. JSON encoded data
51
	 * @return \Aimeos\Base\MQueue\Queue\Iface MQueue queue instance for method chaining
52
	 */
53
	public function add( string $msg ) : \Aimeos\Base\MQueue\Queue\Iface
54
	{
55
		try
56
		{
57
			$stmt = $this->conn->create( $this->sql['insert'] );
58
59
			$stmt->bind( 1, $this->queue );
60
			$stmt->bind( 2, $this->cname );
61
			$stmt->bind( 3, '0001-01-01 00:00:00' );
62
			$stmt->bind( 4, $msg );
63
64
			$stmt->execute()->finish();
65
		}
66
		catch( \Exception $e )
67
		{
68
			throw new \Aimeos\Base\MQueue\Exception( $e->getMessage() );
69
		}
70
71
		return $this;
72
	}
73
74
75
	/**
76
	 * Removes the message from the queue
77
	 *
78
	 * @param \Aimeos\Base\MQueue\Message\Iface $msg Message object
79
	 * @return \Aimeos\Base\MQueue\Queue\Iface MQueue queue instance for method chaining
80
	 */
81
	public function del( \Aimeos\Base\MQueue\Message\Iface $msg ) : \Aimeos\Base\MQueue\Queue\Iface
82
	{
83
		try
84
		{
85
			$stmt = $this->conn->create( $this->sql['delete'] );
86
87
			$stmt->bind( 1, $msg->getId(), \Aimeos\Base\DB\Statement\Base::PARAM_INT );
0 ignored issues
show
The method getId() does not exist on Aimeos\Base\MQueue\Message\Iface. Since it exists in all sub-types, consider adding an abstract or default implementation to Aimeos\Base\MQueue\Message\Iface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

87
			$stmt->bind( 1, $msg->/** @scrutinizer ignore-call */ getId(), \Aimeos\Base\DB\Statement\Base::PARAM_INT );
Loading history...
88
			$stmt->bind( 2, $this->queue );
89
90
			$stmt->execute()->finish();
91
		}
92
		catch( \Exception $e )
93
		{
94
			throw new \Aimeos\Base\MQueue\Exception( $e->getMessage() );
95
		}
96
97
		return $this;
98
	}
99
100
101
	/**
102
	 * Returns the next message from the queue
103
	 *
104
	 * @return \Aimeos\Base\MQueue\Message\Iface|null Message object or null if none is available
105
	 */
106
	public function get() : ?\Aimeos\Base\MQueue\Message\Iface
107
	{
108
		$msg = null;
109
110
		try
111
		{
112
			$rtime = date( 'Y-m-d H:i:s', time() + $this->rtime );
113
			$stmt = $this->conn->create( $this->sql['reserve'] );
114
115
			$stmt->bind( 1, $this->cname );
116
			$stmt->bind( 2, $rtime );
117
			$stmt->bind( 3, $this->queue );
118
			$stmt->bind( 4, $rtime );
119
120
			$stmt->execute()->finish();
121
122
123
			$stmt = $this->conn->create( $this->sql['get'] );
124
125
			$stmt->bind( 1, $this->queue );
126
			$stmt->bind( 2, $this->cname );
127
			$stmt->bind( 3, $rtime );
128
129
			$result = $stmt->execute();
130
131
			if( ( $row = $result->fetch() ) !== null ) {
132
				$msg = new \Aimeos\Base\MQueue\Message\Standard( $row );
133
			}
134
135
			$result->finish();
136
		}
137
		catch( \Exception $e )
138
		{
139
			throw new \Aimeos\Base\MQueue\Exception( $e->getMessage() );
140
		}
141
142
		return $msg;
143
	}
144
}
145