|
1
|
|
|
package gossip |
|
2
|
|
|
|
|
3
|
|
|
import ( |
|
4
|
|
|
"errors" |
|
5
|
|
|
"fmt" |
|
6
|
|
|
hash "github.com/Permify/permify/pkg/consistent" |
|
7
|
|
|
"github.com/hashicorp/serf/serf" |
|
8
|
|
|
"io" |
|
9
|
|
|
"log" |
|
10
|
|
|
) |
|
11
|
|
|
|
|
12
|
|
|
type Serf struct { |
|
13
|
|
|
Enabled bool |
|
14
|
|
|
serf *serf.Serf |
|
15
|
|
|
EventCh chan serf.Event |
|
16
|
|
|
} |
|
17
|
|
|
|
|
18
|
|
|
func NewSerfGossip(node string) (*Serf, error) { |
|
19
|
|
|
config := serf.DefaultConfig() |
|
20
|
|
|
config.RejoinAfterLeave = true |
|
21
|
|
|
eventChannel := make(chan serf.Event, 256) |
|
22
|
|
|
// disable logger for serf |
|
23
|
|
|
config.LogOutput = io.Discard |
|
24
|
|
|
config.Logger = log.New(io.Discard, "", 0) |
|
25
|
|
|
config.MemberlistConfig.LogOutput = io.Discard |
|
26
|
|
|
config.EventCh = eventChannel |
|
27
|
|
|
|
|
28
|
|
|
// Create serf instance |
|
29
|
|
|
s, err := serf.Create(config) |
|
30
|
|
|
if err != nil { |
|
31
|
|
|
log.Fatalf("Failed to create serf instance: %v", err) |
|
32
|
|
|
return nil, err |
|
33
|
|
|
} |
|
34
|
|
|
|
|
35
|
|
|
_, err = s.Join([]string{node}, true) |
|
36
|
|
|
if err != nil { |
|
37
|
|
|
log.Fatalf("Failed to join cluster: %v", err) |
|
38
|
|
|
return nil, err |
|
39
|
|
|
} |
|
40
|
|
|
|
|
41
|
|
|
// Return a new Gossip instance with the initialized memberlist. |
|
42
|
|
|
return &Serf{ |
|
43
|
|
|
Enabled: true, |
|
44
|
|
|
serf: s, |
|
45
|
|
|
EventCh: eventChannel, |
|
46
|
|
|
}, nil |
|
47
|
|
|
} |
|
48
|
|
|
|
|
49
|
|
|
func (s *Serf) SyncNodes(consistent *hash.ConsistentHash, nodeName, port string) { |
|
50
|
|
|
for { |
|
51
|
|
|
select { |
|
52
|
|
|
case e := <-s.EventCh: |
|
53
|
|
|
switch e.EventType() { |
|
54
|
|
|
case serf.EventMemberJoin: |
|
55
|
|
|
me := e.(serf.MemberEvent) |
|
56
|
|
|
for _, m := range me.Members { |
|
57
|
|
|
if m.Name != nodeName { |
|
58
|
|
|
if _, exists := consistent.Nodes[fmt.Sprintf("%s:%d", m.Addr.String(), m.Port)]; !exists { |
|
59
|
|
|
fmt.Printf("Adding node %s:%s to the consistent hash\n", m.Addr.String(), port) |
|
60
|
|
|
consistent.AddWithWeight(fmt.Sprintf("%s:%s", m.Addr.String(), port), 100) |
|
61
|
|
|
} |
|
62
|
|
|
} |
|
63
|
|
|
} |
|
64
|
|
|
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap: |
|
65
|
|
|
me := e.(serf.MemberEvent) |
|
66
|
|
|
for _, m := range me.Members { |
|
67
|
|
|
if m.Name != nodeName { |
|
68
|
|
|
fmt.Printf("Removing node %s:%d to the consistent hash\n", m.Addr.String(), m.Port) |
|
69
|
|
|
if _, exists := consistent.Nodes[fmt.Sprintf("%s:%d", m.Addr.String(), m.Port)]; exists { |
|
70
|
|
|
consistent.Remove(fmt.Sprintf("%s:%s", m.Addr.String(), port)) |
|
71
|
|
|
} |
|
72
|
|
|
} |
|
73
|
|
|
} |
|
74
|
|
|
} |
|
75
|
|
|
} |
|
76
|
|
|
} |
|
77
|
|
|
} |
|
78
|
|
|
|
|
79
|
|
|
func (s *Serf) Shutdown() error { |
|
80
|
|
|
return errors.Join(s.serf.Leave(), s.serf.Shutdown()) |
|
81
|
|
|
} |
|
82
|
|
|
|