Passed
Pull Request — master (#2609)
by Tolga
02:58
created

postgres.newDB   D

Complexity

Conditions 13

Size

Total Lines 110
Code Lines 62

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
eloc 62
nop 3
dl 0
loc 110
rs 4.1836
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.newDB often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog" // Structured logging
7
	"strings"
8
	"time"
9
10
	"github.com/cenkalti/backoff/v4"
11
12
	"github.com/exaring/otelpgx"
13
14
	"github.com/jackc/pgx/v5"
15
16
	"github.com/jackc/pgx/v5/pgxpool"
17
18
	"github.com/Masterminds/squirrel"
19
)
20
21
// Postgres - Structure for Postresql instance
22
type Postgres struct {
23
	ReadPool  *pgxpool.Pool
24
	WritePool *pgxpool.Pool
25
26
	Builder squirrel.StatementBuilderType
27
	// options
28
	maxDataPerWrite       int
29
	maxRetries            int
30
	watchBufferSize       int
31
	maxConnectionLifeTime time.Duration
32
	maxConnectionIdleTime time.Duration
33
	maxConns              int // Maximum number of connections in the pool (maps to pgxpool MaxConns)
34
	maxIdleConnections    int // Deprecated: Use MinConns instead. Kept for backward compatibility (maps to MinConns if MinConns is not set).
35
	minConns              int // Minimum number of connections in the pool (maps to pgxpool MinConns)
36
	minIdleConns          int // Minimum number of idle connections in the pool (maps to pgxpool MinIdleConns)
37
	healthCheckPeriod     time.Duration
38
	maxConnLifetimeJitter time.Duration
39
	connectTimeout        time.Duration
40
}
41
42
// New -
43
func New(uri string, opts ...Option) (*Postgres, error) {
44
	return newDB(uri, uri, opts...)
45
}
46
47
// NewWithSeparateURIs -
48
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
49
	return newDB(writerUri, readerUri, opts...)
50
}
51
52
// new - Creates new postgresql db instance
53
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
54
	pg := &Postgres{
55
		maxConns:              _defaultMaxConns,
56
		maxIdleConnections:    _defaultMaxIdleConnections,
57
		minConns:              _defaultMinConns,
58
		minIdleConns:          _defaultMinIdleConns,
59
		maxDataPerWrite:       _defaultMaxDataPerWrite,
60
		maxRetries:            _defaultMaxRetries,
61
		watchBufferSize:       _defaultWatchBufferSize,
62
		healthCheckPeriod:     _defaultHealthCheckPeriod,
63
		maxConnLifetimeJitter: _defaultMaxConnLifetimeJitter,
64
		connectTimeout:        _defaultConnectTimeout,
65
	}
66
67
	// Custom options
68
	for _, opt := range opts {
69
		opt(pg)
70
	}
71
72
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
73
74
	writeConfig, err := pgxpool.ParseConfig(writerUri)
75
	if err != nil {
76
		return nil, err
77
	}
78
79
	readConfig, err := pgxpool.ParseConfig(readerUri)
80
	if err != nil {
81
		return nil, err
82
	}
83
84
	// Set the default execution mode for queries using the write and read configurations.
85
	setDefaultQueryExecMode(writeConfig.ConnConfig)
86
	setDefaultQueryExecMode(readConfig.ConnConfig)
87
88
	// Set the plan cache mode for both write and read configurations to optimize query planning.
89
	setPlanCacheMode(writeConfig.ConnConfig)
90
	setPlanCacheMode(readConfig.ConnConfig)
91
92
	// Set the minimum number of connections in the pool for both write and read configurations.
93
	// For backward compatibility: if MinConns is not set (0) but MaxIdleConnections is set, use MaxIdleConnections (old behavior).
94
	minConns := pg.minConns
95
	if minConns == 0 && pg.maxIdleConnections > 0 {
96
		minConns = pg.maxIdleConnections
97
	}
98
	if minConns > 0 {
99
		writeConfig.MinConns = int32(minConns)
100
		readConfig.MinConns = int32(minConns)
101
	}
102
103
	// Set the minimum number of idle connections in the pool.
104
	// Note: MinIdleConns was not set in the old code, so we only set it if explicitly configured.
105
	if pg.minIdleConns > 0 {
106
		writeConfig.MinIdleConns = int32(pg.minIdleConns)
107
		readConfig.MinIdleConns = int32(pg.minIdleConns)
108
	}
109
110
	// Set the maximum number of connections in the pool for both write and read configurations.
111
	// pgxpool default is 0 (unlimited), so only set if explicitly configured.
112
	// Note: MaxOpenConnections is already mapped to MaxConns via options.go, so no backward compatibility needed here.
113
	if pg.maxConns > 0 {
114
		writeConfig.MaxConns = int32(pg.maxConns)
115
		readConfig.MaxConns = int32(pg.maxConns)
116
	}
117
118
	// Set the maximum amount of time a connection may be idle before being closed for both configurations.
119
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
120
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
121
122
	// Set the maximum lifetime of a connection in the pool for both configurations.
123
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
124
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
125
126
	// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
127
	if pg.maxConnLifetimeJitter > 0 {
128
		writeConfig.MaxConnLifetimeJitter = pg.maxConnLifetimeJitter
129
		readConfig.MaxConnLifetimeJitter = pg.maxConnLifetimeJitter
130
	} else {
131
		// Default to 20% of MaxConnLifetime if not explicitly set
132
		writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
133
		readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
134
	}
135
136
	// Set the health check period for both configurations.
137
	if pg.healthCheckPeriod > 0 {
138
		writeConfig.HealthCheckPeriod = pg.healthCheckPeriod
139
		readConfig.HealthCheckPeriod = pg.healthCheckPeriod
140
	}
141
142
	// Set the connect timeout for both configurations.
143
	if pg.connectTimeout > 0 {
144
		writeConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
145
		readConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
146
	}
147
148
	writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
149
	readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
150
151
	// Create connection pools for both writing and reading operations using the configured settings.
152
	pg.WritePool, pg.ReadPool, err = createPools(
153
		context.Background(), // Context used to control the lifecycle of the pools.
154
		writeConfig,          // Configuration settings for the write pool.
155
		readConfig,           // Configuration settings for the read pool.
156
	)
157
	// Handle errors during the creation of the connection pools.
158
	if err != nil {
159
		return nil, err
160
	}
161
162
	return pg, nil
163
}
164
165
func (p *Postgres) GetMaxDataPerWrite() int {
166
	return p.maxDataPerWrite
167
}
168
169
func (p *Postgres) GetMaxRetries() int {
170
	return p.maxRetries
171
}
172
173
func (p *Postgres) GetWatchBufferSize() int {
174
	return p.watchBufferSize
175
}
176
177
// GetEngineType - Get the engine type which is postgresql in string
178
func (p *Postgres) GetEngineType() string {
179
	return "postgres"
180
}
181
182
// Close - Close postgresql instance
183
func (p *Postgres) Close() error {
184
	p.ReadPool.Close()
185
	p.WritePool.Close()
186
	return nil
187
}
188
189
// IsReady - Check if database is ready
190
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
191
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
192
	defer cancel()
193
	if err := p.ReadPool.Ping(ctx); err != nil {
194
		return false, err
195
	}
196
	return true, nil
197
}
198
199
var queryExecModes = map[string]pgx.QueryExecMode{
200
	"cache_statement": pgx.QueryExecModeCacheStatement,
201
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
202
	"describe_exec":   pgx.QueryExecModeDescribeExec,
203
	"mode_exec":       pgx.QueryExecModeExec,
204
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
205
}
206
207
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
208
	// Default mode if no specific mode is found in the connection string
209
	defaultMode := "cache_statement"
210
211
	// Iterate through the map keys to check if any are mentioned in the connection string
212
	for key := range queryExecModes {
213
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
214
			config.DefaultQueryExecMode = queryExecModes[key]
215
			slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
216
			return
217
		}
218
	}
219
220
	// Set to default mode if no matching mode is found
221
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
222
	slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
223
}
224
225
var planCacheModes = map[string]string{
226
	"auto":              "auto",
227
	"force_custom_plan": "force_custom_plan",
228
	"disable":           "disable",
229
}
230
231
func setPlanCacheMode(config *pgx.ConnConfig) {
232
	// Default plan cache mode
233
	const defaultMode = "auto"
234
235
	// Extract connection string
236
	connStr := config.ConnString()
237
	planCacheMode := defaultMode
238
239
	// Check for specific plan cache modes in the connection string
240
	for key, value := range planCacheModes {
241
		if strings.Contains(connStr, "plan_cache_mode="+key) {
242
			if key == "disable" {
243
				delete(config.RuntimeParams, "plan_cache_mode")
244
				slog.Info("setPlanCacheMode", slog.String("mode", "disabled"))
245
				return
246
			}
247
			planCacheMode = value
248
			slog.Info("setPlanCacheMode", slog.String("mode", key))
249
			break
250
		}
251
	}
252
253
	// Set the plan cache mode
254
	config.RuntimeParams["plan_cache_mode"] = planCacheMode
255
	if planCacheMode == defaultMode {
256
		slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
257
	}
258
}
259
260
// createPools initializes read and write connection pools with appropriate configurations and error handling.
261
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
262
	// Context with timeout for creating the pools
263
	initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
264
	defer cancel()
265
266
	// Create write pool
267
	writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
268
	if err != nil {
269
		return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
270
	}
271
272
	// Create read pool using the same configuration
273
	readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
274
	if err != nil {
275
		writePool.Close() // Ensure write pool is closed on failure
276
		return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
277
	}
278
279
	// Set up retry policy for pinging pools
280
	retryPolicy := backoff.NewExponentialBackOff()
281
	retryPolicy.MaxElapsedTime = 1 * time.Minute
282
283
	// Attempt to ping both pools to confirm connectivity
284
	err = backoff.Retry(func() error {
285
		pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
286
		defer pingCancel()
287
288
		if err := writePool.Ping(pingCtx); err != nil {
289
			return fmt.Errorf("write pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
290
		}
291
		if err := readPool.Ping(pingCtx); err != nil {
292
			return fmt.Errorf("read pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
293
		}
294
		return nil
295
	}, retryPolicy)
296
	// Handle errors from pinging
297
	if err != nil {
298
		writePool.Close()
299
		readPool.Close()
300
		return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
301
	}
302
303
	return writePool, readPool, nil
304
}
305