org.apereo.cas.logging.CloudWatchAppender.start()   C
last analyzed

Complexity

Conditions 7

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
c 0
b 0
f 0
cc 7
rs 5.5
1
package org.apereo.cas.logging;
2
3
import com.amazonaws.auth.AWSStaticCredentialsProvider;
4
import com.amazonaws.auth.BasicAWSCredentials;
5
import com.amazonaws.services.logs.AWSLogs;
6
import com.amazonaws.services.logs.AWSLogsClient;
7
import com.amazonaws.services.logs.AWSLogsClientBuilder;
8
import com.amazonaws.services.logs.model.CreateLogGroupRequest;
9
import com.amazonaws.services.logs.model.CreateLogStreamRequest;
10
import com.amazonaws.services.logs.model.DataAlreadyAcceptedException;
11
import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
12
import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
13
import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
14
import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
15
import com.amazonaws.services.logs.model.InputLogEvent;
16
import com.amazonaws.services.logs.model.InvalidSequenceTokenException;
17
import com.amazonaws.services.logs.model.LogStream;
18
import com.amazonaws.services.logs.model.PutLogEventsRequest;
19
import com.amazonaws.services.logs.model.PutLogEventsResult;
20
import org.apache.commons.lang3.StringUtils;
21
import org.apache.logging.log4j.core.Layout;
22
import org.apache.logging.log4j.core.LogEvent;
23
import org.apache.logging.log4j.core.appender.AbstractAppender;
24
import org.apache.logging.log4j.core.config.plugins.Plugin;
25
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
26
import org.apache.logging.log4j.core.config.plugins.PluginElement;
27
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28
import org.apache.logging.log4j.core.layout.PatternLayout;
29
30
import java.io.Serializable;
31
import java.nio.charset.StandardCharsets;
32
import java.util.ArrayList;
33
import java.util.Comparator;
34
import java.util.List;
35
import java.util.concurrent.BlockingQueue;
36
import java.util.concurrent.LinkedBlockingQueue;
37
38
/**
39
 * This is {@link CloudWatchAppender}.
40
 *
41
 * @author Misagh Moayyed
42
 * @since 5.1.0
43
 */
44
@Plugin(name = "CloudWatchAppender", category = "Core", elementType = "appender", printObject = true)
45
public class CloudWatchAppender extends AbstractAppender {
46
    private static final long serialVersionUID = 1044758913028847477L;
47
48
    private static final int AWS_DRAIN_LIMIT = 256;
49
    private static final int AWS_LOG_STREAM_MAX_QUEUE_DEPTH = 10000;
50
    private static final int SHUTDOWN_TIMEOUT_MILLIS = 10000;
51
    private static final int AWS_LOG_STREAM_FLUSH_PERIOD_IN_SECONDS = 5;
52
53
    private final BlockingQueue<InputLogEvent> queue = new LinkedBlockingQueue<>(AWS_LOG_STREAM_MAX_QUEUE_DEPTH);
54
    private volatile boolean shutdown;
55
    private int flushPeriodMillis;
56
    private Thread deliveryThread;
57
    private final Object monitor = new Object();
58
59
    /**
60
     * Every PutLogEvents request must include the sequenceToken obtained from the response of the previous request.
61
     */
62
    private String sequenceTokenCache;
63
    private long lastReportedTimestamp = -1;
64
65
    private String logGroupName;
66
    private String logStreamName;
67
    private AWSLogs awsLogsClient;
68
    private volatile boolean queueFull;
69
70
    public CloudWatchAppender(final String name,
71
                              final String awsLogGroupName,
72
                              final String awsLogStreamName,
73
                              final String awsLogStreamFlushPeriodInSeconds,
74
                              final String credentialAccessKey,
75
                              final String credentialSecretKey,
76
                              final String awsLogRegionName,
77
                              final Layout<Serializable> layout) {
78
        super(name, null, layout == null ? PatternLayout.createDefaultLayout() : layout, false);
79
        try {
80
            int flushPeriod = AWS_LOG_STREAM_FLUSH_PERIOD_IN_SECONDS;
81
            if (awsLogStreamFlushPeriodInSeconds != null) {
82
                flushPeriod = Integer.parseInt(awsLogStreamFlushPeriodInSeconds);
83
            }
84
            flushPeriodMillis = flushPeriod * 1_000;
85
86
            LOGGER.debug("Connecting to AWS CloudWatch...");
87
            final AWSLogsClientBuilder builder = AWSLogsClient.builder();
88
            final BasicAWSCredentials credentials = new BasicAWSCredentials(credentialAccessKey, credentialSecretKey);
89
            builder.setCredentials(new AWSStaticCredentialsProvider(credentials));
90
            builder.setRegion(awsLogRegionName);
91
            this.awsLogsClient = builder.build();
92
            this.logGroupName = awsLogGroupName;
93
            this.logStreamName = awsLogStreamName;
94
            this.sequenceTokenCache = createLogGroupAndLogStreamIfNeeded();
95
        } catch (final Exception e) {
96
            LOGGER.error(e.getMessage(), e);
97
        }
98
    }
99
100
    private void flush() {
101
        int drained;
102
        final List<InputLogEvent> logEvents = new ArrayList<>(AWS_DRAIN_LIMIT);
103
        do {
104
            drained = queue.drainTo(logEvents, AWS_DRAIN_LIMIT);
105
            if (logEvents.isEmpty()) {
106
                break;
107
            }
108
            logEvents.sort(Comparator.comparing(InputLogEvent::getTimestamp));
109
            if (lastReportedTimestamp > 0) {
110
                for (final InputLogEvent event : logEvents) {
111
                    if (event.getTimestamp() < lastReportedTimestamp) {
112
                        event.setTimestamp(lastReportedTimestamp);
113
                    }
114
                }
115
            }
116
117
            lastReportedTimestamp = logEvents.get(logEvents.size() - 1).getTimestamp();
118
            final PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents);
119
            putLogEventsRequest.setSequenceToken(sequenceTokenCache);
120
            try {
121
                final PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest);
122
                sequenceTokenCache = putLogEventsResult.getNextSequenceToken();
123
            } catch (final DataAlreadyAcceptedException daae) {
124
                sequenceTokenCache = daae.getExpectedSequenceToken();
125
            } catch (final InvalidSequenceTokenException iste) {
126
                sequenceTokenCache = iste.getExpectedSequenceToken();
127
            } catch (final Exception e) {
128
                LOGGER.error(e.getMessage(), e);
129
            }
130
            logEvents.clear();
131
        } while (drained >= AWS_DRAIN_LIMIT);
132
    }
133
134
    @Override
135
    public void append(final LogEvent logEvent) {
136
        final LogEvent event = LoggingUtils.prepareLogEvent(logEvent);
137
        final InputLogEvent awsLogEvent = new InputLogEvent();
138
        final long timestamp = event.getTimeMillis();
139
        final String message = new String(getLayout().toByteArray(event), StandardCharsets.UTF_8);
140
        awsLogEvent.setTimestamp(timestamp);
141
        awsLogEvent.setMessage(message);
142
        if (!queue.offer(awsLogEvent) && !queueFull) {
143
            queueFull = true;
144
        } else if (queueFull) {
145
            queueFull = false;
146
        }
147
    }
148
149
    private String createLogGroupAndLogStreamIfNeeded() {
150
        LOGGER.debug("Attempting to locate the log group [{}]", logGroupName);
151
        final DescribeLogGroupsResult describeLogGroupsResult =
152
                awsLogsClient.describeLogGroups(new DescribeLogGroupsRequest().withLogGroupNamePrefix(logGroupName));
153
        boolean createLogGroup = true;
154
        if (describeLogGroupsResult != null && describeLogGroupsResult.getLogGroups() != null && !describeLogGroupsResult.getLogGroups().isEmpty()) {
155
            createLogGroup = !describeLogGroupsResult.getLogGroups().stream().anyMatch(g -> g.getLogGroupName().equals(logGroupName));
156
        }
157
        if (createLogGroup) {
158
            LOGGER.debug("Creating log group [{}]", logGroupName);
159
            final CreateLogGroupRequest createLogGroupRequest = new CreateLogGroupRequest(logGroupName);
160
            awsLogsClient.createLogGroup(createLogGroupRequest);
161
        }
162
        String logSequenceToken = null;
163
        boolean createLogStream = true;
164
        LOGGER.debug("Attempting to locate the log stream [{}] for group [{}]", logStreamName, logGroupName);
165
        final DescribeLogStreamsRequest describeLogStreamsRequest = new DescribeLogStreamsRequest(logGroupName).withLogStreamNamePrefix(logStreamName);
166
        final DescribeLogStreamsResult describeLogStreamsResult = awsLogsClient.describeLogStreams(describeLogStreamsRequest);
167
        if (describeLogStreamsResult != null && describeLogStreamsResult.getLogStreams() != null && !describeLogStreamsResult.getLogStreams().isEmpty()) {
168
            for (final LogStream ls : describeLogStreamsResult.getLogStreams()) {
169
                if (logStreamName.equals(ls.getLogStreamName())) {
170
                    createLogStream = false;
171
                    logSequenceToken = ls.getUploadSequenceToken();
172
                    LOGGER.debug("Found log stream [{}] with sequence token [{}]", logStreamName, logSequenceToken);
173
                    break;
174
                }
175
            }
176
        }
177
178
        if (createLogStream) {
179
            LOGGER.debug("Creating log stream [{}] for group [{}]", logStreamName, logGroupName);
180
            final CreateLogStreamRequest createLogStreamRequest = new CreateLogStreamRequest(logGroupName, logStreamName);
181
            awsLogsClient.createLogStream(createLogStreamRequest);
182
        }
183
        return logSequenceToken;
184
    }
185
186
    @Override
187
    public void start() {
188
        super.start();
189
        this.deliveryThread = new Thread(() -> {
190
            while (!shutdown) {
191
                try {
192
                    flush();
193
                } catch (final Exception e) {
194
                    LOGGER.error(e.getMessage(), e);
195
                }
196
                if (!shutdown && queue.size() < AWS_DRAIN_LIMIT) {
197
                    try {
198
                        synchronized (monitor) {
199
                            monitor.wait(flushPeriodMillis);
200
                        }
201
                    } catch (final InterruptedException e) {
202
                        LOGGER.error(e.getMessage(), e);
203
                    }
204
                }
205
            }
206
207
            while (!queue.isEmpty()) {
208
                flush();
209
            }
210
        }, "CloudWatchAppenderDeliveryThread");
211
        deliveryThread.start();
212
    }
213
214
    @Override
215
    public void stop() {
216
        super.stop();
217
        shutdown = true;
218
        if (deliveryThread != null) {
219
            synchronized (monitor) {
220
                monitor.notify();
221
            }
222
            try {
223
                deliveryThread.join(SHUTDOWN_TIMEOUT_MILLIS);
224
            } catch (final InterruptedException e) {
225
                LOGGER.error(e.getMessage(), e);
226
            }
227
        }
228
        if (queue.size() > 0) {
229
            flush();
230
        }
231
    }
232
233
    /**
234
     * Create appender cloud watch appender.
235
     *
236
     * @param name                             the name
237
     * @param awsLogStreamName                 the aws log stream name
238
     * @param awsLogGroupName                  the aws log group name
239
     * @param awsLogStreamFlushPeriodInSeconds the aws log stream flush period in seconds
240
     * @param credentialAccessKey              the credential access key
241
     * @param credentialSecretKey              the credential secret key
242
     * @param awsLogRegionName                 the aws log region name
243
     * @param layout                           the layout
244
     * @return the cloud watch appender
245
     */
246
    @PluginFactory
247
    public static CloudWatchAppender createAppender(@PluginAttribute("name") final String name,
248
                                                    @PluginAttribute("awsLogStreamName") final String awsLogStreamName,
249
                                                    @PluginAttribute("awsLogGroupName") final String awsLogGroupName,
250
                                                    @PluginAttribute("awsLogStreamFlushPeriodInSeconds") final String awsLogStreamFlushPeriodInSeconds,
251
                                                    @PluginAttribute("credentialAccessKey") final String credentialAccessKey,
252
                                                    @PluginAttribute("credentialSecretKey") final String credentialSecretKey,
253
                                                    @PluginAttribute("awsLogRegionName") final String awsLogRegionName,
254
                                                    @PluginElement("Layout") final Layout<Serializable> layout) {
255
        return new CloudWatchAppender(
256
                name,
257
                awsLogGroupName,
258
                awsLogStreamName,
259
                awsLogStreamFlushPeriodInSeconds,
260
                StringUtils.defaultIfBlank(credentialAccessKey, System.getProperty("AWS_ACCESS_KEY")),
261
                StringUtils.defaultIfBlank(credentialSecretKey, System.getProperty("AWS_SECRET_KEY")),
262
                StringUtils.defaultIfBlank(awsLogRegionName, System.getProperty("AWS_REGION_NAME")),
263
                layout);
264
    }
265
}
266