Permify /
permify
| 1 | package postgres |
||
| 2 | |||
| 3 | import ( |
||
| 4 | "context" |
||
| 5 | "fmt" |
||
| 6 | "log/slog" // Structured logging |
||
| 7 | "strings" |
||
| 8 | "time" |
||
| 9 | |||
| 10 | "github.com/cenkalti/backoff/v4" |
||
| 11 | |||
| 12 | "github.com/exaring/otelpgx" |
||
| 13 | |||
| 14 | "github.com/jackc/pgx/v5" |
||
| 15 | |||
| 16 | "github.com/jackc/pgx/v5/pgxpool" |
||
| 17 | |||
| 18 | "github.com/Masterminds/squirrel" |
||
| 19 | ) |
||
| 20 | |||
| 21 | // Postgres - Structure for Postresql instance |
||
| 22 | type Postgres struct { |
||
| 23 | ReadPool *pgxpool.Pool |
||
| 24 | WritePool *pgxpool.Pool |
||
| 25 | |||
| 26 | Builder squirrel.StatementBuilderType |
||
| 27 | // options |
||
| 28 | // maxDataPerWrite sets the maximum amount of data per write operation to the database |
||
| 29 | maxDataPerWrite int |
||
| 30 | // maxRetries defines the maximum number of retries for database operations in case of failure |
||
| 31 | maxRetries int |
||
| 32 | // watchBufferSize specifies the buffer size for database watch operations, impacting how many changes can be queued |
||
| 33 | watchBufferSize int |
||
| 34 | // maxConnectionLifeTime determines the maximum lifetime of a connection in the pool |
||
| 35 | maxConnectionLifeTime time.Duration |
||
| 36 | // maxConnectionIdleTime determines the maximum time a connection can remain idle before being closed |
||
| 37 | maxConnectionIdleTime time.Duration |
||
| 38 | // maxConnections is the maximum number of connections in the pool (maps to pgxpool MaxConns) |
||
| 39 | maxConnections int |
||
| 40 | // maxIdleConnections is deprecated: Use MinConnections instead. Kept for backward compatibility (maps to MinConnections if MinConnections is not set). |
||
| 41 | maxIdleConnections int |
||
| 42 | // minConnections is the minimum number of connections in the pool (maps to pgxpool MinConns) |
||
| 43 | minConnections int |
||
| 44 | // minIdleConnections is the minimum number of idle connections in the pool (maps to pgxpool MinIdleConns) |
||
| 45 | minIdleConnections int |
||
| 46 | // healthCheckPeriod is the period between health checks on idle connections |
||
| 47 | healthCheckPeriod time.Duration |
||
| 48 | // maxConnectionLifetimeJitter is jitter added to MaxConnLifetime to prevent all connections from expiring at once |
||
| 49 | maxConnectionLifetimeJitter time.Duration |
||
| 50 | // connectTimeout is the maximum time to wait when establishing a new connection |
||
| 51 | connectTimeout time.Duration |
||
| 52 | } |
||
| 53 | |||
| 54 | // New - |
||
| 55 | func New(uri string, opts ...Option) (*Postgres, error) { |
||
| 56 | return newDB(uri, uri, opts...) |
||
| 57 | } |
||
| 58 | |||
| 59 | // NewWithSeparateURIs - |
||
| 60 | func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) { |
||
| 61 | return newDB(writerUri, readerUri, opts...) |
||
| 62 | } |
||
| 63 | |||
| 64 | // new - Creates new postgresql db instance |
||
| 65 | func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) { |
||
| 66 | pg := &Postgres{ |
||
| 67 | maxConnections: _defaultMaxConnections, |
||
| 68 | maxIdleConnections: _defaultMaxIdleConnections, |
||
| 69 | minConnections: _defaultMinConnections, |
||
| 70 | minIdleConnections: _defaultMinIdleConnections, |
||
| 71 | maxDataPerWrite: _defaultMaxDataPerWrite, |
||
| 72 | maxRetries: _defaultMaxRetries, |
||
| 73 | watchBufferSize: _defaultWatchBufferSize, |
||
| 74 | healthCheckPeriod: _defaultHealthCheckPeriod, |
||
| 75 | maxConnectionLifetimeJitter: _defaultMaxConnectionLifetimeJitter, |
||
| 76 | connectTimeout: _defaultConnectTimeout, |
||
| 77 | } |
||
| 78 | |||
| 79 | // Custom options |
||
| 80 | for _, opt := range opts { |
||
| 81 | opt(pg) |
||
| 82 | } |
||
| 83 | |||
| 84 | pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar) |
||
| 85 | |||
| 86 | writeConfig, err := pgxpool.ParseConfig(writerUri) |
||
| 87 | if err != nil { |
||
| 88 | return nil, err |
||
| 89 | } |
||
| 90 | |||
| 91 | readConfig, err := pgxpool.ParseConfig(readerUri) |
||
| 92 | if err != nil { |
||
| 93 | return nil, err |
||
| 94 | } |
||
| 95 | |||
| 96 | // Set the default execution mode for queries using the write and read configurations. |
||
| 97 | setDefaultQueryExecMode(writeConfig.ConnConfig) |
||
| 98 | setDefaultQueryExecMode(readConfig.ConnConfig) |
||
| 99 | |||
| 100 | // Set the plan cache mode for both write and read configurations to optimize query planning. |
||
| 101 | setPlanCacheMode(writeConfig.ConnConfig) |
||
| 102 | setPlanCacheMode(readConfig.ConnConfig) |
||
| 103 | |||
| 104 | // Set the minimum number of connections in the pool for both write and read configurations. |
||
| 105 | // For backward compatibility: if MinConnections is not set (0) but MaxIdleConnections is set, use MaxIdleConnections (old behavior). |
||
| 106 | minConns := pg.minConnections |
||
| 107 | if minConns == 0 && pg.maxIdleConnections > 0 { |
||
| 108 | minConns = pg.maxIdleConnections |
||
| 109 | } |
||
| 110 | if minConns > 0 { |
||
| 111 | writeConfig.MinConns = int32(minConns) |
||
| 112 | readConfig.MinConns = int32(minConns) |
||
| 113 | } |
||
| 114 | |||
| 115 | // Set the minimum number of idle connections in the pool. |
||
| 116 | // Note: MinIdleConnections was not set in the old code, so we only set it if explicitly configured. |
||
| 117 | if pg.minIdleConnections > 0 { |
||
| 118 | writeConfig.MinIdleConns = int32(pg.minIdleConnections) |
||
| 119 | readConfig.MinIdleConns = int32(pg.minIdleConnections) |
||
| 120 | } |
||
| 121 | |||
| 122 | // Set the maximum number of connections in the pool for both write and read configurations. |
||
| 123 | // pgxpool default is 0 (unlimited), so only set if explicitly configured. |
||
| 124 | // Note: MaxOpenConnections is already mapped to MaxConnections via options.go, so no backward compatibility needed here. |
||
| 125 | if pg.maxConnections > 0 { |
||
| 126 | writeConfig.MaxConns = int32(pg.maxConnections) |
||
| 127 | readConfig.MaxConns = int32(pg.maxConnections) |
||
| 128 | } |
||
| 129 | |||
| 130 | // Set the maximum amount of time a connection may be idle before being closed for both configurations. |
||
| 131 | writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime |
||
| 132 | readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime |
||
| 133 | |||
| 134 | // Set the maximum lifetime of a connection in the pool for both configurations. |
||
| 135 | writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime |
||
| 136 | readConfig.MaxConnLifetime = pg.maxConnectionLifeTime |
||
| 137 | |||
| 138 | // Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time. |
||
| 139 | if pg.maxConnectionLifetimeJitter > 0 { |
||
| 140 | writeConfig.MaxConnLifetimeJitter = pg.maxConnectionLifetimeJitter |
||
| 141 | readConfig.MaxConnLifetimeJitter = pg.maxConnectionLifetimeJitter |
||
| 142 | } else { |
||
| 143 | // Default to 20% of MaxConnLifetime if not explicitly set |
||
| 144 | writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime)) |
||
| 145 | readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime)) |
||
| 146 | } |
||
| 147 | |||
| 148 | // Set the health check period for both configurations. |
||
| 149 | if pg.healthCheckPeriod > 0 { |
||
| 150 | writeConfig.HealthCheckPeriod = pg.healthCheckPeriod |
||
| 151 | readConfig.HealthCheckPeriod = pg.healthCheckPeriod |
||
| 152 | } |
||
| 153 | |||
| 154 | // Set the connect timeout for both configurations. |
||
| 155 | if pg.connectTimeout > 0 { |
||
| 156 | writeConfig.ConnConfig.ConnectTimeout = pg.connectTimeout |
||
| 157 | readConfig.ConnConfig.ConnectTimeout = pg.connectTimeout |
||
| 158 | } |
||
| 159 | |||
| 160 | writeConfig.ConnConfig.Tracer = otelpgx.NewTracer() |
||
| 161 | readConfig.ConnConfig.Tracer = otelpgx.NewTracer() |
||
| 162 | |||
| 163 | // Create connection pools for both writing and reading operations using the configured settings. |
||
| 164 | pg.WritePool, pg.ReadPool, err = createPools( |
||
| 165 | context.Background(), // Context used to control the lifecycle of the pools. |
||
| 166 | writeConfig, // Configuration settings for the write pool. |
||
| 167 | readConfig, // Configuration settings for the read pool. |
||
| 168 | ) |
||
| 169 | // Handle errors during the creation of the connection pools. |
||
| 170 | if err != nil { |
||
| 171 | return nil, err |
||
| 172 | } |
||
| 173 | |||
| 174 | return pg, nil |
||
| 175 | } |
||
| 176 | |||
| 177 | func (p *Postgres) GetMaxDataPerWrite() int { |
||
| 178 | return p.maxDataPerWrite |
||
| 179 | } |
||
| 180 | |||
| 181 | func (p *Postgres) GetMaxRetries() int { |
||
| 182 | return p.maxRetries |
||
| 183 | } |
||
| 184 | |||
| 185 | func (p *Postgres) GetWatchBufferSize() int { |
||
| 186 | return p.watchBufferSize |
||
| 187 | } |
||
| 188 | |||
| 189 | // GetEngineType - Get the engine type which is postgresql in string |
||
| 190 | func (p *Postgres) GetEngineType() string { |
||
| 191 | return "postgres" |
||
| 192 | } |
||
| 193 | |||
| 194 | // Close - Close postgresql instance |
||
| 195 | func (p *Postgres) Close() error { |
||
| 196 | p.ReadPool.Close() |
||
| 197 | p.WritePool.Close() |
||
| 198 | return nil |
||
| 199 | } |
||
| 200 | |||
| 201 | // IsReady - Check if database is ready |
||
| 202 | func (p *Postgres) IsReady(ctx context.Context) (bool, error) { |
||
| 203 | ctx, cancel := context.WithTimeout(ctx, 2*time.Second) |
||
| 204 | defer cancel() |
||
| 205 | if err := p.ReadPool.Ping(ctx); err != nil { |
||
| 206 | return false, err |
||
| 207 | } |
||
| 208 | return true, nil |
||
| 209 | } |
||
| 210 | |||
| 211 | var queryExecModes = map[string]pgx.QueryExecMode{ |
||
| 212 | "cache_statement": pgx.QueryExecModeCacheStatement, |
||
| 213 | "cache_describe": pgx.QueryExecModeCacheDescribe, |
||
| 214 | "describe_exec": pgx.QueryExecModeDescribeExec, |
||
| 215 | "mode_exec": pgx.QueryExecModeExec, |
||
| 216 | "simple_protocol": pgx.QueryExecModeSimpleProtocol, |
||
| 217 | } |
||
| 218 | |||
| 219 | func setDefaultQueryExecMode(config *pgx.ConnConfig) { |
||
| 220 | // Default mode if no specific mode is found in the connection string |
||
| 221 | defaultMode := "cache_statement" |
||
| 222 | |||
| 223 | // Iterate through the map keys to check if any are mentioned in the connection string |
||
| 224 | for key := range queryExecModes { |
||
| 225 | if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) { |
||
| 226 | config.DefaultQueryExecMode = queryExecModes[key] |
||
| 227 | slog.Info("setDefaultQueryExecMode", slog.String("mode", key)) |
||
| 228 | return |
||
| 229 | } |
||
| 230 | } |
||
| 231 | |||
| 232 | // Set to default mode if no matching mode is found |
||
| 233 | config.DefaultQueryExecMode = queryExecModes[defaultMode] |
||
| 234 | slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode)) |
||
| 235 | } |
||
| 236 | |||
| 237 | var planCacheModes = map[string]string{ |
||
| 238 | "auto": "auto", |
||
| 239 | "force_custom_plan": "force_custom_plan", |
||
| 240 | "disable": "disable", |
||
| 241 | } |
||
| 242 | |||
| 243 | func setPlanCacheMode(config *pgx.ConnConfig) { |
||
| 244 | // Default plan cache mode |
||
| 245 | const defaultMode = "auto" |
||
| 246 | |||
| 247 | // Extract connection string |
||
| 248 | connStr := config.ConnString() |
||
| 249 | planCacheMode := defaultMode |
||
| 250 | |||
| 251 | // Check for specific plan cache modes in the connection string |
||
| 252 | for key, value := range planCacheModes { |
||
| 253 | if strings.Contains(connStr, "plan_cache_mode="+key) { |
||
| 254 | if key == "disable" { |
||
| 255 | delete(config.RuntimeParams, "plan_cache_mode") |
||
| 256 | slog.Info("setPlanCacheMode", slog.String("mode", "disabled")) |
||
| 257 | return |
||
| 258 | } |
||
| 259 | planCacheMode = value |
||
| 260 | slog.Info("setPlanCacheMode", slog.String("mode", key)) |
||
| 261 | break |
||
| 262 | } |
||
| 263 | } |
||
| 264 | |||
| 265 | // Set the plan cache mode |
||
| 266 | config.RuntimeParams["plan_cache_mode"] = planCacheMode |
||
| 267 | if planCacheMode == defaultMode { |
||
| 268 | slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode)) |
||
| 269 | } |
||
| 270 | } |
||
| 271 | |||
| 272 | // createPools initializes read and write connection pools with appropriate configurations and error handling. |
||
| 273 | func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) { |
||
| 274 | // Context with timeout for creating the pools |
||
| 275 | initCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
||
| 276 | defer cancel() |
||
| 277 | |||
| 278 | // Create write pool |
||
| 279 | writePool, err := pgxpool.NewWithConfig(initCtx, wConfig) |
||
| 280 | if err != nil { |
||
| 281 | return nil, nil, fmt.Errorf("failed to create write pool: %w", err) |
||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
| 282 | } |
||
| 283 | |||
| 284 | // Create read pool using the same configuration |
||
| 285 | readPool, err := pgxpool.NewWithConfig(initCtx, rConfig) |
||
| 286 | if err != nil { |
||
| 287 | writePool.Close() // Ensure write pool is closed on failure |
||
| 288 | return nil, nil, fmt.Errorf("failed to create read pool: %w", err) |
||
|
0 ignored issues
–
show
|
|||
| 289 | } |
||
| 290 | |||
| 291 | // Set up retry policy for pinging pools |
||
| 292 | retryPolicy := backoff.NewExponentialBackOff() |
||
| 293 | retryPolicy.MaxElapsedTime = 1 * time.Minute |
||
| 294 | |||
| 295 | // Attempt to ping both pools to confirm connectivity |
||
| 296 | err = backoff.Retry(func() error { |
||
| 297 | pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second) |
||
| 298 | defer pingCancel() |
||
| 299 | |||
| 300 | if err := writePool.Ping(pingCtx); err != nil { |
||
| 301 | return fmt.Errorf("write pool ping failed: %w", err) |
||
|
0 ignored issues
–
show
|
|||
| 302 | } |
||
| 303 | if err := readPool.Ping(pingCtx); err != nil { |
||
| 304 | return fmt.Errorf("read pool ping failed: %w", err) |
||
|
0 ignored issues
–
show
|
|||
| 305 | } |
||
| 306 | return nil |
||
| 307 | }, retryPolicy) |
||
| 308 | // Handle errors from pinging |
||
| 309 | if err != nil { |
||
| 310 | writePool.Close() |
||
| 311 | readPool.Close() |
||
| 312 | return nil, nil, fmt.Errorf("pinging pools failed: %w", err) |
||
|
0 ignored issues
–
show
|
|||
| 313 | } |
||
| 314 | |||
| 315 | return writePool, readPool, nil |
||
| 316 | } |
||
| 317 |