Passed
Pull Request — master (#1466)
by
unknown
03:03
created

postgres.*DataReader.QueryRelationships   C

Complexity

Conditions 11

Size

Total Lines 82
Code Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 48
nop 5
dl 0
loc 82
rs 5.4
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryRelationships often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"strconv"
9
	"strings"
10
11
	"github.com/jackc/pgx/v5"
12
13
	"github.com/Masterminds/squirrel"
14
	"github.com/golang/protobuf/jsonpb"
15
	"google.golang.org/protobuf/types/known/anypb"
16
17
	"github.com/Permify/permify/internal/storage"
18
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
19
	"github.com/Permify/permify/internal/storage/postgres/types"
20
	"github.com/Permify/permify/internal/storage/postgres/utils"
21
	"github.com/Permify/permify/pkg/database"
22
	db "github.com/Permify/permify/pkg/database/postgres"
23
	base "github.com/Permify/permify/pkg/pb/base/v1"
24
	"github.com/Permify/permify/pkg/token"
25
)
26
27
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
28
// It is responsible for reading data from the database.
29
type DataReader struct {
30
	database  *db.Postgres  // database is an instance of the PostgreSQL database
31
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
32
}
33
34
// NewDataReader is a constructor function for DataReader.
35
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
36
func NewDataReader(database *db.Postgres) *DataReader {
37
	return &DataReader{
38
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
39
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
40
	}
41
}
42
43
// QueryRelationships reads relation tuples from the storage based on the given filter.
44
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (it *database.TupleIterator, ct database.EncodedContinuousToken, err error) {
45
	// Start a new trace span and end it when the function exits.
46
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
47
	defer span.End()
48
49
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
50
51
	// Decode the snapshot value.
52
	var st token.SnapToken
53
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
54
	if err != nil {
55
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
56
	}
57
58
	// Build the relationships query based on the provided filter and snapshot value.
59
	var args []interface{}
60
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
61
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
62
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
63
64
	// Apply the pagination token and limit to the query.
65
	if pagination.Token() != "" {
66
		var t database.ContinuousToken
67
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
68
		if err != nil {
69
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
70
		}
71
72
		var v uint64
73
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
74
		if err != nil {
75
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
76
		}
77
78
		builder = builder.Where(squirrel.GtOrEq{"id": v})
79
	}
80
81
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
82
83
	// Generate the SQL query and arguments.
84
	var query string
85
	query, args, err = builder.ToSql()
86
	if err != nil {
87
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
88
	}
89
90
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
91
92
	// Execute the SQL query and retrieve the result rows.
93
	var rows pgx.Rows
94
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
95
	if err != nil {
96
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
97
	}
98
	defer rows.Close()
99
100
	var lastID uint64
101
102
	// Iterate through the rows and scan the result into a RelationTuple struct.
103
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
104
	for rows.Next() {
105
		rt := storage.RelationTuple{}
106
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
107
		if err != nil {
108
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
109
		}
110
		lastID = rt.ID
111
		tuples = append(tuples, rt.ToTuple())
112
	}
113
	// Check for any errors during iteration.
114
	if err = rows.Err(); err != nil {
115
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
116
	}
117
118
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
119
120
	// Return a TupleIterator created from the TupleCollection.
121
	if len(tuples) > int(pagination.PageSize()) {
122
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...).CreateTupleIterator(), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
123
	}
124
125
	return database.NewTupleCollection(tuples...).CreateTupleIterator(), database.NewNoopContinuousToken().Encode(), nil
126
}
127
128
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
129
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
130
	// Start a new trace span and end it when the function exits.
131
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
132
	defer span.End()
133
134
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
135
136
	// Decode the snapshot value.
137
	var st token.SnapToken
138
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
139
	if err != nil {
140
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
141
	}
142
143
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
144
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
145
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
146
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
147
148
	// Apply the pagination token and limit to the query.
149
	if pagination.Token() != "" {
150
		var t database.ContinuousToken
151
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
152
		if err != nil {
153
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
154
		}
155
		var v uint64
156
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
157
		if err != nil {
158
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
159
		}
160
		builder = builder.Where(squirrel.GtOrEq{"id": v})
161
	}
162
163
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
164
165
	// Generate the SQL query and arguments.
166
	var query string
167
	var args []interface{}
168
	query, args, err = builder.ToSql()
169
	if err != nil {
170
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
171
	}
172
173
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
174
175
	// Execute the query and retrieve the rows.
176
	var rows pgx.Rows
177
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
178
	if err != nil {
179
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
180
	}
181
	defer rows.Close()
182
183
	var lastID uint64
184
185
	// Iterate through the rows and scan the result into a RelationTuple struct.
186
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
187
	for rows.Next() {
188
		rt := storage.RelationTuple{}
189
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
190
		if err != nil {
191
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
192
		}
193
		lastID = rt.ID
194
		tuples = append(tuples, rt.ToTuple())
195
	}
196
	// Check for any errors during iteration.
197
	if err = rows.Err(); err != nil {
198
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
199
	}
200
201
	slog.DebugContext(ctx, "successfully read relation tuples from database")
202
203
	// Return the results and encoded continuous token for pagination.
204
	if len(tuples) > int(pagination.PageSize()) {
205
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
206
	}
207
208
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
209
}
210
211
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
212
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
213
	// Start a new trace span and end it when the function exits.
214
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
215
	defer span.End()
216
217
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
218
219
	// Decode the snapshot value.
220
	var st token.SnapToken
221
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
222
	if err != nil {
223
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
224
	}
225
226
	// Build the relationships query based on the provided filter and snapshot value.
227
	var args []interface{}
228
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
229
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
230
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
231
232
	// Generate the SQL query and arguments.
233
	var query string
234
	query, args, err = builder.ToSql()
235
	if err != nil {
236
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
237
	}
238
239
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
240
241
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
242
243
	rt := storage.Attribute{}
244
245
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
246
	var valueStr string
247
248
	// Scan the row from the database into the fields of `rt` and `valueStr`.
249
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
250
	if err != nil {
251
		if errors.Is(err, pgx.ErrNoRows) {
252
			return nil, nil
253
		} else {
254
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
255
		}
256
	}
257
258
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
259
	rt.Value = &anypb.Any{}
260
	unmarshaler := &jsonpb.Unmarshaler{}
261
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
262
	if err != nil {
263
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
264
	}
265
266
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
267
268
	return rt.ToAttribute(), nil
269
}
270
271
// QueryAttributes reads multiple attributes from the storage based on the given filter.
272
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
273
	// Start a new trace span and end it when the function exits.
274
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
275
	defer span.End()
276
277
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
278
279
	// Decode the snapshot value.
280
	var st token.SnapToken
281
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
282
	if err != nil {
283
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
284
	}
285
286
	// Build the relationships query based on the provided filter and snapshot value.
287
	var args []interface{}
288
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
289
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
290
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
291
292
	// Generate the SQL query and arguments.
293
	var query string
294
	query, args, err = builder.ToSql()
295
	if err != nil {
296
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
297
	}
298
299
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
300
301
	// Execute the SQL query and retrieve the result rows.
302
	var rows pgx.Rows
303
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
304
	if err != nil {
305
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
306
	}
307
	defer rows.Close()
308
309
	// Process the result rows and store the relationships in a TupleCollection.
310
	collection := database.NewAttributeCollection()
311
	for rows.Next() {
312
		rt := storage.Attribute{}
313
314
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
315
		var valueStr string
316
317
		// Scan the row from the database into the fields of `rt` and `valueStr`.
318
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
319
		if err != nil {
320
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
321
		}
322
323
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
324
		rt.Value = &anypb.Any{}
325
		unmarshaler := &jsonpb.Unmarshaler{}
326
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
327
		if err != nil {
328
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
329
		}
330
331
		collection.Add(rt.ToAttribute())
332
	}
333
	if err = rows.Err(); err != nil {
334
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
335
	}
336
337
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
338
339
	// Return a TupleIterator created from the TupleCollection.
340
	return collection.CreateAttributeIterator(), nil
341
}
342
343
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
344
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
345
	// Start a new trace span and end it when the function exits.
346
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
347
	defer span.End()
348
349
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
350
351
	// Decode the snapshot value.
352
	var st token.SnapToken
353
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
354
	if err != nil {
355
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
356
	}
357
358
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
359
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
360
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
361
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
362
363
	// Apply the pagination token and limit to the query.
364
	if pagination.Token() != "" {
365
		var t database.ContinuousToken
366
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
367
		if err != nil {
368
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
369
		}
370
		var v uint64
371
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
372
		if err != nil {
373
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
374
		}
375
		builder = builder.Where(squirrel.GtOrEq{"id": v})
376
	}
377
378
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
379
380
	// Generate the SQL query and arguments.
381
	var query string
382
	var args []interface{}
383
	query, args, err = builder.ToSql()
384
	if err != nil {
385
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
386
	}
387
388
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
389
390
	// Execute the query and retrieve the rows.
391
	var rows pgx.Rows
392
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
393
	if err != nil {
394
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
395
	}
396
	defer rows.Close()
397
398
	var lastID uint64
399
400
	// Iterate through the rows and scan the result into a RelationTuple struct.
401
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
402
	for rows.Next() {
403
		rt := storage.Attribute{}
404
405
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
406
		var valueStr string
407
408
		// Scan the row from the database into the fields of `rt` and `valueStr`.
409
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
410
		if err != nil {
411
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
412
		}
413
		lastID = rt.ID
414
415
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
416
		rt.Value = &anypb.Any{}
417
		unmarshaler := &jsonpb.Unmarshaler{}
418
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
419
		if err != nil {
420
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
421
		}
422
423
		attributes = append(attributes, rt.ToAttribute())
424
	}
425
	// Check for any errors during iteration.
426
	if err = rows.Err(); err != nil {
427
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
428
	}
429
430
	slog.DebugContext(ctx, "successfully read attributes from the database")
431
432
	// Return the results and encoded continuous token for pagination.
433
	if len(attributes) > int(pagination.PageSize()) {
434
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
435
	}
436
437
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
438
}
439
440
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
441
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
442
	// Start a new trace span and end it when the function exits.
443
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
444
	defer span.End()
445
446
	slog.DebugContext(ctx, "querying unique entities for tenant_id", slog.String("tenant_id", tenantID))
447
448
	// Decode the snapshot value.
449
	var st token.SnapToken
450
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
451
	if err != nil {
452
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
453
	}
454
455
	query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
456
457
	// Apply the pagination token and limit to the subQuery.
458
	if pagination.Token() != "" {
459
		var t database.ContinuousToken
460
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
461
		if err != nil {
462
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
463
		}
464
		var v uint64
465
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
466
		if err != nil {
467
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
468
		}
469
470
		query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10))
471
	}
472
473
	// Append ORDER BY and LIMIT clauses.
474
	query = fmt.Sprintf("%s ORDER BY id LIMIT %d", query, pagination.PageSize()+1)
475
476
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query))
477
478
	// Execute the query and retrieve the rows.
479
	var rows pgx.Rows
480
	rows, err = r.database.ReadPool.Query(ctx, query)
481
	if err != nil {
482
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
483
	}
484
	defer rows.Close()
485
486
	var lastID uint64
487
488
	// Iterate through the rows and scan the result into a RelationTuple struct.
489
	entityIDs := make([]string, 0, pagination.PageSize()+1)
490
	for rows.Next() {
491
		var entityId string
492
		err = rows.Scan(&lastID, &entityId)
493
		if err != nil {
494
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
495
		}
496
497
		entityIDs = append(entityIDs, entityId)
498
	}
499
500
	// Check for any errors during iteration.
501
	if err = rows.Err(); err != nil {
502
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
503
	}
504
505
	slog.DebugContext(ctx, "successfully retrieved unique entities from the database")
506
507
	// Return the results and encoded continuous token for pagination.
508
	if len(entityIDs) > int(pagination.PageSize()) {
509
		return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
510
	}
511
512
	return entityIDs, database.NewNoopContinuousToken().Encode(), nil
513
}
514
515
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
516
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
517
	// Start a new trace span and end it when the function exits.
518
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
519
	defer span.End()
520
521
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
522
523
	// Decode the snapshot value.
524
	var st token.SnapToken
525
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
526
	if err != nil {
527
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
528
	}
529
530
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
531
	builder := r.database.Builder.
532
		Select("MIN(id) as id, subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
533
		From(RelationTuplesTable).
534
		Where(squirrel.Eq{"tenant_id": tenantID}).
535
		GroupBy("subject_id")
536
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
537
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
538
539
	// Apply the pagination token and limit to the query.
540
	if pagination.Token() != "" {
541
		var t database.ContinuousToken
542
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
543
		if err != nil {
544
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
545
		}
546
		var v uint64
547
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
548
		if err != nil {
549
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
550
		}
551
		builder = builder.Where(squirrel.GtOrEq{"id": v})
552
	}
553
554
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
555
556
	// Generate the SQL query and arguments.
557
	var query string
558
	var args []interface{}
559
	query, args, err = builder.ToSql()
560
	if err != nil {
561
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
562
	}
563
564
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
565
566
	// Execute the query and retrieve the rows.
567
	var rows pgx.Rows
568
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
569
	if err != nil {
570
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
571
	}
572
	defer rows.Close()
573
574
	var lastID uint64
575
576
	// Iterate through the rows and scan the result into a RelationTuple struct.
577
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
578
	for rows.Next() {
579
		var subjectID string
580
		err = rows.Scan(&lastID, &subjectID)
581
		if err != nil {
582
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
583
		}
584
		subjectIDs = append(subjectIDs, subjectID)
585
	}
586
	// Check for any errors during iteration.
587
	if err = rows.Err(); err != nil {
588
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
589
	}
590
591
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
592
593
	// Return the results and encoded continuous token for pagination.
594
	if len(subjectIDs) > int(pagination.PageSize()) {
595
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
596
	}
597
598
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
599
}
600
601
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
602
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
603
	// Start a new trace span and end it when the function exits.
604
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
605
	defer span.End()
606
607
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
608
609
	var xid types.XID8
610
611
	// Build the query to find the highest transaction ID associated with the tenant.
612
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
613
	query, args, err := builder.ToSql()
614
	if err != nil {
615
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
616
	}
617
618
	// Execute the query and retrieve the highest transaction ID.
619
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
620
	if err != nil {
621
		// If no rows are found, return a snapshot token with a value of 0.
622
		if errors.Is(err, pgx.ErrNoRows) {
623
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
624
		}
625
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
626
	}
627
628
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
629
630
	// Return the latest snapshot token associated with the tenant.
631
	return snapshot.Token{Value: xid}, nil
632
}
633