1 | package postgres |
||
2 | |||
3 | import ( |
||
4 | "context" |
||
5 | "fmt" |
||
6 | "log/slog" |
||
7 | "strings" |
||
8 | "time" |
||
9 | |||
10 | "github.com/cenkalti/backoff/v4" |
||
11 | |||
12 | "github.com/exaring/otelpgx" |
||
13 | |||
14 | "github.com/jackc/pgx/v5" |
||
15 | |||
16 | "github.com/jackc/pgx/v5/pgxpool" |
||
17 | |||
18 | "github.com/Masterminds/squirrel" |
||
19 | ) |
||
20 | |||
21 | // Postgres - Structure for Postresql instance |
||
22 | type Postgres struct { |
||
23 | ReadPool *pgxpool.Pool |
||
24 | WritePool *pgxpool.Pool |
||
25 | |||
26 | Builder squirrel.StatementBuilderType |
||
27 | // options |
||
28 | maxDataPerWrite int |
||
29 | maxRetries int |
||
30 | watchBufferSize int |
||
31 | maxConnectionLifeTime time.Duration |
||
32 | maxConnectionIdleTime time.Duration |
||
33 | maxOpenConnections int |
||
34 | maxIdleConnections int |
||
35 | } |
||
36 | |||
37 | // New - |
||
38 | func New(uri string, opts ...Option) (*Postgres, error) { |
||
39 | return newDB(uri, uri, opts...) |
||
40 | } |
||
41 | |||
42 | // NewWithSeparateURIs - |
||
43 | func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) { |
||
44 | return newDB(writerUri, readerUri, opts...) |
||
45 | } |
||
46 | |||
47 | // new - Creates new postgresql db instance |
||
48 | func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) { |
||
49 | pg := &Postgres{ |
||
50 | maxOpenConnections: _defaultMaxOpenConnections, |
||
51 | maxIdleConnections: _defaultMaxIdleConnections, |
||
52 | maxDataPerWrite: _defaultMaxDataPerWrite, |
||
53 | maxRetries: _defaultMaxRetries, |
||
54 | watchBufferSize: _defaultWatchBufferSize, |
||
55 | } |
||
56 | |||
57 | // Custom options |
||
58 | for _, opt := range opts { |
||
59 | opt(pg) |
||
60 | } |
||
61 | |||
62 | pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar) |
||
63 | |||
64 | writeConfig, err := pgxpool.ParseConfig(writerUri) |
||
65 | if err != nil { |
||
66 | return nil, err |
||
67 | } |
||
68 | |||
69 | readConfig, err := pgxpool.ParseConfig(readerUri) |
||
70 | if err != nil { |
||
71 | return nil, err |
||
72 | } |
||
73 | |||
74 | // Set the default execution mode for queries using the write and read configurations. |
||
75 | setDefaultQueryExecMode(writeConfig.ConnConfig) |
||
76 | setDefaultQueryExecMode(readConfig.ConnConfig) |
||
77 | |||
78 | // Set the plan cache mode for both write and read configurations to optimize query planning. |
||
79 | setPlanCacheMode(writeConfig.ConnConfig) |
||
80 | setPlanCacheMode(readConfig.ConnConfig) |
||
81 | |||
82 | // Set the minimum number of idle connections in the pool for both write and read configurations. |
||
83 | writeConfig.MinConns = int32(pg.maxIdleConnections) |
||
84 | readConfig.MinConns = int32(pg.maxIdleConnections) |
||
85 | |||
86 | // Set the maximum number of active connections in the pool for both write and read configurations. |
||
87 | writeConfig.MaxConns = int32(pg.maxOpenConnections) |
||
88 | readConfig.MaxConns = int32(pg.maxOpenConnections) |
||
89 | |||
90 | // Set the maximum amount of time a connection may be idle before being closed for both configurations. |
||
91 | writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime |
||
92 | readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime |
||
93 | |||
94 | // Set the maximum lifetime of a connection in the pool for both configurations. |
||
95 | writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime |
||
96 | readConfig.MaxConnLifetime = pg.maxConnectionLifeTime |
||
97 | |||
98 | // Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time. |
||
99 | writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime)) |
||
100 | readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime)) |
||
101 | |||
102 | writeConfig.ConnConfig.Tracer = otelpgx.NewTracer() |
||
103 | readConfig.ConnConfig.Tracer = otelpgx.NewTracer() |
||
104 | |||
105 | // Create connection pools for both writing and reading operations using the configured settings. |
||
106 | pg.WritePool, pg.ReadPool, err = createPools( |
||
107 | context.Background(), // Context used to control the lifecycle of the pools. |
||
108 | writeConfig, // Configuration settings for the write pool. |
||
109 | readConfig, // Configuration settings for the read pool. |
||
110 | ) |
||
111 | // Handle errors during the creation of the connection pools. |
||
112 | if err != nil { |
||
113 | return nil, err |
||
114 | } |
||
115 | |||
116 | return pg, nil |
||
117 | } |
||
118 | |||
119 | func (p *Postgres) GetMaxDataPerWrite() int { |
||
120 | return p.maxDataPerWrite |
||
121 | } |
||
122 | |||
123 | func (p *Postgres) GetMaxRetries() int { |
||
124 | return p.maxRetries |
||
125 | } |
||
126 | |||
127 | func (p *Postgres) GetWatchBufferSize() int { |
||
128 | return p.watchBufferSize |
||
129 | } |
||
130 | |||
131 | // GetEngineType - Get the engine type which is postgresql in string |
||
132 | func (p *Postgres) GetEngineType() string { |
||
133 | return "postgres" |
||
134 | } |
||
135 | |||
136 | // Close - Close postgresql instance |
||
137 | func (p *Postgres) Close() error { |
||
138 | p.ReadPool.Close() |
||
139 | p.WritePool.Close() |
||
140 | return nil |
||
141 | } |
||
142 | |||
143 | // IsReady - Check if database is ready |
||
144 | func (p *Postgres) IsReady(ctx context.Context) (bool, error) { |
||
145 | ctx, cancel := context.WithTimeout(ctx, 2*time.Second) |
||
146 | defer cancel() |
||
147 | if err := p.ReadPool.Ping(ctx); err != nil { |
||
148 | return false, err |
||
149 | } |
||
150 | return true, nil |
||
151 | } |
||
152 | |||
153 | var queryExecModes = map[string]pgx.QueryExecMode{ |
||
154 | "cache_statement": pgx.QueryExecModeCacheStatement, |
||
155 | "cache_describe": pgx.QueryExecModeCacheDescribe, |
||
156 | "describe_exec": pgx.QueryExecModeDescribeExec, |
||
157 | "mode_exec": pgx.QueryExecModeExec, |
||
158 | "simple_protocol": pgx.QueryExecModeSimpleProtocol, |
||
159 | } |
||
160 | |||
161 | func setDefaultQueryExecMode(config *pgx.ConnConfig) { |
||
162 | // Default mode if no specific mode is found in the connection string |
||
163 | defaultMode := "cache_statement" |
||
164 | |||
165 | // Iterate through the map keys to check if any are mentioned in the connection string |
||
166 | for key := range queryExecModes { |
||
167 | if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) { |
||
168 | config.DefaultQueryExecMode = queryExecModes[key] |
||
169 | slog.Info("setDefaultQueryExecMode", slog.String("mode", key)) |
||
170 | return |
||
171 | } |
||
172 | } |
||
173 | |||
174 | // Set to default mode if no matching mode is found |
||
175 | config.DefaultQueryExecMode = queryExecModes[defaultMode] |
||
176 | slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode)) |
||
177 | } |
||
178 | |||
179 | var planCacheModes = map[string]string{ |
||
180 | "auto": "auto", |
||
181 | "force_custom_plan": "force_custom_plan", |
||
182 | "disable": "disable", |
||
183 | } |
||
184 | |||
185 | func setPlanCacheMode(config *pgx.ConnConfig) { |
||
186 | // Default plan cache mode |
||
187 | const defaultMode = "auto" |
||
188 | |||
189 | // Extract connection string |
||
190 | connStr := config.ConnString() |
||
191 | planCacheMode := defaultMode |
||
192 | |||
193 | // Check for specific plan cache modes in the connection string |
||
194 | for key, value := range planCacheModes { |
||
195 | if strings.Contains(connStr, "plan_cache_mode="+key) { |
||
196 | if key == "disable" { |
||
197 | delete(config.Config.RuntimeParams, "plan_cache_mode") |
||
198 | slog.Info("setPlanCacheMode", slog.String("mode", "disabled")) |
||
199 | return |
||
200 | } |
||
201 | planCacheMode = value |
||
202 | slog.Info("setPlanCacheMode", slog.String("mode", key)) |
||
203 | break |
||
204 | } |
||
205 | } |
||
206 | |||
207 | // Set the plan cache mode |
||
208 | config.Config.RuntimeParams["plan_cache_mode"] = planCacheMode |
||
209 | if planCacheMode == defaultMode { |
||
210 | slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode)) |
||
211 | } |
||
212 | } |
||
213 | |||
214 | // createPools initializes read and write connection pools with appropriate configurations and error handling. |
||
215 | func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) { |
||
216 | // Context with timeout for creating the pools |
||
217 | initCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
||
218 | defer cancel() |
||
219 | |||
220 | // Create write pool |
||
221 | writePool, err := pgxpool.NewWithConfig(initCtx, wConfig) |
||
222 | if err != nil { |
||
223 | return nil, nil, fmt.Errorf("failed to create write pool: %w", err) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
224 | } |
||
225 | |||
226 | // Create read pool using the same configuration |
||
227 | readPool, err := pgxpool.NewWithConfig(initCtx, rConfig) |
||
228 | if err != nil { |
||
229 | writePool.Close() // Ensure write pool is closed on failure |
||
230 | return nil, nil, fmt.Errorf("failed to create read pool: %w", err) |
||
0 ignored issues
–
show
|
|||
231 | } |
||
232 | |||
233 | // Set up retry policy for pinging pools |
||
234 | retryPolicy := backoff.NewExponentialBackOff() |
||
235 | retryPolicy.MaxElapsedTime = 1 * time.Minute |
||
236 | |||
237 | // Attempt to ping both pools to confirm connectivity |
||
238 | err = backoff.Retry(func() error { |
||
239 | pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second) |
||
240 | defer pingCancel() |
||
241 | |||
242 | if err := writePool.Ping(pingCtx); err != nil { |
||
243 | return fmt.Errorf("write pool ping failed: %w", err) |
||
0 ignored issues
–
show
|
|||
244 | } |
||
245 | if err := readPool.Ping(pingCtx); err != nil { |
||
246 | return fmt.Errorf("read pool ping failed: %w", err) |
||
0 ignored issues
–
show
|
|||
247 | } |
||
248 | return nil |
||
249 | }, retryPolicy) |
||
250 | // Handle errors from pinging |
||
251 | if err != nil { |
||
252 | writePool.Close() |
||
253 | readPool.Close() |
||
254 | return nil, nil, fmt.Errorf("pinging pools failed: %w", err) |
||
0 ignored issues
–
show
|
|||
255 | } |
||
256 | |||
257 | return writePool, readPool, nil |
||
258 | } |
||
259 |