connect()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 36
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 36
c 0
b 0
f 0
cc 4
rs 9.184
1
package it.cnr.istc.pst.cognition.koala.network.mqtt;
2
3
import java.io.IOException;
4
import java.net.InetAddress;
5
import java.util.Date;
6
import java.util.logging.Level;
7
import java.util.logging.Logger;
8
9
import javax.ws.rs.core.UriBuilder;
10
11
import org.apache.activemq.broker.Broker;
12
import org.apache.activemq.broker.BrokerPlugin;
13
import org.apache.activemq.broker.BrokerService;
14
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
15
import org.eclipse.paho.client.mqttv3.MqttCallback;
16
import org.eclipse.paho.client.mqttv3.MqttClient;
17
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
18
import org.eclipse.paho.client.mqttv3.MqttException;
19
import org.eclipse.paho.client.mqttv3.MqttMessage;
20
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
21
22
import it.cnr.istc.pst.cognition.koala.lang.dictionary.KoalaPropertyDictionary;
23
import it.cnr.istc.pst.cognition.koala.reasoner.environment.EnvironmentReasoner;
24
import it.cnr.istc.pst.cognition.koala.reasoner.owl.jena.OWLKoalaEnvironmentReasoner;
25
import it.cnr.istc.pst.cognition.koala.reasoner.owl.jena.OWLKoalaObservationReasoner;
26
27
/**
28
 * 
29
 * @author anacleto
30
 *
31
 */
32
public class MQTTSensorNetworkManager implements MqttCallback 
33
{
34
	private Process mqtt = null;
35
	private String clientId = ""+new Date().getTime();
0 ignored issues
show
Performance introduced by
Boxing a primitive just to be able to call toString() on it is inefficient. Consider using the static method Long.toString() instead.
Loading history...
36
	private static final String TASKLIST = "tasklist";
0 ignored issues
show
Unused Code introduced by
Consider removing the unused private field TASKLIST.
Loading history...
37
    private static final String KILL = "taskkill /F /IM ";
0 ignored issues
show
Unused Code introduced by
Consider removing the unused private field KILL.
Loading history...
38
	private final String broker2 = "tcp://192.168.67.182:1883"; //150.146.65.143:1883"; //LUCA "tcp://150.146.65.68:1883";
0 ignored issues
show
Security introduced by
Hardcoding an IP address (192.168.67.182) into your code is a bad practice as well as a security problem. Consider reading the address from another source like a property file.

Cert has something to say on the matter.

Loading history...
Coding Style introduced by
This final field may as well be static. It helps the compiler optimize your code.
Loading history...
39
	
40
	public static final String TOPIC_ASK_HOUSE_CONFIG = "house-config-file";
41
    public static final String TOPIC_GET_HOUSE_CONFIG = "get-house-config-file";
42
43
    public static final String TOPIC_ASK_SENSOR_VALUE = "house/0x0184e8a0/sensor/";
44
    public static final String TOPIC_GET_SENSOR_VALUE = "/0x0184e8a0/sensor/";
45
    public static final String NO_UNIT = "NOUNIT";
46
    
47
    private static MQTTSensorNetworkManager _instance = null;
48
	private MqttClient client;
49
	private String config = null;
50
	
51
	private OWLKoalaObservationReasoner reasoner;
52
	
53
	
54
	/**
55
	 * 
56
	 * @param envConfigFile
57
	 * @param ontoFile
58
	 * @param envRules
59
	 * @param obsRules
60
	 */
61
    public MQTTSensorNetworkManager(String envConfigFile, String ontoFile, String envRules, String obsRules) 
62
    {
63
        super();
64
        
65
        // create environment reasoner
66
        EnvironmentReasoner environment = new OWLKoalaEnvironmentReasoner(ontoFile, envRules);
67
        environment.init(envConfigFile);
68
        
69
        // create observation reasoner and setup with the environment
70
        this.reasoner = new OWLKoalaObservationReasoner(ontoFile, obsRules);
71
        this.reasoner.init(environment);
72
    }
73
74
    /**
75
     * 
76
     * @return
77
     */
78
    public String getConfig() {
79
        return config;
80
    }
81
82
    /**
83
     * 
84
     * @param config
85
     */
86
    public void setConfig(String config) {
87
        this.config = config;
88
    }
89
	
90
	/**
91
	 * 
92
	 */
93
	public void connect() 
94
	{
95
        try 
96
        {
97
            InetAddress localIp = InetAddress.getLocalHost();
98
            System.out.println("IP of my system is := " + localIp.getHostAddress());
99
100
            System.out.println("MQTT connected");
101
            System.out.println("subscribing all");
102
            MemoryPersistence persistence = new MemoryPersistence();
0 ignored issues
show
Security Bug introduced by
Use try-with-resources or close this "MemoryPersistence" in a "finally" clause.

You may want to use try {} ... finally {} to close the resource or use the (relatively) new try-with-resources capability.

Loading history...
103
            try 
104
            {
105
//                client = new MqttClient(broker2, clientId, persistence);
106
            	client = new MqttClient("tcp://localhost:1883", clientId, persistence);
107
                MqttConnectOptions connOpts = new MqttConnectOptions();
108
                connOpts.setCleanSession(false);
109
//                connOpts.setKeepAliveInterval(Integer.MAX_VALUE);
110
                
111
                System.out.println("Connecting to broker: " + broker2);
112
                client.connect(connOpts);
113
                System.out.println("Connected");
114
                client.setCallback(_instance);
115
            } 
116
            catch (MqttException me) {
117
                System.out.println("reason " + me.getReasonCode());
118
                System.out.println("msg " + me.getMessage());
119
                System.out.println("loc " + me.getLocalizedMessage());
120
                System.out.println("cause " + me.getCause());
121
                System.out.println("excep " + me);
122
                me.printStackTrace();
0 ignored issues
show
Best Practice introduced by
Throwable.printStackTrace writes to the console which might not be available at runtime. Using a logger is preferred.
Loading history...
123
            }
124
125
        } catch (IOException ex) {
126
            Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
127
        } catch (Exception ex) {
128
            Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
129
        }
130
    }
131
132
	
133
//	public void disconnect() {
134
//        try {
135
//            if (mqtt != null && isProcessRunning("mosquitto.exe")) {
136
//                killProcess("mosquitto.exe");
137
//                System.out.println("MQTT disconnected");
138
//            }
139
//        } catch (Exception ex) {
140
//            Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
141
//        }
142
//    }
143
//	
144
//	public static boolean isProcessRunning(String serviceName) throws Exception {
145
//
146
//        Process p = Runtime.getRuntime().exec(TASKLIST);
147
//        BufferedReader reader = new BufferedReader(new InputStreamReader(
148
//                p.getInputStream()));
149
//        String line;
150
//        while ((line = reader.readLine()) != null) {
151
//
152
//            System.out.println(line);
153
//            if (line.contains(serviceName)) {
154
//                System.out.println("TROVATO !");
155
//                return true;
156
//            }
157
//        }
158
//
159
//        return false;
160
//
161
//    }
162
//	
163
//	/**
164
//	 * 
165
//	 * @param serviceName
166
//	 * @throws Exception
167
//	 */
168
//	public static void killProcess(String serviceName) throws Exception {
169
//        System.out.println("service name : " + serviceName);
170
//
171
//        Runtime.getRuntime().exec(KILL + serviceName);
172
//
173
//    }
174
	
175
	/**
176
	 * 
177
	 */
178
	public void connectionLost(Throwable thrwbl) {
179
		System.out.println("we have a problem!");
180
        thrwbl.printStackTrace();
0 ignored issues
show
Best Practice introduced by
Throwable.printStackTrace writes to the console which might not be available at runtime. Using a logger is preferred.
Loading history...
181
		
182
	}
183
184
	/**
185
	 * 
186
	 */
187
	public void deliveryComplete(IMqttDeliveryToken arg0) {
188
		// TODO Auto-generated method stub
189
		
190
	}
191
192
	/**
193
	 * 
194
	 */
195
	public void messageArrived(String topic, MqttMessage mm) 
196
	{
197
//		try 
198
//		{
199
	        System.out.println("MESSAGE ARRIVED: ");
200
	        System.out.println("\tTOPIC: " + topic);
201
	        String message = "";
202
	        message = new String(mm.getPayload());
203
	        
204
	        System.out.println("\tMESSAGE: " + message);
205
	        
206
	        if (topic.startsWith("/house/0x0184e8a0/sensorValue")) 
207
	        {
208
	            System.out.println("SENSOR REQUEST IS ARRIVED");
209
	            System.out.println("topic -> " + topic);
210
	            System.out.println("data -> " + message);
211
212
	            String[] split = message.split("/");
213
	            String sid = split[0];
214
	            String time = split[1];
0 ignored issues
show
Unused Code introduced by
Consider removing the unused local variable time.
Loading history...
Unused Code Code Smell introduced by
Remove this useless assignment to local variable "time".
Loading history...
215
	            String value = split[2];
216
	            
217
	            // sensor id 
218
	            String[] sidSplit = sid.split("-");
219
	            String sensorId = sidSplit[0];
220
	            // property label
221
	            String propLabel = sidSplit[1] + "-" + sidSplit[2];
222
	            
223
	            // read temperature
224
	            if (propLabel.equals("49-1")) 
225
	            {
226
	            	try
227
	            	{
228
	            		double data = Double.parseDouble(value);
229
	            		
230
	            		System.out.println("\nADDING OBSERVATION: SENSOR-DI= " + sensorId + ", OBSERVED-VALUE= " + value + ", PROPERTY= " + KoalaPropertyDictionary.KOALA_TEMPERATURE + "\n");
231
			            // add observation
232
			            this.reasoner.observation(
233
			            		sensorId, 
234
			            		new Long(Math.round(data)).toString(),
0 ignored issues
show
Performance introduced by
Boxing a primitive just to be able to call toString() on it is inefficient. Consider using the static method Long.toString() instead.
Loading history...
Best Practice introduced by
Instantiating a temporary object just to perform a string conversion is inefficient. Consider using Long.toString(...) instead.
Loading history...
introduced by
Remove this "Long" constructor
Loading history...
235
			            		KoalaPropertyDictionary.KOALA_TEMPERATURE.getUri());
236
	            	}
237
	            	catch(Exception ex) {
238
	            		System.err.println(ex.getMessage());
239
	            	}
240
	            }
241
	            
242
	            /*
243
	             * TODO 
244
	             */
245
	            
246
	        }
247
	        
248
	        
249
	        
250
//        } //Sharing sensor informations
251
//        catch (MqttException ex) {
252
//            Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
253
//        }
254
	}	
255
	
256
	/**
257
	 * 
258
	 * @param topic
259
	 * @param message
260
	 * @throws MqttException
261
	 */
262
	public void publish(String topic, String message) throws MqttException {
263
        System.out.println("sending message:");
264
        System.out.println("\tTopic: " + topic);
265
        System.out.println("\tMessage: " + message);
266
        System.out.println("sample client is null ? "+(client == null));
267
        System.out.println("----------------------------------------------");
268
        client.publish(topic, new MqttMessage(message.getBytes()));
269
    }
270
	
271
	
272
    public void startBroker() 
273
    {
274
        BrokerService broker = new BrokerService();
275
        try 
276
        {
277
            broker.addConnector(UriBuilder.fromUri("mqtt://192.168.67.182:1883").build()); //150.146.65.143:1883").build());//("mqtt://150.146.65.68:1883").build());
0 ignored issues
show
Security introduced by
Hardcoding an IP address (192.168.67.182) into your code is a bad practice as well as a security problem. Consider reading the address from another source like a property file.

Cert has something to say on the matter.

Loading history...
278
            broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
279
                public Broker installPlugin(Broker broker) throws Exception {
280
                    return new MyBroker(broker);
281
                }
282
            }});
283
            broker.start();
284
            System.out.println("broker started [OK]");
285
        } 
286
        catch (Exception ex) {
287
            ex.printStackTrace();
0 ignored issues
show
Best Practice introduced by
Throwable.printStackTrace writes to the console which might not be available at runtime. Using a logger is preferred.
Loading history...
288
        }
289
    }
290
    
291
    
292
    public void subscribe(String topic) {
293
        try {
294
            System.out.println("[MQTT][SUBSCRIBE] topic: " + topic);
295
            client.subscribe(topic);
296
297
        } catch (MqttException ex) {
298
            Logger.getLogger(getClass()
299
                    .getName()).log(Level.SEVERE, null, ex);
300
        }
301
    }
302
    
303
    /**
304
     * 
305
     * @param args
306
     */
307
    public static void main(String[] args) {
308
		
309
		MQTTSensorNetworkManager manager = new MQTTSensorNetworkManager(
310
				"etc/environment/house_config.xml",
311
				"etc/ontology/koala_v1.0.owl",
312
				"etc/ontology/feature_extraction_v1.0.rules",
313
				"etc/ontology/situation_detection_v1.0.rules");
314
		
315
		manager.startBroker();
316
		manager.connect();
317
		manager.subscribe("/house/0x0184e8a0/sensorValue/#");
318
		
319
	}
320
}
321