@@ 12,22 12,25 @@
*/
package org.openhab.binding.lutronmqtt.handler;
-import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
+import jersey.repackaged.com.google.common.collect.ImmutableList;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.smarthome.config.core.Configuration;
import org.eclipse.smarthome.core.thing.*;
import org.eclipse.smarthome.core.thing.binding.BaseBridgeHandler;
import org.eclipse.smarthome.core.types.Command;
+import org.eclipse.smarthome.io.transport.mqtt.*;
+import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.openhab.binding.lutronmqtt.internal.LutronMQTTConfiguration;
import org.openhab.binding.lutronmqtt.model.LutronDevice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.*;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ 41,8 44,7 @@ import static org.openhab.binding.lutron
*
* @author William Welliver - Initial contribution
*/
-public class LutronMQTTHubHandler extends BaseBridgeHandler implements MqttCallback {
-
+public class LutronMQTTHubHandler extends BaseBridgeHandler implements MqttMessageSubscriber, MqttConnectionObserver {
private final Logger logger = LoggerFactory.getLogger(LutronMQTTHubHandler.class);
private Collection<DeviceStatusListener> deviceStatusListeners = new HashSet<>();
@@ 52,15 54,15 @@ public class LutronMQTTHubHandler extend
@Nullable
private LutronMQTTConfiguration config;
private String token;
- private MqttClient mqttClient;
+ private MqttBrokerConnection mqttClient;
private Gson gson = new Gson();
private Map<Integer, LutronDevice> devicesByLinkAddress = new HashMap<>();
- private ScheduledFuture<?> reconnectJob;
private ScheduledFuture<?> onlineTimeout;
private ScheduledFuture<?> allItemsJob;
public LutronMQTTHubHandler(Bridge thing) {
super(thing);
+ logger.warn("LutronMQTTHubHandler create.");
}
@Override
@@ 70,72 72,86 @@ public class LutronMQTTHubHandler extend
Map<String, String> properties = getThing().getProperties();
token = (String) config.get(CONFIG_TOKEN);
- final String uuid = properties.get(PROPERTY_UUID);
final String broker = properties.get(PROPERTY_URL);
- updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
- MemoryPersistence persistence = new MemoryPersistence();
String clientId = "openhab-lutron-mqtt-" + (System.currentTimeMillis()/1000);
- try {
- if(mqttClient == null || !mqttClient.isConnected()) {
- logger.info("Attempting to connect to MQTT Broker.");
- MqttClient client = new MqttClient(broker, clientId, persistence);
- MqttConnectOptions connOpts = new MqttConnectOptions();
-// connOpts.setCleanSession(true);
-// connOpts.setKeepAliveInterval(30);
-// connOpts.setConnectionTimeout(90);
- client.connect(connOpts);
- mqttClient = client;
- setupSubscriptions();
+ if(mqttClient == null || mqttClient.connectionState() == MqttConnectionState.DISCONNECTED) {
+ logger.warn("Attempting to connect to MQTT Broker.");
+ URI brokerUri = null;
+ try {
+ brokerUri = new URI(broker);
+ } catch (URISyntaxException e) {
+ logger.error("Lutron-MQTT broker url was invalid: " + broker);
+ goOffline(ThingStatusDetail.CONFIGURATION_ERROR, "Lutron-MQTT broker url was invalid: " + broker);
+ return;
+ }
+
+ MqttBrokerConnection brokerConnection = new MqttBrokerConnection(brokerUri.getHost(), brokerUri.getPort(), false, clientId);
+ brokerConnection.addConnectionObserver(this);
+ brokerConnection.setReconnectStrategy(new PeriodicReconnectStrategy());
+ updateStatus(ThingStatus.OFFLINE, "Preparing to connect to " + brokerUri.toASCIIString());
+
+ Boolean bool = null;
+ try {
+ mqttClient = brokerConnection;
+ bool = brokerConnection.start().get();
+ } catch (InterruptedException|ExecutionException e) {
+ logger.warn("MQTT startup failed.", e);
+ }
+ logger.warn("MQTT startup returned " + bool);
}
- } catch (MqttException e) {
- logger.error("Unable to connect to MQTT broker at " + broker, e);
- goOffline(ThingStatusDetail.COMMUNICATION_ERROR, "Unable to connect to MQTT broker at broker");
- }
+
+ // logger.error("Unable to connect to MQTT broker at " + broker, e);
+ // goOffline(ThingStatusDetail.COMMUNICATION_ERROR, "Unable to connect to MQTT broker at broker");
+
}
-
+ protected void updateStatus(ThingStatus status, String statusDetail) {
+ this.updateStatus(status, ThingStatusDetail.NONE, (String)statusDetail);
+ }
@Override
public void dispose() {
logger.info("Disposing of handler " + this);
super.dispose();
- try {
- MqttClient cl = mqttClient;
- cl.setCallback(null);
- cl.unsubscribe(new String[] {"lutron/status", "lutron/events", "lutron/remote"});
- cl.disconnectForcibly(3000);
- cl.close();
- mqttClient = null;
+
+ mqttClient.stop();
cancelJobs();
- } catch (MqttException e) {
- logger.warn("Error while disconnecting", e);
- }
+ mqttClient = null;
}
private void cancelJobs() {
- if(reconnectJob != null && !reconnectJob.isCancelled())
- reconnectJob.cancel(true);
if(allItemsJob != null && !allItemsJob.isCancelled())
allItemsJob.cancel(true);
- if(onlineTimeout != null && !allItemsJob.isCancelled())
+ if(onlineTimeout != null && !onlineTimeout.isCancelled())
onlineTimeout.cancel(true);
}
private void setupSubscriptions() {
- mqttClient.setCallback(this);
try {
- mqttClient.subscribe(new String[] {"lutron/status", "lutron/events", "lutron/remote"});
- } catch (MqttException e) {
- logger.error("subscribe failed.", e);
- goOffline(ThingStatusDetail.COMMUNICATION_ERROR, "Unable to subscribe to events");
+ mqttClient.unsubscribeAll().get();
+ } catch (InterruptedException|ExecutionException e) {
+ logger.warn("An error occurred while unsubscribing.", e);
}
-
+ logger.info("Subscribing.");
+ for(String topic : new String[] {"lutron/status", "lutron/events", "lutron/remote"}) {
+ try {
+ logger.info("subscribing to " + topic);
+ if(!mqttClient.subscribe(topic, this).get()) {
+ logger.error("subscribe failed.");
+ goOffline(ThingStatusDetail.COMMUNICATION_ERROR, "Unable to subscribe to events");
+ }
+ } catch (InterruptedException|ExecutionException e) {
+ logger.warn("An error occurred while subscribing.", e);
+ }
+ }
+ logger.info("Subscribed.");
cancelJobs();
+ logger.info("Scheduling item request.");
allItemsJob = scheduler.schedule(new Runnable() {
@Override
public void run() {
@@ 145,13 161,18 @@ public class LutronMQTTHubHandler extend
}
private void requestAllItems() {
+ logger.warn("Requesting all items");
+ byte[] b = new byte[0];
try {
- byte[] b = ("{\"cmd\": \"GetDevices\", \"args\": {}}").getBytes("UTF-8");
- mqttClient.publish("lutron/commands", (b), 0, false);
- } catch (MqttException e) {
- logger.warn("Error Sending message.", e);
+ b = ("{\"cmd\": \"GetDevices\", \"args\": {}}").getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
- logger.warn("Error encoding message.", e);
+ logger.warn("Unable to encode JSON.", e);
+ }
+ try {
+ Boolean res = mqttClient.publish("lutron/commands", (b), 0, false).get();
+ logger.warn("Publish returned " + res);
+ } catch(Exception e) {
+ logger.warn("Exception while publishing.", e);
}
allItemsJob = scheduler.schedule(new Runnable() {
@@ 190,44 211,8 @@ public class LutronMQTTHubHandler extend
}
@Override
- public void connectionLost(Throwable throwable) {
- goOffline(ThingStatusDetail.BRIDGE_OFFLINE, throwable.getMessage());
- logger.warn("Lost connection to MQTT server. Will rety in 20 seconds.", throwable);
- if(mqttClient != null) scheduleReconnect(20);
- }
-
- private void scheduleReconnect(int seconds) {
- if(reconnectJob == null || reconnectJob.isDone()) {
- reconnectJob = scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- reconnect();
- }
- }, seconds, TimeUnit.SECONDS);
- }
- }
-
- private void reconnect() {
- if(mqttClient == null) {
- logger.info("Skipping reconnect because MQTT client is null.");
- return;
- }
- logger.info("Attempting to reconnect to MQTT server");
- initialize();
- ThingStatusInfo info = getThing().getStatusInfo();
- if(info.getStatus() != ThingStatus.ONLINE &&
- (info.getStatusDetail() == ThingStatusDetail.BRIDGE_OFFLINE ||
- info.getStatusDetail() == ThingStatusDetail.COMMUNICATION_ERROR
- )) // we're not online but have a retryable status
- scheduleReconnect(20);
- else {
- reconnectJob = null;
- }
- }
-
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- logger.debug("messageArrived: " + s + " " + mqttMessage.toString());
+ public void processMessage(String s, byte[] mqttMessage) {
+ logger.warn("messageArrived: " + s + " " + mqttMessage.toString());
switch(s) {
case "lutron/status":
handleStatusMessage(mqttMessage);
@@ 241,9 226,9 @@ public class LutronMQTTHubHandler extend
}
}
- private void handleGatewayEvent(MqttMessage mqttMessage) {
+ private void handleGatewayEvent(byte[] mqttMessage) {
//logger.info("gateway event: " + new String(mqttMessage.getPayload()));
- Map <String,Object> msg = gson.fromJson(new String(mqttMessage.getPayload()), HashMap.class);
+ Map <String,Object> msg = gson.fromJson(new String(mqttMessage), HashMap.class);
if(msg.containsKey("cmd")) {
String key = (String)msg.get("cmd");
@@ 297,13 282,8 @@ public class LutronMQTTHubHandler extend
}
protected void requestUpdateForDevice(int linkAddress) {
- try {
byte[] b = ("{\"cmd\":\"RuntimePropertyQuery\", \"args\":{\"Params\":[[" + linkAddress + ",15,[1]]]}}").getBytes();
- MqttMessage message = new MqttMessage(b);
- mqttClient.publish("lutron/commands", message);
- } catch (MqttException e) {
- logger.error("Error publishing message", e);
- }
+ mqttClient.publish("lutron/commands", b);
}
protected void informDeviceListeners(LutronDevice device) {
@@ 318,17 298,16 @@ public class LutronMQTTHubHandler extend
return devicesByLinkAddress.get(linkAddress);
}
- private void handleRemoteEvent(MqttMessage mqttMessage) {
- logger.info("remote event: " + new String(mqttMessage.getPayload()));
+ private void handleRemoteEvent(byte[] mqttMessage) {
+ logger.info("remote event: " + new String(mqttMessage));
- Map <String,Object> msg = gson.fromJson(new String(mqttMessage.getPayload()), HashMap.class);
+ Map <String,Object> msg = gson.fromJson(new String(mqttMessage), HashMap.class);
// {"serial" : "C726CA", "action": "down", "button": "select"}
-
}
- private void handleStatusMessage(MqttMessage mqttMessage) {
- logger.info("status message: " + new String(mqttMessage.getPayload()));
- Map <String,Object> msg = gson.fromJson(new String(mqttMessage.getPayload()), HashMap.class);
+ private void handleStatusMessage(byte[] mqttMessage) {
+ logger.info("status message: " + new String(mqttMessage));
+ Map <String,Object> msg = gson.fromJson(new String(mqttMessage), HashMap.class);
if(msg.containsKey("state") && "running".equals(msg.get("state"))) {
if(onlineTimeout != null)
onlineTimeout.cancel(true);
@@ 344,21 323,54 @@ public class LutronMQTTHubHandler extend
protected void onlineTimeoutOccurred() {
goOffline(ThingStatusDetail.BRIDGE_OFFLINE, "Too long between status announcements.");
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- // We really don't care too much about this status.
+ try {
+ mqttClient.stop().get();
+ Thread.sleep(5000);
+ initialize();
+ } catch(Exception e) {
+ logger.warn("An error occurred while restarting the service.", e);
+ }
}
public void setDesiredState(int deviceId, Map<String, Object> lightState) {
+ byte[] bytes = new byte[0];
try {
- byte[] bytes = (gson.toJson(lightState)).getBytes("UTF-8");
- // MqttMessage message = new MqttMessage(bytes);
- mqttClient.publish("lutron/commands", bytes, 0, false);
- logger.info("Sent message.");
- } catch (MqttException|UnsupportedEncodingException e) {
- e.printStackTrace();
+ bytes = (gson.toJson(lightState)).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ logger.warn("Error encoding JSON.", e);
+ }
+ // MqttMessage message = new MqttMessage(bytes);
+ Boolean res = null;
+ try {
+ res = mqttClient.publish("lutron/commands", bytes, 0, false).get();
+ if(res)
+ logger.info("Sent message.");
+ else logger.warn("Failed to send message: " + new String(bytes));
+ } catch (InterruptedException|ExecutionException e) {
+ logger.warn("Error sending message.", e);
+ }
+ }
+
+ @Override
+ public void connectionStateChanged(MqttConnectionState mqttConnectionState, @Nullable Throwable throwable) {
+ if(mqttConnectionState == MqttConnectionState.CONNECTED) {
+ logger.info("MQTT connection state changed to CONNECTED.");
+ goOnline();
+ logger.info("Online");
+ scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ setupSubscriptions();
+ logger.info("Subscribed.");
+
+ }
+ },1, TimeUnit.SECONDS);
+ } else if (mqttConnectionState == MqttConnectionState.CONNECTING) {
+ goOffline(ThingStatusDetail.BRIDGE_OFFLINE, "MQTT Reconnecting");
+ } else if (mqttConnectionState == MqttConnectionState.DISCONNECTED) {
+ goOffline(ThingStatusDetail.BRIDGE_OFFLINE, "MQTT Disconnected");
+ logger.warn("Lost connection to MQTT server.");
+ cancelJobs();
}
}
}