Passed
Pull Request — master (#1258)
by Tolga
02:36
created

postgres.NewWithSeparateURIs   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8
9
	"github.com/exaring/otelpgx"
10
11
	"github.com/cenkalti/backoff/v4"
12
13
	"golang.org/x/exp/slog"
14
15
	"github.com/jackc/pgx/v5"
16
17
	"github.com/jackc/pgx/v5/pgxpool"
18
19
	"github.com/Masterminds/squirrel"
20
)
21
22
// Postgres - Structure for Postresql instance
23
type Postgres struct {
24
	ReadPool  *pgxpool.Pool
25
	WritePool *pgxpool.Pool
26
27
	Builder squirrel.StatementBuilderType
28
	// options
29
	maxDataPerWrite       int
30
	maxRetries            int
31
	watchBufferSize       int
32
	maxConnectionLifeTime time.Duration
33
	maxConnectionIdleTime time.Duration
34
	maxOpenConnections    int
35
	maxIdleConnections    int
36
}
37
38
// New -
39
func New(uri string, opts ...Option) (*Postgres, error) {
40
	return newDB(uri, uri, opts...)
41
}
42
43
// NewWithSeparateURIs -
44
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
45
	return newDB(writerUri, readerUri, opts...)
46
}
47
48
// new - Creates new postgresql db instance
49
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
50
	pg := &Postgres{
51
		maxOpenConnections: _defaultMaxOpenConnections,
52
		maxIdleConnections: _defaultMaxIdleConnections,
53
		maxDataPerWrite:    _defaultMaxDataPerWrite,
54
		maxRetries:         _defaultMaxRetries,
55
		watchBufferSize:    _defaultWatchBufferSize,
56
	}
57
58
	// Custom options
59
	for _, opt := range opts {
60
		opt(pg)
61
	}
62
63
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
64
65
	writeConfig, err := pgxpool.ParseConfig(writerUri)
66
	if err != nil {
67
		return nil, err
68
	}
69
70
	readConfig, err := pgxpool.ParseConfig(readerUri)
71
	if err != nil {
72
		return nil, err
73
	}
74
75
	// Set the default execution mode for queries using the write and read configurations.
76
	setDefaultQueryExecMode(writeConfig.ConnConfig)
77
	setDefaultQueryExecMode(readConfig.ConnConfig)
78
79
	// Set the plan cache mode for both write and read configurations to optimize query planning.
80
	setPlanCacheMode(writeConfig.ConnConfig)
81
	setPlanCacheMode(readConfig.ConnConfig)
82
83
	// Set the minimum number of idle connections in the pool for both write and read configurations.
84
	writeConfig.MinConns = int32(pg.maxIdleConnections)
85
	readConfig.MinConns = int32(pg.maxIdleConnections)
86
87
	// Set the maximum number of active connections in the pool for both write and read configurations.
88
	writeConfig.MaxConns = int32(pg.maxOpenConnections)
89
	readConfig.MaxConns = int32(pg.maxOpenConnections)
90
91
	// Set the maximum amount of time a connection may be idle before being closed for both configurations.
92
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
93
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
94
95
	// Set the maximum lifetime of a connection in the pool for both configurations.
96
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
97
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
98
99
	// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
100
	writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
101
	readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
102
103
	writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
104
	readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
105
106
	// Create connection pools for both writing and reading operations using the configured settings.
107
	pg.WritePool, pg.ReadPool, err = createPools(
108
		context.Background(), // Context used to control the lifecycle of the pools.
109
		writeConfig,          // Configuration settings for the write pool.
110
		readConfig,           // Configuration settings for the read pool.
111
	)
112
	// Handle errors during the creation of the connection pools.
113
	if err != nil {
114
		return nil, err
115
	}
116
117
	return pg, nil
118
}
119
120
func (p *Postgres) GetMaxDataPerWrite() int {
121
	return p.maxDataPerWrite
122
}
123
124
func (p *Postgres) GetMaxRetries() int {
125
	return p.maxRetries
126
}
127
128
func (p *Postgres) GetWatchBufferSize() int {
129
	return p.watchBufferSize
130
}
131
132
// GetEngineType - Get the engine type which is postgresql in string
133
func (p *Postgres) GetEngineType() string {
134
	return "postgres"
135
}
136
137
// Close - Close postgresql instance
138
func (p *Postgres) Close() error {
139
	p.ReadPool.Close()
140
	p.WritePool.Close()
141
	return nil
142
}
143
144
// IsReady - Check if database is ready
145
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
146
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
147
	defer cancel()
148
	if err := p.ReadPool.Ping(ctx); err != nil {
149
		return false, err
150
	}
151
	return true, nil
152
}
153
154
var queryExecModes = map[string]pgx.QueryExecMode{
155
	"cache_statement": pgx.QueryExecModeCacheStatement,
156
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
157
	"describe_exec":   pgx.QueryExecModeDescribeExec,
158
	"mode_exec":       pgx.QueryExecModeExec,
159
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
160
}
161
162
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
163
	// Default mode if no specific mode is found in the connection string
164
	defaultMode := "cache_statement"
165
166
	// Iterate through the map keys to check if any are mentioned in the connection string
167
	for key := range queryExecModes {
168
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
169
			config.DefaultQueryExecMode = queryExecModes[key]
170
			slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
171
			return
172
		}
173
	}
174
175
	// Set to default mode if no matching mode is found
176
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
177
	slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
178
}
179
180
var planCacheModes = map[string]string{
181
	"auto":              "auto",
182
	"force_custom_plan": "force_custom_plan",
183
}
184
185
func setPlanCacheMode(config *pgx.ConnConfig) {
186
	// Default mode if no specific mode is found in the connection string
187
	defaultMode := "auto"
188
189
	// Check if a plan cache mode is mentioned in the connection string and set it
190
	for key := range planCacheModes {
191
		if strings.Contains(config.ConnString(), "plan_cache_mode="+key) {
192
			config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[key]
193
			slog.Info("setPlanCacheMode", slog.String("mode", key))
194
			return
195
		}
196
	}
197
198
	// Set to default mode if no matching mode is found
199
	config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[defaultMode]
200
	slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
201
}
202
203
// createPools initializes read and write connection pools with appropriate configurations and error handling.
204
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
205
	// Context with timeout for creating the pools
206
	initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
207
	defer cancel()
208
209
	// Create write pool
210
	writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
211
	if err != nil {
212
		return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
213
	}
214
215
	// Create read pool using the same configuration
216
	readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
217
	if err != nil {
218
		writePool.Close() // Ensure write pool is closed on failure
219
		return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
220
	}
221
222
	// Set up retry policy for pinging pools
223
	retryPolicy := backoff.NewExponentialBackOff()
224
	retryPolicy.MaxElapsedTime = 1 * time.Minute
225
226
	// Attempt to ping both pools to confirm connectivity
227
	err = backoff.Retry(func() error {
228
		pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
229
		defer pingCancel()
230
231
		if err := writePool.Ping(pingCtx); err != nil {
232
			return fmt.Errorf("write pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
233
		}
234
		if err := readPool.Ping(pingCtx); err != nil {
235
			return fmt.Errorf("read pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
236
		}
237
		return nil
238
	}, retryPolicy)
239
240
	// Handle errors from pinging
241
	if err != nil {
242
		writePool.Close()
243
		readPool.Close()
244
		return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
245
	}
246
247
	return writePool, readPool, nil
248
}
249