Passed
Push — master ( 08d10d...0ed7ba )
by Tolga
01:16 queued 18s
created

postgres.*Postgres.GetEngineType   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 0
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8
9
	"github.com/cenkalti/backoff/v4"
10
11
	"golang.org/x/exp/slog"
12
13
	"github.com/jackc/pgx/v5"
14
15
	"github.com/jackc/pgx/v5/pgxpool"
16
17
	"github.com/Masterminds/squirrel"
18
)
19
20
// Postgres - Structure for Postresql instance
21
type Postgres struct {
22
	ReadPool  *pgxpool.Pool
23
	WritePool *pgxpool.Pool
24
25
	Builder squirrel.StatementBuilderType
26
	// options
27
	maxDataPerWrite       int
28
	maxRetries            int
29
	watchBufferSize       int
30
	maxConnectionLifeTime time.Duration
31
	maxConnectionIdleTime time.Duration
32
	maxOpenConnections    int
33
	maxIdleConnections    int
34
}
35
36
// New - Creates new postgresql db instance
37
func New(uri string, opts ...Option) (*Postgres, error) {
38
	pg := &Postgres{
39
		maxOpenConnections: _defaultMaxOpenConnections,
40
		maxIdleConnections: _defaultMaxIdleConnections,
41
		maxDataPerWrite:    _defaultMaxDataPerWrite,
42
		maxRetries:         _defaultMaxRetries,
43
		watchBufferSize:    _defaultWatchBufferSize,
44
	}
45
46
	// Custom options
47
	for _, opt := range opts {
48
		opt(pg)
49
	}
50
51
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
52
53
	writeConfig, err := pgxpool.ParseConfig(uri)
54
	if err != nil {
55
		return nil, err
56
	}
57
58
	readConfig, err := pgxpool.ParseConfig(uri)
59
	if err != nil {
60
		return nil, err
61
	}
62
63
	// Set the default execution mode for queries using the write and read configurations.
64
	setDefaultQueryExecMode(writeConfig.ConnConfig)
65
	setDefaultQueryExecMode(readConfig.ConnConfig)
66
67
	// Set the plan cache mode for both write and read configurations to optimize query planning.
68
	setPlanCacheMode(writeConfig.ConnConfig)
69
	setPlanCacheMode(readConfig.ConnConfig)
70
71
	// Set the minimum number of idle connections in the pool for both write and read configurations.
72
	writeConfig.MinConns = int32(pg.maxIdleConnections)
73
	readConfig.MinConns = int32(pg.maxIdleConnections)
74
75
	// Set the maximum number of active connections in the pool for both write and read configurations.
76
	writeConfig.MaxConns = int32(pg.maxOpenConnections)
77
	readConfig.MaxConns = int32(pg.maxOpenConnections)
78
79
	// Set the maximum amount of time a connection may be idle before being closed for both configurations.
80
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
81
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
82
83
	// Set the maximum lifetime of a connection in the pool for both configurations.
84
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
85
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
86
87
	// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
88
	writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
89
	readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
90
91
	// Create connection pools for both writing and reading operations using the configured settings.
92
	pg.WritePool, pg.ReadPool, err = createPools(
93
		context.Background(), // Context used to control the lifecycle of the pools.
94
		writeConfig,          // Configuration settings for the write pool.
95
		readConfig,           // Configuration settings for the read pool.
96
	)
97
	// Handle errors during the creation of the connection pools.
98
	if err != nil {
99
		return nil, err
100
	}
101
102
	return pg, nil
103
}
104
105
func (p *Postgres) GetMaxDataPerWrite() int {
106
	return p.maxDataPerWrite
107
}
108
109
func (p *Postgres) GetMaxRetries() int {
110
	return p.maxRetries
111
}
112
113
func (p *Postgres) GetWatchBufferSize() int {
114
	return p.watchBufferSize
115
}
116
117
// GetEngineType - Get the engine type which is postgresql in string
118
func (p *Postgres) GetEngineType() string {
119
	return "postgres"
120
}
121
122
// Close - Close postgresql instance
123
func (p *Postgres) Close() error {
124
	p.ReadPool.Close()
125
	p.WritePool.Close()
126
	return nil
127
}
128
129
// IsReady - Check if database is ready
130
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
131
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
132
	defer cancel()
133
	if err := p.ReadPool.Ping(ctx); err != nil {
134
		return false, err
135
	}
136
	return true, nil
137
}
138
139
var queryExecModes = map[string]pgx.QueryExecMode{
140
	"cache_statement": pgx.QueryExecModeCacheStatement,
141
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
142
	"describe_exec":   pgx.QueryExecModeDescribeExec,
143
	"mode_exec":       pgx.QueryExecModeExec,
144
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
145
}
146
147
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
148
	// Default mode if no specific mode is found in the connection string
149
	defaultMode := "cache_statement"
150
151
	// Iterate through the map keys to check if any are mentioned in the connection string
152
	for key := range queryExecModes {
153
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
154
			config.DefaultQueryExecMode = queryExecModes[key]
155
			slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
156
			return
157
		}
158
	}
159
160
	// Set to default mode if no matching mode is found
161
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
162
	slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
163
}
164
165
var planCacheModes = map[string]string{
166
	"auto":              "auto",
167
	"force_custom_plan": "force_custom_plan",
168
}
169
170
func setPlanCacheMode(config *pgx.ConnConfig) {
171
	// Default mode if no specific mode is found in the connection string
172
	defaultMode := "auto"
173
174
	// Check if a plan cache mode is mentioned in the connection string and set it
175
	for key := range planCacheModes {
176
		if strings.Contains(config.ConnString(), "plan_cache_mode="+key) {
177
			config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[key]
178
			slog.Info("setPlanCacheMode", slog.String("mode", key))
179
			return
180
		}
181
	}
182
183
	// Set to default mode if no matching mode is found
184
	config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[defaultMode]
185
	slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
186
}
187
188
// createPools initializes read and write connection pools with appropriate configurations and error handling.
189
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
190
	// Context with timeout for creating the pools
191
	initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
192
	defer cancel()
193
194
	// Create write pool
195
	writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
196
	if err != nil {
197
		return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
198
	}
199
200
	// Create read pool using the same configuration
201
	readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
202
	if err != nil {
203
		writePool.Close() // Ensure write pool is closed on failure
204
		return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
205
	}
206
207
	// Set up retry policy for pinging pools
208
	retryPolicy := backoff.NewExponentialBackOff()
209
	retryPolicy.MaxElapsedTime = 1 * time.Minute
210
211
	// Attempt to ping both pools to confirm connectivity
212
	err = backoff.Retry(func() error {
213
		pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
214
		defer pingCancel()
215
216
		if err := writePool.Ping(pingCtx); err != nil {
217
			return fmt.Errorf("write pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
218
		}
219
		if err := readPool.Ping(pingCtx); err != nil {
220
			return fmt.Errorf("read pool ping failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
221
		}
222
		return nil
223
	}, retryPolicy)
224
225
	// Handle errors from pinging
226
	if err != nil {
227
		writePool.Close()
228
		readPool.Close()
229
		return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
230
	}
231
232
	return writePool, readPool, nil
233
}
234