Issues (49)

pkg/database/postgres/repair.go (7 issues)

Severity
1
package postgres
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog"
7
)
8
9
const (
10
	// ActiveRecordTxnID represents the maximum XID8 value used for active records
11
	// to avoid XID wraparound issues (instead of using 0)
12
	ActiveRecordTxnID = uint64(9223372036854775807)
13
	MaxXID8Value      = "'9223372036854775807'::xid8"
14
)
15
16
// RepairConfig holds configuration for the XID counter repair operation
17
type RepairConfig struct {
18
	BatchSize  int  // batch size for XID advancement
19
	MaxRetries int  // maximum number of retries
20
	RetryDelay int  // milliseconds
21
	DryRun     bool // perform a dry run without making changes
22
	Verbose    bool // enable verbose logging
23
}
24
25
// DefaultRepairConfig returns default configuration for XID counter repair
26
func DefaultRepairConfig() *RepairConfig {
27
	return &RepairConfig{
28
		BatchSize:  1000, // default batch size for XID advancement
29
		MaxRetries: 3,
30
		RetryDelay: 100,
31
		DryRun:     false,
32
		Verbose:    true,
33
	}
34
}
35
36
// RepairResult holds the results of the XID counter repair operation
37
type RepairResult struct {
38
	CreatedTxIdFixed int // Number of XIDs advanced in counter
39
	Errors           []error
40
	Duration         string
41
}
42
43
// Repair performs XID counter repair to prevent XID wraparound issues
44
// This function uses a safe approach: only advance XID counter, don't modify existing data
45
func (p *Postgres) Repair(ctx context.Context, config *RepairConfig) (*RepairResult, error) {
46
	if config == nil {
47
		config = DefaultRepairConfig()
48
	}
49
50
	// Validate BatchSize - don't accept 0 or negative values
51
	if config.BatchSize <= 0 {
52
		config.BatchSize = 1000 // Use default value
53
		slog.InfoContext(ctx, "Invalid BatchSize provided, using default", slog.Int("default_batch_size", 1000))
54
	}
55
56
	result := &RepairResult{
57
		Errors: make([]error, 0),
58
	}
59
60
	slog.InfoContext(ctx, "Starting PostgreSQL transaction ID counter repair",
61
		slog.Bool("dry_run", config.DryRun),
62
		slog.Int("batch_size", config.BatchSize))
63
64
	// Step 1: Get current PostgreSQL XID
65
	currentXID, err := p.getCurrentPostgreXID(ctx)
66
	if err != nil {
67
		return result, fmt.Errorf("failed to get current PostgreSQL XID: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
68
	}
69
70
	if config.Verbose {
71
		slog.InfoContext(ctx, "Current PostgreSQL transaction ID", slog.Uint64("current_xid", currentXID))
72
	}
73
74
	// Step 2: Get maximum referenced XID from transactions table
75
	maxReferencedXID, err := p.getMaxReferencedXID(ctx)
76
	if err != nil {
77
		result.Errors = append(result.Errors, fmt.Errorf("failed to get max referenced XID: %w", err))
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
78
		return result, nil
79
	}
80
81
	if config.Verbose {
82
		slog.InfoContext(ctx, "Maximum referenced transaction ID", slog.Uint64("max_referenced_xid", maxReferencedXID))
83
	}
84
85
	// Step 3: Advance XID counter if needed
86
	if maxReferencedXID > currentXID {
87
		counterDelta := int(maxReferencedXID - currentXID + 1000) // Add safety buffer
88
89
		if config.DryRun {
90
			slog.InfoContext(ctx, "Would advance XID counter",
91
				slog.Int("counter_delta", counterDelta),
92
				slog.Uint64("target_xid", maxReferencedXID+1000))
93
			result.CreatedTxIdFixed = counterDelta
94
		} else {
95
			if err := p.advanceXIDCounterByDelta(ctx, counterDelta, config); err != nil {
96
				result.Errors = append(result.Errors, fmt.Errorf("failed to advance XID counter: %w", err))
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
97
			} else {
98
				result.CreatedTxIdFixed = counterDelta
99
			}
100
		}
101
	} else {
102
		slog.InfoContext(ctx, "No XID counter advancement needed")
103
	}
104
105
	slog.InfoContext(ctx, "PostgreSQL transaction ID counter repair completed",
106
		slog.Int("transactions_advanced", result.CreatedTxIdFixed),
107
		slog.Int("errors", len(result.Errors)))
108
109
	return result, nil
110
}
111
112
// getCurrentPostgreSQLXID gets the current PostgreSQL transaction ID
113
func (p *Postgres) getCurrentPostgreXID(ctx context.Context) (uint64, error) {
114
	var x int64
115
	query := "SELECT COALESCE(pg_current_xact_id()::text, '0')::bigint"
116
	if err := p.ReadPool.QueryRow(ctx, query).Scan(&x); err != nil {
117
		return 0, err
118
	}
119
	return uint64(x), nil
120
}
121
122
// getMaxReferencedXID gets the maximum transaction ID referenced in the transactions table
123
func (p *Postgres) getMaxReferencedXID(ctx context.Context) (uint64, error) {
124
	query := "SELECT COALESCE(MAX(id)::text, '0')::bigint FROM transactions"
125
	var x int64
126
	if err := p.ReadPool.QueryRow(ctx, query).Scan(&x); err != nil {
127
		return 0, err
128
	}
129
	return uint64(x), nil
130
}
131
132
// advanceXIDCounterByDelta advances the PostgreSQL XID counter by specified delta
133
func (p *Postgres) advanceXIDCounterByDelta(ctx context.Context, counterDelta int, config *RepairConfig) error {
134
	if counterDelta <= 0 {
135
		return nil
136
	}
137
	slog.InfoContext(ctx, "Advancing transaction ID counter by delta", slog.Int("counter_delta", counterDelta))
138
139
	batchSize := config.BatchSize
140
	if batchSize <= 0 {
141
		batchSize = 1000
142
	}
143
144
	conn, err := p.WritePool.Acquire(ctx)
145
	if err != nil {
146
		return fmt.Errorf("acquire connection: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
147
	}
148
	defer conn.Release()
149
150
	remaining := counterDelta
151
	for remaining > 0 {
152
		if err := ctx.Err(); err != nil {
153
			return err
154
		}
155
		currentBatch := min(remaining, batchSize)
156
157
		for i := 0; i < currentBatch; i++ {
158
			if err := ctx.Err(); err != nil {
159
				return err
160
			}
161
			tx, err := conn.Begin(ctx)
162
			if err != nil {
163
				return fmt.Errorf("begin tx: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
164
			}
165
			if _, err := tx.Exec(ctx, "SELECT pg_current_xact_id()"); err != nil {
166
				_ = tx.Rollback(ctx)
167
				return fmt.Errorf("advance xid (iter %d): %w", i+1, err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
168
			}
169
			// Rolling back is fine — XID assignment happens on first reference.
170
			if err := tx.Rollback(ctx); err != nil {
171
				return fmt.Errorf("rollback tx: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
172
			}
173
		}
174
175
		remaining -= currentBatch
176
		if config.Verbose {
177
			slog.InfoContext(ctx, "Advanced XID counter batch",
178
				slog.Int("batch_size", currentBatch),
179
				slog.Int("remaining", remaining))
180
		}
181
	}
182
	slog.InfoContext(ctx, "Transaction ID counter advancement completed", slog.Int("total_advanced", counterDelta))
183
	return nil
184
}
185