Passed
Pull Request — master (#2447)
by Tolga
05:40 queued 02:21
created

pkg/database/postgres/repair.go   A

Size/Duplication

Total Lines 182
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 21
eloc 102
dl 0
loc 182
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A postgres.*Postgres.getCurrentPostgreSQLXID 0 5 1
A postgres.*Postgres.getMaxReferencedXID 0 6 1
B postgres.*Postgres.advanceXIDCounterByDelta 0 41 7
C postgres.*Postgres.Repair 0 65 10
A postgres.DefaultRepairConfig 0 7 1
A postgres.queryLoopXactID 0 7 1
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog"
7
)
8
9
const (
10
	// ActiveRecordTxnID represents the maximum XID8 value used for active records
11
	// to avoid XID wraparound issues (instead of using 0)
12
	ActiveRecordTxnID = uint64(9223372036854775807)
13
	MaxXID8Value      = "'9223372036854775807'::xid8"
14
)
15
16
// RepairConfig holds configuration for the XID counter repair operation
17
type RepairConfig struct {
18
	BatchSize  int  // batch size for XID advancement
19
	MaxRetries int  // maximum number of retries
20
	RetryDelay int  // milliseconds
21
	DryRun     bool // perform a dry run without making changes
22
	Verbose    bool // enable verbose logging
23
}
24
25
// DefaultRepairConfig returns default configuration for XID counter repair
26
func DefaultRepairConfig() *RepairConfig {
27
	return &RepairConfig{
28
		BatchSize:  1000, // default batch size for XID advancement
29
		MaxRetries: 3,
30
		RetryDelay: 100,
31
		DryRun:     false,
32
		Verbose:    true,
33
	}
34
}
35
36
// RepairResult holds the results of the XID counter repair operation
37
type RepairResult struct {
38
	CreatedTxIdFixed int // Number of XIDs advanced in counter
39
	Errors           []error
40
	Duration         string
41
}
42
43
// Repair performs XID counter repair to prevent XID wraparound issues
44
// This function uses a safe approach: only advance XID counter, don't modify existing data
45
func (p *Postgres) Repair(ctx context.Context, config *RepairConfig) (*RepairResult, error) {
46
	if config == nil {
47
		config = DefaultRepairConfig()
48
	}
49
50
	// Validate BatchSize - don't accept 0 or negative values
51
	if config.BatchSize <= 0 {
52
		config.BatchSize = 1000 // Use default value
53
		slog.InfoContext(ctx, "Invalid BatchSize provided, using default", slog.Int("default_batch_size", 1000))
54
	}
55
56
	result := &RepairResult{
57
		Errors: make([]error, 0),
58
	}
59
60
	slog.InfoContext(ctx, "Starting PostgreSQL transaction ID counter repair",
61
		slog.Bool("dry_run", config.DryRun),
62
		slog.Int("batch_size", config.BatchSize))
63
64
	// Step 1: Get current PostgreSQL XID
65
	currentXID, err := p.getCurrentPostgreSQLXID(ctx)
66
	if err != nil {
67
		return result, fmt.Errorf("failed to get current PostgreSQL XID: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
68
	}
69
70
	if config.Verbose {
71
		slog.InfoContext(ctx, "Current PostgreSQL transaction ID", slog.Uint64("current_xid", currentXID))
72
	}
73
74
	// Step 2: Get maximum referenced XID from transactions table
75
	maxReferencedXID, err := p.getMaxReferencedXID(ctx)
76
	if err != nil {
77
		result.Errors = append(result.Errors, fmt.Errorf("failed to get max referenced XID: %w", err))
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
78
		return result, nil
79
	}
80
81
	if config.Verbose {
82
		slog.InfoContext(ctx, "Maximum referenced transaction ID", slog.Uint64("max_referenced_xid", maxReferencedXID))
83
	}
84
85
	// Step 3: Advance XID counter if needed
86
	if maxReferencedXID > currentXID {
87
		counterDelta := int(maxReferencedXID - currentXID + 1000) // Add safety buffer
88
89
		if config.DryRun {
90
			slog.InfoContext(ctx, "Would advance XID counter",
91
				slog.Int("counter_delta", counterDelta),
92
				slog.Uint64("target_xid", maxReferencedXID+1000))
93
			result.CreatedTxIdFixed = counterDelta
94
		} else {
95
			if err := p.advanceXIDCounterByDelta(ctx, counterDelta, config); err != nil {
96
				result.Errors = append(result.Errors, fmt.Errorf("failed to advance XID counter: %w", err))
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
97
			} else {
98
				result.CreatedTxIdFixed = counterDelta
99
			}
100
		}
101
	} else {
102
		slog.InfoContext(ctx, "No XID counter advancement needed")
103
	}
104
105
	slog.InfoContext(ctx, "PostgreSQL transaction ID counter repair completed",
106
		slog.Int("transactions_advanced", result.CreatedTxIdFixed),
107
		slog.Int("errors", len(result.Errors)))
108
109
	return result, nil
110
}
111
112
113
// getCurrentPostgreSQLXID gets the current PostgreSQL transaction ID
114
func (p *Postgres) getCurrentPostgreSQLXID(ctx context.Context) (uint64, error) {
115
	var currentXID uint64
116
	query := "SELECT pg_current_xact_id()::text::integer"
117
	err := p.ReadPool.QueryRow(ctx, query).Scan(&currentXID)
118
	return currentXID, err
119
}
120
121
// getMaxReferencedXID gets the maximum transaction ID referenced in the transactions table
122
func (p *Postgres) getMaxReferencedXID(ctx context.Context) (uint64, error) {
123
	query := "SELECT MAX(id)::text::integer FROM transactions"
124
125
	var maxReferencedXID uint64
126
	err := p.ReadPool.QueryRow(ctx, query).Scan(&maxReferencedXID)
127
	return maxReferencedXID, err
128
}
129
130
// queryLoopXactID performs pg_current_xact_id() in a server-side loop
131
// to increment the transaction ID counter
132
func queryLoopXactID(batchSize int) string {
133
	return fmt.Sprintf(`DO $$
134
BEGIN
135
  FOR i IN 1..%d LOOP
136
    PERFORM pg_current_xact_id(); ROLLBACK;
137
  END LOOP;
138
END $$;`, batchSize)
139
}
140
141
// advanceXIDCounterByDelta advances the PostgreSQL XID counter by specified delta
142
func (p *Postgres) advanceXIDCounterByDelta(ctx context.Context, counterDelta int, config *RepairConfig) error {
143
	if counterDelta <= 0 {
144
		return nil
145
	}
146
147
	slog.InfoContext(ctx, "Advancing transaction ID counter by delta",
148
		slog.Int("counter_delta", counterDelta))
149
150
	// Use batch size for XID advancement
151
	batchSize := config.BatchSize
152
	if batchSize <= 0 {
153
		batchSize = 1000 // Default batch size if not specified or invalid
154
	}
155
156
	// Process in batches for performance
157
	remaining := counterDelta
158
	for remaining > 0 {
159
		currentBatchSize := remaining
160
		if currentBatchSize > batchSize {
161
			currentBatchSize = batchSize
162
		}
163
164
		query := queryLoopXactID(currentBatchSize)
165
		_, err := p.WritePool.Exec(ctx, query)
166
		if err != nil {
167
			return fmt.Errorf("failed to advance XID counter (batch %d): %w", currentBatchSize, err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
168
		}
169
170
		remaining -= currentBatchSize
171
172
		if config.Verbose {
173
			slog.InfoContext(ctx, "Advanced XID counter batch",
174
				slog.Int("batch_size", currentBatchSize),
175
				slog.Int("remaining", remaining))
176
		}
177
	}
178
179
	slog.InfoContext(ctx, "Transaction ID counter advancement completed",
180
		slog.Int("total_advanced", counterDelta))
181
182
	return nil
183
}
184