Passed
Push — master ( b9a458...e79a13 )
by Tolga
01:29 queued 13s
created

balancer.*builder.Build   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 13
rs 9.9
c 0
b 0
f 0
1
package balancer
2
3
import (
4
	"encoding/json"
5
	"fmt"
6
	"sync"
7
8
	"golang.org/x/exp/slog"
9
	"google.golang.org/grpc/balancer"
10
	"google.golang.org/grpc/balancer/base"
11
	"google.golang.org/grpc/connectivity"
12
	"google.golang.org/grpc/resolver"
13
	"google.golang.org/grpc/serviceconfig"
14
15
	"github.com/Permify/permify/pkg/consistent"
16
)
17
18
// Package-level constants for the balancer name and consistent hash key.
19
const (
20
	Name = "consistenthashing" // Name of the balancer.
21
	Key  = "consistenthashkey" // Key for the consistent hash.
22
)
23
24
// Config represents the configuration for the consistent hashing balancer.
25
type Config struct {
26
	serviceconfig.LoadBalancingConfig `json:"-"` // Embedding the base load balancing config.
27
	PartitionCount                    int        `json:"partitionCount,omitempty"`    // Number of partitions in the consistent hash ring.
28
	ReplicationFactor                 int        `json:"replicationFactor,omitempty"` // Number of replicas for each member.
29
	Load                              float64    `json:"load,omitempty"`              // Load factor for balancing traffic.
30
	PickerWidth                       int        `json:"pickerWidth,omitempty"`       // Number of closest members to consider in the picker.
31
}
32
33
// ServiceConfigJSON generates the JSON representation of the load balancer configuration.
34
func (c *Config) ServiceConfigJSON() (string, error) {
35
	// Define the JSON wrapper structure for the load balancing config.
36
	type Wrapper struct {
37
		LoadBalancingConfig []map[string]*Config `json:"loadBalancingConfig"`
38
	}
39
40
	// Apply default values for zero fields.
41
	if c.PartitionCount <= 0 {
42
		c.PartitionCount = consistent.DefaultPartitionCount
43
	}
44
	if c.ReplicationFactor <= 0 {
45
		c.ReplicationFactor = consistent.DefaultReplicationFactor
46
	}
47
	if c.Load <= 1.0 {
48
		c.Load = consistent.DefaultLoad
49
	}
50
	if c.PickerWidth < 1 {
51
		c.PickerWidth = consistent.DefaultPickerWidth
52
	}
53
54
	// Create the wrapper with the current configuration.
55
	wrapper := Wrapper{
56
		LoadBalancingConfig: []map[string]*Config{
57
			{Name: c},
58
		},
59
	}
60
61
	// Marshal the wrapped configuration to JSON.
62
	jsonData, err := json.Marshal(wrapper)
63
	if err != nil {
64
		return "", fmt.Errorf("failed to marshal service config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
65
	}
66
67
	return string(jsonData), nil
68
}
69
70
// NewBuilder initializes a new builder with the given hashing function.
71
func NewBuilder(fn consistent.Hasher) Builder {
72
	return &builder{hasher: fn}
73
}
74
75
// ConsistentMember represents a member in the consistent hashing ring.
76
type ConsistentMember struct {
77
	balancer.SubConn        // Embedded SubConn for the gRPC connection.
78
	name             string // Unique identifier for the member.
79
}
80
81
// String returns the name of the ConsistentMember.
82
func (s ConsistentMember) String() string { return s.name }
83
84
// builder is responsible for creating and configuring the consistent hashing balancer.
85
type builder struct {
86
	sync.Mutex                   // Mutex for thread-safe updates to the builder.
87
	hasher     consistent.Hasher // Hashing function for the consistent hash ring.
88
	config     Config            // Current balancer configuration.
89
}
90
91
// Builder defines the interface for the consistent hashing balancer builder.
92
type Builder interface {
93
	balancer.Builder      // Interface for building balancers.
94
	balancer.ConfigParser // Interface for parsing balancer configurations.
95
}
96
97
// Name returns the name of the balancer.
98
func (b *builder) Name() string { return Name }
99
100
// Build creates a new instance of the consistent hashing balancer.
101
func (b *builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
102
	// Initialize a new balancer with default values.
103
	bal := &Balancer{
104
		clientConn:            cc,
105
		addressSubConns:       resolver.NewAddressMap(),
106
		subConnStates:         make(map[balancer.SubConn]connectivity.State),
107
		connectivityEvaluator: &balancer.ConnectivityStateEvaluator{},
108
		state:                 connectivity.Connecting, // Initial state.
109
		hasher:                b.hasher,
110
		picker:                base.NewErrPicker(balancer.ErrNoSubConnAvailable), // Default picker with no SubConns available.
111
	}
112
113
	return bal
114
}
115
116
// ParseConfig parses the balancer configuration from the provided JSON.
117
func (b *builder) ParseConfig(rm json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
118
	var cfg Config
119
	// Unmarshal the JSON configuration into the Config struct.
120
	if err := json.Unmarshal(rm, &cfg); err != nil {
121
		return nil, fmt.Errorf("consistenthash: unable to unmarshal LB policy config: %s, error: %w", string(rm), err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
122
	}
123
124
	// Log the parsed configuration using structured logging.
125
	slog.Info("Parsed balancer configuration",
126
		slog.String("raw_json", string(rm)), // Log the raw JSON string.
127
		slog.Any("config", cfg),             // Log the unmarshaled Config struct.
128
	)
129
130
	// Set default values for configuration if not provided.
131
	if cfg.PartitionCount <= 0 {
132
		cfg.PartitionCount = consistent.DefaultPartitionCount
133
	}
134
	if cfg.ReplicationFactor <= 0 {
135
		cfg.ReplicationFactor = consistent.DefaultReplicationFactor
136
	}
137
	if cfg.Load <= 1.0 {
138
		cfg.Load = consistent.DefaultLoad
139
	}
140
	if cfg.PickerWidth < 1 {
141
		cfg.PickerWidth = consistent.DefaultPickerWidth
142
	}
143
144
	// Update the builder's configuration with thread safety.
145
	b.Lock()
146
	b.config = cfg
147
	b.Unlock()
148
149
	return &cfg, nil
150
}
151