Passed
Pull Request — master (#23)
by Frank
06:05 queued 03:00
created

pq.*Listener.NotificationChannel   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 0
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package pq
2
3
// Package pq is a pure Go Postgres driver for the database/sql package.
4
// This module contains support for Postgres LISTEN/NOTIFY.
5
6
import (
7
	"errors"
8
	"fmt"
9
	"sync"
10
	"sync/atomic"
11
	"time"
12
)
13
14
// Notification represents a single notification from the database.
15
type Notification struct {
16
	// Process ID (PID) of the notifying postgres backend.
17
	BePid int
18
	// Name of the channel the notification was sent on.
19
	Channel string
20
	// Payload, or the empty string if unspecified.
21
	Extra string
22
}
23
24
func recvNotification(r *readBuf) *Notification {
25
	bePid := r.int32()
26
	channel := r.string()
27
	extra := r.string()
28
29
	return &Notification{bePid, channel, extra}
30
}
31
32
const (
33
	connStateIdle int32 = iota
34
	connStateExpectResponse
35
	connStateExpectReadyForQuery
36
)
37
38
type message struct {
39
	typ byte
40
	err error
41
}
42
43
var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
44
45
// ListenerConn is a low-level interface for waiting for notifications.  You
46
// should use Listener instead.
47
type ListenerConn struct {
48
	// guards cn and err
49
	connectionLock sync.Mutex
50
	cn             *conn
51
	err            error
52
53
	connState int32
54
55
	// the sending goroutine will be holding this lock
56
	senderLock sync.Mutex
57
58
	notificationChan chan<- *Notification
59
60
	replyChan chan message
61
}
62
63
// NewListenerConn creates a new ListenerConn. Use NewListener instead.
64
func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
65
	return newDialListenerConn(defaultDialer{}, name, notificationChan)
66
}
67
68
func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
69
	cn, err := DialOpen(d, name)
70
	if err != nil {
71
		return nil, err
72
	}
73
74
	l := &ListenerConn{
75
		cn:               cn.(*conn),
76
		notificationChan: c,
77
		connState:        connStateIdle,
78
		replyChan:        make(chan message, 2),
79
	}
80
81
	go l.listenerConnMain()
82
83
	return l, nil
84
}
85
86
// We can only allow one goroutine at a time to be running a query on the
87
// connection for various reasons, so the goroutine sending on the connection
88
// must be holding senderLock.
89
//
90
// Returns an error if an unrecoverable error has occurred and the ListenerConn
91
// should be abandoned.
92
func (l *ListenerConn) acquireSenderLock() error {
93
	// we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
94
	l.senderLock.Lock()
95
96
	l.connectionLock.Lock()
97
	err := l.err
98
	l.connectionLock.Unlock()
99
	if err != nil {
100
		l.senderLock.Unlock()
101
		return err
102
	}
103
	return nil
104
}
105
106
func (l *ListenerConn) releaseSenderLock() {
107
	l.senderLock.Unlock()
108
}
109
110
// setState advances the protocol state to newState.  Returns false if moving
111
// to that state from the current state is not allowed.
112
func (l *ListenerConn) setState(newState int32) bool {
113
	var expectedState int32
114
115
	switch newState {
116
	case connStateIdle:
117
		expectedState = connStateExpectReadyForQuery
118
	case connStateExpectResponse:
119
		expectedState = connStateIdle
120
	case connStateExpectReadyForQuery:
121
		expectedState = connStateExpectResponse
122
	default:
123
		panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
124
	}
125
126
	return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
127
}
128
129
// Main logic is here: receive messages from the postgres backend, forward
130
// notifications and query replies and keep the internal state in sync with the
131
// protocol state.  Returns when the connection has been lost, is about to go
132
// away or should be discarded because we couldn't agree on the state with the
133
// server backend.
134
func (l *ListenerConn) listenerConnLoop() (err error) {
135
	defer errRecoverNoErrBadConn(&err)
136
137
	r := &readBuf{}
138
	for {
139
		t, err := l.cn.recvMessage(r)
140
		if err != nil {
141
			return err
142
		}
143
144
		switch t {
145
		case 'A':
146
			// recvNotification copies all the data so we don't need to worry
147
			// about the scratch buffer being overwritten.
148
			l.notificationChan <- recvNotification(r)
149
150
		case 'T', 'D':
151
			// only used by tests; ignore
152
153
		case 'E':
154
			// We might receive an ErrorResponse even when not in a query; it
155
			// is expected that the server will close the connection after
156
			// that, but we should make sure that the error we display is the
157
			// one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
158
			if !l.setState(connStateExpectReadyForQuery) {
159
				return parseError(r)
160
			}
161
			l.replyChan <- message{t, parseError(r)}
162
163
		case 'C', 'I':
164
			if !l.setState(connStateExpectReadyForQuery) {
165
				// protocol out of sync
166
				return fmt.Errorf("unexpected CommandComplete")
167
			}
168
			// ExecSimpleQuery doesn't need to know about this message
169
170
		case 'Z':
171
			if !l.setState(connStateIdle) {
172
				// protocol out of sync
173
				return fmt.Errorf("unexpected ReadyForQuery")
174
			}
175
			l.replyChan <- message{t, nil}
176
177
		case 'N', 'S':
178
			// ignore
179
		default:
180
			return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
181
		}
182
	}
183
}
184
185
// This is the main routine for the goroutine receiving on the database
186
// connection.  Most of the main logic is in listenerConnLoop.
187
func (l *ListenerConn) listenerConnMain() {
188
	err := l.listenerConnLoop()
189
190
	// listenerConnLoop terminated; we're done, but we still have to clean up.
191
	// Make sure nobody tries to start any new queries by making sure the err
192
	// pointer is set.  It is important that we do not overwrite its value; a
193
	// connection could be closed by either this goroutine or one sending on
194
	// the connection -- whoever closes the connection is assumed to have the
195
	// more meaningful error message (as the other one will probably get
196
	// net.errClosed), so that goroutine sets the error we expose while the
197
	// other error is discarded.  If the connection is lost while two
198
	// goroutines are operating on the socket, it probably doesn't matter which
199
	// error we expose so we don't try to do anything more complex.
200
	l.connectionLock.Lock()
201
	if l.err == nil {
202
		l.err = err
203
	}
204
	l.cn.Close()
205
	l.connectionLock.Unlock()
206
207
	// There might be a query in-flight; make sure nobody's waiting for a
208
	// response to it, since there's not going to be one.
209
	close(l.replyChan)
210
211
	// let the listener know we're done
212
	close(l.notificationChan)
213
214
	// this ListenerConn is done
215
}
216
217
// Listen sends a LISTEN query to the server. See ExecSimpleQuery.
218
func (l *ListenerConn) Listen(channel string) (bool, error) {
219
	return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
220
}
221
222
// Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
223
func (l *ListenerConn) Unlisten(channel string) (bool, error) {
224
	return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
225
}
226
227
// UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
228
func (l *ListenerConn) UnlistenAll() (bool, error) {
229
	return l.ExecSimpleQuery("UNLISTEN *")
230
}
231
232
// Ping the remote server to make sure it's alive.  Non-nil error means the
233
// connection has failed and should be abandoned.
234
func (l *ListenerConn) Ping() error {
235
	sent, err := l.ExecSimpleQuery("")
236
	if !sent {
237
		return err
238
	}
239
	if err != nil {
240
		// shouldn't happen
241
		panic(err)
242
	}
243
	return nil
244
}
245
246
// Attempt to send a query on the connection.  Returns an error if sending the
247
// query failed, and the caller should initiate closure of this connection.
248
// The caller must be holding senderLock (see acquireSenderLock and
249
// releaseSenderLock).
250
func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
251
	defer errRecoverNoErrBadConn(&err)
252
253
	// must set connection state before sending the query
254
	if !l.setState(connStateExpectResponse) {
255
		panic("two queries running at the same time")
256
	}
257
258
	// Can't use l.cn.writeBuf here because it uses the scratch buffer which
259
	// might get overwritten by listenerConnLoop.
260
	b := &writeBuf{
261
		buf: []byte("Q\x00\x00\x00\x00"),
262
		pos: 1,
263
	}
264
	b.string(q)
265
	l.cn.send(b)
266
267
	return nil
268
}
269
270
// ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
271
// parameters) on the connection. The possible return values are:
272
//   1) "executed" is true; the query was executed to completion on the
273
//      database server.  If the query failed, err will be set to the error
274
//      returned by the database, otherwise err will be nil.
275
//   2) If "executed" is false, the query could not be executed on the remote
276
//      server.  err will be non-nil.
277
//
278
// After a call to ExecSimpleQuery has returned an executed=false value, the
279
// connection has either been closed or will be closed shortly thereafter, and
280
// all subsequently executed queries will return an error.
281
func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
282
	if err = l.acquireSenderLock(); err != nil {
283
		return false, err
284
	}
285
	defer l.releaseSenderLock()
286
287
	err = l.sendSimpleQuery(q)
288
	if err != nil {
289
		// We can't know what state the protocol is in, so we need to abandon
290
		// this connection.
291
		l.connectionLock.Lock()
292
		// Set the error pointer if it hasn't been set already; see
293
		// listenerConnMain.
294
		if l.err == nil {
295
			l.err = err
296
		}
297
		l.connectionLock.Unlock()
298
		l.cn.c.Close()
299
		return false, err
300
	}
301
302
	// now we just wait for a reply..
303
	for {
304
		m, ok := <-l.replyChan
305
		if !ok {
306
			// We lost the connection to server, don't bother waiting for a
307
			// a response.  err should have been set already.
308
			l.connectionLock.Lock()
309
			err := l.err
310
			l.connectionLock.Unlock()
311
			return false, err
312
		}
313
		switch m.typ {
314
		case 'Z':
315
			// sanity check
316
			if m.err != nil {
317
				panic("m.err != nil")
318
			}
319
			// done; err might or might not be set
320
			return true, err
321
322
		case 'E':
323
			// sanity check
324
			if m.err == nil {
325
				panic("m.err == nil")
326
			}
327
			// server responded with an error; ReadyForQuery to follow
328
			err = m.err
329
330
		default:
331
			return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
332
		}
333
	}
334
}
335
336
// Close closes the connection.
337
func (l *ListenerConn) Close() error {
338
	l.connectionLock.Lock()
339
	if l.err != nil {
340
		l.connectionLock.Unlock()
341
		return errListenerConnClosed
342
	}
343
	l.err = errListenerConnClosed
344
	l.connectionLock.Unlock()
345
	// We can't send anything on the connection without holding senderLock.
346
	// Simply close the net.Conn to wake up everyone operating on it.
347
	return l.cn.c.Close()
348
}
349
350
// Err returns the reason the connection was closed. It is not safe to call
351
// this function until l.Notify has been closed.
352
func (l *ListenerConn) Err() error {
353
	return l.err
354
}
355
356
var errListenerClosed = errors.New("pq: Listener has been closed")
357
358
// ErrChannelAlreadyOpen is returned from Listen when a channel is already
359
// open.
360
var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
361
362
// ErrChannelNotOpen is returned from Unlisten when a channel is not open.
363
var ErrChannelNotOpen = errors.New("pq: channel is not open")
364
365
// ListenerEventType is an enumeration of listener event types.
366
type ListenerEventType int
367
368
const (
369
	// ListenerEventConnected is emitted only when the database connection
370
	// has been initially initialized. The err argument of the callback
371
	// will always be nil.
372
	ListenerEventConnected ListenerEventType = iota
373
374
	// ListenerEventDisconnected is emitted after a database connection has
375
	// been lost, either because of an error or because Close has been
376
	// called. The err argument will be set to the reason the database
377
	// connection was lost.
378
	ListenerEventDisconnected
379
380
	// ListenerEventReconnected is emitted after a database connection has
381
	// been re-established after connection loss. The err argument of the
382
	// callback will always be nil. After this event has been emitted, a
383
	// nil pq.Notification is sent on the Listener.Notify channel.
384
	ListenerEventReconnected
385
386
	// ListenerEventConnectionAttemptFailed is emitted after a connection
387
	// to the database was attempted, but failed. The err argument will be
388
	// set to an error describing why the connection attempt did not
389
	// succeed.
390
	ListenerEventConnectionAttemptFailed
391
)
392
393
// EventCallbackType is the event callback type. See also ListenerEventType
394
// constants' documentation.
395
type EventCallbackType func(event ListenerEventType, err error)
396
397
// Listener provides an interface for listening to notifications from a
398
// PostgreSQL database.  For general usage information, see section
399
// "Notifications".
400
//
401
// Listener can safely be used from concurrently running goroutines.
402
type Listener struct {
403
	// Channel for receiving notifications from the database.  In some cases a
404
	// nil value will be sent.  See section "Notifications" above.
405
	Notify chan *Notification
406
407
	name                 string
408
	minReconnectInterval time.Duration
409
	maxReconnectInterval time.Duration
410
	dialer               Dialer
411
	eventCallback        EventCallbackType
412
413
	lock                 sync.Mutex
414
	isClosed             bool
415
	reconnectCond        *sync.Cond
416
	cn                   *ListenerConn
417
	connNotificationChan <-chan *Notification
418
	channels             map[string]struct{}
419
}
420
421
// NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
422
//
423
// name should be set to a connection string to be used to establish the
424
// database connection (see section "Connection String Parameters" above).
425
//
426
// minReconnectInterval controls the duration to wait before trying to
427
// re-establish the database connection after connection loss.  After each
428
// consecutive failure this interval is doubled, until maxReconnectInterval is
429
// reached.  Successfully completing the connection establishment procedure
430
// resets the interval back to minReconnectInterval.
431
//
432
// The last parameter eventCallback can be set to a function which will be
433
// called by the Listener when the state of the underlying database connection
434
// changes.  This callback will be called by the goroutine which dispatches the
435
// notifications over the Notify channel, so you should try to avoid doing
436
// potentially time-consuming operations from the callback.
437
func NewListener(name string,
438
	minReconnectInterval time.Duration,
439
	maxReconnectInterval time.Duration,
440
	eventCallback EventCallbackType) *Listener {
441
	return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
442
}
443
444
// NewDialListener is like NewListener but it takes a Dialer.
445
func NewDialListener(d Dialer,
446
	name string,
447
	minReconnectInterval time.Duration,
448
	maxReconnectInterval time.Duration,
449
	eventCallback EventCallbackType) *Listener {
450
451
	l := &Listener{
452
		name:                 name,
453
		minReconnectInterval: minReconnectInterval,
454
		maxReconnectInterval: maxReconnectInterval,
455
		dialer:               d,
456
		eventCallback:        eventCallback,
457
458
		channels: make(map[string]struct{}),
459
460
		Notify: make(chan *Notification, 32),
461
	}
462
	l.reconnectCond = sync.NewCond(&l.lock)
463
464
	go l.listenerMain()
465
466
	return l
467
}
468
469
// NotificationChannel returns the notification channel for this listener.
470
// This is the same channel as Notify, and will not be recreated during the
471
// life time of the Listener.
472
func (l *Listener) NotificationChannel() <-chan *Notification {
473
	return l.Notify
474
}
475
476
// Listen starts listening for notifications on a channel.  Calls to this
477
// function will block until an acknowledgement has been received from the
478
// server.  Note that Listener automatically re-establishes the connection
479
// after connection loss, so this function may block indefinitely if the
480
// connection can not be re-established.
481
//
482
// Listen will only fail in three conditions:
483
//   1) The channel is already open.  The returned error will be
484
//      ErrChannelAlreadyOpen.
485
//   2) The query was executed on the remote server, but PostgreSQL returned an
486
//      error message in response to the query.  The returned error will be a
487
//      pq.Error containing the information the server supplied.
488
//   3) Close is called on the Listener before the request could be completed.
489
//
490
// The channel name is case-sensitive.
491
func (l *Listener) Listen(channel string) error {
492
	l.lock.Lock()
493
	defer l.lock.Unlock()
494
495
	if l.isClosed {
496
		return errListenerClosed
497
	}
498
499
	// The server allows you to issue a LISTEN on a channel which is already
500
	// open, but it seems useful to be able to detect this case to spot for
501
	// mistakes in application logic.  If the application genuinely does't
502
	// care, it can check the exported error and ignore it.
503
	_, exists := l.channels[channel]
504
	if exists {
505
		return ErrChannelAlreadyOpen
506
	}
507
508
	if l.cn != nil {
509
		// If gotResponse is true but error is set, the query was executed on
510
		// the remote server, but resulted in an error.  This should be
511
		// relatively rare, so it's fine if we just pass the error to our
512
		// caller.  However, if gotResponse is false, we could not complete the
513
		// query on the remote server and our underlying connection is about
514
		// to go away, so we only add relname to l.channels, and wait for
515
		// resync() to take care of the rest.
516
		gotResponse, err := l.cn.Listen(channel)
517
		if gotResponse && err != nil {
518
			return err
519
		}
520
	}
521
522
	l.channels[channel] = struct{}{}
523
	for l.cn == nil {
524
		l.reconnectCond.Wait()
525
		// we let go of the mutex for a while
526
		if l.isClosed {
527
			return errListenerClosed
528
		}
529
	}
530
531
	return nil
532
}
533
534
// Unlisten removes a channel from the Listener's channel list.  Returns
535
// ErrChannelNotOpen if the Listener is not listening on the specified channel.
536
// Returns immediately with no error if there is no connection.  Note that you
537
// might still get notifications for this channel even after Unlisten has
538
// returned.
539
//
540
// The channel name is case-sensitive.
541
func (l *Listener) Unlisten(channel string) error {
542
	l.lock.Lock()
543
	defer l.lock.Unlock()
544
545
	if l.isClosed {
546
		return errListenerClosed
547
	}
548
549
	// Similarly to LISTEN, this is not an error in Postgres, but it seems
550
	// useful to distinguish from the normal conditions.
551
	_, exists := l.channels[channel]
552
	if !exists {
553
		return ErrChannelNotOpen
554
	}
555
556
	if l.cn != nil {
557
		// Similarly to Listen (see comment in that function), the caller
558
		// should only be bothered with an error if it came from the backend as
559
		// a response to our query.
560
		gotResponse, err := l.cn.Unlisten(channel)
561
		if gotResponse && err != nil {
562
			return err
563
		}
564
	}
565
566
	// Don't bother waiting for resync if there's no connection.
567
	delete(l.channels, channel)
568
	return nil
569
}
570
571
// UnlistenAll removes all channels from the Listener's channel list.  Returns
572
// immediately with no error if there is no connection.  Note that you might
573
// still get notifications for any of the deleted channels even after
574
// UnlistenAll has returned.
575
func (l *Listener) UnlistenAll() error {
576
	l.lock.Lock()
577
	defer l.lock.Unlock()
578
579
	if l.isClosed {
580
		return errListenerClosed
581
	}
582
583
	if l.cn != nil {
584
		// Similarly to Listen (see comment in that function), the caller
585
		// should only be bothered with an error if it came from the backend as
586
		// a response to our query.
587
		gotResponse, err := l.cn.UnlistenAll()
588
		if gotResponse && err != nil {
589
			return err
590
		}
591
	}
592
593
	// Don't bother waiting for resync if there's no connection.
594
	l.channels = make(map[string]struct{})
595
	return nil
596
}
597
598
// Ping the remote server to make sure it's alive.  Non-nil return value means
599
// that there is no active connection.
600
func (l *Listener) Ping() error {
601
	l.lock.Lock()
602
	defer l.lock.Unlock()
603
604
	if l.isClosed {
605
		return errListenerClosed
606
	}
607
	if l.cn == nil {
608
		return errors.New("no connection")
609
	}
610
611
	return l.cn.Ping()
612
}
613
614
// Clean up after losing the server connection.  Returns l.cn.Err(), which
615
// should have the reason the connection was lost.
616
func (l *Listener) disconnectCleanup() error {
617
	l.lock.Lock()
618
	defer l.lock.Unlock()
619
620
	// sanity check; can't look at Err() until the channel has been closed
621
	select {
622
	case _, ok := <-l.connNotificationChan:
623
		if ok {
624
			panic("connNotificationChan not closed")
625
		}
626
	default:
627
		panic("connNotificationChan not closed")
628
	}
629
630
	err := l.cn.Err()
631
	l.cn.Close()
632
	l.cn = nil
633
	return err
634
}
635
636
// Synchronize the list of channels we want to be listening on with the server
637
// after the connection has been established.
638
func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
639
	doneChan := make(chan error)
640
	go func(notificationChan <-chan *Notification) {
641
		for channel := range l.channels {
642
			// If we got a response, return that error to our caller as it's
643
			// going to be more descriptive than cn.Err().
644
			gotResponse, err := cn.Listen(channel)
645
			if gotResponse && err != nil {
646
				doneChan <- err
647
				return
648
			}
649
650
			// If we couldn't reach the server, wait for notificationChan to
651
			// close and then return the error message from the connection, as
652
			// per ListenerConn's interface.
653
			if err != nil {
654
				for range notificationChan {
655
				}
656
				doneChan <- cn.Err()
657
				return
658
			}
659
		}
660
		doneChan <- nil
661
	}(notificationChan)
662
663
	// Ignore notifications while synchronization is going on to avoid
664
	// deadlocks.  We have to send a nil notification over Notify anyway as
665
	// we can't possibly know which notifications (if any) were lost while
666
	// the connection was down, so there's no reason to try and process
667
	// these messages at all.
668
	for {
669
		select {
670
		case _, ok := <-notificationChan:
671
			if !ok {
672
				notificationChan = nil
673
			}
674
675
		case err := <-doneChan:
676
			return err
677
		}
678
	}
679
}
680
681
// caller should NOT be holding l.lock
682
func (l *Listener) closed() bool {
683
	l.lock.Lock()
684
	defer l.lock.Unlock()
685
686
	return l.isClosed
687
}
688
689
func (l *Listener) connect() error {
690
	notificationChan := make(chan *Notification, 32)
691
	cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
692
	if err != nil {
693
		return err
694
	}
695
696
	l.lock.Lock()
697
	defer l.lock.Unlock()
698
699
	err = l.resync(cn, notificationChan)
700
	if err != nil {
701
		cn.Close()
702
		return err
703
	}
704
705
	l.cn = cn
706
	l.connNotificationChan = notificationChan
707
	l.reconnectCond.Broadcast()
708
709
	return nil
710
}
711
712
// Close disconnects the Listener from the database and shuts it down.
713
// Subsequent calls to its methods will return an error.  Close returns an
714
// error if the connection has already been closed.
715
func (l *Listener) Close() error {
716
	l.lock.Lock()
717
	defer l.lock.Unlock()
718
719
	if l.isClosed {
720
		return errListenerClosed
721
	}
722
723
	if l.cn != nil {
724
		l.cn.Close()
725
	}
726
	l.isClosed = true
727
728
	// Unblock calls to Listen()
729
	l.reconnectCond.Broadcast()
730
731
	return nil
732
}
733
734
func (l *Listener) emitEvent(event ListenerEventType, err error) {
735
	if l.eventCallback != nil {
736
		l.eventCallback(event, err)
737
	}
738
}
739
740
// Main logic here: maintain a connection to the server when possible, wait
741
// for notifications and emit events.
742
func (l *Listener) listenerConnLoop() {
743
	var nextReconnect time.Time
744
745
	reconnectInterval := l.minReconnectInterval
746
	for {
747
		for {
748
			err := l.connect()
749
			if err == nil {
750
				break
751
			}
752
753
			if l.closed() {
754
				return
755
			}
756
			l.emitEvent(ListenerEventConnectionAttemptFailed, err)
757
758
			time.Sleep(reconnectInterval)
759
			reconnectInterval *= 2
760
			if reconnectInterval > l.maxReconnectInterval {
761
				reconnectInterval = l.maxReconnectInterval
762
			}
763
		}
764
765
		if nextReconnect.IsZero() {
766
			l.emitEvent(ListenerEventConnected, nil)
767
		} else {
768
			l.emitEvent(ListenerEventReconnected, nil)
769
			l.Notify <- nil
770
		}
771
772
		reconnectInterval = l.minReconnectInterval
773
		nextReconnect = time.Now().Add(reconnectInterval)
774
775
		for {
776
			notification, ok := <-l.connNotificationChan
777
			if !ok {
778
				// lost connection, loop again
779
				break
780
			}
781
			l.Notify <- notification
782
		}
783
784
		err := l.disconnectCleanup()
785
		if l.closed() {
786
			return
787
		}
788
		l.emitEvent(ListenerEventDisconnected, err)
789
790
		time.Sleep(time.Until(nextReconnect))
791
	}
792
}
793
794
func (l *Listener) listenerMain() {
795
	l.listenerConnLoop()
796
	close(l.Notify)
797
}
798