|
1
|
|
|
package main |
|
2
|
|
|
|
|
3
|
|
|
import ( |
|
4
|
|
|
"log" |
|
5
|
|
|
"flag" |
|
6
|
|
|
"os" |
|
7
|
|
|
"fmt" |
|
8
|
|
|
"os/signal" |
|
9
|
|
|
"strings" |
|
10
|
|
|
"strconv" |
|
11
|
|
|
"github.com/streadway/amqp" |
|
12
|
|
|
) |
|
13
|
|
|
|
|
14
|
|
|
type arrayFlags []string |
|
15
|
|
|
|
|
16
|
|
|
func (i *arrayFlags) String() string { |
|
17
|
|
|
return "" |
|
18
|
|
|
} |
|
19
|
|
|
|
|
20
|
|
|
func (i *arrayFlags) Set(value string) error { |
|
21
|
|
|
*i = append(*i, value) |
|
22
|
|
|
return nil |
|
23
|
|
|
} |
|
24
|
|
|
|
|
25
|
|
|
var arguments arrayFlags |
|
26
|
|
|
var fromArguments arrayFlags |
|
27
|
|
|
var toArguments arrayFlags |
|
28
|
|
|
var terminate bool = false |
|
29
|
|
|
|
|
30
|
|
|
var ( |
|
31
|
|
|
username = flag.String("username", "guest", "Username") |
|
32
|
|
|
password = flag.String("password", "guest", "Password") |
|
33
|
|
|
hostname = flag.String("hostname", "localhost", "Hostname") |
|
34
|
|
|
port = flag.String("port", "5672", "Port") |
|
35
|
|
|
fromQueueName = flag.String("from", "", "The queue name to consume messages from") |
|
36
|
|
|
toQueueName = flag.String("to", "", "The queue name to deliver messages to") |
|
37
|
|
|
fromDurable = flag.Bool("from-durable", false, "Define the from queue deceleration to be durable") |
|
38
|
|
|
toDurable = flag.Bool("to-durable", false, "Define the to queue deceleration to be durable") |
|
39
|
|
|
exchange = flag.String("exchange", "", "The exchange name to deliver messages through") |
|
40
|
|
|
messageCount = flag.Int("count", 1, "The number of messages to move between queues") |
|
41
|
|
|
verbose = flag.Bool("v", false, "Turn on verbose mode") |
|
42
|
|
|
veryVerbose = flag.Bool("vv", false, "Turn on very verbose mode") |
|
43
|
|
|
extremelyVerbose = flag.Bool("vvv", false, "Turn on extremely verbose mode") |
|
44
|
|
|
) |
|
45
|
|
|
|
|
46
|
|
|
func init() { |
|
47
|
|
|
flag.Var(&arguments, "arg", "Argument(s) to pass to the queue decelerations") |
|
48
|
|
|
flag.Var(&fromArguments, "from-arg", "Argument(s) to pass the queue deceleration which consumes messages") |
|
49
|
|
|
flag.Var(&toArguments, "to-arg", "Argument(s) to pass to the queue deceleration which delivers messages") |
|
50
|
|
|
flag.Parse() |
|
51
|
|
|
} |
|
52
|
|
|
|
|
53
|
|
|
func main() { |
|
54
|
|
|
if *extremelyVerbose { |
|
55
|
|
|
log.Printf("Extremely verbose mode enabled") |
|
56
|
|
|
} else if *veryVerbose { |
|
57
|
|
|
log.Printf("Very verbose mode enabled") |
|
58
|
|
|
} else if *verbose { |
|
59
|
|
|
log.Printf("Verbose mode enabled") |
|
60
|
|
|
} |
|
61
|
|
|
|
|
62
|
|
|
// Set the verbose modes accordingly |
|
63
|
|
|
if *extremelyVerbose { |
|
64
|
|
|
*veryVerbose = true |
|
65
|
|
|
*verbose = true |
|
66
|
|
|
} else if *veryVerbose { |
|
67
|
|
|
*verbose = true |
|
68
|
|
|
} |
|
69
|
|
|
|
|
70
|
|
|
if *fromQueueName == "" { |
|
71
|
|
|
log.Printf("The from argument must be defined") |
|
72
|
|
|
os.Exit(2) |
|
73
|
|
|
} |
|
74
|
|
|
|
|
75
|
|
|
if *toQueueName == "" { |
|
76
|
|
|
log.Printf("The to argument must be defined") |
|
77
|
|
|
os.Exit(2) |
|
78
|
|
|
} |
|
79
|
|
|
|
|
80
|
|
|
if *fromQueueName == *toQueueName { |
|
81
|
|
|
log.Printf("The from queue name matches the to queue name") |
|
82
|
|
|
os.Exit(2) |
|
83
|
|
|
} |
|
84
|
|
|
|
|
85
|
|
|
fromArgs := make(amqp.Table) |
|
86
|
|
|
toArgs := make(amqp.Table) |
|
87
|
|
|
|
|
88
|
|
|
for _, argument := range arguments { |
|
89
|
|
|
fromArgs = mapQueueArguments(fromArgs, argument) |
|
90
|
|
|
toArgs = mapQueueArguments(toArgs, argument) |
|
91
|
|
|
} |
|
92
|
|
|
for _, fromArgument := range fromArguments { |
|
93
|
|
|
fromArgs = mapQueueArguments(fromArgs, fromArgument) |
|
94
|
|
|
} |
|
95
|
|
|
for _, toArgument := range toArguments { |
|
96
|
|
|
toArgs = mapQueueArguments(toArgs, toArgument) |
|
97
|
|
|
} |
|
98
|
|
|
|
|
99
|
|
|
c := make(chan os.Signal, 1) |
|
100
|
|
|
signal.Notify(c, os.Interrupt) |
|
101
|
|
|
go func() { |
|
102
|
|
|
for sig := range c { |
|
103
|
|
|
terminate = true |
|
104
|
|
|
log.Printf("Finishing up ... (%v detected)", sig) |
|
105
|
|
|
} |
|
106
|
|
|
}() |
|
107
|
|
|
|
|
108
|
|
|
// It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback |
|
109
|
|
|
// on publishing affect the ability to consume messages: https://godoc.org/github.com/streadway/amqp#Channel.Consume |
|
110
|
|
|
|
|
111
|
|
|
consumerConnection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", *username, *password, *hostname, *port)) |
|
112
|
|
|
failOnError(err, "Failed to create the consumer connection to RabbitMQ") |
|
113
|
|
|
defer consumerConnection.Close() |
|
114
|
|
|
|
|
115
|
|
|
publisherConnection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", *username, *password, *hostname, *port)) |
|
116
|
|
|
failOnError(err, "Failed to create the publisher connection to RabbitMQ") |
|
117
|
|
|
defer publisherConnection.Close() |
|
118
|
|
|
|
|
119
|
|
|
consumerChannel, err := consumerConnection.Channel() |
|
120
|
|
|
failOnError(err, "Failed to open the consumer channel") |
|
121
|
|
|
defer consumerChannel.Close() |
|
122
|
|
|
|
|
123
|
|
|
publisherChannel, err := publisherConnection.Channel() |
|
124
|
|
|
failOnError(err, "Failed to open the publisher channel") |
|
125
|
|
|
defer publisherChannel.Close() |
|
126
|
|
|
|
|
127
|
|
|
if *verbose { |
|
128
|
|
|
log.Printf("Moving %d messages from queue %s to %s", *messageCount, *fromQueueName, *toQueueName) |
|
129
|
|
|
} |
|
130
|
|
|
|
|
131
|
|
|
// Check if the queue exists, otherwise, fail |
|
132
|
|
|
// To do this, add the additional argument of passive to true. See: |
|
133
|
|
|
// https://github.com/streadway/amqp/blob/master/channel.go#L758 |
|
134
|
|
|
// so if the queue does exist, the command fails (but we need the latest code for that) |
|
135
|
|
|
|
|
136
|
|
|
if *extremelyVerbose && len(fromArgs) != 0 { |
|
137
|
|
|
log.Printf("Declaring from queue with the following arguments:") |
|
138
|
|
|
for key, argument := range fromArgs { |
|
139
|
|
|
log.Println("-", key, ":", argument); |
|
140
|
|
|
} |
|
141
|
|
|
} |
|
142
|
|
|
|
|
143
|
|
|
fromQueue, err := consumerChannel.QueueDeclare( |
|
144
|
|
|
*fromQueueName, // name |
|
145
|
|
|
*fromDurable, // durable |
|
146
|
|
|
false, // delete when unused |
|
147
|
|
|
false, // exclusive |
|
148
|
|
|
false, // no-wait |
|
149
|
|
|
fromArgs, // arguments |
|
150
|
|
|
) |
|
151
|
|
|
failOnError(err, "Failed to declare a queue") |
|
152
|
|
|
|
|
153
|
|
|
if *veryVerbose { |
|
154
|
|
|
log.Printf("There are %d messages in queue %s", fromQueue.Messages, fromQueue.Name) |
|
155
|
|
|
} |
|
156
|
|
|
|
|
157
|
|
|
if *extremelyVerbose && len(toArgs) != 0 { |
|
158
|
|
|
log.Printf("Declaring to queue with the following arguments:") |
|
159
|
|
|
for key, argument := range toArgs { |
|
160
|
|
|
log.Println("-", key, ":", argument); |
|
161
|
|
|
} |
|
162
|
|
|
} |
|
163
|
|
|
|
|
164
|
|
|
toQueue, err := publisherChannel.QueueDeclare( |
|
165
|
|
|
*toQueueName, // name |
|
166
|
|
|
*toDurable, // durable |
|
167
|
|
|
false, // delete when unused |
|
168
|
|
|
false, // exclusive |
|
169
|
|
|
false, // no-wait |
|
170
|
|
|
toArgs, // arguments |
|
171
|
|
|
) |
|
172
|
|
|
failOnError(err, "Failed to declare a queue") |
|
173
|
|
|
|
|
174
|
|
|
if *veryVerbose { |
|
175
|
|
|
log.Printf("There are %d messages in queue %s", toQueue.Messages, toQueue.Name) |
|
176
|
|
|
} |
|
177
|
|
|
|
|
178
|
|
|
messages, err := consumerChannel.Consume( |
|
179
|
|
|
fromQueue.Name, // queue |
|
180
|
|
|
"", // consumer |
|
181
|
|
|
false, // auto-ack (it's very important this stays false) |
|
182
|
|
|
false, // exclusive |
|
183
|
|
|
false, // no-local |
|
184
|
|
|
false, // no-wait |
|
185
|
|
|
nil, // args |
|
186
|
|
|
) |
|
187
|
|
|
failOnError(err, "Failed to register the scoop consumer") |
|
188
|
|
|
|
|
189
|
|
|
log.Printf("Running scoop consumer... (press Ctl-C to cancel)") |
|
190
|
|
|
|
|
191
|
|
|
confirms := publisherChannel.NotifyPublish(make(chan amqp.Confirmation, 1)) |
|
192
|
|
|
|
|
193
|
|
|
if err := publisherChannel.Confirm(false); err != nil { |
|
194
|
|
|
log.Fatalf("Unable to put publisher channel into confirm mode: %s", err) |
|
195
|
|
|
} |
|
196
|
|
|
|
|
197
|
|
|
i := 1 |
|
198
|
|
|
|
|
199
|
|
|
for { |
|
200
|
|
|
message, ok := <-messages |
|
201
|
|
|
if !ok { |
|
202
|
|
|
log.Printf("The consumer channel was unexpectedly closed") |
|
203
|
|
|
break |
|
204
|
|
|
} |
|
205
|
|
|
|
|
206
|
|
|
if terminate { |
|
207
|
|
|
break |
|
208
|
|
|
} |
|
209
|
|
|
|
|
210
|
|
|
if i > *messageCount { |
|
211
|
|
|
break |
|
212
|
|
|
} |
|
213
|
|
|
|
|
214
|
|
|
err = publisherChannel.Publish( |
|
215
|
|
|
*exchange, // exchange |
|
216
|
|
|
toQueue.Name, // routing key |
|
217
|
|
|
false, // mandatory |
|
218
|
|
|
false, // immediate |
|
219
|
|
|
amqp.Publishing{ |
|
220
|
|
|
ContentType: message.ContentType, |
|
221
|
|
|
ContentEncoding: message.ContentEncoding, |
|
222
|
|
|
DeliveryMode: message.DeliveryMode, |
|
223
|
|
|
Priority: message.Priority, |
|
224
|
|
|
CorrelationId: message.CorrelationId, |
|
225
|
|
|
ReplyTo: message.ReplyTo, |
|
226
|
|
|
Expiration: message.Expiration, |
|
227
|
|
|
MessageId: message.MessageId, |
|
228
|
|
|
Timestamp: message.Timestamp, |
|
229
|
|
|
Type: message.Type, |
|
230
|
|
|
UserId: message.UserId, |
|
231
|
|
|
AppId: message.AppId, |
|
232
|
|
|
Headers: message.Headers, |
|
233
|
|
|
Body: message.Body, |
|
234
|
|
|
}) |
|
235
|
|
|
|
|
236
|
|
|
if err != nil { |
|
237
|
|
|
message.Nack(false, false) |
|
238
|
|
|
log.Printf("Failed to deliver message") |
|
239
|
|
|
break |
|
240
|
|
|
} |
|
241
|
|
|
|
|
242
|
|
|
if confirmed := <-confirms; confirmed.Ack { |
|
243
|
|
|
message.Ack(false) |
|
244
|
|
|
if *extremelyVerbose { |
|
245
|
|
|
log.Printf("Successfully delivered message (%d/%d)", i, *messageCount) |
|
246
|
|
|
} |
|
247
|
|
|
i++ |
|
248
|
|
|
continue |
|
249
|
|
|
} |
|
250
|
|
|
|
|
251
|
|
|
message.Nack(false, false) |
|
252
|
|
|
if *extremelyVerbose { |
|
253
|
|
|
log.Printf("Failed to deliver message ... refused to acknowledge delivery") |
|
254
|
|
|
} |
|
255
|
|
|
} |
|
256
|
|
|
|
|
257
|
|
|
if *verbose { |
|
258
|
|
|
log.Printf("Complete") |
|
259
|
|
|
} |
|
260
|
|
|
} |
|
261
|
|
|
|
|
262
|
|
|
func mapQueueArguments(arguments amqp.Table, argument string) amqp.Table { |
|
263
|
|
|
parts := strings.Split(argument, ":") |
|
264
|
|
|
key, value := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) |
|
265
|
|
|
|
|
266
|
|
|
// Keep values as strings where the keys expect it |
|
267
|
|
|
if key == "x-overflow" || key == "x-queue-mode" || key == "x-queue-master-locator" || key == "x-dead-letter-exchange" || key == "x-dead-letter-routing-key" { |
|
268
|
|
|
arguments[key] = value |
|
269
|
|
|
return arguments |
|
270
|
|
|
} |
|
271
|
|
|
|
|
272
|
|
|
// Cast values to integers where the keys expect it |
|
273
|
|
|
i, err := strconv.Atoi(value) |
|
274
|
|
|
if err != nil { |
|
275
|
|
|
log.Fatalf("Argument with key \"%s\" does not have a valid integer value. Received: \"%s\"", key, value) |
|
276
|
|
|
} |
|
277
|
|
|
arguments[key] = i |
|
278
|
|
|
return arguments |
|
279
|
|
|
} |
|
280
|
|
|
|
|
281
|
|
|
func failOnError(err error, message string) { |
|
282
|
|
|
if err != nil { |
|
283
|
|
|
log.Fatalf("%s: %s", message, err) |
|
284
|
|
|
} |
|
285
|
|
|
} |
|
286
|
|
|
|