Issues (35)

pkg/database/postgres/postgres.go (5 issues)

Severity
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog"
7
	"strings"
8
	"time"
9
10
	"github.com/cenkalti/backoff/v4"
11
12
	"github.com/exaring/otelpgx"
13
14
	"github.com/jackc/pgx/v5"
15
16
	"github.com/jackc/pgx/v5/pgxpool"
17
18
	"github.com/Masterminds/squirrel"
19
)
20
21
// Postgres - Structure for Postresql instance
22
type Postgres struct {
23
	ReadPool  *pgxpool.Pool
24
	WritePool *pgxpool.Pool
25
26
	Builder squirrel.StatementBuilderType
27
	// options
28
	maxDataPerWrite       int
29
	maxRetries            int
30
	watchBufferSize       int
31
	maxConnectionLifeTime time.Duration
32
	maxConnectionIdleTime time.Duration
33
	maxOpenConnections    int
34
	maxIdleConnections    int
35
}
36
37
// New -
38
func New(uri string, opts ...Option) (*Postgres, error) {
39
	return newDB(uri, uri, opts...)
40
}
41
42
// NewWithSeparateURIs -
43
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
44
	return newDB(writerUri, readerUri, opts...)
45
}
46
47
// new - Creates new postgresql db instance
48
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
49
	pg := &Postgres{
50
		maxOpenConnections: _defaultMaxOpenConnections,
51
		maxIdleConnections: _defaultMaxIdleConnections,
52
		maxDataPerWrite:    _defaultMaxDataPerWrite,
53
		maxRetries:         _defaultMaxRetries,
54
		watchBufferSize:    _defaultWatchBufferSize,
55
	}
56
57
	// Custom options
58
	for _, opt := range opts {
59
		opt(pg)
60
	}
61
62
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
63
64
	writeConfig, err := pgxpool.ParseConfig(writerUri)
65
	if err != nil {
66
		return nil, err
67
	}
68
69
	readConfig, err := pgxpool.ParseConfig(readerUri)
70
	if err != nil {
71
		return nil, err
72
	}
73
74
	// Set the default execution mode for queries using the write and read configurations.
75
	setDefaultQueryExecMode(writeConfig.ConnConfig)
76
	setDefaultQueryExecMode(readConfig.ConnConfig)
77
78
	// Set the plan cache mode for both write and read configurations to optimize query planning.
79
	setPlanCacheMode(writeConfig.ConnConfig)
80
	setPlanCacheMode(readConfig.ConnConfig)
81
82
	// Set the minimum number of idle connections in the pool for both write and read configurations.
83
	writeConfig.MinConns = int32(pg.maxIdleConnections)
84
	readConfig.MinConns = int32(pg.maxIdleConnections)
85
86
	// Set the maximum number of active connections in the pool for both write and read configurations.
87
	writeConfig.MaxConns = int32(pg.maxOpenConnections)
88
	readConfig.MaxConns = int32(pg.maxOpenConnections)
89
90
	// Set the maximum amount of time a connection may be idle before being closed for both configurations.
91
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
92
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
93
94
	// Set the maximum lifetime of a connection in the pool for both configurations.
95
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
96
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
97
98
	// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
99
	writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
100
	readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
101
102
	writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
103
	readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
104
105
	// Create connection pools for both writing and reading operations using the configured settings.
106
	pg.WritePool, pg.ReadPool, err = createPools(
107
		context.Background(), // Context used to control the lifecycle of the pools.
108
		writeConfig,          // Configuration settings for the write pool.
109
		readConfig,           // Configuration settings for the read pool.
110
	)
111
	// Handle errors during the creation of the connection pools.
112
	if err != nil {
113
		return nil, err
114
	}
115
116
	return pg, nil
117
}
118
119
func (p *Postgres) GetMaxDataPerWrite() int {
120
	return p.maxDataPerWrite
121
}
122
123
func (p *Postgres) GetMaxRetries() int {
124
	return p.maxRetries
125
}
126
127
func (p *Postgres) GetWatchBufferSize() int {
128
	return p.watchBufferSize
129
}
130
131
// GetEngineType - Get the engine type which is postgresql in string
132
func (p *Postgres) GetEngineType() string {
133
	return "postgres"
134
}
135
136
// Close - Close postgresql instance
137
func (p *Postgres) Close() error {
138
	p.ReadPool.Close()
139
	p.WritePool.Close()
140
	return nil
141
}
142
143
// IsReady - Check if database is ready
144
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
145
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
146
	defer cancel()
147
	if err := p.ReadPool.Ping(ctx); err != nil {
148
		return false, err
149
	}
150
	return true, nil
151
}
152
153
var queryExecModes = map[string]pgx.QueryExecMode{
154
	"cache_statement": pgx.QueryExecModeCacheStatement,
155
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
156
	"describe_exec":   pgx.QueryExecModeDescribeExec,
157
	"mode_exec":       pgx.QueryExecModeExec,
158
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
159
}
160
161
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
162
	// Default mode if no specific mode is found in the connection string
163
	defaultMode := "cache_statement"
164
165
	// Iterate through the map keys to check if any are mentioned in the connection string
166
	for key := range queryExecModes {
167
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
168
			config.DefaultQueryExecMode = queryExecModes[key]
169
			slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
170
			return
171
		}
172
	}
173
174
	// Set to default mode if no matching mode is found
175
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
176
	slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
177
}
178
179
var planCacheModes = map[string]string{
180
	"auto":              "auto",
181
	"force_custom_plan": "force_custom_plan",
182
	"disable":           "disable",
183
}
184
185
func setPlanCacheMode(config *pgx.ConnConfig) {
186
	// Default plan cache mode
187
	const defaultMode = "auto"
188
189
	// Extract connection string
190
	connStr := config.ConnString()
191
	planCacheMode := defaultMode
192
193
	// Check for specific plan cache modes in the connection string
194
	for key, value := range planCacheModes {
195
		if strings.Contains(connStr, "plan_cache_mode="+key) {
196
			if key == "disable" {
197
				delete(config.Config.RuntimeParams, "plan_cache_mode")
198
				slog.Info("setPlanCacheMode", slog.String("mode", "disabled"))
199
				return
200
			}
201
			planCacheMode = value
202
			slog.Info("setPlanCacheMode", slog.String("mode", key))
203
			break
204
		}
205
	}
206
207
	// Set the plan cache mode
208
	config.Config.RuntimeParams["plan_cache_mode"] = planCacheMode
209
	if planCacheMode == defaultMode {
210
		slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
211
	}
212
}
213
214
// createPools initializes read and write connection pools with appropriate configurations and error handling.
215
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
216
	// Context with timeout for creating the pools
217
	initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
218
	defer cancel()
219
220
	// Create write pool
221
	writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
222
	if err != nil {
223
		return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
224
	}
225
226
	// Create read pool using the same configuration
227
	readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
228
	if err != nil {
229
		writePool.Close() // Ensure write pool is closed on failure
230
		return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
231
	}
232
233
	// Set up retry policy for pinging pools
234
	retryPolicy := backoff.NewExponentialBackOff()
235
	retryPolicy.MaxElapsedTime = 1 * time.Minute
236
237
	// Attempt to ping both pools to confirm connectivity
238
	err = backoff.Retry(func() error {
239
		pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
240
		defer pingCancel()
241
242
		if err := writePool.Ping(pingCtx); err != nil {
243
			return fmt.Errorf("write pool ping failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
244
		}
245
		if err := readPool.Ping(pingCtx); err != nil {
246
			return fmt.Errorf("read pool ping failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
247
		}
248
		return nil
249
	}, retryPolicy)
250
	// Handle errors from pinging
251
	if err != nil {
252
		writePool.Close()
253
		readPool.Close()
254
		return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
255
	}
256
257
	return writePool, readPool, nil
258
}
259