Issues (58)

pkg/database/postgres/postgres.go (5 issues)

Severity
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog" // Structured logging
7
	"strings"
8
	"time"
9
10
	"github.com/cenkalti/backoff/v4"
11
12
	"github.com/exaring/otelpgx"
13
14
	"github.com/jackc/pgx/v5"
15
16
	"github.com/jackc/pgx/v5/pgxpool"
17
18
	"github.com/Masterminds/squirrel"
19
)
20
21
// Postgres - Structure for Postresql instance
22
type Postgres struct {
23
	ReadPool  *pgxpool.Pool
24
	WritePool *pgxpool.Pool
25
26
	Builder squirrel.StatementBuilderType
27
	// options
28
	// maxDataPerWrite sets the maximum amount of data per write operation to the database
29
	maxDataPerWrite int
30
	// maxRetries defines the maximum number of retries for database operations in case of failure
31
	maxRetries int
32
	// watchBufferSize specifies the buffer size for database watch operations, impacting how many changes can be queued
33
	watchBufferSize int
34
	// maxConnectionLifeTime determines the maximum lifetime of a connection in the pool
35
	maxConnectionLifeTime time.Duration
36
	// maxConnectionIdleTime determines the maximum time a connection can remain idle before being closed
37
	maxConnectionIdleTime time.Duration
38
	// maxConnections is the maximum number of connections in the pool (maps to pgxpool MaxConns)
39
	maxConnections int
40
	// maxIdleConnections is deprecated: Use MinConnections instead. Kept for backward compatibility (maps to MinConnections if MinConnections is not set).
41
	maxIdleConnections int
42
	// minConnections is the minimum number of connections in the pool (maps to pgxpool MinConns)
43
	minConnections int
44
	// minIdleConnections is the minimum number of idle connections in the pool (maps to pgxpool MinIdleConns)
45
	minIdleConnections int
46
	// healthCheckPeriod is the period between health checks on idle connections
47
	healthCheckPeriod time.Duration
48
	// maxConnectionLifetimeJitter is jitter added to MaxConnLifetime to prevent all connections from expiring at once
49
	maxConnectionLifetimeJitter time.Duration
50
	// connectTimeout is the maximum time to wait when establishing a new connection
51
	connectTimeout time.Duration
52
}
53
54
// New -
55
func New(uri string, opts ...Option) (*Postgres, error) {
56
	return newDB(uri, uri, opts...)
57
}
58
59
// NewWithSeparateURIs -
60
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
61
	return newDB(writerUri, readerUri, opts...)
62
}
63
64
// new - Creates new postgresql db instance
65
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
66
	pg := &Postgres{
67
		maxConnections:              _defaultMaxConnections,
68
		maxIdleConnections:          _defaultMaxIdleConnections,
69
		minConnections:              _defaultMinConnections,
70
		minIdleConnections:          _defaultMinIdleConnections,
71
		maxDataPerWrite:             _defaultMaxDataPerWrite,
72
		maxRetries:                  _defaultMaxRetries,
73
		watchBufferSize:             _defaultWatchBufferSize,
74
		healthCheckPeriod:           _defaultHealthCheckPeriod,
75
		maxConnectionLifetimeJitter: _defaultMaxConnectionLifetimeJitter,
76
		connectTimeout:              _defaultConnectTimeout,
77
	}
78
79
	// Custom options
80
	for _, opt := range opts {
81
		opt(pg)
82
	}
83
84
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
85
86
	writeConfig, err := pgxpool.ParseConfig(writerUri)
87
	if err != nil {
88
		return nil, err
89
	}
90
91
	readConfig, err := pgxpool.ParseConfig(readerUri)
92
	if err != nil {
93
		return nil, err
94
	}
95
96
	// Set the default execution mode for queries using the write and read configurations.
97
	setDefaultQueryExecMode(writeConfig.ConnConfig)
98
	setDefaultQueryExecMode(readConfig.ConnConfig)
99
100
	// Set the plan cache mode for both write and read configurations to optimize query planning.
101
	setPlanCacheMode(writeConfig.ConnConfig)
102
	setPlanCacheMode(readConfig.ConnConfig)
103
104
	// Set the minimum number of connections in the pool for both write and read configurations.
105
	// For backward compatibility: if MinConnections is not set (0) but MaxIdleConnections is set, use MaxIdleConnections (old behavior).
106
	minConns := pg.minConnections
107
	if minConns == 0 && pg.maxIdleConnections > 0 {
108
		minConns = pg.maxIdleConnections
109
	}
110
	if minConns > 0 {
111
		writeConfig.MinConns = int32(minConns)
112
		readConfig.MinConns = int32(minConns)
113
	}
114
115
	// Set the minimum number of idle connections in the pool.
116
	// Note: MinIdleConnections was not set in the old code, so we only set it if explicitly configured.
117
	if pg.minIdleConnections > 0 {
118
		writeConfig.MinIdleConns = int32(pg.minIdleConnections)
119
		readConfig.MinIdleConns = int32(pg.minIdleConnections)
120
	}
121
122
	// Set the maximum number of connections in the pool for both write and read configurations.
123
	// pgxpool default is 0 (unlimited), so only set if explicitly configured.
124
	// Note: MaxOpenConnections is already mapped to MaxConnections via options.go, so no backward compatibility needed here.
125
	if pg.maxConnections > 0 {
126
		writeConfig.MaxConns = int32(pg.maxConnections)
127
		readConfig.MaxConns = int32(pg.maxConnections)
128
	}
129
130
	// Set the maximum amount of time a connection may be idle before being closed for both configurations.
131
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
132
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
133
134
	// Set the maximum lifetime of a connection in the pool for both configurations.
135
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
136
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
137
138
	// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
139
	if pg.maxConnectionLifetimeJitter > 0 {
140
		writeConfig.MaxConnLifetimeJitter = pg.maxConnectionLifetimeJitter
141
		readConfig.MaxConnLifetimeJitter = pg.maxConnectionLifetimeJitter
142
	} else {
143
		// Default to 20% of MaxConnLifetime if not explicitly set
144
		writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
145
		readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
146
	}
147
148
	// Set the health check period for both configurations.
149
	if pg.healthCheckPeriod > 0 {
150
		writeConfig.HealthCheckPeriod = pg.healthCheckPeriod
151
		readConfig.HealthCheckPeriod = pg.healthCheckPeriod
152
	}
153
154
	// Set the connect timeout for both configurations.
155
	if pg.connectTimeout > 0 {
156
		writeConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
157
		readConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
158
	}
159
160
	writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
161
	readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
162
163
	// Create connection pools for both writing and reading operations using the configured settings.
164
	pg.WritePool, pg.ReadPool, err = createPools(
165
		context.Background(), // Context used to control the lifecycle of the pools.
166
		writeConfig,          // Configuration settings for the write pool.
167
		readConfig,           // Configuration settings for the read pool.
168
	)
169
	// Handle errors during the creation of the connection pools.
170
	if err != nil {
171
		return nil, err
172
	}
173
174
	return pg, nil
175
}
176
177
func (p *Postgres) GetMaxDataPerWrite() int {
178
	return p.maxDataPerWrite
179
}
180
181
func (p *Postgres) GetMaxRetries() int {
182
	return p.maxRetries
183
}
184
185
func (p *Postgres) GetWatchBufferSize() int {
186
	return p.watchBufferSize
187
}
188
189
// GetEngineType - Get the engine type which is postgresql in string
190
func (p *Postgres) GetEngineType() string {
191
	return "postgres"
192
}
193
194
// Close - Close postgresql instance
195
func (p *Postgres) Close() error {
196
	p.ReadPool.Close()
197
	p.WritePool.Close()
198
	return nil
199
}
200
201
// IsReady - Check if database is ready
202
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
203
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
204
	defer cancel()
205
	if err := p.ReadPool.Ping(ctx); err != nil {
206
		return false, err
207
	}
208
	return true, nil
209
}
210
211
var queryExecModes = map[string]pgx.QueryExecMode{
212
	"cache_statement": pgx.QueryExecModeCacheStatement,
213
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
214
	"describe_exec":   pgx.QueryExecModeDescribeExec,
215
	"mode_exec":       pgx.QueryExecModeExec,
216
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
217
}
218
219
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
220
	// Default mode if no specific mode is found in the connection string
221
	defaultMode := "cache_statement"
222
223
	// Iterate through the map keys to check if any are mentioned in the connection string
224
	for key := range queryExecModes {
225
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
226
			config.DefaultQueryExecMode = queryExecModes[key]
227
			slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
228
			return
229
		}
230
	}
231
232
	// Set to default mode if no matching mode is found
233
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
234
	slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
235
}
236
237
var planCacheModes = map[string]string{
238
	"auto":              "auto",
239
	"force_custom_plan": "force_custom_plan",
240
	"disable":           "disable",
241
}
242
243
func setPlanCacheMode(config *pgx.ConnConfig) {
244
	// Default plan cache mode
245
	const defaultMode = "auto"
246
247
	// Extract connection string
248
	connStr := config.ConnString()
249
	planCacheMode := defaultMode
250
251
	// Check for specific plan cache modes in the connection string
252
	for key, value := range planCacheModes {
253
		if strings.Contains(connStr, "plan_cache_mode="+key) {
254
			if key == "disable" {
255
				delete(config.RuntimeParams, "plan_cache_mode")
256
				slog.Info("setPlanCacheMode", slog.String("mode", "disabled"))
257
				return
258
			}
259
			planCacheMode = value
260
			slog.Info("setPlanCacheMode", slog.String("mode", key))
261
			break
262
		}
263
	}
264
265
	// Set the plan cache mode
266
	config.RuntimeParams["plan_cache_mode"] = planCacheMode
267
	if planCacheMode == defaultMode {
268
		slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
269
	}
270
}
271
272
// createPools initializes read and write connection pools with appropriate configurations and error handling.
273
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
274
	// Context with timeout for creating the pools
275
	initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
276
	defer cancel()
277
278
	// Create write pool
279
	writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
280
	if err != nil {
281
		return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
282
	}
283
284
	// Create read pool using the same configuration
285
	readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
286
	if err != nil {
287
		writePool.Close() // Ensure write pool is closed on failure
288
		return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
289
	}
290
291
	// Set up retry policy for pinging pools
292
	retryPolicy := backoff.NewExponentialBackOff()
293
	retryPolicy.MaxElapsedTime = 1 * time.Minute
294
295
	// Attempt to ping both pools to confirm connectivity
296
	err = backoff.Retry(func() error {
297
		pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
298
		defer pingCancel()
299
300
		if err := writePool.Ping(pingCtx); err != nil {
301
			return fmt.Errorf("write pool ping failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
302
		}
303
		if err := readPool.Ping(pingCtx); err != nil {
304
			return fmt.Errorf("read pool ping failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
305
		}
306
		return nil
307
	}, retryPolicy)
308
	// Handle errors from pinging
309
	if err != nil {
310
		writePool.Close()
311
		readPool.Close()
312
		return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
313
	}
314
315
	return writePool, readPool, nil
316
}
317