balancer.*builder.Build   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 13
rs 9.9
c 0
b 0
f 0
1
package balancer
2
3
import (
4
	"encoding/json"
5
	"fmt"
6
	"sync"
7
8
	"golang.org/x/exp/slog"
9
	"google.golang.org/grpc/balancer"
10
	"google.golang.org/grpc/balancer/base"
11
	"google.golang.org/grpc/connectivity"
12
	"google.golang.org/grpc/resolver"
13
	"google.golang.org/grpc/serviceconfig"
14
15
	"github.com/Permify/permify/pkg/consistent"
16
)
17
18
// contextKey is a custom type for context keys to avoid collisions
19
type contextKey string
20
21
// Package-level constants for the balancer name and consistent hash key.
22
const (
23
	Name = "consistenthashing"             // Name of the balancer.
24
	Key  = contextKey("consistenthashkey") // Key for the consistent hash.
25
)
26
27
// Config represents the configuration for the consistent hashing balancer.
28
type Config struct {
29
	serviceconfig.LoadBalancingConfig `json:"-"` // Embedding the base load balancing config.
30
	PartitionCount                    int        `json:"partitionCount,omitempty"`    // Number of partitions in the consistent hash ring.
31
	ReplicationFactor                 int        `json:"replicationFactor,omitempty"` // Number of replicas for each member.
32
	Load                              float64    `json:"load,omitempty"`              // Load factor for balancing traffic.
33
	PickerWidth                       int        `json:"pickerWidth,omitempty"`       // Number of closest members to consider in the picker.
34
}
35
36
// ServiceConfigJSON generates the JSON representation of the load balancer configuration.
37
func (c *Config) ServiceConfigJSON() (string, error) {
38
	// Define the JSON wrapper structure for the load balancing config.
39
	type Wrapper struct {
40
		LoadBalancingConfig []map[string]*Config `json:"loadBalancingConfig"`
41
	}
42
43
	// Apply default values for zero fields.
44
	if c.PartitionCount <= 0 {
45
		c.PartitionCount = consistent.DefaultPartitionCount
46
	}
47
	if c.ReplicationFactor <= 0 {
48
		c.ReplicationFactor = consistent.DefaultReplicationFactor
49
	}
50
	if c.Load <= 1.0 {
51
		c.Load = consistent.DefaultLoad
52
	}
53
	if c.PickerWidth < 1 {
54
		c.PickerWidth = consistent.DefaultPickerWidth
55
	}
56
57
	// Create the wrapper with the current configuration.
58
	wrapper := Wrapper{
59
		LoadBalancingConfig: []map[string]*Config{
60
			{Name: c},
61
		},
62
	}
63
64
	// Marshal the wrapped configuration to JSON.
65
	jsonData, err := json.Marshal(wrapper)
66
	if err != nil {
67
		return "", fmt.Errorf("failed to marshal service config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
68
	}
69
70
	return string(jsonData), nil
71
}
72
73
// NewBuilder initializes a new builder with the given hashing function.
74
func NewBuilder(fn consistent.Hasher) Builder {
75
	return &builder{hasher: fn}
76
}
77
78
// ConsistentMember represents a member in the consistent hashing ring.
79
type ConsistentMember struct {
80
	balancer.SubConn        // Embedded SubConn for the gRPC connection.
81
	name             string // Unique identifier for the member.
82
}
83
84
// String returns the name of the ConsistentMember.
85
func (s ConsistentMember) String() string { return s.name }
86
87
// builder is responsible for creating and configuring the consistent hashing balancer.
88
type builder struct {
89
	sync.Mutex                   // Mutex for thread-safe updates to the builder.
90
	hasher     consistent.Hasher // Hashing function for the consistent hash ring.
91
	config     Config            // Current balancer configuration.
92
}
93
94
// Builder defines the interface for the consistent hashing balancer builder.
95
type Builder interface {
96
	balancer.Builder      // Interface for building balancers.
97
	balancer.ConfigParser // Interface for parsing balancer configurations.
98
}
99
100
// Name returns the name of the balancer.
101
func (b *builder) Name() string { return Name }
102
103
// Build creates a new instance of the consistent hashing balancer.
104
func (b *builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
105
	// Initialize a new balancer with default values.
106
	bal := &Balancer{
107
		clientConn:            cc,
108
		addressSubConns:       resolver.NewAddressMap(),
109
		subConnStates:         make(map[balancer.SubConn]connectivity.State),
110
		connectivityEvaluator: &balancer.ConnectivityStateEvaluator{},
111
		state:                 connectivity.Connecting, // Initial state.
112
		hasher:                b.hasher,
113
		picker:                base.NewErrPicker(balancer.ErrNoSubConnAvailable), // Default picker with no SubConns available.
114
	}
115
116
	return bal
117
}
118
119
// ParseConfig parses the balancer configuration from the provided JSON.
120
func (b *builder) ParseConfig(rm json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
121
	var cfg Config
122
	// Unmarshal the JSON configuration into the Config struct.
123
	if err := json.Unmarshal(rm, &cfg); err != nil {
124
		return nil, fmt.Errorf("consistenthash: unable to unmarshal LB policy config: %s, error: %w", string(rm), err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
125
	}
126
127
	// Log the parsed configuration using structured logging.
128
	slog.Info("Parsed balancer configuration",
129
		slog.String("raw_json", string(rm)), // Log the raw JSON string.
130
		slog.Any("config", cfg),             // Log the unmarshaled Config struct.
131
	)
132
133
	// Set default values for configuration if not provided.
134
	if cfg.PartitionCount <= 0 {
135
		cfg.PartitionCount = consistent.DefaultPartitionCount
136
	}
137
	if cfg.ReplicationFactor <= 0 {
138
		cfg.ReplicationFactor = consistent.DefaultReplicationFactor
139
	}
140
	if cfg.Load <= 1.0 {
141
		cfg.Load = consistent.DefaultLoad
142
	}
143
	if cfg.PickerWidth < 1 {
144
		cfg.PickerWidth = consistent.DefaultPickerWidth
145
	}
146
147
	// Update the builder's configuration with thread safety.
148
	b.Lock()
149
	b.config = cfg
150
	b.Unlock()
151
152
	return &cfg, nil
153
}
154