consume(Consumer)   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 43
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 7.0022

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 28
c 1
b 0
f 0
dl 0
loc 43
ccs 27
cts 28
cp 0.9643
rs 7.808
cc 7
crap 7.0022
1
/*
2
 * This file is part of SingleInstance.
3
 *
4
 * SingleInstance is free software: you can redistribute it and/or modify
5
 * it under the terms of the GNU Lesser General Public License as published by
6
 * the Free Software Foundation, either version 3 of the License, or
7
 * (at your option) any later version.
8
 *
9
 * SingleInstance is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU Lesser General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU Lesser General Public License
15
 * along with SingleInstance.  If not, see <https://www.gnu.org/licenses/>.
16
 *
17
 * Copyright (c) 2020 Vincent Quatrevieux
18
 */
19
20
package fr.quatrevieux.singleinstance.ipc;
21
22
import java.io.Closeable;
23
import java.io.IOException;
24
import java.net.InetSocketAddress;
25
import java.nio.ByteBuffer;
26
import java.nio.channels.*;
27
import java.util.Iterator;
28
import java.util.Set;
29
import java.util.function.Consumer;
30
31
/**
32
 * IPC server for the first running instance
33
 *
34
 * Usage:
35
 * <pre>{@code
36
 *     try (InstanceServer server = new InstanceServer()) {
37
 *         server.open();
38
 *
39
 *         server.consume(message -> {
40
 *             // process received message
41
 *             if (message.name().equals(xxx)) {
42
 *                 //...
43
 *             }
44
 *         });
45
 *     }
46
 * }</pre>
47
 */
48 1
final public class InstanceServer implements Closeable {
49
    private ServerSocketChannel serverSocket;
50
    private Selector selector;
51
52
    /**
53
     * Open the server on a random port number
54
     *
55
     * @throws IOException When cannot start the server
56
     */
57
    public void open() throws IOException {
58 1
        open(new InetSocketAddress("localhost", 0));
59 1
    }
60
61
    /**
62
     * Open the server on the given address
63
     *
64
     * @param address The bind address
65
     *
66
     * @throws IOException When cannot start the server
67
     */
68
    public void open(InetSocketAddress address) throws IOException {
69 1
        serverSocket = ServerSocketChannel.open();
70 1
        serverSocket.bind(address);
71 1
        serverSocket.configureBlocking(false);
72 1
    }
73
74
    /**
75
     * Consume incoming messages
76
     * Note: This method is blocking : a thread is not created for consuming messages
77
     *
78
     * @param action The message consumer
79
     *
80
     * @throws IOException When an error occurs during reading message
81
     * @throws IllegalStateException If the server is not opened
82
     */
83
    public void consume(Consumer<Message> action) throws IOException {
84 1
        if (serverSocket == null) {
85
            throw new IllegalStateException("Server must be opened");
86
        }
87
88 1
        selector = Selector.open();
89 1
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
90
91 1
        final ByteBuffer buffer = ByteBuffer.allocate(256);
92
93
        try {
94 1
            while (running()) {
95 1
                selector.select();
96 1
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
97 1
                Iterator<SelectionKey> iter = selectedKeys.iterator();
98
99 1
                while (iter.hasNext()) {
100 1
                    SelectionKey key = iter.next();
101
102 1
                    if (key.isAcceptable()) {
103 1
                        SocketChannel client = serverSocket.accept();
104 1
                        client.configureBlocking(false);
105 1
                        client.register(selector, SelectionKey.OP_READ);
106
                    }
107
108 1
                    if (key.isReadable()) {
109 1
                        SocketChannel client = (SocketChannel) key.channel();
110 1
                        client.read(buffer);
111 1
                        buffer.flip();
112
113 1
                        final ProtocolParser parser = ProtocolParser.forKey(key);
114
115 1
                        ProtocolParser.forKey(key).parse(buffer);
116
117 1
                        parser.packets().forEach(action);
118 1
                        parser.clear();
119 1
                        buffer.clear();
120
                    }
121
122 1
                    iter.remove();
123 1
                }
124 1
            }
125 1
        } catch (ClosedSelectorException|CancelledKeyException e) {
126
            // Ignore
127 1
        }
128 1
    }
129
130
    /**
131
     * Get the bind port number
132
     *
133
     * @return The port number as int
134
     * @throws IllegalStateException If the server is not opened
135
     */
136
    public int port() {
137 1
        if (serverSocket == null) {
138 1
            throw new IllegalStateException("Server must be opened");
139
        }
140
141 1
        return serverSocket.socket().getLocalPort();
142
    }
143
144
    /**
145
     * Check if the server is running
146
     *
147
     * @return true if the server is running
148
     */
149
    public boolean running() {
150 1
        return selector != null && selector.isOpen() && serverSocket != null && serverSocket.isOpen();
151
    }
152
153
    @Override
154
    public void close() throws IOException {
155 1
        if (selector != null) {
156 1
            selector.close();
157 1
            selector = null;
158
        }
159
160 1
        if (serverSocket != null) {
161 1
            serverSocket.close();
162 1
            serverSocket = null;
163
        }
164 1
    }
165
}
166