Issues (1008)

sqs/client_xapi_message.go (1 issue)

Severity
1
package sqs
2
3
import (
4
	"context"
5
	"strconv"
6
)
7
8
// XDeleteMessage deletes a message.
9
func (svc *SQS) XDeleteMessage(ctx context.Context, queueURL, receiptHandle string) error {
10
	_, err := svc.DeleteMessage(ctx, DeleteMessageRequest{
11
		QueueURL:      queueURL,
12
		ReceiptHandle: receiptHandle,
13
	})
14
	return err
15
}
16
17
// XReceiveMessage receives messages.
18
func (svc *SQS) XReceiveMessage(ctx context.Context, queueURL string) ([]Message, error) {
19
	resp, err := svc.ReceiveMessage(ctx, ReceiveMessageRequest{
20
		QueueURL: queueURL,
21
	})
22
	if err != nil {
23
		return nil, err
24
	}
25
	return resp.Messages, nil
26
}
27
28
// XReceiveMessageLongPolling receives messages with long polling.
29
func (svc *SQS) XReceiveMessageLongPolling(ctx context.Context, queueURL string, waitSec int64) ([]Message, error) {
30
	resp, err := svc.ReceiveMessage(ctx, ReceiveMessageRequest{
31
		QueueURL:        queueURL,
32
		WaitTimeSeconds: waitSec,
33
	})
34
	if err != nil {
35
		return nil, err
36
	}
37
	return resp.Messages, nil
38
}
39
40
// XSendMessage sends a message.
41
func (svc *SQS) XSendMessage(ctx context.Context, queueURL, message string) error {
42
	_, err := svc.SendMessage(ctx, SendMessageRequest{
43
		QueueURL:    queueURL,
44
		MessageBody: message,
45
	})
46
	return err
47
}
48
49
// XSendMessageBatch sends messages.
50
func (svc *SQS) XSendMessageBatch(ctx context.Context, queueURL string, messages []string) (XSendMessageBatchResult, error) {
51
	const chunkSize = 10
52
	var chunkList [][]SendMessageBatchRequestEntry
53
	idNum := 0
54
	for i, max := 0, len(messages); i < max; i += chunkSize {
55
		end := i + chunkSize
56
		if end > max {
57
			end = max
58
		}
59
60
		chunk := make([]SendMessageBatchRequestEntry, 0, chunkSize)
61
		for _, v := range messages[i:end] {
62
			chunk = append(chunk, SendMessageBatchRequestEntry{
63
				ID:          strconv.Itoa(idNum),
64
				MessageBody: v,
65
			})
66
			idNum++
67
		}
68
		chunkList = append(chunkList, chunk)
69
	}
70
71
	result := XSendMessageBatchResult{}
72
	for _, v := range chunkList {
73
		resp, err := svc.SendMessageBatch(ctx, SendMessageBatchRequest{
74
			QueueURL: queueURL,
75
			Entries:  v,
76
		})
77
		if err != nil {
78
			return result, err
79
		}
80
81
		for _, v := range resp.Successful {
82
			idNum, _ := strconv.Atoi(v.ID)
83
			result.Success = append(result.Success, idNum)
84
		}
85
		for _, v := range resp.Failed {
86
			idNum, _ := strconv.Atoi(v.ID)
87
			result.Failed = append(result.Failed, idNum)
88
		}
89
		result.FailEntries = append(result.FailEntries, resp.Failed...)
90
	}
91
92
	return result, nil
93
}
94
95
type XSendMessageBatchResult struct {
0 ignored issues
show
exported type XSendMessageBatchResult should have comment or be unexported
Loading history...
96
	// `[]int` contains slice index of success/fail messages
97
	Success []int
98
	Failed  []int
99
100
	FailEntries []BatchResultErrorEntry
101
}
102