Passed
Push — master ( 7c2ecd...a3f554 )
by Tolga
01:25 queued 30s
created

postgres.setDefaultQueryExecMode   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
package postgres
2
3
import (
4
	"context"
5
	"strings"
6
	"time"
7
8
	"github.com/jackc/pgx/v5"
9
10
	"github.com/jackc/pgx/v5/pgxpool"
11
12
	"github.com/Masterminds/squirrel"
13
)
14
15
// Postgres - Structure for Postresql instance
16
type Postgres struct {
17
	ReadPool  *pgxpool.Pool
18
	WritePool *pgxpool.Pool
19
20
	Builder squirrel.StatementBuilderType
21
	// options
22
	maxDataPerWrite       int
23
	maxRetries            int
24
	watchBufferSize       int
25
	maxConnectionLifeTime time.Duration
26
	maxConnectionIdleTime time.Duration
27
	maxOpenConnections    int
28
	maxIdleConnections    int
29
}
30
31
// New - Creates new postgresql db instance
32
func New(uri string, opts ...Option) (*Postgres, error) {
33
	pg := &Postgres{
34
		maxOpenConnections: _defaultMaxOpenConnections,
35
		maxIdleConnections: _defaultMaxIdleConnections,
36
		maxDataPerWrite:    _defaultMaxDataPerWrite,
37
		maxRetries:         _defaultMaxRetries,
38
		watchBufferSize:    _defaultWatchBufferSize,
39
	}
40
41
	// Custom options
42
	for _, opt := range opts {
43
		opt(pg)
44
	}
45
46
	pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
47
48
	writeConfig, err := pgxpool.ParseConfig(uri)
49
	if err != nil {
50
		return nil, err
51
	}
52
53
	readConfig, err := pgxpool.ParseConfig(uri)
54
	if err != nil {
55
		return nil, err
56
	}
57
58
	setDefaultQueryExecMode(writeConfig.ConnConfig)
59
	setDefaultQueryExecMode(readConfig.ConnConfig)
60
61
	writeConfig.MinConns = int32(pg.maxIdleConnections)
62
	readConfig.MinConns = int32(pg.maxIdleConnections)
63
64
	writeConfig.MaxConns = int32(pg.maxOpenConnections)
65
	readConfig.MaxConns = int32(pg.maxOpenConnections)
66
67
	writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
68
	readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
69
70
	writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
71
	readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
72
73
	writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
74
	readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
75
76
	initialContext, cancelInit := context.WithTimeout(context.Background(), 5*time.Second)
77
	defer cancelInit()
78
79
	pg.WritePool, err = pgxpool.NewWithConfig(initialContext, writeConfig)
80
	if err != nil {
81
		return nil, err
82
	}
83
	pg.ReadPool, err = pgxpool.NewWithConfig(initialContext, readConfig)
84
	if err != nil {
85
		return nil, err
86
	}
87
88
	return pg, nil
89
}
90
91
func (p *Postgres) GetMaxDataPerWrite() int {
92
	return p.maxDataPerWrite
93
}
94
95
func (p *Postgres) GetMaxRetries() int {
96
	return p.maxRetries
97
}
98
99
func (p *Postgres) GetWatchBufferSize() int {
100
	return p.watchBufferSize
101
}
102
103
// GetEngineType - Get the engine type which is postgresql in string
104
func (p *Postgres) GetEngineType() string {
105
	return "postgres"
106
}
107
108
// Close - Close postgresql instance
109
func (p *Postgres) Close() error {
110
	p.ReadPool.Close()
111
	p.WritePool.Close()
112
	return nil
113
}
114
115
// IsReady - Check if database is ready
116
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
117
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
118
	defer cancel()
119
	if err := p.ReadPool.Ping(ctx); err != nil {
120
		return false, err
121
	}
122
	return true, nil
123
}
124
125
var queryExecModes = map[string]pgx.QueryExecMode{
126
	"cache_statement": pgx.QueryExecModeCacheStatement,
127
	"cache_describe":  pgx.QueryExecModeCacheDescribe,
128
	"describe_exec":   pgx.QueryExecModeDescribeExec,
129
	"mode_exec":       pgx.QueryExecModeExec,
130
	"simple_protocol": pgx.QueryExecModeSimpleProtocol,
131
}
132
133
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
134
	// Default mode if no specific mode is found in the connection string
135
	defaultMode := "cache_statement"
136
137
	// Iterate through the map keys to check if any are mentioned in the connection string
138
	for key := range queryExecModes {
139
		if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
140
			config.DefaultQueryExecMode = queryExecModes[key]
141
			return
142
		}
143
	}
144
145
	// Set to default mode if no matching mode is found
146
	config.DefaultQueryExecMode = queryExecModes[defaultMode]
147
}
148