1
|
|
|
package balancer |
2
|
|
|
|
3
|
|
|
import ( |
4
|
|
|
"encoding/json" |
5
|
|
|
"fmt" |
6
|
|
|
"sync" |
7
|
|
|
|
8
|
|
|
"golang.org/x/exp/slog" |
9
|
|
|
"google.golang.org/grpc/balancer" |
10
|
|
|
"google.golang.org/grpc/balancer/base" |
11
|
|
|
"google.golang.org/grpc/connectivity" |
12
|
|
|
"google.golang.org/grpc/resolver" |
13
|
|
|
"google.golang.org/grpc/serviceconfig" |
14
|
|
|
|
15
|
|
|
"github.com/Permify/permify/pkg/consistent" |
16
|
|
|
) |
17
|
|
|
|
18
|
|
|
// Package-level constants for the balancer name and consistent hash key. |
19
|
|
|
const ( |
20
|
|
|
Name = "consistenthashing" // Name of the balancer. |
21
|
|
|
Key = "consistenthashkey" // Key for the consistent hash. |
22
|
|
|
) |
23
|
|
|
|
24
|
|
|
// Config represents the configuration for the consistent hashing balancer. |
25
|
|
|
type Config struct { |
26
|
|
|
serviceconfig.LoadBalancingConfig `json:"-"` // Embedding the base load balancing config. |
27
|
|
|
PartitionCount int `json:"partitionCount,omitempty"` // Number of partitions in the consistent hash ring. |
28
|
|
|
ReplicationFactor int `json:"replicationFactor,omitempty"` // Number of replicas for each member. |
29
|
|
|
Load float64 `json:"load,omitempty"` // Load factor for balancing traffic. |
30
|
|
|
PickerWidth int `json:"pickerWidth,omitempty"` // Number of closest members to consider in the picker. |
31
|
|
|
} |
32
|
|
|
|
33
|
|
|
// ServiceConfigJSON generates the JSON representation of the load balancer configuration. |
34
|
|
|
func (c *Config) ServiceConfigJSON() (string, error) { |
35
|
|
|
// Define the JSON wrapper structure for the load balancing config. |
36
|
|
|
type Wrapper struct { |
37
|
|
|
LoadBalancingConfig []map[string]*Config `json:"loadBalancingConfig"` |
38
|
|
|
} |
39
|
|
|
|
40
|
|
|
// Apply default values for zero fields. |
41
|
|
|
if c.PartitionCount <= 0 { |
42
|
|
|
c.PartitionCount = consistent.DefaultPartitionCount |
43
|
|
|
} |
44
|
|
|
if c.ReplicationFactor <= 0 { |
45
|
|
|
c.ReplicationFactor = consistent.DefaultReplicationFactor |
46
|
|
|
} |
47
|
|
|
if c.Load <= 1.0 { |
48
|
|
|
c.Load = consistent.DefaultLoad |
49
|
|
|
} |
50
|
|
|
if c.PickerWidth < 1 { |
51
|
|
|
c.PickerWidth = consistent.DefaultPickerWidth |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
// Create the wrapper with the current configuration. |
55
|
|
|
wrapper := Wrapper{ |
56
|
|
|
LoadBalancingConfig: []map[string]*Config{ |
57
|
|
|
{Name: c}, |
58
|
|
|
}, |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
// Marshal the wrapped configuration to JSON. |
62
|
|
|
jsonData, err := json.Marshal(wrapper) |
63
|
|
|
if err != nil { |
64
|
|
|
return "", fmt.Errorf("failed to marshal service config: %w", err) |
|
|
|
|
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
return string(jsonData), nil |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
// NewBuilder initializes a new builder with the given hashing function. |
71
|
|
|
func NewBuilder(fn consistent.Hasher) Builder { |
72
|
|
|
return &builder{hasher: fn} |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
// ConsistentMember represents a member in the consistent hashing ring. |
76
|
|
|
type ConsistentMember struct { |
77
|
|
|
balancer.SubConn // Embedded SubConn for the gRPC connection. |
78
|
|
|
name string // Unique identifier for the member. |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
// String returns the name of the ConsistentMember. |
82
|
|
|
func (s ConsistentMember) String() string { return s.name } |
83
|
|
|
|
84
|
|
|
// builder is responsible for creating and configuring the consistent hashing balancer. |
85
|
|
|
type builder struct { |
86
|
|
|
sync.Mutex // Mutex for thread-safe updates to the builder. |
87
|
|
|
hasher consistent.Hasher // Hashing function for the consistent hash ring. |
88
|
|
|
config Config // Current balancer configuration. |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
// Builder defines the interface for the consistent hashing balancer builder. |
92
|
|
|
type Builder interface { |
93
|
|
|
balancer.Builder // Interface for building balancers. |
94
|
|
|
balancer.ConfigParser // Interface for parsing balancer configurations. |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
// Name returns the name of the balancer. |
98
|
|
|
func (b *builder) Name() string { return Name } |
99
|
|
|
|
100
|
|
|
// Build creates a new instance of the consistent hashing balancer. |
101
|
|
|
func (b *builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
102
|
|
|
// Initialize a new balancer with default values. |
103
|
|
|
bal := &Balancer{ |
104
|
|
|
clientConn: cc, |
105
|
|
|
addressSubConns: resolver.NewAddressMap(), |
106
|
|
|
subConnStates: make(map[balancer.SubConn]connectivity.State), |
107
|
|
|
connectivityEvaluator: &balancer.ConnectivityStateEvaluator{}, |
108
|
|
|
state: connectivity.Connecting, // Initial state. |
109
|
|
|
hasher: b.hasher, |
110
|
|
|
picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), // Default picker with no SubConns available. |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
return bal |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
// ParseConfig parses the balancer configuration from the provided JSON. |
117
|
|
|
func (b *builder) ParseConfig(rm json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
118
|
|
|
var cfg Config |
119
|
|
|
// Unmarshal the JSON configuration into the Config struct. |
120
|
|
|
if err := json.Unmarshal(rm, &cfg); err != nil { |
121
|
|
|
return nil, fmt.Errorf("consistenthash: unable to unmarshal LB policy config: %s, error: %w", string(rm), err) |
|
|
|
|
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
// Log the parsed configuration using structured logging. |
125
|
|
|
slog.Info("Parsed balancer configuration", |
126
|
|
|
slog.String("raw_json", string(rm)), // Log the raw JSON string. |
127
|
|
|
slog.Any("config", cfg), // Log the unmarshaled Config struct. |
128
|
|
|
) |
129
|
|
|
|
130
|
|
|
// Set default values for configuration if not provided. |
131
|
|
|
if cfg.PartitionCount <= 0 { |
132
|
|
|
cfg.PartitionCount = consistent.DefaultPartitionCount |
133
|
|
|
} |
134
|
|
|
if cfg.ReplicationFactor <= 0 { |
135
|
|
|
cfg.ReplicationFactor = consistent.DefaultReplicationFactor |
136
|
|
|
} |
137
|
|
|
if cfg.Load <= 1.0 { |
138
|
|
|
cfg.Load = consistent.DefaultLoad |
139
|
|
|
} |
140
|
|
|
if cfg.PickerWidth < 1 { |
141
|
|
|
cfg.PickerWidth = consistent.DefaultPickerWidth |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
// Update the builder's configuration with thread safety. |
145
|
|
|
b.Lock() |
146
|
|
|
b.config = cfg |
147
|
|
|
b.Unlock() |
148
|
|
|
|
149
|
|
|
return &cfg, nil |
150
|
|
|
} |
151
|
|
|
|