1 | package cloudwatchlogs |
||
2 | |||
3 | import ( |
||
4 | "context" |
||
5 | "fmt" |
||
6 | "math" |
||
7 | "math/rand" |
||
8 | "time" |
||
9 | ) |
||
10 | |||
11 | const ( |
||
12 | defaultMaxRetry = 5 |
||
13 | ) |
||
14 | |||
15 | var defaultWaitFunc = func(i int) { |
||
16 | const initialWaitSec = 1 |
||
17 | expo := math.Pow(2, float64(i)) * initialWaitSec |
||
18 | time.Sleep(time.Duration(rand.Float64()*expo) * time.Second) // #nosec G404 |
||
19 | } |
||
20 | |||
21 | // XQueryResults executes a query and waits for fetching complete results. |
||
22 | func (svc *CloudwatchLogs) XQueryResults(ctx context.Context, r XQueryResultsRequest) (*GetQueryResultsResult, error) { |
||
23 | out, err := svc.StartQuery(ctx, StartQueryRequest{ |
||
24 | QueryString: r.QueryString, |
||
25 | StartTime: r.StartTime, |
||
26 | StartTimeInt: r.StartTimeInt, |
||
27 | EndTime: r.EndTime, |
||
28 | EndTimeInt: r.EndTimeInt, |
||
29 | Limit: r.Limit, |
||
30 | LogGroupName: r.LogGroupName, |
||
31 | LogGroupNames: r.LogGroupNames, |
||
32 | }) |
||
33 | if err != nil { |
||
34 | return nil, err |
||
35 | } |
||
36 | |||
37 | maxRetry := r.MaxRetry |
||
38 | if maxRetry == 0 { |
||
39 | maxRetry = defaultMaxRetry |
||
40 | } |
||
41 | waitFn := r.WaitFunc |
||
42 | if waitFn == nil { |
||
43 | waitFn = defaultWaitFunc |
||
44 | } |
||
45 | |||
46 | return svc.waitQueryResult(ctx, out.QueryID, maxRetry, waitFn) |
||
47 | } |
||
48 | |||
49 | type XQueryResultsRequest struct { |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
50 | QueryString string |
||
51 | StartTime time.Time |
||
52 | StartTimeInt int64 |
||
53 | EndTime time.Time |
||
54 | EndTimeInt int64 |
||
55 | |||
56 | // optional |
||
57 | Limit int64 |
||
58 | LogGroupName string |
||
59 | LogGroupNames []string |
||
60 | |||
61 | // extension |
||
62 | MaxRetry int // default=5 |
||
63 | WaitFunc func(i int) // waiting strategy, default=exponential backoff with full jitter from 1sec |
||
64 | } |
||
65 | |||
66 | func (svc *CloudwatchLogs) waitQueryResult(ctx context.Context, queryID string, maxRetry int, waitFn func(int)) (*GetQueryResultsResult, error) { |
||
67 | for i := 0; i < maxRetry; i++ { |
||
68 | waitFn(i) // wait |
||
69 | |||
70 | res, err := svc.GetQueryResults(ctx, GetQueryResultsRequest{ |
||
71 | QueryID: queryID, |
||
72 | }) |
||
73 | |||
74 | switch { |
||
75 | case err != nil: |
||
76 | return nil, err |
||
77 | case res.Status.IsComplete(): |
||
78 | return res, nil |
||
79 | case res.Status.IsRunning(), |
||
80 | res.Status.IsScheduled(): |
||
81 | // do nothing; continue loop |
||
82 | default: |
||
83 | return nil, fmt.Errorf("Invalid query status: id:[%s], status:[%s]", queryID, string(res.Status)) |
||
84 | } |
||
85 | } |
||
86 | return nil, fmt.Errorf("Timeout error: id:[%s]", queryID) |
||
87 | } |
||
88 |