Code Duplication    Length = 23-23 lines in 3 locations

src/Adapter/SqsAdapter.php 3 locations

@@ 86-108 (lines=23) @@
83
    /**
84
     * @param MessageInterface[] $messages
85
     */
86
    public function acknowledge(array $messages)
87
    {
88
        $url = $this->getQueueUrl();
89
        $failed = [];
90
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
91
92
        foreach ($batches as $batch) {
93
            $results = $this->client->deleteMessageBatch([
94
                'QueueUrl' => $url,
95
                'Entries'  => $batch,
96
            ]);
97
98
            $map = function ($result) use ($messages) {
99
                return $messages[$result['Id']];
100
            };
101
102
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
103
        }
104
105
        if (!empty($failed)) {
106
            throw new FailedAcknowledgementException($this, $failed);
107
        }
108
    }
109
110
    /**
111
     * @param MessageInterface[] $messages
@@ 115-137 (lines=23) @@
112
     *
113
     * @throws FailedAcknowledgementException|void
114
     */
115
    public function reject(array $messages)
116
    {
117
        $url = $this->getQueueUrl();
118
        $failed = [];
119
        $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE);
120
121
        foreach ($batches as $batch) {
122
            $results = $this->client->changeMessageVisibilityBatch([
123
                'QueueUrl' => $url,
124
                'Entries'  => $batch,
125
            ]);
126
127
            $map = function ($result) use ($messages) {
128
                return $messages[$result['Id']];
129
            };
130
131
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
132
        }
133
134
        if (!empty($failed)) {
135
            throw new FailedAcknowledgementException($this, $failed);
136
        }
137
    }
138
139
    /**
140
     * @param MessageFactoryInterface $factory
@@ 193-215 (lines=23) @@
190
    /**
191
     * @param MessageInterface[] $messages
192
     */
193
    public function enqueue(array $messages)
194
    {
195
        $url = $this->getQueueUrl();
196
        $failed = [];
197
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
198
199
        foreach ($batches as $batch) {
200
            $results = $this->client->sendMessageBatch([
201
                'QueueUrl' => $url,
202
                'Entries'  => $batch,
203
            ]);
204
205
            $map = function ($result) use ($messages) {
206
                return $messages[$result['Id']];
207
            };
208
209
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
210
        }
211
212
        if (!empty($failed)) {
213
            throw new FailedEnqueueException($this, $failed);
214
        }
215
    }
216
217
    /**
218
     * {@inheritdoc}