Passed
Push — master ( a74778...1f869b )
by Tolga
02:36 queued 13s
created

internal/storage/postgres/utils/common.go   A

Size/Duplication

Total Lines 177
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 19
eloc 76
dl 0
loc 177
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
B utils.IsContextRelatedError 0 10 6
A utils.GenerateGCQuery 0 15 1
A utils.IsSerializationRelatedError 0 6 3
A utils.snapshotQuery 0 25 1
A utils.BulkEntityFilterQuery 0 3 1
A utils.WaitWithBackoff 0 8 3
A utils.SnapshotQuery 0 25 1
A utils.HandleError 0 19 3
1
package utils
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"math"
9
	"strings"
10
	"time"
11
12
	"go.opentelemetry.io/otel/codes"
13
	"golang.org/x/exp/rand"
14
15
	"go.opentelemetry.io/otel/trace"
16
17
	"github.com/Masterminds/squirrel"
18
19
	base "github.com/Permify/permify/pkg/pb/base/v1"
20
)
21
22
const (
23
	BulkEntityFilterTemplate = `
24
    WITH entities AS (
25
        (SELECT id, entity_id, entity_type, tenant_id, created_tx_id, expired_tx_id FROM relation_tuples)
26
        UNION ALL
27
        (SELECT id, entity_id, entity_type, tenant_id, created_tx_id, expired_tx_id FROM attributes)
28
    ), filtered_entities AS (
29
        SELECT DISTINCT ON (entity_id) id, entity_id
30
        FROM entities
31
        WHERE tenant_id = '%s'
32
        AND entity_type = '%s'
33
        AND %s
34
        AND %s
35
    )
36
    SELECT id, entity_id
37
    FROM filtered_entities`
38
)
39
40
// SnapshotQuery adds conditions to a SELECT query for checking transaction visibility based on created and expired transaction IDs.
41
// The query checks if transactions are visible in a snapshot associated with the provided value.
42
func SnapshotQuery(sl squirrel.SelectBuilder, value uint64) squirrel.SelectBuilder {
43
	// Convert the value to a string once to reduce redundant calls to fmt.Sprintf.
44
	valStr := fmt.Sprintf("'%v'::xid8", value)
45
46
	// Create a subquery for the snapshot associated with the provided value.
47
	snapshotQuery := fmt.Sprintf("(select snapshot from transactions where id = %s)", valStr)
48
49
	// Create an expression to check if a transaction with a specific created_tx_id is visible in the snapshot.
50
	visibilityExpr := squirrel.Expr(fmt.Sprintf("pg_visible_in_snapshot(created_tx_id, %s) = true", snapshotQuery))
51
	// Create an expression to check if the created_tx_id is equal to the provided value.
52
	createdExpr := squirrel.Expr(fmt.Sprintf("created_tx_id = %s", valStr))
53
	// Use OR condition for the created expressions.
54
	createdWhere := squirrel.Or{visibilityExpr, createdExpr}
55
56
	// Create an expression to check if a transaction with a specific expired_tx_id is not visible in the snapshot.
57
	expiredVisibilityExpr := squirrel.Expr(fmt.Sprintf("pg_visible_in_snapshot(expired_tx_id, %s) = false", snapshotQuery))
58
	// Create an expression to check if the expired_tx_id is equal to zero.
59
	expiredZeroExpr := squirrel.Expr("expired_tx_id = '0'::xid8")
60
	// Create an expression to check if the expired_tx_id is not equal to the provided value.
61
	expiredNotExpr := squirrel.Expr(fmt.Sprintf("expired_tx_id <> %s", valStr))
62
	// Use AND condition for the expired expressions, checking both visibility and non-equality with value.
63
	expiredWhere := squirrel.And{squirrel.Or{expiredVisibilityExpr, expiredZeroExpr}, expiredNotExpr}
64
65
	// Add the created and expired conditions to the SELECT query.
66
	return sl.Where(createdWhere).Where(expiredWhere)
67
}
68
69
// snapshotQuery function generates two strings representing conditions to be applied in a SQL query to filter data based on visibility of transactions.
70
func snapshotQuery(value uint64) (string, string) {
71
	// Convert the provided value into a string format suitable for our SQL query, formatted as a transaction ID.
72
	valStr := fmt.Sprintf("'%v'::xid8", value)
73
74
	// Create a subquery that fetches the snapshot associated with the transaction ID.
75
	snapshotQ := fmt.Sprintf("(SELECT snapshot FROM transactions WHERE id = %s)", valStr)
76
77
	// Create an expression that checks whether a transaction (represented by 'created_tx_id') is visible in the snapshot.
78
	visibilityExpr := fmt.Sprintf("pg_visible_in_snapshot(created_tx_id, %s) = true", snapshotQ)
79
	// Create an expression that checks if the 'created_tx_id' is the same as our transaction ID.
80
	createdExpr := fmt.Sprintf("created_tx_id = %s", valStr)
81
	// Combine these expressions to form a condition. A row will satisfy this condition if either of the expressions are true.
82
	createdWhere := fmt.Sprintf("(%s OR %s)", visibilityExpr, createdExpr)
83
84
	// Create an expression that checks whether a transaction (represented by 'expired_tx_id') is not visible in the snapshot.
85
	expiredVisibilityExpr := fmt.Sprintf("pg_visible_in_snapshot(expired_tx_id, %s) = false", snapshotQ)
86
	// Create an expression that checks if the 'expired_tx_id' is zero. This handles cases where the transaction hasn't expired.
87
	expiredZeroExpr := "expired_tx_id = '0'::xid8"
88
	// Create an expression that checks if the 'expired_tx_id' is not the same as our transaction ID.
89
	expiredNotExpr := fmt.Sprintf("expired_tx_id <> %s", valStr)
90
	// Combine these expressions to form a condition. A row will satisfy this condition if the first set of expressions are true (either the transaction hasn't expired, or if it has, it's not visible in the snapshot) and the second expression is also true (the 'expired_tx_id' is not the same as our transaction ID).
91
	expiredWhere := fmt.Sprintf("(%s AND %s)", fmt.Sprintf("(%s OR %s)", expiredVisibilityExpr, expiredZeroExpr), expiredNotExpr)
92
93
	// Return the conditions for both 'created' and 'expired' transactions. These can be used in a WHERE clause of a SQL query to filter results.
94
	return createdWhere, expiredWhere
95
}
96
97
// BulkEntityFilterQuery -
98
func BulkEntityFilterQuery(tenantID, entityType string, snap uint64) string {
99
	createdWhere, expiredWhere := snapshotQuery(snap)
100
	return fmt.Sprintf(BulkEntityFilterTemplate, tenantID, entityType, createdWhere, expiredWhere)
101
}
102
103
// GenerateGCQuery generates a Squirrel DELETE query builder for garbage collection.
104
// It constructs a query to delete expired records from the specified table
105
// based on the provided value, which represents a transaction ID.
106
func GenerateGCQuery(table string, value uint64) squirrel.DeleteBuilder {
107
	// Convert the provided value into a string format suitable for our SQL query, formatted as a transaction ID.
108
	valStr := fmt.Sprintf("'%v'::xid8", value)
109
110
	// Create a Squirrel DELETE builder for the specified table.
111
	deleteBuilder := squirrel.Delete(table)
112
113
	// Create an expression to check if 'expired_tx_id' is not equal to '0' (not expired).
114
	expiredZeroExpr := squirrel.Expr("expired_tx_id <> '0'::xid8")
115
116
	// Create an expression to check if 'expired_tx_id' is less than the provided value (before the cutoff).
117
	beforeExpr := squirrel.Expr(fmt.Sprintf("expired_tx_id < %s", valStr))
118
119
	// Add the WHERE clauses to the DELETE query builder to filter and delete expired data.
120
	return deleteBuilder.Where(expiredZeroExpr).Where(beforeExpr)
121
}
122
123
// HandleError records an error in the given span, logs the error, and returns a standardized error.
124
// This function is used for consistent error handling across different parts of the application.
125
func HandleError(ctx context.Context, span trace.Span, err error, errorCode base.ErrorCode) error {
126
	// Record the error on the span
127
	span.RecordError(err)
128
129
	// Check if the error is context-related
130
	if IsContextRelatedError(ctx, err) || IsSerializationRelatedError(err) {
131
		// Set the status of the span
132
		span.SetStatus(codes.Unset, err.Error())
133
		// Use debug level logging for context or serialization-related errors
134
		slog.Debug("an error related to context or serialization was encountered during the operation", slog.String("error", err.Error()))
135
	} else {
136
		// Set the status of the span
137
		span.SetStatus(codes.Error, err.Error())
138
		// Use error level logging for all other errors
139
		slog.Error("error encountered", slog.Any("error", err))
140
	}
141
142
	// Return a new standardized error with the provided error code
143
	return errors.New(errorCode.String())
144
}
145
146
// IsContextRelatedError checks if the error is due to context cancellation, deadline exceedance, or closed connection
147
func IsContextRelatedError(ctx context.Context, err error) bool {
148
	if errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded) {
149
		return true
150
	}
151
	if errors.Is(err, context.Canceled) ||
152
		errors.Is(err, context.DeadlineExceeded) ||
153
		strings.Contains(err.Error(), "conn closed") {
154
		return true
155
	}
156
	return false
157
}
158
159
// IsSerializationRelatedError checks if the error is a serialization failure, typically in database transactions.
160
func IsSerializationRelatedError(err error) bool {
161
	if strings.Contains(err.Error(), "could not serialize") ||
162
		strings.Contains(err.Error(), "duplicate key value") {
163
		return true
164
	}
165
	return false
166
}
167
168
// WaitWithBackoff implements an exponential backoff strategy with jitter for retries.
169
// It waits for a calculated duration or until the context is cancelled, whichever comes first.
170
func WaitWithBackoff(ctx context.Context, tenantID string, retries int) {
171
	backoff := time.Duration(math.Min(float64(20*time.Millisecond)*math.Pow(2, float64(retries)), float64(1*time.Second)))
172
	jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5)
173
	nextBackoff := backoff + jitter
174
	slog.Warn("waiting before retry", slog.String("tenant_id", tenantID), slog.Int64("backoff_duration", nextBackoff.Milliseconds()))
175
	select {
176
	case <-time.After(nextBackoff):
177
	case <-ctx.Done():
178
	}
179
}
180