pstlab /
KOaLaR
| 1 | package it.cnr.istc.pst.cognition.koala.network.mqtt; |
||
| 2 | |||
| 3 | import java.io.IOException; |
||
| 4 | import java.net.InetAddress; |
||
| 5 | import java.util.Date; |
||
| 6 | import java.util.logging.Level; |
||
| 7 | import java.util.logging.Logger; |
||
| 8 | |||
| 9 | import javax.ws.rs.core.UriBuilder; |
||
| 10 | |||
| 11 | import org.apache.activemq.broker.Broker; |
||
| 12 | import org.apache.activemq.broker.BrokerPlugin; |
||
| 13 | import org.apache.activemq.broker.BrokerService; |
||
| 14 | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
||
| 15 | import org.eclipse.paho.client.mqttv3.MqttCallback; |
||
| 16 | import org.eclipse.paho.client.mqttv3.MqttClient; |
||
| 17 | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
||
| 18 | import org.eclipse.paho.client.mqttv3.MqttException; |
||
| 19 | import org.eclipse.paho.client.mqttv3.MqttMessage; |
||
| 20 | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
||
| 21 | |||
| 22 | import it.cnr.istc.pst.cognition.koala.lang.dictionary.KoalaPropertyDictionary; |
||
| 23 | import it.cnr.istc.pst.cognition.koala.reasoner.environment.EnvironmentReasoner; |
||
| 24 | import it.cnr.istc.pst.cognition.koala.reasoner.owl.jena.OWLKoalaEnvironmentReasoner; |
||
| 25 | import it.cnr.istc.pst.cognition.koala.reasoner.owl.jena.OWLKoalaObservationReasoner; |
||
| 26 | |||
| 27 | /** |
||
| 28 | * |
||
| 29 | * @author anacleto |
||
| 30 | * |
||
| 31 | */ |
||
| 32 | public class MQTTSensorNetworkManager implements MqttCallback |
||
| 33 | { |
||
| 34 | private Process mqtt = null; |
||
| 35 | private String clientId = ""+new Date().getTime(); |
||
|
0 ignored issues
–
show
Performance
introduced
by
Loading history...
|
|||
| 36 | private static final String TASKLIST = "tasklist"; |
||
|
0 ignored issues
–
show
|
|||
| 37 | private static final String KILL = "taskkill /F /IM "; |
||
|
0 ignored issues
–
show
|
|||
| 38 | private final String broker2 = "tcp://192.168.67.182:1883"; //150.146.65.143:1883"; //LUCA "tcp://150.146.65.68:1883"; |
||
|
0 ignored issues
–
show
Hardcoding an IP address (192.168.67.182) into your code is a bad practice as well as a security problem. Consider reading the address from another source like a property file.
Loading history...
|
|||
| 39 | |||
| 40 | public static final String TOPIC_ASK_HOUSE_CONFIG = "house-config-file"; |
||
| 41 | public static final String TOPIC_GET_HOUSE_CONFIG = "get-house-config-file"; |
||
| 42 | |||
| 43 | public static final String TOPIC_ASK_SENSOR_VALUE = "house/0x0184e8a0/sensor/"; |
||
| 44 | public static final String TOPIC_GET_SENSOR_VALUE = "/0x0184e8a0/sensor/"; |
||
| 45 | public static final String NO_UNIT = "NOUNIT"; |
||
| 46 | |||
| 47 | private static MQTTSensorNetworkManager _instance = null; |
||
| 48 | private MqttClient client; |
||
| 49 | private String config = null; |
||
| 50 | |||
| 51 | private OWLKoalaObservationReasoner reasoner; |
||
| 52 | |||
| 53 | |||
| 54 | /** |
||
| 55 | * |
||
| 56 | * @param envConfigFile |
||
| 57 | * @param ontoFile |
||
| 58 | * @param envRules |
||
| 59 | * @param obsRules |
||
| 60 | */ |
||
| 61 | public MQTTSensorNetworkManager(String envConfigFile, String ontoFile, String envRules, String obsRules) |
||
| 62 | { |
||
| 63 | super(); |
||
| 64 | |||
| 65 | // create environment reasoner |
||
| 66 | EnvironmentReasoner environment = new OWLKoalaEnvironmentReasoner(ontoFile, envRules); |
||
| 67 | environment.init(envConfigFile); |
||
| 68 | |||
| 69 | // create observation reasoner and setup with the environment |
||
| 70 | this.reasoner = new OWLKoalaObservationReasoner(ontoFile, obsRules); |
||
| 71 | this.reasoner.init(environment); |
||
| 72 | } |
||
| 73 | |||
| 74 | /** |
||
| 75 | * |
||
| 76 | * @return |
||
| 77 | */ |
||
| 78 | public String getConfig() { |
||
| 79 | return config; |
||
| 80 | } |
||
| 81 | |||
| 82 | /** |
||
| 83 | * |
||
| 84 | * @param config |
||
| 85 | */ |
||
| 86 | public void setConfig(String config) { |
||
| 87 | this.config = config; |
||
| 88 | } |
||
| 89 | |||
| 90 | /** |
||
| 91 | * |
||
| 92 | */ |
||
| 93 | public void connect() |
||
| 94 | { |
||
| 95 | try |
||
| 96 | { |
||
| 97 | InetAddress localIp = InetAddress.getLocalHost(); |
||
| 98 | System.out.println("IP of my system is := " + localIp.getHostAddress()); |
||
| 99 | |||
| 100 | System.out.println("MQTT connected"); |
||
| 101 | System.out.println("subscribing all"); |
||
| 102 | MemoryPersistence persistence = new MemoryPersistence(); |
||
|
0 ignored issues
–
show
Use try-with-resources or close this "MemoryPersistence" in a "finally" clause.
You may want to use Loading history...
|
|||
| 103 | try |
||
| 104 | { |
||
| 105 | // client = new MqttClient(broker2, clientId, persistence); |
||
| 106 | client = new MqttClient("tcp://localhost:1883", clientId, persistence); |
||
| 107 | MqttConnectOptions connOpts = new MqttConnectOptions(); |
||
| 108 | connOpts.setCleanSession(false); |
||
| 109 | // connOpts.setKeepAliveInterval(Integer.MAX_VALUE); |
||
| 110 | |||
| 111 | System.out.println("Connecting to broker: " + broker2); |
||
| 112 | client.connect(connOpts); |
||
| 113 | System.out.println("Connected"); |
||
| 114 | client.setCallback(_instance); |
||
| 115 | } |
||
| 116 | catch (MqttException me) { |
||
| 117 | System.out.println("reason " + me.getReasonCode()); |
||
| 118 | System.out.println("msg " + me.getMessage()); |
||
| 119 | System.out.println("loc " + me.getLocalizedMessage()); |
||
| 120 | System.out.println("cause " + me.getCause()); |
||
| 121 | System.out.println("excep " + me); |
||
| 122 | me.printStackTrace(); |
||
|
0 ignored issues
–
show
|
|||
| 123 | } |
||
| 124 | |||
| 125 | } catch (IOException ex) { |
||
| 126 | Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex); |
||
| 127 | } catch (Exception ex) { |
||
| 128 | Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex); |
||
| 129 | } |
||
| 130 | } |
||
| 131 | |||
| 132 | |||
| 133 | // public void disconnect() { |
||
| 134 | // try { |
||
| 135 | // if (mqtt != null && isProcessRunning("mosquitto.exe")) { |
||
| 136 | // killProcess("mosquitto.exe"); |
||
| 137 | // System.out.println("MQTT disconnected"); |
||
| 138 | // } |
||
| 139 | // } catch (Exception ex) { |
||
| 140 | // Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex); |
||
| 141 | // } |
||
| 142 | // } |
||
| 143 | // |
||
| 144 | // public static boolean isProcessRunning(String serviceName) throws Exception { |
||
| 145 | // |
||
| 146 | // Process p = Runtime.getRuntime().exec(TASKLIST); |
||
| 147 | // BufferedReader reader = new BufferedReader(new InputStreamReader( |
||
| 148 | // p.getInputStream())); |
||
| 149 | // String line; |
||
| 150 | // while ((line = reader.readLine()) != null) { |
||
| 151 | // |
||
| 152 | // System.out.println(line); |
||
| 153 | // if (line.contains(serviceName)) { |
||
| 154 | // System.out.println("TROVATO !"); |
||
| 155 | // return true; |
||
| 156 | // } |
||
| 157 | // } |
||
| 158 | // |
||
| 159 | // return false; |
||
| 160 | // |
||
| 161 | // } |
||
| 162 | // |
||
| 163 | // /** |
||
| 164 | // * |
||
| 165 | // * @param serviceName |
||
| 166 | // * @throws Exception |
||
| 167 | // */ |
||
| 168 | // public static void killProcess(String serviceName) throws Exception { |
||
| 169 | // System.out.println("service name : " + serviceName); |
||
| 170 | // |
||
| 171 | // Runtime.getRuntime().exec(KILL + serviceName); |
||
| 172 | // |
||
| 173 | // } |
||
| 174 | |||
| 175 | /** |
||
| 176 | * |
||
| 177 | */ |
||
| 178 | public void connectionLost(Throwable thrwbl) { |
||
| 179 | System.out.println("we have a problem!"); |
||
| 180 | thrwbl.printStackTrace(); |
||
|
0 ignored issues
–
show
|
|||
| 181 | |||
| 182 | } |
||
| 183 | |||
| 184 | /** |
||
| 185 | * |
||
| 186 | */ |
||
| 187 | public void deliveryComplete(IMqttDeliveryToken arg0) { |
||
| 188 | // TODO Auto-generated method stub |
||
| 189 | |||
| 190 | } |
||
| 191 | |||
| 192 | /** |
||
| 193 | * |
||
| 194 | */ |
||
| 195 | public void messageArrived(String topic, MqttMessage mm) |
||
| 196 | { |
||
| 197 | // try |
||
| 198 | // { |
||
| 199 | System.out.println("MESSAGE ARRIVED: "); |
||
| 200 | System.out.println("\tTOPIC: " + topic); |
||
| 201 | String message = ""; |
||
| 202 | message = new String(mm.getPayload()); |
||
| 203 | |||
| 204 | System.out.println("\tMESSAGE: " + message); |
||
| 205 | |||
| 206 | if (topic.startsWith("/house/0x0184e8a0/sensorValue")) |
||
| 207 | { |
||
| 208 | System.out.println("SENSOR REQUEST IS ARRIVED"); |
||
| 209 | System.out.println("topic -> " + topic); |
||
| 210 | System.out.println("data -> " + message); |
||
| 211 | |||
| 212 | String[] split = message.split("/"); |
||
| 213 | String sid = split[0]; |
||
| 214 | String time = split[1]; |
||
|
0 ignored issues
–
show
|
|||
| 215 | String value = split[2]; |
||
| 216 | |||
| 217 | // sensor id |
||
| 218 | String[] sidSplit = sid.split("-"); |
||
| 219 | String sensorId = sidSplit[0]; |
||
| 220 | // property label |
||
| 221 | String propLabel = sidSplit[1] + "-" + sidSplit[2]; |
||
| 222 | |||
| 223 | // read temperature |
||
| 224 | if (propLabel.equals("49-1")) |
||
| 225 | { |
||
| 226 | try |
||
| 227 | { |
||
| 228 | double data = Double.parseDouble(value); |
||
| 229 | |||
| 230 | System.out.println("\nADDING OBSERVATION: SENSOR-DI= " + sensorId + ", OBSERVED-VALUE= " + value + ", PROPERTY= " + KoalaPropertyDictionary.KOALA_TEMPERATURE + "\n"); |
||
| 231 | // add observation |
||
| 232 | this.reasoner.observation( |
||
| 233 | sensorId, |
||
| 234 | new Long(Math.round(data)).toString(), |
||
|
0 ignored issues
–
show
|
|||
| 235 | KoalaPropertyDictionary.KOALA_TEMPERATURE.getUri()); |
||
| 236 | } |
||
| 237 | catch(Exception ex) { |
||
| 238 | System.err.println(ex.getMessage()); |
||
| 239 | } |
||
| 240 | } |
||
| 241 | |||
| 242 | /* |
||
| 243 | * TODO |
||
| 244 | */ |
||
| 245 | |||
| 246 | } |
||
| 247 | |||
| 248 | |||
| 249 | |||
| 250 | // } //Sharing sensor informations |
||
| 251 | // catch (MqttException ex) { |
||
| 252 | // Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex); |
||
| 253 | // } |
||
| 254 | } |
||
| 255 | |||
| 256 | /** |
||
| 257 | * |
||
| 258 | * @param topic |
||
| 259 | * @param message |
||
| 260 | * @throws MqttException |
||
| 261 | */ |
||
| 262 | public void publish(String topic, String message) throws MqttException { |
||
| 263 | System.out.println("sending message:"); |
||
| 264 | System.out.println("\tTopic: " + topic); |
||
| 265 | System.out.println("\tMessage: " + message); |
||
| 266 | System.out.println("sample client is null ? "+(client == null)); |
||
| 267 | System.out.println("----------------------------------------------"); |
||
| 268 | client.publish(topic, new MqttMessage(message.getBytes())); |
||
| 269 | } |
||
| 270 | |||
| 271 | |||
| 272 | public void startBroker() |
||
| 273 | { |
||
| 274 | BrokerService broker = new BrokerService(); |
||
| 275 | try |
||
| 276 | { |
||
| 277 | broker.addConnector(UriBuilder.fromUri("mqtt://192.168.67.182:1883").build()); //150.146.65.143:1883").build());//("mqtt://150.146.65.68:1883").build()); |
||
|
0 ignored issues
–
show
Hardcoding an IP address (192.168.67.182) into your code is a bad practice as well as a security problem. Consider reading the address from another source like a property file.
Loading history...
|
|||
| 278 | broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() { |
||
| 279 | public Broker installPlugin(Broker broker) throws Exception { |
||
| 280 | return new MyBroker(broker); |
||
| 281 | } |
||
| 282 | }}); |
||
| 283 | broker.start(); |
||
| 284 | System.out.println("broker started [OK]"); |
||
| 285 | } |
||
| 286 | catch (Exception ex) { |
||
| 287 | ex.printStackTrace(); |
||
|
0 ignored issues
–
show
|
|||
| 288 | } |
||
| 289 | } |
||
| 290 | |||
| 291 | |||
| 292 | public void subscribe(String topic) { |
||
| 293 | try { |
||
| 294 | System.out.println("[MQTT][SUBSCRIBE] topic: " + topic); |
||
| 295 | client.subscribe(topic); |
||
| 296 | |||
| 297 | } catch (MqttException ex) { |
||
| 298 | Logger.getLogger(getClass() |
||
| 299 | .getName()).log(Level.SEVERE, null, ex); |
||
| 300 | } |
||
| 301 | } |
||
| 302 | |||
| 303 | /** |
||
| 304 | * |
||
| 305 | * @param args |
||
| 306 | */ |
||
| 307 | public static void main(String[] args) { |
||
| 308 | |||
| 309 | MQTTSensorNetworkManager manager = new MQTTSensorNetworkManager( |
||
| 310 | "etc/environment/house_config.xml", |
||
| 311 | "etc/ontology/koala_v1.0.owl", |
||
| 312 | "etc/ontology/feature_extraction_v1.0.rules", |
||
| 313 | "etc/ontology/situation_detection_v1.0.rules"); |
||
| 314 | |||
| 315 | manager.startBroker(); |
||
| 316 | manager.connect(); |
||
| 317 | manager.subscribe("/house/0x0184e8a0/sensorValue/#"); |
||
| 318 | |||
| 319 | } |
||
| 320 | } |
||
| 321 |