Test Setup Failed
Push — master ( 739146...0c30c8 )
by Oliver
92:40 queued 90:54
created

main.main   F

Complexity

Conditions 33

Size

Total Lines 206
Code Lines 136

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 33
eloc 136
nop 0
dl 0
loc 206
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like main.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package main
2
3
import (
4
    "log"
5
    "flag"
6
    "os"
7
    "fmt"
8
    "os/signal"
9
    "strings"
10
    "strconv"
11
    "github.com/streadway/amqp"
12
)
13
14
type arrayFlags []string
15
16
func (i *arrayFlags) String() string {
17
	return ""
18
}
19
20
func (i *arrayFlags) Set(value string) error {
21
	*i = append(*i, value)
22
	return nil
23
}
24
25
var arguments arrayFlags
26
var fromArguments arrayFlags
27
var toArguments arrayFlags
28
var terminate bool = false
29
30
var (
31
    username         = flag.String("username", "guest", "Username")
32
    password         = flag.String("password", "guest", "Password")
33
    hostname         = flag.String("hostname", "localhost", "Hostname")
34
    port             = flag.String("port", "5672", "Port")
35
    fromQueueName    = flag.String("from", "", "The queue name to consume messages from")
36
    toQueueName      = flag.String("to", "", "The queue name to deliver messages to")
37
    fromDurable      = flag.Bool("from-durable", false, "Define the from queue deceleration to be durable")
38
    toDurable        = flag.Bool("to-durable", false, "Define the to queue deceleration to be durable")
39
    exchange         = flag.String("exchange", "", "The exchange name to deliver messages through")
40
    messageCount     = flag.Int("count", 1, "The number of messages to move between queues")
41
    verbose          = flag.Bool("v", false, "Turn on verbose mode")
42
    veryVerbose      = flag.Bool("vv", false, "Turn on very verbose mode")
43
    extremelyVerbose = flag.Bool("vvv", false, "Turn on extremely verbose mode")
44
)
45
46
func init() {
47
    flag.Var(&arguments, "arg", "Argument(s) to pass to the queue decelerations")
48
    flag.Var(&fromArguments, "from-arg", "Argument(s) to pass the queue deceleration which consumes messages")
49
    flag.Var(&toArguments, "to-arg", "Argument(s) to pass to the queue deceleration which delivers messages")
50
    flag.Parse()
51
}
52
53
func main() {
54
    if *extremelyVerbose {
55
        log.Printf("Extremely verbose mode enabled")
56
    } else if *veryVerbose {
57
        log.Printf("Very verbose mode enabled")
58
    } else if *verbose {
59
        log.Printf("Verbose mode enabled")
60
    }
61
62
    // Set the verbose modes accordingly
63
    if *extremelyVerbose {
64
        *veryVerbose = true
65
        *verbose = true
66
    } else if *veryVerbose {
67
       *verbose = true
68
    }
69
70
    if *fromQueueName == "" {
71
        log.Printf("The from argument must be defined")
72
        os.Exit(2)
73
    }
74
75
    if *toQueueName == "" {
76
        log.Printf("The to argument must be defined")
77
        os.Exit(2)
78
    }
79
80
    if *fromQueueName == *toQueueName {
81
        log.Printf("The from queue name matches the to queue name")
82
        os.Exit(2)
83
    }
84
85
    fromArgs := make(amqp.Table)
86
    toArgs := make(amqp.Table)
87
88
    for _, argument := range arguments {
89
        fromArgs = mapQueueArguments(fromArgs, argument)
90
        toArgs = mapQueueArguments(toArgs, argument)
91
    }
92
    for _, fromArgument := range fromArguments {
93
        fromArgs = mapQueueArguments(fromArgs, fromArgument)
94
    }
95
    for _, toArgument := range toArguments {
96
        toArgs = mapQueueArguments(toArgs, toArgument)
97
    }
98
99
    c := make(chan os.Signal, 1)
100
    signal.Notify(c, os.Interrupt)
101
    go func() {
102
        for sig := range c {
103
            terminate = true
104
            log.Printf("Finishing up ... (%v detected)", sig)
105
        }
106
    }()
107
108
    // It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback
109
    // on publishing affect the ability to consume messages: https://godoc.org/github.com/streadway/amqp#Channel.Consume
110
111
    consumerConnection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", *username, *password, *hostname, *port))
112
    failOnError(err, "Failed to create the consumer connection to RabbitMQ")
113
    defer consumerConnection.Close()
114
115
    publisherConnection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", *username, *password, *hostname, *port))
116
    failOnError(err, "Failed to create the publisher connection to RabbitMQ")
117
    defer publisherConnection.Close()
118
119
    consumerChannel, err := consumerConnection.Channel()
120
    failOnError(err, "Failed to open the consumer channel")
121
    defer consumerChannel.Close()
122
123
    publisherChannel, err := publisherConnection.Channel()
124
    failOnError(err, "Failed to open the publisher channel")
125
    defer publisherChannel.Close()
126
127
    if *verbose {
128
        log.Printf("Moving %d messages from queue %s to %s", *messageCount, *fromQueueName, *toQueueName)
129
    }
130
131
    // Check if the queue exists, otherwise, fail
132
    // To do this, add the additional argument of passive to true. See:
133
    // https://github.com/streadway/amqp/blob/master/channel.go#L758
134
    // so if the queue does exist, the command fails (but we need the latest code for that)
135
136
    if *extremelyVerbose && len(fromArgs) != 0 {
137
        log.Printf("Declaring from queue with the following arguments:")
138
        for key, argument := range fromArgs {
139
            log.Println("-", key, ":", argument);
140
        }
141
    }
142
143
    fromQueue, err := consumerChannel.QueueDeclare(
144
        *fromQueueName, // name
145
        *fromDurable,   // durable
146
        false,          // delete when unused
147
        false,          // exclusive
148
        false,          // no-wait
149
        fromArgs,       // arguments
150
    )
151
    failOnError(err, "Failed to declare a queue")
152
153
    if *veryVerbose {
154
        log.Printf("There are %d messages in queue %s", fromQueue.Messages, fromQueue.Name)
155
    }
156
157
    if *extremelyVerbose && len(toArgs) != 0 {
158
        log.Printf("Declaring to queue with the following arguments:")
159
        for key, argument := range toArgs {
160
            log.Println("-", key, ":", argument);
161
        }
162
    }
163
164
    toQueue, err := publisherChannel.QueueDeclare(
165
        *toQueueName, // name
166
        *toDurable,   // durable
167
        false,        // delete when unused
168
        false,        // exclusive
169
        false,        // no-wait
170
        toArgs,       // arguments
171
    )
172
    failOnError(err, "Failed to declare a queue")
173
174
    if *veryVerbose {
175
        log.Printf("There are %d messages in queue %s", toQueue.Messages, toQueue.Name)
176
    }
177
178
    messages, err := consumerChannel.Consume(
179
        fromQueue.Name, // queue
180
        "",             // consumer
181
        false,          // auto-ack (it's very important this stays false)
182
        false,          // exclusive
183
        false,          // no-local
184
        false,          // no-wait
185
        nil,            // args
186
    )
187
    failOnError(err, "Failed to register the scoop consumer")
188
189
    log.Printf("Running scoop consumer... (press Ctl-C to cancel)")
190
191
    confirms := publisherChannel.NotifyPublish(make(chan amqp.Confirmation, 1))
192
193
    if err := publisherChannel.Confirm(false); err != nil {
194
        log.Fatalf("Unable to put publisher channel into confirm mode: %s", err)
195
    }
196
197
    i := 1
198
199
    for {
200
        message, ok := <-messages
201
        if !ok {
202
            log.Printf("The consumer channel was unexpectedly closed")
203
            break
204
        }
205
206
        if terminate {
207
            break
208
        }
209
210
        if i > *messageCount {
211
            break
212
        }
213
214
        err = publisherChannel.Publish(
215
            *exchange,    // exchange
216
            toQueue.Name, // routing key
217
            false,        // mandatory
218
            false,        // immediate
219
            amqp.Publishing{
220
                ContentType:     message.ContentType,
221
                ContentEncoding: message.ContentEncoding,
222
                DeliveryMode:    message.DeliveryMode,
223
                Priority:        message.Priority,
224
                CorrelationId:   message.CorrelationId,
225
                ReplyTo:         message.ReplyTo,
226
                Expiration:      message.Expiration,
227
                MessageId:       message.MessageId,
228
                Timestamp:       message.Timestamp,
229
                Type:            message.Type,
230
                UserId:          message.UserId,
231
                AppId:           message.AppId,
232
                Headers:         message.Headers,
233
                Body:            message.Body,
234
            })
235
236
        if err != nil {
237
            message.Nack(false, false)
238
            log.Printf("Failed to deliver message")
239
            break
240
        }
241
242
        if confirmed := <-confirms; confirmed.Ack {
243
            message.Ack(false)
244
            if *extremelyVerbose {
245
                log.Printf("Successfully delivered message (%d/%d)", i, *messageCount)
246
            }
247
            i++
248
            continue
249
        }
250
251
        message.Nack(false, false)
252
        if *extremelyVerbose {
253
            log.Printf("Failed to deliver message ... refused to acknowledge delivery")
254
        }
255
    }
256
257
    if *verbose {
258
        log.Printf("Complete")
259
    }
260
}
261
262
func mapQueueArguments(arguments amqp.Table, argument string) amqp.Table {
263
    parts := strings.Split(argument, ":")
264
    key, value := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
265
266
    // Keep values as strings where the keys expect it
267
    if key == "x-overflow" || key == "x-queue-mode" || key == "x-queue-master-locator" || key == "x-dead-letter-exchange" || key == "x-dead-letter-routing-key" {
268
        arguments[key] = value
269
        return arguments
270
    }
271
272
    // Cast values to integers where the keys expect it
273
    i, err := strconv.Atoi(value)
274
    if err != nil {
275
        log.Fatalf("Argument with key \"%s\" does not have a valid integer value. Received: \"%s\"", key, value)
276
    }
277
    arguments[key] = i
278
    return arguments
279
}
280
281
func failOnError(err error, message string) {
282
    if err != nil {
283
        log.Fatalf("%s: %s", message, err)
284
    }
285
}
286