Passed
Pull Request — master (#2657)
by
unknown
03:44
created

servers.*PermissionServer.LookupEntityStream   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 2
dl 0
loc 18
rs 9.75
c 0
b 0
f 0
1
package servers
2
3
import (
4
	"context"
5
	"log/slog"
6
	"errors"
7
	"sync"
8
9
	otelCodes "go.opentelemetry.io/otel/codes"
10
	"google.golang.org/grpc/status"
11
12
	"github.com/Permify/permify/internal"
13
	"github.com/Permify/permify/internal/invoke"
14
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
// PermissionServer - Structure for Permission Server
18
type PermissionServer struct {
19
	v1.UnimplementedPermissionServer
20
21
	invoker invoke.Invoker
22
}
23
24
// NewPermissionServer - Creates new Permission Server
25
func NewPermissionServer(i invoke.Invoker) *PermissionServer {
26
	return &PermissionServer{
27
		invoker: i,
28
	}
29
}
30
31
// Check - Performs Authorization Check
32
func (r *PermissionServer) Check(ctx context.Context, request *v1.PermissionCheckRequest) (*v1.PermissionCheckResponse, error) {
33
	ctx, span := internal.Tracer.Start(ctx, "permissions.check")
34
	defer span.End()
35
36
	v := request.Validate()
37
	if v != nil {
38
		return nil, status.Error(GetStatus(v), v.Error()) // Return validation error
39
	}
40
41
	response, err := r.invoker.Check(ctx, request)
42
	if err != nil {
43
		span.RecordError(err)
44
		span.SetStatus(otelCodes.Error, err.Error())
45
		slog.ErrorContext(ctx, err.Error())
46
		return nil, status.Error(GetStatus(err), err.Error())
47
	}
48
49
	return response, nil
50
}
51
52
// BulkCheck - Performs multiple authorization checks in a single request
53
func (r *PermissionServer) BulkCheck(ctx context.Context, request *v1.PermissionBulkCheckRequest) (*v1.PermissionBulkCheckResponse, error) {
54
	// emptyResp is a default, empty response that we will return in case of an error or when the context is cancelled.
55
	emptyResp := &v1.PermissionBulkCheckResponse{
56
		Results: make([]*v1.PermissionCheckResponse, 0),
57
	}
58
59
	ctx, span := internal.Tracer.Start(ctx, "permissions.bulk-check")
60
	defer span.End()
61
62
	// Validate tenant_id
63
	if request.GetTenantId() == "" {
64
		err := status.Error(GetStatus(nil), "tenant_id is required")
65
		span.RecordError(err)
66
		span.SetStatus(otelCodes.Error, err.Error())
67
		return nil, err
68
	}
69
70
	checkItems := request.GetItems()
71
72
	// Validate number of requests
73
	if len(checkItems) == 0 {
74
		err := status.Error(GetStatus(nil), "at least one item is required")
75
		span.RecordError(err)
76
		span.SetStatus(otelCodes.Error, err.Error())
77
		return nil, err
78
	}
79
80
	if len(checkItems) > 100 {
81
		err := status.Error(GetStatus(nil), "maximum 100 items allowed")
82
		span.RecordError(err)
83
		span.SetStatus(otelCodes.Error, err.Error())
84
		return nil, err
85
	}
86
87
	// Create a buffered channel for BulkPermissionCheckResponses.
88
	// The buffer size is equal to the number of references in the entity.
89
	type ResultChannel struct {int; *v1.PermissionCheckResponse}
90
	resultChannel := make(chan ResultChannel, len(checkItems))
91
92
	// The WaitGroup and Mutex are used for synchronization.
93
	var wg sync.WaitGroup
94
	var mutex sync.Mutex
95
96
	// Process each check request
97
	for i, checkRequestItem := range checkItems {
98
		wg.Add(1)
99
100
		go func(checkRequestItem *v1.PermissionBulkCheckRequestItem) {
101
			defer wg.Done()
102
103
			// Validate individual request
104
			v := checkRequestItem.Validate()
105
			if v != nil {
106
				// Return error response for this check
107
				resultChannel <- ResultChannel{
108
					i,
109
					&v1.PermissionCheckResponse{
110
						Can: v1.CheckResult_CHECK_RESULT_DENIED,
111
						Metadata: &v1.PermissionCheckResponseMetadata{
112
							CheckCount: 0,
113
						},
114
					},
115
				}
116
				return
117
			}
118
119
			// Perform the check using existing Check function
120
			checkRequest := &v1.PermissionCheckRequest{
121
				TenantId:      request.GetTenantId(),
122
				Subject:       checkRequestItem.GetSubject(),
123
				Entity:        checkRequestItem.GetEntity(),
124
				Permission:    checkRequestItem.GetPermission(),
125
				Metadata: 	   request.GetMetadata(),
126
				Context:       request.GetContext(),
127
				Arguments:     request.GetArguments(),
128
			}
129
			response, err := r.invoker.Check(ctx, checkRequest)
130
			if err != nil {
131
				// Log error but don't fail the entire bulk operation
132
				slog.ErrorContext(ctx, "check failed in bulk operation", "error", err.Error(), "index", i)
133
				resultChannel <- ResultChannel{
134
					i,
135
					&v1.PermissionCheckResponse{
136
						Can: v1.CheckResult_CHECK_RESULT_DENIED,
137
						Metadata: &v1.PermissionCheckResponseMetadata{
138
							CheckCount: 0,
139
						},
140
					},
141
				}
142
				return
143
			}
144
145
			resultChannel <- ResultChannel{i, response}
146
		}(checkRequestItem)
147
	}
148
149
	// Once the function returns, we wait for all goroutines to finish, then close the resultChannel.
150
	defer func() {
151
		wg.Wait()
152
		close(resultChannel)
153
	}()
154
155
	// We read the responses from the resultChannel.
156
	// We expect as many responses as there are references in the entity.
157
	results := make([]*v1.PermissionCheckResponse, len(request.GetItems()))
158
	for range checkItems {
159
		select {
160
		// If we receive a response from the resultChannel, we check for errors.
161
		case response := <-resultChannel:
162
			// If there's no error, we add the result to our response's Results map.
163
			// We use a mutex to safely update the map since multiple goroutines may be writing to it concurrently.
164
			mutex.Lock()
165
			results[response.int] = response.PermissionCheckResponse
166
			mutex.Unlock()
167
168
		// If the context is done (i.e., canceled or deadline exceeded), we return an empty response and an error.
169
		case <-ctx.Done():
170
			return emptyResp, errors.New(v1.ErrorCode_ERROR_CODE_CANCELLED.String())
171
		}
172
	}
173
174
	return &v1.PermissionBulkCheckResponse{
175
		Results: results,
176
	}, nil
177
}
178
179
// Expand - Get schema actions in a tree structure
180
func (r *PermissionServer) Expand(ctx context.Context, request *v1.PermissionExpandRequest) (*v1.PermissionExpandResponse, error) {
181
	ctx, span := internal.Tracer.Start(ctx, "permissions.expand")
182
	defer span.End()
183
184
	v := request.Validate()
185
	if v != nil {
186
		return nil, status.Error(GetStatus(v), v.Error()) // Return validation error
187
	}
188
189
	response, err := r.invoker.Expand(ctx, request)
190
	if err != nil {
191
		span.RecordError(err)
192
		span.SetStatus(otelCodes.Error, err.Error())
193
		slog.ErrorContext(ctx, err.Error())
194
		return nil, status.Error(GetStatus(err), err.Error())
195
	}
196
197
	return response, nil
198
}
199
200
// LookupEntity -
201
func (r *PermissionServer) LookupEntity(ctx context.Context, request *v1.PermissionLookupEntityRequest) (*v1.PermissionLookupEntityResponse, error) {
202
	ctx, span := internal.Tracer.Start(ctx, "permissions.lookup-entity")
203
	defer span.End()
204
205
	v := request.Validate()
206
	if v != nil {
207
		return nil, status.Error(GetStatus(v), v.Error()) // Return validation error
208
	}
209
210
	response, err := r.invoker.LookupEntity(ctx, request)
211
	if err != nil {
212
		span.RecordError(err)
213
		span.SetStatus(otelCodes.Error, err.Error())
214
		slog.ErrorContext(ctx, err.Error())
215
		return nil, status.Error(GetStatus(err), err.Error())
216
	}
217
218
	return response, nil
219
}
220
221
// LookupEntityStream -
222
func (r *PermissionServer) LookupEntityStream(request *v1.PermissionLookupEntityRequest, server v1.Permission_LookupEntityStreamServer) error {
223
	ctx, span := internal.Tracer.Start(server.Context(), "permissions.lookup-entity-stream")
224
	defer span.End()
225
226
	v := request.Validate()
227
	if v != nil {
228
		return v
229
	}
230
231
	err := r.invoker.LookupEntityStream(ctx, request, server)
232
	if err != nil {
233
		span.RecordError(err)
234
		span.SetStatus(otelCodes.Error, err.Error())
235
		slog.ErrorContext(ctx, err.Error())
236
		return status.Error(GetStatus(err), err.Error())
237
	}
238
239
	return nil
240
}
241
242
// LookupSubject -
243
func (r *PermissionServer) LookupSubject(ctx context.Context, request *v1.PermissionLookupSubjectRequest) (*v1.PermissionLookupSubjectResponse, error) {
244
	ctx, span := internal.Tracer.Start(ctx, "permissions.lookup-subject")
245
	defer span.End()
246
247
	v := request.Validate()
248
	if v != nil {
249
		return nil, status.Error(GetStatus(v), v.Error()) // Return validation error
250
	}
251
252
	response, err := r.invoker.LookupSubject(ctx, request)
253
	if err != nil {
254
		span.RecordError(err)
255
		span.SetStatus(otelCodes.Error, err.Error())
256
		slog.ErrorContext(ctx, err.Error())
257
		return nil, status.Error(GetStatus(err), err.Error())
258
	}
259
260
	return response, nil
261
}
262
263
// SubjectPermission -
264
func (r *PermissionServer) SubjectPermission(ctx context.Context, request *v1.PermissionSubjectPermissionRequest) (*v1.PermissionSubjectPermissionResponse, error) {
265
	ctx, span := internal.Tracer.Start(ctx, "permissions.subject-permission")
266
	defer span.End()
267
268
	v := request.Validate()
269
	if v != nil {
270
		return nil, status.Error(GetStatus(v), v.Error()) // Return validation error
271
	}
272
273
	response, err := r.invoker.SubjectPermission(ctx, request)
274
	if err != nil {
275
		span.RecordError(err)
276
		span.SetStatus(otelCodes.Error, err.Error())
277
		slog.ErrorContext(ctx, err.Error())
278
		return nil, status.Error(GetStatus(err), err.Error())
279
	}
280
281
	return response, nil
282
}
283