AbstractQueueTransport::is_consumer()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
    namespace NokitaKaze\Queue;
4
5
    abstract class AbstractQueueTransport implements QueueTransport {
6
        /**
7
         * @var iMessage[]|object[]
8
         */
9
        protected $_pushed_for_save = [];
10
11
        protected $_same_time_consumer_and_producer = 0;
12
13
        /**
14
         * @var callable|null
15
         */
16
        protected $_callback_closure = null;
17
18
        /**
19
         * @var integer[] В keys список индексов полученных сообщений
20
         */
21
        protected $_consumed_keys = [];
22
23
        /**
24
         * Удалить сообщение и сразу же записать это в БД
25
         *
26
         * @param iMessage|object $message
27
         *
28
         * @return boolean
29
         */
30
        function delete_message($message) {
31
            return !empty($this->delete_messages([$message]));
32
        }
33
34
        /**
35
         * Загоняем сообщение в очередь и явно сохраняем
36
         *
37
         * @param mixed       $data
38
         * @param string|null $name Название сообщения. Если null, то можно дублировать
39
         * @param integer     $sort
40
         *
41
         * @throws QueueException
42
         */
43
        function produce_message($data, $name = null, $sort = 5) {
44
            $this->push(Queue::build_message($data, $name, $sort));
45
            $this->save();
46
        }
47
48
        /**
49
         * Загоняем сообщение в очередь необходимых для сохранения сообщений
50
         *
51
         * @param iMessage[]|iMessage|object[]|object $stream
52
         *
53
         * @throws QueueException
54
         */
55
        function push($stream) {
56
            $this->set_same_time_flag(1);
57
            if (is_array($stream)) {
58
                foreach ($stream as &$message) {
59
                    $this->_pushed_for_save[] = Queue::sanify_event_object($message);
60
                }
61
            } else {
62
                $this->_pushed_for_save[] = Queue::sanify_event_object($stream);
63
            }
64
        }
65
66
        /**
67
         * @param integer $flag
68
         *
69
         * @throws QueueException
70
         */
71
        protected function set_same_time_flag($flag) {
72
            $this->_same_time_consumer_and_producer |= $flag;
73
            if ($this->_same_time_consumer_and_producer === 3) {
74
                throw new QueueException('Consumer and producer at the same time', 18);
75
            }
76
        }
77
78
        /**
79
         * @param callable $closure
80
         */
81
        function set_callback_closure($closure) {
82
            $this->_callback_closure = $closure;
83
        }
84
85
        /**
86
         * @param double|integer $wait_time
87
         *
88
         * @throws QueueException
89
         */
90
        function listen($wait_time = -1) {
91
            $start = microtime(true);
92
            if (is_null($this->_callback_closure)) {
93
                throw new QueueException('Event listener has not been set', 3);
94
            }
95
            $closure = $this->_callback_closure;
96
            do {
97
                $message = $this->consume_next_message($wait_time);
98
                if (!is_null($message)) {
99
                    call_user_func($closure, $message);
100
                } else {
101
                    usleep(10000);
102
                }
103
            } while (($wait_time === -1) or ($start + $wait_time >= microtime(true)));
104
        }
105
106
        function __clone() {
107
            foreach ($this->_pushed_for_save as &$message) {
108
                $message = clone $message;
109
            }
110
        }
111
112
        function clear_consumed_keys() {
113
            $this->_consumed_keys = [];
114
        }
115
116
        /**
117
         * @return boolean
118
         */
119
        function is_producer() {
120
            return (($this->_same_time_consumer_and_producer & 1) == 1);
121
        }
122
123
        /**
124
         * @return boolean
125
         */
126
        function is_consumer() {
127
            return (($this->_same_time_consumer_and_producer & 2) == 2);
128
        }
129
130
        /**
131
         * @param iMessage|object $message
132
         *
133
         * @return string
134
         */
135
        static function get_real_key_for_message($message) {
136
            return Queue::get_real_key_for_message($message);
137
        }
138
139
        /**
140
         * @param iMessage[] $messages
141
         * @param iMessage   $message
142
         * @param string     $message_key
143
         *
144
         * @return boolean
145
         */
146
        protected function change_message_in_array(array &$messages, $message, $message_key) {
147
            $exists = false;
148
            $message->is_read = true;
0 ignored issues
show
Bug introduced by
Accessing is_read on the interface NokitaKaze\Queue\iMessage suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
149
            foreach ($messages as $inner_id => &$inner_message) {
150
                $inner_message_key = self::get_real_key_for_message($inner_message);
151
                if ($inner_message_key == $message_key) {
152
                    $message->time_last_update = microtime(true);
0 ignored issues
show
Bug introduced by
Accessing time_last_update on the interface NokitaKaze\Queue\iMessage suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
153
                    $messages[$inner_id] = $message;
154
                    $exists = true;
155
                } else {
156
                    $inner_message->is_read = true;
0 ignored issues
show
Bug introduced by
Accessing is_read on the interface NokitaKaze\Queue\iMessage suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
157
                }
158
            }
159
160
            return $exists;
161
        }
162
163
        /**
164
         * @param QueueTransport $queue
165
         *
166
         * @return boolean
167
         */
168
        function is_equal_to($queue) {
169
            return (get_class($this) == get_class($queue));
170
        }
171
172
        /**
173
         * @return string
174
         */
175
        static function generate_rnd_postfix() {
176
            return Queue::generate_rnd_postfix();
177
        }
178
179
        // @todo Удаление конкретных ключей из consumed. Причем туда передаётся кложур, в который передаётся название ключа
180
        // @todo Удаление конкретных ключей из индекса и очереди, с указанием max_create_timestamp,
181
        // чтобы не хранить в очереди те же сообщения, пришедшие ещё раз
182
    }
183
184
?>