Test Failed
Pull Request — master (#36)
by Frank
03:43 queued 02:03
created

vendor/github.com/lib/pq/notify.go   F

Size/Duplication

Total Lines 857
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 118
eloc 385
dl 0
loc 857
rs 2
c 0
b 0
f 0

35 Methods

Rating   Name   Duplication   Size   Complexity  
F pq.*ListenerConn.listenerConnLoop 0 51 15
A pq.NewDialListener 0 22 1
A pq.*ListenerConn.UnlistenAll 0 2 1
A pq.*ListenerConn.sendSimpleQuery 0 18 2
A pq.*Listener.listenerMain 0 3 1
A pq.*ListenerConn.listenerConnMain 0 26 2
A pq.*Listener.closed 0 5 1
A pq.*NotificationHandlerConnector.Connect 0 6 2
A pq.*Listener.NotificationChannel 0 2 1
C pq.*ListenerConn.ExecSimpleQuery 0 51 11
A pq.*ListenerConn.Listen 0 2 1
A pq.*ListenerConn.Err 0 2 1
A pq.*ListenerConn.acquireSenderLock 0 12 2
C pq.*Listener.resync 0 39 11
A pq.*ListenerConn.setState 0 15 5
C pq.*Listener.listenerConnLoop 0 49 10
A pq.NewListener 0 5 1
A pq.SetNotificationHandler 0 2 1
A pq.*ListenerConn.Unlisten 0 2 1
A pq.*ListenerConn.Ping 0 10 3
A pq.*ListenerConn.Close 0 11 2
A pq.recvNotification 0 6 1
A pq.*Listener.Close 0 17 3
B pq.*Listener.Listen 0 41 8
A pq.*Listener.Ping 0 12 3
A pq.*Listener.UnlistenAll 0 21 5
A pq.newDialListenerConn 0 16 2
A pq.*Listener.emitEvent 0 3 2
A pq.NewListenerConn 0 2 1
A pq.*ListenerConn.releaseSenderLock 0 2 1
A pq.*Listener.connect 0 21 3
A pq.ConnectorWithNotificationHandler 0 6 2
A pq.ConnectorNotificationHandler 0 5 2
A pq.*Listener.disconnectCleanup 0 18 4
B pq.*Listener.Unlisten 0 28 6
1
package pq
2
3
// Package pq is a pure Go Postgres driver for the database/sql package.
4
// This module contains support for Postgres LISTEN/NOTIFY.
5
6
import (
7
	"context"
8
	"database/sql/driver"
9
	"errors"
10
	"fmt"
11
	"sync"
12
	"sync/atomic"
13
	"time"
14
)
15
16
// Notification represents a single notification from the database.
17
type Notification struct {
18
	// Process ID (PID) of the notifying postgres backend.
19
	BePid int
20
	// Name of the channel the notification was sent on.
21
	Channel string
22
	// Payload, or the empty string if unspecified.
23
	Extra string
24
}
25
26
func recvNotification(r *readBuf) *Notification {
27
	bePid := r.int32()
28
	channel := r.string()
29
	extra := r.string()
30
31
	return &Notification{bePid, channel, extra}
32
}
33
34
// SetNotificationHandler sets the given notification handler on the given
35
// connection. A runtime panic occurs if c is not a pq connection. A nil handler
36
// may be used to unset it.
37
//
38
// Note: Notification handlers are executed synchronously by pq meaning commands
39
// won't continue to be processed until the handler returns.
40
func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
41
	c.(*conn).notificationHandler = handler
42
}
43
44
// NotificationHandlerConnector wraps a regular connector and sets a notification handler
45
// on it.
46
type NotificationHandlerConnector struct {
47
	driver.Connector
48
	notificationHandler func(*Notification)
49
}
50
51
// Connect calls the underlying connector's connect method and then sets the
52
// notification handler.
53
func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
54
	c, err := n.Connector.Connect(ctx)
55
	if err == nil {
56
		SetNotificationHandler(c, n.notificationHandler)
57
	}
58
	return c, err
59
}
60
61
// ConnectorNotificationHandler returns the currently set notification handler, if any. If
62
// the given connector is not a result of ConnectorWithNotificationHandler, nil is
63
// returned.
64
func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
65
	if c, ok := c.(*NotificationHandlerConnector); ok {
66
		return c.notificationHandler
67
	}
68
	return nil
69
}
70
71
// ConnectorWithNotificationHandler creates or sets the given handler for the given
72
// connector. If the given connector is a result of calling this function
73
// previously, it is simply set on the given connector and returned. Otherwise,
74
// this returns a new connector wrapping the given one and setting the notification
75
// handler. A nil notification handler may be used to unset it.
76
//
77
// The returned connector is intended to be used with database/sql.OpenDB.
78
//
79
// Note: Notification handlers are executed synchronously by pq meaning commands
80
// won't continue to be processed until the handler returns.
81
func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
82
	if c, ok := c.(*NotificationHandlerConnector); ok {
83
		c.notificationHandler = handler
84
		return c
85
	}
86
	return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
87
}
88
89
const (
90
	connStateIdle int32 = iota
91
	connStateExpectResponse
92
	connStateExpectReadyForQuery
93
)
94
95
type message struct {
96
	typ byte
97
	err error
98
}
99
100
var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
101
102
// ListenerConn is a low-level interface for waiting for notifications.  You
103
// should use Listener instead.
104
type ListenerConn struct {
105
	// guards cn and err
106
	connectionLock sync.Mutex
107
	cn             *conn
108
	err            error
109
110
	connState int32
111
112
	// the sending goroutine will be holding this lock
113
	senderLock sync.Mutex
114
115
	notificationChan chan<- *Notification
116
117
	replyChan chan message
118
}
119
120
// NewListenerConn creates a new ListenerConn. Use NewListener instead.
121
func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
122
	return newDialListenerConn(defaultDialer{}, name, notificationChan)
123
}
124
125
func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
126
	cn, err := DialOpen(d, name)
127
	if err != nil {
128
		return nil, err
129
	}
130
131
	l := &ListenerConn{
132
		cn:               cn.(*conn),
133
		notificationChan: c,
134
		connState:        connStateIdle,
135
		replyChan:        make(chan message, 2),
136
	}
137
138
	go l.listenerConnMain()
139
140
	return l, nil
141
}
142
143
// We can only allow one goroutine at a time to be running a query on the
144
// connection for various reasons, so the goroutine sending on the connection
145
// must be holding senderLock.
146
//
147
// Returns an error if an unrecoverable error has occurred and the ListenerConn
148
// should be abandoned.
149
func (l *ListenerConn) acquireSenderLock() error {
150
	// we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
151
	l.senderLock.Lock()
152
153
	l.connectionLock.Lock()
154
	err := l.err
155
	l.connectionLock.Unlock()
156
	if err != nil {
157
		l.senderLock.Unlock()
158
		return err
159
	}
160
	return nil
161
}
162
163
func (l *ListenerConn) releaseSenderLock() {
164
	l.senderLock.Unlock()
165
}
166
167
// setState advances the protocol state to newState.  Returns false if moving
168
// to that state from the current state is not allowed.
169
func (l *ListenerConn) setState(newState int32) bool {
170
	var expectedState int32
171
172
	switch newState {
173
	case connStateIdle:
174
		expectedState = connStateExpectReadyForQuery
175
	case connStateExpectResponse:
176
		expectedState = connStateIdle
177
	case connStateExpectReadyForQuery:
178
		expectedState = connStateExpectResponse
179
	default:
180
		panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
181
	}
182
183
	return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
184
}
185
186
// Main logic is here: receive messages from the postgres backend, forward
187
// notifications and query replies and keep the internal state in sync with the
188
// protocol state.  Returns when the connection has been lost, is about to go
189
// away or should be discarded because we couldn't agree on the state with the
190
// server backend.
191
func (l *ListenerConn) listenerConnLoop() (err error) {
192
	defer errRecoverNoErrBadConn(&err)
193
194
	r := &readBuf{}
195
	for {
196
		t, err := l.cn.recvMessage(r)
197
		if err != nil {
198
			return err
199
		}
200
201
		switch t {
202
		case 'A':
203
			// recvNotification copies all the data so we don't need to worry
204
			// about the scratch buffer being overwritten.
205
			l.notificationChan <- recvNotification(r)
206
207
		case 'T', 'D':
208
			// only used by tests; ignore
209
210
		case 'E':
211
			// We might receive an ErrorResponse even when not in a query; it
212
			// is expected that the server will close the connection after
213
			// that, but we should make sure that the error we display is the
214
			// one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
215
			if !l.setState(connStateExpectReadyForQuery) {
216
				return parseError(r)
217
			}
218
			l.replyChan <- message{t, parseError(r)}
219
220
		case 'C', 'I':
221
			if !l.setState(connStateExpectReadyForQuery) {
222
				// protocol out of sync
223
				return fmt.Errorf("unexpected CommandComplete")
224
			}
225
			// ExecSimpleQuery doesn't need to know about this message
226
227
		case 'Z':
228
			if !l.setState(connStateIdle) {
229
				// protocol out of sync
230
				return fmt.Errorf("unexpected ReadyForQuery")
231
			}
232
			l.replyChan <- message{t, nil}
233
234
		case 'S':
235
			// ignore
236
		case 'N':
237
			if n := l.cn.noticeHandler; n != nil {
238
				n(parseError(r))
239
			}
240
		default:
241
			return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
242
		}
243
	}
244
}
245
246
// This is the main routine for the goroutine receiving on the database
247
// connection.  Most of the main logic is in listenerConnLoop.
248
func (l *ListenerConn) listenerConnMain() {
249
	err := l.listenerConnLoop()
250
251
	// listenerConnLoop terminated; we're done, but we still have to clean up.
252
	// Make sure nobody tries to start any new queries by making sure the err
253
	// pointer is set.  It is important that we do not overwrite its value; a
254
	// connection could be closed by either this goroutine or one sending on
255
	// the connection -- whoever closes the connection is assumed to have the
256
	// more meaningful error message (as the other one will probably get
257
	// net.errClosed), so that goroutine sets the error we expose while the
258
	// other error is discarded.  If the connection is lost while two
259
	// goroutines are operating on the socket, it probably doesn't matter which
260
	// error we expose so we don't try to do anything more complex.
261
	l.connectionLock.Lock()
262
	if l.err == nil {
263
		l.err = err
264
	}
265
	l.cn.Close()
266
	l.connectionLock.Unlock()
267
268
	// There might be a query in-flight; make sure nobody's waiting for a
269
	// response to it, since there's not going to be one.
270
	close(l.replyChan)
271
272
	// let the listener know we're done
273
	close(l.notificationChan)
274
275
	// this ListenerConn is done
276
}
277
278
// Listen sends a LISTEN query to the server. See ExecSimpleQuery.
279
func (l *ListenerConn) Listen(channel string) (bool, error) {
280
	return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
281
}
282
283
// Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
284
func (l *ListenerConn) Unlisten(channel string) (bool, error) {
285
	return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
286
}
287
288
// UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
289
func (l *ListenerConn) UnlistenAll() (bool, error) {
290
	return l.ExecSimpleQuery("UNLISTEN *")
291
}
292
293
// Ping the remote server to make sure it's alive.  Non-nil error means the
294
// connection has failed and should be abandoned.
295
func (l *ListenerConn) Ping() error {
296
	sent, err := l.ExecSimpleQuery("")
297
	if !sent {
298
		return err
299
	}
300
	if err != nil {
301
		// shouldn't happen
302
		panic(err)
303
	}
304
	return nil
305
}
306
307
// Attempt to send a query on the connection.  Returns an error if sending the
308
// query failed, and the caller should initiate closure of this connection.
309
// The caller must be holding senderLock (see acquireSenderLock and
310
// releaseSenderLock).
311
func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
312
	defer errRecoverNoErrBadConn(&err)
313
314
	// must set connection state before sending the query
315
	if !l.setState(connStateExpectResponse) {
316
		panic("two queries running at the same time")
317
	}
318
319
	// Can't use l.cn.writeBuf here because it uses the scratch buffer which
320
	// might get overwritten by listenerConnLoop.
321
	b := &writeBuf{
322
		buf: []byte("Q\x00\x00\x00\x00"),
323
		pos: 1,
324
	}
325
	b.string(q)
326
	l.cn.send(b)
327
328
	return nil
329
}
330
331
// ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
332
// parameters) on the connection. The possible return values are:
333
//   1) "executed" is true; the query was executed to completion on the
334
//      database server.  If the query failed, err will be set to the error
335
//      returned by the database, otherwise err will be nil.
336
//   2) If "executed" is false, the query could not be executed on the remote
337
//      server.  err will be non-nil.
338
//
339
// After a call to ExecSimpleQuery has returned an executed=false value, the
340
// connection has either been closed or will be closed shortly thereafter, and
341
// all subsequently executed queries will return an error.
342
func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
343
	if err = l.acquireSenderLock(); err != nil {
344
		return false, err
345
	}
346
	defer l.releaseSenderLock()
347
348
	err = l.sendSimpleQuery(q)
349
	if err != nil {
350
		// We can't know what state the protocol is in, so we need to abandon
351
		// this connection.
352
		l.connectionLock.Lock()
353
		// Set the error pointer if it hasn't been set already; see
354
		// listenerConnMain.
355
		if l.err == nil {
356
			l.err = err
357
		}
358
		l.connectionLock.Unlock()
359
		l.cn.c.Close()
360
		return false, err
361
	}
362
363
	// now we just wait for a reply..
364
	for {
365
		m, ok := <-l.replyChan
366
		if !ok {
367
			// We lost the connection to server, don't bother waiting for a
368
			// a response.  err should have been set already.
369
			l.connectionLock.Lock()
370
			err := l.err
371
			l.connectionLock.Unlock()
372
			return false, err
373
		}
374
		switch m.typ {
375
		case 'Z':
376
			// sanity check
377
			if m.err != nil {
378
				panic("m.err != nil")
379
			}
380
			// done; err might or might not be set
381
			return true, err
382
383
		case 'E':
384
			// sanity check
385
			if m.err == nil {
386
				panic("m.err == nil")
387
			}
388
			// server responded with an error; ReadyForQuery to follow
389
			err = m.err
390
391
		default:
392
			return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
393
		}
394
	}
395
}
396
397
// Close closes the connection.
398
func (l *ListenerConn) Close() error {
399
	l.connectionLock.Lock()
400
	if l.err != nil {
401
		l.connectionLock.Unlock()
402
		return errListenerConnClosed
403
	}
404
	l.err = errListenerConnClosed
405
	l.connectionLock.Unlock()
406
	// We can't send anything on the connection without holding senderLock.
407
	// Simply close the net.Conn to wake up everyone operating on it.
408
	return l.cn.c.Close()
409
}
410
411
// Err returns the reason the connection was closed. It is not safe to call
412
// this function until l.Notify has been closed.
413
func (l *ListenerConn) Err() error {
414
	return l.err
415
}
416
417
var errListenerClosed = errors.New("pq: Listener has been closed")
418
419
// ErrChannelAlreadyOpen is returned from Listen when a channel is already
420
// open.
421
var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
422
423
// ErrChannelNotOpen is returned from Unlisten when a channel is not open.
424
var ErrChannelNotOpen = errors.New("pq: channel is not open")
425
426
// ListenerEventType is an enumeration of listener event types.
427
type ListenerEventType int
428
429
const (
430
	// ListenerEventConnected is emitted only when the database connection
431
	// has been initially initialized. The err argument of the callback
432
	// will always be nil.
433
	ListenerEventConnected ListenerEventType = iota
434
435
	// ListenerEventDisconnected is emitted after a database connection has
436
	// been lost, either because of an error or because Close has been
437
	// called. The err argument will be set to the reason the database
438
	// connection was lost.
439
	ListenerEventDisconnected
440
441
	// ListenerEventReconnected is emitted after a database connection has
442
	// been re-established after connection loss. The err argument of the
443
	// callback will always be nil. After this event has been emitted, a
444
	// nil pq.Notification is sent on the Listener.Notify channel.
445
	ListenerEventReconnected
446
447
	// ListenerEventConnectionAttemptFailed is emitted after a connection
448
	// to the database was attempted, but failed. The err argument will be
449
	// set to an error describing why the connection attempt did not
450
	// succeed.
451
	ListenerEventConnectionAttemptFailed
452
)
453
454
// EventCallbackType is the event callback type. See also ListenerEventType
455
// constants' documentation.
456
type EventCallbackType func(event ListenerEventType, err error)
457
458
// Listener provides an interface for listening to notifications from a
459
// PostgreSQL database.  For general usage information, see section
460
// "Notifications".
461
//
462
// Listener can safely be used from concurrently running goroutines.
463
type Listener struct {
464
	// Channel for receiving notifications from the database.  In some cases a
465
	// nil value will be sent.  See section "Notifications" above.
466
	Notify chan *Notification
467
468
	name                 string
469
	minReconnectInterval time.Duration
470
	maxReconnectInterval time.Duration
471
	dialer               Dialer
472
	eventCallback        EventCallbackType
473
474
	lock                 sync.Mutex
475
	isClosed             bool
476
	reconnectCond        *sync.Cond
477
	cn                   *ListenerConn
478
	connNotificationChan <-chan *Notification
479
	channels             map[string]struct{}
480
}
481
482
// NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
483
//
484
// name should be set to a connection string to be used to establish the
485
// database connection (see section "Connection String Parameters" above).
486
//
487
// minReconnectInterval controls the duration to wait before trying to
488
// re-establish the database connection after connection loss.  After each
489
// consecutive failure this interval is doubled, until maxReconnectInterval is
490
// reached.  Successfully completing the connection establishment procedure
491
// resets the interval back to minReconnectInterval.
492
//
493
// The last parameter eventCallback can be set to a function which will be
494
// called by the Listener when the state of the underlying database connection
495
// changes.  This callback will be called by the goroutine which dispatches the
496
// notifications over the Notify channel, so you should try to avoid doing
497
// potentially time-consuming operations from the callback.
498
func NewListener(name string,
499
	minReconnectInterval time.Duration,
500
	maxReconnectInterval time.Duration,
501
	eventCallback EventCallbackType) *Listener {
502
	return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
503
}
504
505
// NewDialListener is like NewListener but it takes a Dialer.
506
func NewDialListener(d Dialer,
507
	name string,
508
	minReconnectInterval time.Duration,
509
	maxReconnectInterval time.Duration,
510
	eventCallback EventCallbackType) *Listener {
511
512
	l := &Listener{
513
		name:                 name,
514
		minReconnectInterval: minReconnectInterval,
515
		maxReconnectInterval: maxReconnectInterval,
516
		dialer:               d,
517
		eventCallback:        eventCallback,
518
519
		channels: make(map[string]struct{}),
520
521
		Notify: make(chan *Notification, 32),
522
	}
523
	l.reconnectCond = sync.NewCond(&l.lock)
524
525
	go l.listenerMain()
526
527
	return l
528
}
529
530
// NotificationChannel returns the notification channel for this listener.
531
// This is the same channel as Notify, and will not be recreated during the
532
// life time of the Listener.
533
func (l *Listener) NotificationChannel() <-chan *Notification {
534
	return l.Notify
535
}
536
537
// Listen starts listening for notifications on a channel.  Calls to this
538
// function will block until an acknowledgement has been received from the
539
// server.  Note that Listener automatically re-establishes the connection
540
// after connection loss, so this function may block indefinitely if the
541
// connection can not be re-established.
542
//
543
// Listen will only fail in three conditions:
544
//   1) The channel is already open.  The returned error will be
545
//      ErrChannelAlreadyOpen.
546
//   2) The query was executed on the remote server, but PostgreSQL returned an
547
//      error message in response to the query.  The returned error will be a
548
//      pq.Error containing the information the server supplied.
549
//   3) Close is called on the Listener before the request could be completed.
550
//
551
// The channel name is case-sensitive.
552
func (l *Listener) Listen(channel string) error {
553
	l.lock.Lock()
554
	defer l.lock.Unlock()
555
556
	if l.isClosed {
557
		return errListenerClosed
558
	}
559
560
	// The server allows you to issue a LISTEN on a channel which is already
561
	// open, but it seems useful to be able to detect this case to spot for
562
	// mistakes in application logic.  If the application genuinely does't
563
	// care, it can check the exported error and ignore it.
564
	_, exists := l.channels[channel]
565
	if exists {
566
		return ErrChannelAlreadyOpen
567
	}
568
569
	if l.cn != nil {
570
		// If gotResponse is true but error is set, the query was executed on
571
		// the remote server, but resulted in an error.  This should be
572
		// relatively rare, so it's fine if we just pass the error to our
573
		// caller.  However, if gotResponse is false, we could not complete the
574
		// query on the remote server and our underlying connection is about
575
		// to go away, so we only add relname to l.channels, and wait for
576
		// resync() to take care of the rest.
577
		gotResponse, err := l.cn.Listen(channel)
578
		if gotResponse && err != nil {
579
			return err
580
		}
581
	}
582
583
	l.channels[channel] = struct{}{}
584
	for l.cn == nil {
585
		l.reconnectCond.Wait()
586
		// we let go of the mutex for a while
587
		if l.isClosed {
588
			return errListenerClosed
589
		}
590
	}
591
592
	return nil
593
}
594
595
// Unlisten removes a channel from the Listener's channel list.  Returns
596
// ErrChannelNotOpen if the Listener is not listening on the specified channel.
597
// Returns immediately with no error if there is no connection.  Note that you
598
// might still get notifications for this channel even after Unlisten has
599
// returned.
600
//
601
// The channel name is case-sensitive.
602
func (l *Listener) Unlisten(channel string) error {
603
	l.lock.Lock()
604
	defer l.lock.Unlock()
605
606
	if l.isClosed {
607
		return errListenerClosed
608
	}
609
610
	// Similarly to LISTEN, this is not an error in Postgres, but it seems
611
	// useful to distinguish from the normal conditions.
612
	_, exists := l.channels[channel]
613
	if !exists {
614
		return ErrChannelNotOpen
615
	}
616
617
	if l.cn != nil {
618
		// Similarly to Listen (see comment in that function), the caller
619
		// should only be bothered with an error if it came from the backend as
620
		// a response to our query.
621
		gotResponse, err := l.cn.Unlisten(channel)
622
		if gotResponse && err != nil {
623
			return err
624
		}
625
	}
626
627
	// Don't bother waiting for resync if there's no connection.
628
	delete(l.channels, channel)
629
	return nil
630
}
631
632
// UnlistenAll removes all channels from the Listener's channel list.  Returns
633
// immediately with no error if there is no connection.  Note that you might
634
// still get notifications for any of the deleted channels even after
635
// UnlistenAll has returned.
636
func (l *Listener) UnlistenAll() error {
637
	l.lock.Lock()
638
	defer l.lock.Unlock()
639
640
	if l.isClosed {
641
		return errListenerClosed
642
	}
643
644
	if l.cn != nil {
645
		// Similarly to Listen (see comment in that function), the caller
646
		// should only be bothered with an error if it came from the backend as
647
		// a response to our query.
648
		gotResponse, err := l.cn.UnlistenAll()
649
		if gotResponse && err != nil {
650
			return err
651
		}
652
	}
653
654
	// Don't bother waiting for resync if there's no connection.
655
	l.channels = make(map[string]struct{})
656
	return nil
657
}
658
659
// Ping the remote server to make sure it's alive.  Non-nil return value means
660
// that there is no active connection.
661
func (l *Listener) Ping() error {
662
	l.lock.Lock()
663
	defer l.lock.Unlock()
664
665
	if l.isClosed {
666
		return errListenerClosed
667
	}
668
	if l.cn == nil {
669
		return errors.New("no connection")
670
	}
671
672
	return l.cn.Ping()
673
}
674
675
// Clean up after losing the server connection.  Returns l.cn.Err(), which
676
// should have the reason the connection was lost.
677
func (l *Listener) disconnectCleanup() error {
678
	l.lock.Lock()
679
	defer l.lock.Unlock()
680
681
	// sanity check; can't look at Err() until the channel has been closed
682
	select {
683
	case _, ok := <-l.connNotificationChan:
684
		if ok {
685
			panic("connNotificationChan not closed")
686
		}
687
	default:
688
		panic("connNotificationChan not closed")
689
	}
690
691
	err := l.cn.Err()
692
	l.cn.Close()
693
	l.cn = nil
694
	return err
695
}
696
697
// Synchronize the list of channels we want to be listening on with the server
698
// after the connection has been established.
699
func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
700
	doneChan := make(chan error)
701
	go func(notificationChan <-chan *Notification) {
702
		for channel := range l.channels {
703
			// If we got a response, return that error to our caller as it's
704
			// going to be more descriptive than cn.Err().
705
			gotResponse, err := cn.Listen(channel)
706
			if gotResponse && err != nil {
707
				doneChan <- err
708
				return
709
			}
710
711
			// If we couldn't reach the server, wait for notificationChan to
712
			// close and then return the error message from the connection, as
713
			// per ListenerConn's interface.
714
			if err != nil {
715
				for range notificationChan {
716
				}
717
				doneChan <- cn.Err()
718
				return
719
			}
720
		}
721
		doneChan <- nil
722
	}(notificationChan)
723
724
	// Ignore notifications while synchronization is going on to avoid
725
	// deadlocks.  We have to send a nil notification over Notify anyway as
726
	// we can't possibly know which notifications (if any) were lost while
727
	// the connection was down, so there's no reason to try and process
728
	// these messages at all.
729
	for {
730
		select {
731
		case _, ok := <-notificationChan:
732
			if !ok {
733
				notificationChan = nil
734
			}
735
736
		case err := <-doneChan:
737
			return err
738
		}
739
	}
740
}
741
742
// caller should NOT be holding l.lock
743
func (l *Listener) closed() bool {
744
	l.lock.Lock()
745
	defer l.lock.Unlock()
746
747
	return l.isClosed
748
}
749
750
func (l *Listener) connect() error {
751
	notificationChan := make(chan *Notification, 32)
752
	cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
753
	if err != nil {
754
		return err
755
	}
756
757
	l.lock.Lock()
758
	defer l.lock.Unlock()
759
760
	err = l.resync(cn, notificationChan)
761
	if err != nil {
762
		cn.Close()
763
		return err
764
	}
765
766
	l.cn = cn
767
	l.connNotificationChan = notificationChan
768
	l.reconnectCond.Broadcast()
769
770
	return nil
771
}
772
773
// Close disconnects the Listener from the database and shuts it down.
774
// Subsequent calls to its methods will return an error.  Close returns an
775
// error if the connection has already been closed.
776
func (l *Listener) Close() error {
777
	l.lock.Lock()
778
	defer l.lock.Unlock()
779
780
	if l.isClosed {
781
		return errListenerClosed
782
	}
783
784
	if l.cn != nil {
785
		l.cn.Close()
786
	}
787
	l.isClosed = true
788
789
	// Unblock calls to Listen()
790
	l.reconnectCond.Broadcast()
791
792
	return nil
793
}
794
795
func (l *Listener) emitEvent(event ListenerEventType, err error) {
796
	if l.eventCallback != nil {
797
		l.eventCallback(event, err)
798
	}
799
}
800
801
// Main logic here: maintain a connection to the server when possible, wait
802
// for notifications and emit events.
803
func (l *Listener) listenerConnLoop() {
804
	var nextReconnect time.Time
805
806
	reconnectInterval := l.minReconnectInterval
807
	for {
808
		for {
809
			err := l.connect()
810
			if err == nil {
811
				break
812
			}
813
814
			if l.closed() {
815
				return
816
			}
817
			l.emitEvent(ListenerEventConnectionAttemptFailed, err)
818
819
			time.Sleep(reconnectInterval)
820
			reconnectInterval *= 2
821
			if reconnectInterval > l.maxReconnectInterval {
822
				reconnectInterval = l.maxReconnectInterval
823
			}
824
		}
825
826
		if nextReconnect.IsZero() {
827
			l.emitEvent(ListenerEventConnected, nil)
828
		} else {
829
			l.emitEvent(ListenerEventReconnected, nil)
830
			l.Notify <- nil
831
		}
832
833
		reconnectInterval = l.minReconnectInterval
834
		nextReconnect = time.Now().Add(reconnectInterval)
835
836
		for {
837
			notification, ok := <-l.connNotificationChan
838
			if !ok {
839
				// lost connection, loop again
840
				break
841
			}
842
			l.Notify <- notification
843
		}
844
845
		err := l.disconnectCleanup()
846
		if l.closed() {
847
			return
848
		}
849
		l.emitEvent(ListenerEventDisconnected, err)
850
851
		time.Sleep(time.Until(nextReconnect))
852
	}
853
}
854
855
func (l *Listener) listenerMain() {
856
	l.listenerConnLoop()
857
	close(l.Notify)
858
}
859