1 | package balancer |
||
2 | |||
3 | import ( |
||
4 | "encoding/json" |
||
5 | "fmt" |
||
6 | "sync" |
||
7 | |||
8 | "golang.org/x/exp/slog" |
||
9 | "google.golang.org/grpc/balancer" |
||
10 | "google.golang.org/grpc/balancer/base" |
||
11 | "google.golang.org/grpc/connectivity" |
||
12 | "google.golang.org/grpc/resolver" |
||
13 | "google.golang.org/grpc/serviceconfig" |
||
14 | |||
15 | "github.com/Permify/permify/pkg/consistent" |
||
16 | ) |
||
17 | |||
18 | // Package-level constants for the balancer name and consistent hash key. |
||
19 | const ( |
||
20 | Name = "consistenthashing" // Name of the balancer. |
||
21 | Key = "consistenthashkey" // Key for the consistent hash. |
||
22 | ) |
||
23 | |||
24 | // Config represents the configuration for the consistent hashing balancer. |
||
25 | type Config struct { |
||
26 | serviceconfig.LoadBalancingConfig `json:"-"` // Embedding the base load balancing config. |
||
27 | PartitionCount int `json:"partitionCount,omitempty"` // Number of partitions in the consistent hash ring. |
||
28 | ReplicationFactor int `json:"replicationFactor,omitempty"` // Number of replicas for each member. |
||
29 | Load float64 `json:"load,omitempty"` // Load factor for balancing traffic. |
||
30 | PickerWidth int `json:"pickerWidth,omitempty"` // Number of closest members to consider in the picker. |
||
31 | } |
||
32 | |||
33 | // ServiceConfigJSON generates the JSON representation of the load balancer configuration. |
||
34 | func (c *Config) ServiceConfigJSON() (string, error) { |
||
35 | // Define the JSON wrapper structure for the load balancing config. |
||
36 | type Wrapper struct { |
||
37 | LoadBalancingConfig []map[string]*Config `json:"loadBalancingConfig"` |
||
38 | } |
||
39 | |||
40 | // Apply default values for zero fields. |
||
41 | if c.PartitionCount <= 0 { |
||
42 | c.PartitionCount = consistent.DefaultPartitionCount |
||
43 | } |
||
44 | if c.ReplicationFactor <= 0 { |
||
45 | c.ReplicationFactor = consistent.DefaultReplicationFactor |
||
46 | } |
||
47 | if c.Load <= 1.0 { |
||
48 | c.Load = consistent.DefaultLoad |
||
49 | } |
||
50 | if c.PickerWidth < 1 { |
||
51 | c.PickerWidth = consistent.DefaultPickerWidth |
||
52 | } |
||
53 | |||
54 | // Create the wrapper with the current configuration. |
||
55 | wrapper := Wrapper{ |
||
56 | LoadBalancingConfig: []map[string]*Config{ |
||
57 | {Name: c}, |
||
58 | }, |
||
59 | } |
||
60 | |||
61 | // Marshal the wrapped configuration to JSON. |
||
62 | jsonData, err := json.Marshal(wrapper) |
||
63 | if err != nil { |
||
64 | return "", fmt.Errorf("failed to marshal service config: %w", err) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
65 | } |
||
66 | |||
67 | return string(jsonData), nil |
||
68 | } |
||
69 | |||
70 | // NewBuilder initializes a new builder with the given hashing function. |
||
71 | func NewBuilder(fn consistent.Hasher) Builder { |
||
72 | return &builder{hasher: fn} |
||
73 | } |
||
74 | |||
75 | // ConsistentMember represents a member in the consistent hashing ring. |
||
76 | type ConsistentMember struct { |
||
77 | balancer.SubConn // Embedded SubConn for the gRPC connection. |
||
78 | name string // Unique identifier for the member. |
||
79 | } |
||
80 | |||
81 | // String returns the name of the ConsistentMember. |
||
82 | func (s ConsistentMember) String() string { return s.name } |
||
83 | |||
84 | // builder is responsible for creating and configuring the consistent hashing balancer. |
||
85 | type builder struct { |
||
86 | sync.Mutex // Mutex for thread-safe updates to the builder. |
||
87 | hasher consistent.Hasher // Hashing function for the consistent hash ring. |
||
88 | config Config // Current balancer configuration. |
||
89 | } |
||
90 | |||
91 | // Builder defines the interface for the consistent hashing balancer builder. |
||
92 | type Builder interface { |
||
93 | balancer.Builder // Interface for building balancers. |
||
94 | balancer.ConfigParser // Interface for parsing balancer configurations. |
||
95 | } |
||
96 | |||
97 | // Name returns the name of the balancer. |
||
98 | func (b *builder) Name() string { return Name } |
||
99 | |||
100 | // Build creates a new instance of the consistent hashing balancer. |
||
101 | func (b *builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
||
102 | // Initialize a new balancer with default values. |
||
103 | bal := &Balancer{ |
||
104 | clientConn: cc, |
||
105 | addressSubConns: resolver.NewAddressMap(), |
||
106 | subConnStates: make(map[balancer.SubConn]connectivity.State), |
||
107 | connectivityEvaluator: &balancer.ConnectivityStateEvaluator{}, |
||
108 | state: connectivity.Connecting, // Initial state. |
||
109 | hasher: b.hasher, |
||
110 | picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), // Default picker with no SubConns available. |
||
111 | } |
||
112 | |||
113 | return bal |
||
114 | } |
||
115 | |||
116 | // ParseConfig parses the balancer configuration from the provided JSON. |
||
117 | func (b *builder) ParseConfig(rm json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
||
118 | var cfg Config |
||
119 | // Unmarshal the JSON configuration into the Config struct. |
||
120 | if err := json.Unmarshal(rm, &cfg); err != nil { |
||
121 | return nil, fmt.Errorf("consistenthash: unable to unmarshal LB policy config: %s, error: %w", string(rm), err) |
||
0 ignored issues
–
show
|
|||
122 | } |
||
123 | |||
124 | // Log the parsed configuration using structured logging. |
||
125 | slog.Info("Parsed balancer configuration", |
||
126 | slog.String("raw_json", string(rm)), // Log the raw JSON string. |
||
127 | slog.Any("config", cfg), // Log the unmarshaled Config struct. |
||
128 | ) |
||
129 | |||
130 | // Set default values for configuration if not provided. |
||
131 | if cfg.PartitionCount <= 0 { |
||
132 | cfg.PartitionCount = consistent.DefaultPartitionCount |
||
133 | } |
||
134 | if cfg.ReplicationFactor <= 0 { |
||
135 | cfg.ReplicationFactor = consistent.DefaultReplicationFactor |
||
136 | } |
||
137 | if cfg.Load <= 1.0 { |
||
138 | cfg.Load = consistent.DefaultLoad |
||
139 | } |
||
140 | if cfg.PickerWidth < 1 { |
||
141 | cfg.PickerWidth = consistent.DefaultPickerWidth |
||
142 | } |
||
143 | |||
144 | // Update the builder's configuration with thread safety. |
||
145 | b.Lock() |
||
146 | b.config = cfg |
||
147 | b.Unlock() |
||
148 | |||
149 | return &cfg, nil |
||
150 | } |
||
151 |