Passed
Push — master ( 529ae6...944870 )
by Tolga
06:00 queued 02:43
created

internal/storage/postgres/bundle_writer.go   A

Size/Duplication

Total Lines 98
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 59
dl 0
loc 98
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
B postgres.*BundleWriter.Write 0 41 5
A postgres.NewBundleWriter 0 4 1
A postgres.*BundleWriter.Delete 0 24 3
1
package postgres
2
3
import (
4
	"context"
5
	"log/slog"
6
7
	"github.com/jackc/pgx/v5"
8
9
	"github.com/Masterminds/squirrel"
10
	"google.golang.org/protobuf/encoding/protojson"
11
12
	"github.com/Permify/permify/internal"
13
	"github.com/Permify/permify/internal/storage"
14
	"github.com/Permify/permify/internal/storage/postgres/utils"
15
	db "github.com/Permify/permify/pkg/database/postgres"
16
	base "github.com/Permify/permify/pkg/pb/base/v1"
17
)
18
19
type BundleWriter struct {
20
	database *db.Postgres
21
	// options
22
	txOptions pgx.TxOptions
23
}
24
25
func NewBundleWriter(database *db.Postgres) *BundleWriter {
26
	return &BundleWriter{
27
		database:  database,
28
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadWrite},
29
	}
30
}
31
32
func (b *BundleWriter) Write(ctx context.Context, bundles []storage.Bundle) (names []string, err error) {
33
	ctx, span := internal.Tracer.Start(ctx, "bundle-writer.write-bundle")
34
	defer span.End()
35
36
	slog.DebugContext(ctx, "writing bundles to the database", slog.Any("number_of_bundles", len(bundles)))
37
38
	insertBuilder := b.database.Builder.Insert(BundlesTable).
39
		Columns("name, payload, tenant_id").
40
		Suffix("ON CONFLICT (name, tenant_id) DO UPDATE SET payload = EXCLUDED.payload")
41
42
	for _, bundle := range bundles {
43
44
		names = append(names, bundle.Name)
45
46
		jsonBytes, err := protojson.Marshal(bundle.DataBundle)
47
		if err != nil {
48
			return names, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_ARGUMENT)
49
		}
50
		jsonStr := string(jsonBytes)
51
52
		insertBuilder = insertBuilder.Values(bundle.Name, jsonStr, bundle.TenantID)
53
	}
54
55
	var query string
56
	var args []interface{}
57
58
	query, args, err = insertBuilder.ToSql()
59
	if err != nil {
60
		return names, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
61
	}
62
63
	slog.DebugContext(ctx, "executing sql insert query", slog.Any("query", query), slog.Any("arguments", args))
64
65
	_, err = b.database.WritePool.Exec(ctx, query, args...)
66
	if err != nil {
67
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
68
	}
69
70
	slog.DebugContext(ctx, "successfully wrote bundles to the database", slog.Any("number_of_bundles", len(bundles)))
71
72
	return
73
}
74
75
func (b *BundleWriter) Delete(ctx context.Context, tenantID, name string) (err error) {
76
	ctx, span := internal.Tracer.Start(ctx, "bundle-writer.delete-bundle")
77
	defer span.End()
78
79
	slog.DebugContext(ctx, "deleting bundle", slog.Any("bundle", name))
80
81
	deleteBuilder := b.database.Builder.Delete(BundlesTable).Where(squirrel.Eq{"name": name, "tenant_id": tenantID})
82
83
	var query string
84
	var args []interface{}
85
86
	query, args, err = deleteBuilder.ToSql()
87
	if err != nil {
88
		return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
89
	}
90
91
	_, err = b.database.WritePool.Exec(ctx, query, args...)
92
	if err != nil {
93
		return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
94
	}
95
96
	slog.DebugContext(ctx, "bundle successfully deleted")
97
98
	return nil
99
}
100