Topic::publish_message()   A
last analyzed

Complexity

Conditions 6
Paths 4

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 6
eloc 13
c 2
b 1
f 0
nc 4
nop 3
dl 0
loc 20
ccs 0
cts 19
cp 0
crap 42
rs 9.2222
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue\Driver;
4
5
class Topic
6
{
7
    private $topic_name;
8
    private $cmq_client;
9
    private $encoding;
10
11
    public function __construct($topic_name, CMQClient $cmq_client, $encoding = false)
12
    {
13
        $this->topic_name = $topic_name;
14
        $this->cmq_client = $cmq_client;
15
        $this->encoding = $encoding;
16
    }
17
18
    public function set_encoding($encoding)
19
    {
20
        $this->encoding = $encoding;
21
    }
22
23
    /*
24
     * create topic
25
     * @type topic_meta : TopicMeta
26
     * @param topic_meta :
0 ignored issues
show
Documentation Bug introduced by
The doc comment : at position 0 could not be parsed: Unknown type name ':' at position 0 in :.
Loading history...
27
     */
28
    public function create($topic_meta)
29
    {
30
        $params = [
31
            'topicName'  => $this->topic_name,
32
            'filterType' => $topic_meta->filterType,
33
        ];
34
35
        if ($topic_meta->maxMsgSize > 0) {
36
            $params['maxMsgSize'] = $topic_meta->maxMsgSize;
37
        }
38
        $this->cmq_client->create_topic($params);
39
    }
40
41
    /*
42
     * get attributes
43
     *
44
     * @return topic_meta :TopicMeta
0 ignored issues
show
Bug introduced by
The type Freyo\LaravelQueueCMQ\Queue\Driver\topic_meta was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
45
     *
46
     */
47
    public function get_attributes()
48
    {
49
        $params = [
50
            'topicName' => $this->topic_name,
51
        ];
52
        $resp = $this->cmq_client->get_topic_attributes($params);
53
54
        $topic_meta = new TopicMeta();
55
        $this->__resp2meta($topic_meta, $resp);
56
57
        return $topic_meta;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $topic_meta returns the type Freyo\LaravelQueueCMQ\Queue\Driver\TopicMeta which is incompatible with the documented return type Freyo\LaravelQueueCMQ\Queue\Driver\topic_meta.
Loading history...
58
    }
59
60
    /*
61
     * set attributes
62
     *
63
     * @type topic_meta :TopicMeta
64
     * @param topic_meta :
0 ignored issues
show
Documentation Bug introduced by
The doc comment : at position 0 could not be parsed: Unknown type name ':' at position 0 in :.
Loading history...
65
     */
66
67
    protected function __resp2meta($topic_meta, $resp)
68
    {
69
        if (isset($resp['maxMsgSize'])) {
70
            $topic_meta->maxMsgSize = $resp['maxMsgSize'];
71
        }
72
        if (isset($resp['msgRetentionSeconds'])) {
73
            $topic_meta->msgRetentionSeconds = $resp['msgRetentionSeconds'];
74
        }
75
        if (isset($resp['createTime'])) {
76
            $topic_meta->createTime = $resp['createTime'];
77
        }
78
        if (isset($resp['lastModifyTime'])) {
79
            $topic_meta->lastModifyTime = $resp['lastModifyTime'];
80
        }
81
        if (isset($resp['filterType'])) {
82
            $topic_meta->filterType = $resp['filterType'];
83
        }
84
    }
85
86
    /*
87
     * delete topic
88
     */
89
90
    public function set_attributes($topic_meta)
91
    {
92
        $params = [
93
            'topicName'  => $this->topic_name,
94
            'maxMsgSize' => strval($topic_meta->maxMsgSize),
95
        ];
96
        $this->cmq_client->set_topic_attributes($params);
97
    }
98
99
    /*
100
     * 推送消息 非批量
101
     * @type message :string
102
     * @param message
103
     *
104
     * @type vTagList :list
105
     * @param vTagList 标签
0 ignored issues
show
Documentation Bug introduced by
The doc comment 标签 at position 0 could not be parsed: Unknown type name '标签' at position 0 in 标签.
Loading history...
106
     *
107
     * @return   message handle
108
     */
109
110
    public function delete()
111
    {
112
        $params = [
113
            'topicName' => $this->topic_name,
114
        ];
115
        $this->cmq_client->delete_topic($params);
116
    }
117
118
    /*
119
     * 批量推送消息
120
     * @type vmessageList :list
121
     * @param vmessageList:
122
     *
123
     * @type vtagList :list
124
     * @param vtagList
125
     *
126
     * @return : return message handle list
0 ignored issues
show
Documentation Bug introduced by
The doc comment : return at position 0 could not be parsed: Unknown type name ':' at position 0 in : return.
Loading history...
127
     */
128
129
    public function publish_message($message, $vTagList = null, $routingKey = null)
130
    {
131
        $params = [
132
            'topicName' => $this->topic_name,
133
            'msgBody'   => $message,
134
        ];
135
        if ($routingKey != null) {
136
            $params['routingKey'] = $routingKey;
137
        }
138
        if ($vTagList != null && is_array($vTagList) && !empty($vTagList)) {
139
            $n = 1;
140
            foreach ($vTagList as $tag) {
141
                $key = 'msgTag.'.$n;
142
                $params[$key] = $tag;
143
                $n += 1;
144
            }
145
        }
146
        $msgId = $this->cmq_client->publish_message($params);
147
148
        return $msgId;
149
    }
150
151
    /* 列出Topic的Subscriptoin
152
153
    @type topic_name :string
154
    @param topic_name:
155
156
    @type searchWord: string
157
    @param searchWord:  订阅关键字
158
159
    @type limit: int
160
    @param limit: 最多返回的订阅数目
161
162
    @type offset: string
163
    @param offset: list_subscription的起始位置,上次list_subscription返回的next_offset
164
165
    @rtype: tuple
166
    @return: subscriptionURL的列表和下次list subscription的起始位置; 如果所有subscription都list出来,next_offset为"".
167
    */
0 ignored issues
show
Documentation Bug introduced by
The doc comment @type at position 0 could not be parsed: Unknown type name '@type' at position 0 in @type.
Loading history...
168
169
    public function batch_publish_message($vmessageList, $vtagList = null, $routingKey = null)
170
    {
171
        $params = [
172
            'topicName' => $this->topic_name,
173
        ];
174
175
        if ($routingKey != null) {
176
            $params['routingKey'] = $routingKey;
177
        }
178
        $n = 1;
179
        if (is_array($vmessageList) && !empty($vmessageList)) {
180
            foreach ($vmessageList as $msg) {
181
                $key = 'msgBody.'.$n;
182
                if ($this->encoding) {
183
                    $params[$key] = base64_encode($msg);
184
                } else {
185
                    $params[$key] = $msg;
186
                }
187
                $n += 1;
188
            }
189
        }
190
        if ($vtagList != null && is_array($vtagList) && !empty($vtagList)) {
191
            $n = 1;
192
            foreach ($vtagList as $tag) {
193
                $key = 'msgTag.'.$n;
194
                $params[$key] = $tag;
195
                $n += 1;
196
            }
197
        }
198
199
        $msgList = $this->cmq_client->batch_publish_message($params);
200
201
        $retMessageList = [];
202
        foreach ($msgList as $msg) {
203
            if (isset($msg['msgId'])) {
204
                $retmsgId = $msg['msgId'];
205
                $retMessageList[] = $retmsgId;
206
            }
207
        }
208
209
        return $retMessageList;
210
    }
211
212
    public function list_subscription($searchWord = '', $limit = -1, $offset = '')
213
    {
214
        $params = ['topicName' => $this->topic_name];
215
216
        if ($searchWord != '') {
217
            $params['searchWord'] = $searchWord;
218
        }
219
220
        if ($limit != -1) {
221
            $params['limit'] = $limit;
222
        }
223
224
        if ($offset != '') {
225
            $params['offset'] = $offset;
226
        }
227
228
        $resp = $this->cmq_client->list_subscription($params);
229
230
        if ($offset == '') {
231
            $next_offset = count($resp['subscriptionList']);
232
        } else {
233
            $next_offset = (int) $offset + count($resp['subscriptionList']);
234
        }
235
236
        if ($next_offset >= $resp['totalCount']) {
237
            $next_offset = '';
238
        }
239
240
        return ['totalCoult'            => $resp['totalCount'],
241
                     'subscriptionList' => $resp['subscriptionList'],
242
                     'next_offset'      => $next_offset, ];
243
    }
244
}
245