| 1 |  |  | package postgres | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | import ( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  | 	"context" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  | 	"errors" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  | 	"log/slog" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  | 	"strconv" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  | 	"strings" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | 	"github.com/jackc/pgx/v5" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  | 	"github.com/Masterminds/squirrel" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  | 	"github.com/golang/protobuf/jsonpb" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  | 	"google.golang.org/protobuf/types/known/anypb" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  | 	"github.com/Permify/permify/internal" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  | 	"github.com/Permify/permify/internal/storage" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  | 	"github.com/Permify/permify/internal/storage/postgres/snapshot" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  | 	"github.com/Permify/permify/internal/storage/postgres/types" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  | 	"github.com/Permify/permify/internal/storage/postgres/utils" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  | 	"github.com/Permify/permify/pkg/database" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  | 	db "github.com/Permify/permify/pkg/database/postgres" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  | 	base "github.com/Permify/permify/pkg/pb/base/v1" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  | 	"github.com/Permify/permify/pkg/token" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  | ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  | // DataReader is a struct which holds a reference to the database, transaction options and a logger. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  | // It is responsible for reading data from the database. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  | type DataReader struct { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  | 	database  *db.Postgres  // database is an instance of the PostgreSQL database | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  | 	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  | // NewDataReader is a constructor function for DataReader. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  | // It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  | func NewDataReader(database *db.Postgres) *DataReader { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  | 	return &DataReader{ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  | 		database:  database,                                                             // Set the database to the passed in PostgreSQL instance | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  | 		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  | // QueryRelationships reads relation tuples from the storage based on the given filter. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  | func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 |  |  | 	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 |  |  | 	// Decode the snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  | 	var st token.SnapToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 56 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 57 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 58 |  |  | 	// Build the relationships query based on the provided filter and snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 59 |  |  | 	var args []interface{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 60 |  |  | 	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 61 |  |  | 	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 62 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 63 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 64 |  |  | 	if pagination.Cursor() != "" { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 65 |  |  | 		var t database.ContinuousToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 66 |  |  | 		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 67 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 68 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 69 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 70 |  |  | 		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 |  |  | 	if pagination.Sort() != "" { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 74 |  |  | 		builder = builder.OrderBy(pagination.Sort()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 75 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 76 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 77 |  |  | 	// Apply limit if specified in pagination | 
            
                                                                                                            
                            
            
                                    
            
            
                | 78 |  |  | 	limit := pagination.Limit() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 79 |  |  | 	if limit > 0 { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 80 |  |  | 		builder = builder.Limit(uint64(limit)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 81 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 82 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 83 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 84 |  |  | 	var query string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 85 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 86 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 87 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 88 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 89 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 90 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 91 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  | 	// Execute the SQL query and retrieve the result rows. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  | 	var rows pgx.Rows | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  | 	rows, err = r.database.ReadPool.Query(ctx, query, args...) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 |  |  | 	defer rows.Close() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 |  |  | 	// Process the result rows and store the relationships in a TupleCollection. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 101 |  |  | 	collection := database.NewTupleCollection() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 102 |  |  | 	for rows.Next() { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 103 |  |  | 		rt := storage.RelationTuple{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 104 |  |  | 		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 105 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 106 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 107 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 108 |  |  | 		collection.Add(rt.ToTuple()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 109 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 110 |  |  | 	if err = rows.Err(); err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 111 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 112 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 113 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 114 |  |  | 	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 115 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 116 |  |  | 	// Return a TupleIterator created from the TupleCollection. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 117 |  |  | 	return collection.CreateTupleIterator(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 118 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 119 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 120 |  |  | // ReadRelationships reads relation tuples from the storage based on the given filter and pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 121 |  |  | func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 122 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 123 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 124 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 125 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 126 |  |  | 	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 127 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 128 |  |  | 	// Decode the snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 129 |  |  | 	var st token.SnapToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 130 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 131 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 132 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 133 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 134 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 135 |  |  | 	// Build the relationships query based on the provided filter, snapshot value, and pagination settings. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 136 |  |  | 	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 137 |  |  | 	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 138 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 139 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 140 |  |  | 	// Apply the pagination token and limit to the query. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 141 |  |  | 	if pagination.Token() != "" { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 142 |  |  | 		var t database.ContinuousToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 143 |  |  | 		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 144 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 145 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 146 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 147 |  |  | 		var v uint64 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 148 |  |  | 		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 149 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 150 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 151 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 152 |  |  | 		builder = builder.Where(squirrel.GtOrEq{"id": v}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 153 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 154 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 155 |  |  | 	builder = builder.OrderBy("id") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 156 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 157 |  |  | 	if pagination.PageSize() != 0 { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 158 |  |  | 		builder = builder.Limit(uint64(pagination.PageSize() + 1)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 159 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 160 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 161 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 162 |  |  | 	var query string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 163 |  |  | 	var args []interface{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 164 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 165 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 166 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 167 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 168 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 169 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 170 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 171 |  |  | 	// Execute the query and retrieve the rows. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 172 |  |  | 	var rows pgx.Rows | 
            
                                                                                                            
                            
            
                                    
            
            
                | 173 |  |  | 	rows, err = r.database.ReadPool.Query(ctx, query, args...) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 174 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 175 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 176 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 177 |  |  | 	defer rows.Close() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 178 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 179 |  |  | 	var lastID uint64 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 180 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 181 |  |  | 	// Iterate through the rows and scan the result into a RelationTuple struct. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 182 |  |  | 	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 183 |  |  | 	for rows.Next() { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 184 |  |  | 		rt := storage.RelationTuple{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 185 |  |  | 		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 186 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 187 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 188 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 189 |  |  | 		lastID = rt.ID | 
            
                                                                                                            
                            
            
                                    
            
            
                | 190 |  |  | 		tuples = append(tuples, rt.ToTuple()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 191 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 192 |  |  | 	// Check for any errors during iteration. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 193 |  |  | 	if err = rows.Err(); err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 194 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 195 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 196 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 197 |  |  | 	slog.DebugContext(ctx, "successfully read relation tuples from database") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 198 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 199 |  |  | 	// Return the results and encoded continuous token for pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 200 |  |  | 	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 201 |  |  | 		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 202 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 203 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 204 |  |  | 	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 205 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 206 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 207 |  |  | // QuerySingleAttribute retrieves a single attribute from the storage based on the given filter. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 208 |  |  | func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 209 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 210 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 211 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 212 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 213 |  |  | 	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 214 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 215 |  |  | 	// Decode the snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 216 |  |  | 	var st token.SnapToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 217 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 218 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 219 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 220 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 221 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 222 |  |  | 	// Build the relationships query based on the provided filter and snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 223 |  |  | 	var args []interface{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 224 |  |  | 	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 225 |  |  | 	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 226 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 227 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 228 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 229 |  |  | 	var query string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 230 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 231 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 232 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 233 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 234 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 235 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 236 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 237 |  |  | 	row := r.database.ReadPool.QueryRow(ctx, query, args...) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 238 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 239 |  |  | 	rt := storage.Attribute{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 240 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 241 |  |  | 	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 242 |  |  | 	var valueStr string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 243 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 244 |  |  | 	// Scan the row from the database into the fields of `rt` and `valueStr`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 245 |  |  | 	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 246 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 247 |  |  | 		if errors.Is(err, pgx.ErrNoRows) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 248 |  |  | 			return nil, nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 249 |  |  | 		} else { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 250 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 251 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 252 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 253 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 254 |  |  | 	// Unmarshal the JSON data from `valueStr` into `rt.Value`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 255 |  |  | 	rt.Value = &anypb.Any{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 256 |  |  | 	unmarshaler := &jsonpb.Unmarshaler{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 257 |  |  | 	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 258 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 259 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 260 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 261 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 262 |  |  | 	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 263 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 264 |  |  | 	return rt.ToAttribute(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 265 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 266 |  |  |  | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 267 |  |  | // QueryAttributes reads multiple attributes from the storage based on the given filter. | 
            
                                                                        
                            
            
                                    
            
            
                | 268 |  |  | func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) { | 
            
                                                                        
                            
            
                                    
            
            
                | 269 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                        
                            
            
                                    
            
            
                | 270 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes") | 
            
                                                                        
                            
            
                                    
            
            
                | 271 |  |  | 	defer span.End() | 
            
                                                                        
                            
            
                                    
            
            
                | 272 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 273 |  |  | 	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                        
                            
            
                                    
            
            
                | 274 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 275 |  |  | 	// Decode the snapshot value. | 
            
                                                                        
                            
            
                                    
            
            
                | 276 |  |  | 	var st token.SnapToken | 
            
                                                                        
                            
            
                                    
            
            
                | 277 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                        
                            
            
                                    
            
            
                | 278 |  |  | 	if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 279 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                        
                            
            
                                    
            
            
                | 280 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 281 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 282 |  |  | 	// Build the attributes query based on the provided filter and snapshot value. | 
            
                                                                        
                            
            
                                    
            
            
                | 283 |  |  | 	var args []interface{} | 
            
                                                                        
                            
            
                                    
            
            
                | 284 |  |  | 	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID}) | 
            
                                                                        
                            
            
                                    
            
            
                | 285 |  |  | 	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter) | 
            
                                                                        
                            
            
                                    
            
            
                | 286 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                        
                            
            
                                    
            
            
                | 287 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 288 |  |  | 	if pagination.Cursor() != "" { | 
            
                                                                        
                            
            
                                    
            
            
                | 289 |  |  | 		var t database.ContinuousToken | 
            
                                                                        
                            
            
                                    
            
            
                | 290 |  |  | 		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode() | 
            
                                                                        
                            
            
                                    
            
            
                | 291 |  |  | 		if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 292 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                        
                            
            
                                    
            
            
                | 293 |  |  | 		} | 
            
                                                                        
                            
            
                                    
            
            
                | 294 |  |  | 		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value}) | 
            
                                                                        
                            
            
                                    
            
            
                | 295 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 296 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 297 |  |  | 	if pagination.Sort() != "" { | 
            
                                                                        
                            
            
                                    
            
            
                | 298 |  |  | 		builder = builder.OrderBy(pagination.Sort()) | 
            
                                                                        
                            
            
                                    
            
            
                | 299 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 300 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 301 |  |  | 	// Apply limit if specified in pagination | 
            
                                                                        
                            
            
                                    
            
            
                | 302 |  |  | 	limit := pagination.Limit() | 
            
                                                                        
                            
            
                                    
            
            
                | 303 |  |  | 	if limit > 0 { | 
            
                                                                        
                            
            
                                    
            
            
                | 304 |  |  | 		builder = builder.Limit(uint64(limit)) | 
            
                                                                        
                            
            
                                    
            
            
                | 305 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 306 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 307 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                        
                            
            
                                    
            
            
                | 308 |  |  | 	var query string | 
            
                                                                        
                            
            
                                    
            
            
                | 309 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                        
                            
            
                                    
            
            
                | 310 |  |  | 	if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 311 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                        
                            
            
                                    
            
            
                | 312 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 313 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 314 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                        
                            
            
                                    
            
            
                | 315 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 316 |  |  | 	// Execute the SQL query and retrieve the result rows. | 
            
                                                                        
                            
            
                                    
            
            
                | 317 |  |  | 	var rows pgx.Rows | 
            
                                                                        
                            
            
                                    
            
            
                | 318 |  |  | 	rows, err = r.database.ReadPool.Query(ctx, query, args...) | 
            
                                                                        
                            
            
                                    
            
            
                | 319 |  |  | 	if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 320 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) | 
            
                                                                        
                            
            
                                    
            
            
                | 321 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 322 |  |  | 	defer rows.Close() | 
            
                                                                        
                            
            
                                    
            
            
                | 323 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 324 |  |  | 	// Process the result rows and store the attributes in an AttributeCollection. | 
            
                                                                        
                            
            
                                    
            
            
                | 325 |  |  | 	collection := database.NewAttributeCollection() | 
            
                                                                        
                            
            
                                    
            
            
                | 326 |  |  | 	for rows.Next() { | 
            
                                                                        
                            
            
                                    
            
            
                | 327 |  |  | 		rt := storage.Attribute{} | 
            
                                                                        
                            
            
                                    
            
            
                | 328 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 329 |  |  | 		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`. | 
            
                                                                        
                            
            
                                    
            
            
                | 330 |  |  | 		var valueStr string | 
            
                                                                        
                            
            
                                    
            
            
                | 331 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 332 |  |  | 		// Scan the row from the database into the fields of `rt` and `valueStr`. | 
            
                                                                        
                            
            
                                    
            
            
                | 333 |  |  | 		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr) | 
            
                                                                        
                            
            
                                    
            
            
                | 334 |  |  | 		if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 335 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                        
                            
            
                                    
            
            
                | 336 |  |  | 		} | 
            
                                                                        
                            
            
                                    
            
            
                | 337 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 338 |  |  | 		// Unmarshal the JSON data from `valueStr` into `rt.Value`. | 
            
                                                                        
                            
            
                                    
            
            
                | 339 |  |  | 		rt.Value = &anypb.Any{} | 
            
                                                                        
                            
            
                                    
            
            
                | 340 |  |  | 		unmarshaler := &jsonpb.Unmarshaler{} | 
            
                                                                        
                            
            
                                    
            
            
                | 341 |  |  | 		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) | 
            
                                                                        
                            
            
                                    
            
            
                | 342 |  |  | 		if err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 343 |  |  | 			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                        
                            
            
                                    
            
            
                | 344 |  |  | 		} | 
            
                                                                        
                            
            
                                    
            
            
                | 345 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 346 |  |  | 		collection.Add(rt.ToAttribute()) | 
            
                                                                        
                            
            
                                    
            
            
                | 347 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 348 |  |  | 	if err = rows.Err(); err != nil { | 
            
                                                                        
                            
            
                                    
            
            
                | 349 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                        
                            
            
                                    
            
            
                | 350 |  |  | 	} | 
            
                                                                        
                            
            
                                    
            
            
                | 351 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 352 |  |  | 	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database") | 
            
                                                                        
                            
            
                                    
            
            
                | 353 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 354 |  |  | 	// Return an AttributeIterator created from the AttributeCollection. | 
            
                                                                        
                            
            
                                    
            
            
                | 355 |  |  | 	return collection.CreateAttributeIterator(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 356 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 357 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 358 |  |  | // ReadAttributes reads multiple attributes from the storage based on the given filter and pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 359 |  |  | func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 360 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 361 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 362 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 363 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 364 |  |  | 	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 365 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 366 |  |  | 	// Decode the snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 367 |  |  | 	var st token.SnapToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 368 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 369 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 370 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 371 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 372 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 373 |  |  | 	// Build the relationships query based on the provided filter, snapshot value, and pagination settings. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 374 |  |  | 	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 375 |  |  | 	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 376 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 377 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 378 |  |  | 	// Apply the pagination token and limit to the query. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 379 |  |  | 	if pagination.Token() != "" { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 380 |  |  | 		var t database.ContinuousToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 381 |  |  | 		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 382 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 383 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 384 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 385 |  |  | 		var v uint64 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 386 |  |  | 		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 387 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 388 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 389 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 390 |  |  | 		builder = builder.Where(squirrel.GtOrEq{"id": v}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 391 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 392 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 393 |  |  | 	builder = builder.OrderBy("id") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 394 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 395 |  |  | 	if pagination.PageSize() != 0 { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 396 |  |  | 		builder = builder.Limit(uint64(pagination.PageSize() + 1)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 397 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 398 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 399 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 400 |  |  | 	var query string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 401 |  |  | 	var args []interface{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 402 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 403 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 404 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 405 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 406 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 407 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 408 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 409 |  |  | 	// Execute the query and retrieve the rows. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 410 |  |  | 	var rows pgx.Rows | 
            
                                                                                                            
                            
            
                                    
            
            
                | 411 |  |  | 	rows, err = r.database.ReadPool.Query(ctx, query, args...) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 412 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 413 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 414 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 415 |  |  | 	defer rows.Close() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 416 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 417 |  |  | 	var lastID uint64 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 418 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 419 |  |  | 	// Iterate through the rows and scan the result into a RelationTuple struct. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 420 |  |  | 	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 421 |  |  | 	for rows.Next() { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 422 |  |  | 		rt := storage.Attribute{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 423 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 424 |  |  | 		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 425 |  |  | 		var valueStr string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 426 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 427 |  |  | 		// Scan the row from the database into the fields of `rt` and `valueStr`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 428 |  |  | 		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 429 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 430 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 431 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 432 |  |  | 		lastID = rt.ID | 
            
                                                                                                            
                            
            
                                    
            
            
                | 433 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 434 |  |  | 		// Unmarshal the JSON data from `valueStr` into `rt.Value`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 435 |  |  | 		rt.Value = &anypb.Any{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 436 |  |  | 		unmarshaler := &jsonpb.Unmarshaler{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 437 |  |  | 		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 438 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 439 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 440 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 441 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 442 |  |  | 		attributes = append(attributes, rt.ToAttribute()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 443 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 444 |  |  | 	// Check for any errors during iteration. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 445 |  |  | 	if err = rows.Err(); err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 446 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 447 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 448 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 449 |  |  | 	slog.DebugContext(ctx, "successfully read attributes from the database") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 450 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 451 |  |  | 	// Return the results and encoded continuous token for pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 452 |  |  | 	if len(attributes) > int(pagination.PageSize()) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 453 |  |  | 		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 454 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 455 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 456 |  |  | 	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 457 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 458 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 459 |  |  | // QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 460 |  |  | func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 461 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 462 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 463 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 464 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 465 |  |  | 	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 466 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 467 |  |  | 	// Decode the snapshot value. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 468 |  |  | 	var st token.SnapToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 469 |  |  | 	st, err = snapshot.EncodedToken{Value: snap}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 470 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 471 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 472 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 473 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 474 |  |  | 	// Build the relationships query based on the provided filter, snapshot value, and pagination settings. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 475 |  |  | 	builder := r.database.Builder. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 476 |  |  | 		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 477 |  |  | 		From(RelationTuplesTable). | 
            
                                                                                                            
                            
            
                                    
            
            
                | 478 |  |  | 		Where(squirrel.Eq{"tenant_id": tenantID}). | 
            
                                                                                                            
                            
            
                                    
            
            
                | 479 |  |  | 		GroupBy("subject_id") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 480 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 481 |  |  | 	// Apply subject filter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 482 |  |  | 	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 483 |  |  | 		Subject: &base.SubjectFilter{ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 484 |  |  | 			Type:     subjectReference.GetType(), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 485 |  |  | 			Relation: subjectReference.GetRelation(), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 486 |  |  | 		}, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 487 |  |  | 	}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 488 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 489 |  |  | 	// Apply snapshot filter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 490 |  |  | 	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 491 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 492 |  |  | 	// Apply exclusion if the list is not empty | 
            
                                                                                                            
                            
            
                                    
            
            
                | 493 |  |  | 	if len(excluded) > 0 { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 494 |  |  | 		builder = builder.Where(squirrel.NotEq{"subject_id": excluded}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 495 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 496 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 497 |  |  | 	// Apply the pagination token and limit to the query. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 498 |  |  | 	if pagination.Token() != "" { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 499 |  |  | 		var t database.ContinuousToken | 
            
                                                                                                            
                            
            
                                    
            
            
                | 500 |  |  | 		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 501 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 502 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 503 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 504 |  |  | 		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 505 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 506 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 507 |  |  | 	builder = builder.OrderBy("subject_id") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 508 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 509 |  |  | 	if pagination.PageSize() != 0 { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 510 |  |  | 		builder = builder.Limit(uint64(pagination.PageSize() + 1)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 511 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 512 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 513 |  |  | 	// Generate the SQL query and arguments. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 514 |  |  | 	var query string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 515 |  |  | 	var args []interface{} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 516 |  |  | 	query, args, err = builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 517 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 518 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 519 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 520 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 521 |  |  | 	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 522 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 523 |  |  | 	// Execute the query and retrieve the rows. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 524 |  |  | 	var rows pgx.Rows | 
            
                                                                                                            
                            
            
                                    
            
            
                | 525 |  |  | 	rows, err = r.database.ReadPool.Query(ctx, query, args...) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 526 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 527 |  |  | 		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 528 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 529 |  |  | 	defer rows.Close() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 530 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 531 |  |  | 	var lastID string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 532 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 533 |  |  | 	// Iterate through the rows and scan the result into a RelationTuple struct. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 534 |  |  | 	subjectIDs := make([]string, 0, pagination.PageSize()+1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 535 |  |  | 	for rows.Next() { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 536 |  |  | 		var subjectID string | 
            
                                                                                                            
                            
            
                                    
            
            
                | 537 |  |  | 		err = rows.Scan(&subjectID) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 538 |  |  | 		if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 539 |  |  | 			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 540 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 541 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 542 |  |  | 		subjectIDs = append(subjectIDs, subjectID) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 543 |  |  | 		lastID = subjectID | 
            
                                                                                                            
                            
            
                                    
            
            
                | 544 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 545 |  |  | 	// Check for any errors during iteration. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 546 |  |  | 	if err = rows.Err(); err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 547 |  |  | 		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 548 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 549 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 550 |  |  | 	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 551 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 552 |  |  | 	// Return the results and encoded continuous token for pagination. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 553 |  |  | 	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 554 |  |  | 		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 555 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 556 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 557 |  |  | 	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 558 |  |  | } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 559 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 560 |  |  | // HeadSnapshot retrieves the latest snapshot token associated with the tenant. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 561 |  |  | func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 562 |  |  | 	// Start a new trace span and end it when the function exits. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 563 |  |  | 	ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 564 |  |  | 	defer span.End() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 565 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 566 |  |  | 	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 567 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 568 |  |  | 	var xid types.XID8 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 569 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 570 |  |  | 	// Build the query to find the highest transaction ID associated with the tenant. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 571 |  |  | 	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 572 |  |  | 	query, args, err := builder.ToSql() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 573 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 574 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 575 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 576 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 577 |  |  | 	// TODO: To optimize this query, create the following index concurrently to avoid table locks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 578 |  |  | 	// CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC); | 
            
                                                                                                            
                            
            
                                    
            
            
                | 579 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 580 |  |  | 	// Execute the query and retrieve the highest transaction ID. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 581 |  |  | 	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 582 |  |  | 	if err != nil { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 583 |  |  | 		// If no rows are found, return a snapshot token with a value of 0. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 584 |  |  | 		if errors.Is(err, pgx.ErrNoRows) { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 585 |  |  | 			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil | 
            
                                                                                                            
                            
            
                                    
            
            
                | 586 |  |  | 		} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 587 |  |  | 		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 588 |  |  | 	} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 589 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 590 |  |  | 	slog.DebugContext(ctx, "successfully retrieved latest snapshot token") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 591 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 592 |  |  | 	// Return the latest snapshot token associated with the tenant. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 593 |  |  | 	return snapshot.Token{Value: xid}, nil | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 594 |  |  | } | 
            
                                                        
            
                                    
            
            
                | 595 |  |  |  |