Passed
Pull Request — master (#567)
by Murat
02:23
created

gossip.NewSerfGossip   A

Complexity

Conditions 3

Size

Total Lines 29
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 21
nop 1
dl 0
loc 29
rs 9.376
c 0
b 0
f 0
1
package gossip
2
3
import (
4
	"errors"
5
	"fmt"
6
	hash "github.com/Permify/permify/pkg/consistent"
7
	"github.com/hashicorp/serf/serf"
8
	"io"
9
	"log"
10
)
11
12
type Serf struct {
13
	Enabled bool
14
	serf    *serf.Serf
15
	EventCh chan serf.Event
16
}
17
18
func NewSerfGossip(node string) (*Serf, error) {
19
	config := serf.DefaultConfig()
20
	config.RejoinAfterLeave = true
21
	eventChannel := make(chan serf.Event, 256)
22
	// disable logger for serf
23
	config.LogOutput = io.Discard
24
	config.Logger = log.New(io.Discard, "", 0)
25
	config.MemberlistConfig.LogOutput = io.Discard
26
	config.EventCh = eventChannel
27
28
	// Create serf instance
29
	s, err := serf.Create(config)
30
	if err != nil {
31
		log.Fatalf("Failed to create serf instance: %v", err)
32
		return nil, err
33
	}
34
35
	_, err = s.Join([]string{node}, true)
36
	if err != nil {
37
		log.Fatalf("Failed to join cluster: %v", err)
38
		return nil, err
39
	}
40
41
	// Return a new Gossip instance with the initialized memberlist.
42
	return &Serf{
43
		Enabled: true,
44
		serf:    s,
45
		EventCh: eventChannel,
46
	}, nil
47
}
48
49
func (s *Serf) SyncNodes(consistent *hash.ConsistentHash, nodeName, port string) {
50
	for {
51
		select {
52
		case e := <-s.EventCh:
53
			switch e.EventType() {
54
			case serf.EventMemberJoin:
55
				me := e.(serf.MemberEvent)
56
				for _, m := range me.Members {
57
					if m.Name != nodeName {
58
						if _, exists := consistent.Nodes[fmt.Sprintf("%s:%d", m.Addr.String(), m.Port)]; !exists {
59
							fmt.Printf("Adding node %s:%s to the consistent hash\n", m.Addr.String(), port)
60
							consistent.AddWithWeight(fmt.Sprintf("%s:%s", m.Addr.String(), port), 100)
61
						}
62
					}
63
				}
64
			case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
65
				me := e.(serf.MemberEvent)
66
				for _, m := range me.Members {
67
					if m.Name != nodeName {
68
						fmt.Printf("Removing node %s:%d to the consistent hash\n", m.Addr.String(), m.Port)
69
						if _, exists := consistent.Nodes[fmt.Sprintf("%s:%d", m.Addr.String(), m.Port)]; exists {
70
							consistent.Remove(fmt.Sprintf("%s:%s", m.Addr.String(), port))
71
						}
72
					}
73
				}
74
			}
75
		}
76
	}
77
}
78
79
func (s *Serf) Shutdown() error {
80
	return errors.Join(s.serf.Leave(), s.serf.Shutdown())
81
}
82