Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/optical/pom.xml b/apps/optical/pom.xml
new file mode 100644
index 0000000..0264f8a
--- /dev/null
+++ b/apps/optical/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onlab.onos</groupId>
+        <artifactId>onos-apps</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>onos-app-optical</artifactId>
+    <packaging>bundle</packaging>
+
+    <description>ONOS application for packet/optical deployments</description>
+
+    <dependencies>
+        
+        <dependency>
+            <groupId>org.onlab.onos</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+        </dependency>
+        
+      <dependency>
+	<groupId>org.codehaus.jackson</groupId>
+	<artifactId>jackson-core-asl</artifactId>
+      </dependency>
+      <dependency>
+	<groupId>org.codehaus.jackson</groupId>
+	<artifactId>jackson-mapper-asl</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-annotations</artifactId>
+        <scope>provided</scope>
+      </dependency>
+
+    </dependencies> 
+    
+</project>
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalConfigProvider.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalConfigProvider.java
new file mode 100644
index 0000000..86c1c0b
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalConfigProvider.java
@@ -0,0 +1,338 @@
+package org.onlab.onos.optical.cfg;
+
+import static org.onlab.onos.net.DeviceId.deviceId;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceProvider;
+import org.onlab.onos.net.device.DeviceProviderRegistry;
+import org.onlab.onos.net.device.DeviceProviderService;
+import org.onlab.onos.net.link.DefaultLinkDescription;
+import org.onlab.onos.net.link.LinkProvider;
+import org.onlab.onos.net.link.LinkProviderRegistry;
+import org.onlab.onos.net.link.LinkProviderService;
+import org.onlab.onos.net.provider.AbstractProvider;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.ChassisId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OpticalConfigProvider emulates the SB network provider for optical switches,
+ * optical links and any other state that needs to be configured for correct network
+ * operations.
+ *
+ */
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Component(immediate = true)
+public class OpticalConfigProvider extends AbstractProvider implements DeviceProvider, LinkProvider {
+
+    protected static final Logger log = LoggerFactory
+            .getLogger(OpticalConfigProvider.class);
+
+    // TODO: fix hard coded file path later.
+    private static final String DEFAULT_CONFIG_FILE =
+            "/opt/onos/config/demo-3-roadm-2-ps.json";
+    private String configFileName = DEFAULT_CONFIG_FILE;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkProviderRegistry linkProviderRegistry;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceProviderRegistry deviceProviderRegistry;
+
+    private static final String OPTICAL_ANNOTATION = "optical.";
+
+    private LinkProviderService linkProviderService;
+    private DeviceProviderService deviceProviderService;
+
+    private static final List<Roadm> RAW_ROADMS = new ArrayList<>();
+    private static final List<WdmLink> RAW_WDMLINKS = new ArrayList<>();
+    private static final List<PktOptLink> RAW_PKTOPTLINKS = new ArrayList<>();
+
+    private static final String ROADM = "Roadm";
+    private static final String WDM_LINK = "wdmLink";
+    private static final String PKT_OPT_LINK = "pktOptLink";
+
+    protected OpticalNetworkConfig opticalNetworkConfig;
+
+    public OpticalConfigProvider() {
+        super(new ProviderId("of", "org.onlab.onos.provider.opticalConfig", true));
+    }
+
+    @Activate
+    protected void activate() {
+        linkProviderService = linkProviderRegistry.register(this);
+        deviceProviderService = deviceProviderRegistry.register(this);
+        log.info("Starting optical network configuration process...");
+        log.info("Optical config file set to {}", configFileName);
+
+        loadOpticalConfig();
+        parseOpticalConfig();
+        publishOpticalConfig();
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        linkProviderRegistry.unregister(this);
+        linkProviderService = null;
+        deviceProviderRegistry.unregister(this);
+        deviceProviderService = null;
+        RAW_ROADMS.clear();
+        RAW_WDMLINKS.clear();
+        RAW_PKTOPTLINKS.clear();
+        log.info("Stopped");
+    }
+
+    private void loadOpticalConfig() {
+        ObjectMapper mapper = new ObjectMapper();
+        opticalNetworkConfig = new OpticalNetworkConfig();
+        try {
+            opticalNetworkConfig = mapper.readValue(new File(configFileName), OpticalNetworkConfig.class);
+        } catch (JsonParseException e) {
+            String err = String.format("JsonParseException while loading network "
+                    + "config from file: %s: %s", configFileName, e.getMessage());
+            log.error(err, e);
+        } catch (JsonMappingException e) {
+            String err = String.format(
+                    "JsonMappingException while loading network config "
+                            + "from file: %s: %s", configFileName, e.getMessage());
+            log.error(err, e);
+        } catch (IOException e) {
+            String err = String.format("IOException while loading network config "
+                    + "from file: %s %s", configFileName, e.getMessage());
+            log.error(err, e);
+        }
+    }
+
+    private void parseOpticalConfig() {
+        List<OpticalSwitchDescription> swList = opticalNetworkConfig.getOpticalSwitches();
+        List<OpticalLinkDescription> lkList = opticalNetworkConfig.getOpticalLinks();
+
+        for (OpticalSwitchDescription sw : swList) {
+            String swtype = sw.getType();
+            boolean allow = sw.isAllowed();
+           if (swtype.equals(ROADM) && allow) {
+               int regNum = 0;
+               Set<Map.Entry<String, JsonNode>> m = sw.params.entrySet();
+               for (Map.Entry<String, JsonNode> e : m) {
+                       String key = e.getKey();
+                       JsonNode j = e.getValue();
+                       if (key.equals("numRegen")) {
+                           regNum = j.asInt();
+                       }
+                }
+
+                Roadm newRoadm = new Roadm();
+                newRoadm.setName(sw.name);
+                newRoadm.setNodeId(sw.nodeDpid);
+                newRoadm.setLongtitude(sw.longitude);
+                newRoadm.setLatitude(sw.latitude);
+                newRoadm.setRegenNum(regNum);
+
+                RAW_ROADMS.add(newRoadm);
+                log.info(newRoadm.toString());
+            }
+           }
+
+        for (OpticalLinkDescription lk : lkList) {
+            String lktype = lk.getType();
+            switch (lktype) {
+            case WDM_LINK:
+                WdmLink newWdmLink = new WdmLink();
+                newWdmLink.setSrcNodeId(lk.getNodeDpid1());
+                newWdmLink.setSnkNodeId(lk.getNodeDpid2());
+                newWdmLink.setAdminWeight(1000); // default weight for each WDM link.
+                Set<Map.Entry<String, JsonNode>> m = lk.params.entrySet();
+                for (Map.Entry<String, JsonNode> e : m) {
+                    String key = e.getKey();
+                    JsonNode j = e.getValue();
+                    if (key.equals("nodeName1")) {
+                        newWdmLink.setSrcNodeName(j.asText());
+                    } else if (key.equals("nodeName2")) {
+                        newWdmLink.setSnkNodeName(j.asText());
+                    } else if (key.equals("port1")) {
+                        newWdmLink.setSrcPort(j.asInt());
+                    } else if (key.equals("port2")) {
+                        newWdmLink.setSnkPort(j.asInt());
+                    } else if (key.equals("distKms")) {
+                        newWdmLink.setDistance(j.asDouble());
+                    } else if (key.equals("numWaves")) {
+                        newWdmLink.setWavelengthNumber(j.asInt());
+                    } else {
+                        log.error("error found");
+                        // TODO add exception processing;
+                    }
+                }
+                RAW_WDMLINKS.add(newWdmLink);
+                log.info(newWdmLink.toString());
+
+                break;
+
+            case PKT_OPT_LINK:
+                PktOptLink newPktOptLink = new PktOptLink();
+                newPktOptLink.setSrcNodeId(lk.getNodeDpid1());
+                newPktOptLink.setSnkNodeId(lk.getNodeDpid2());
+                newPktOptLink.setAdminWeight(10); // default weight for each packet-optical link.
+                Set<Map.Entry<String, JsonNode>> ptm = lk.params.entrySet();
+                for (Map.Entry<String, JsonNode> e : ptm) {
+                    String key = e.getKey();
+                    JsonNode j = e.getValue();
+                    if (key.equals("nodeName1")) {
+                        newPktOptLink.setSrcNodeName(j.asText());
+                    } else if (key.equals("nodeName2")) {
+                        newPktOptLink.setSnkNodeName(j.asText());
+                    } else if (key.equals("port1")) {
+                        newPktOptLink.setSrcPort(j.asInt());
+                    } else if (key.equals("port2")) {
+                        newPktOptLink.setSnkPort(j.asInt());
+                    } else if (key.equals("bandWidth")) {
+                        newPktOptLink.setBandwdith(j.asDouble());
+                    } else {
+                        log.error("error found");
+                        // TODO add exception processing;
+                    }
+                }
+
+                RAW_PKTOPTLINKS.add(newPktOptLink);
+                log.info(newPktOptLink.toString());
+                break;
+            default:
+            }
+        }
+    }
+
+    private void publishOpticalConfig() {
+        if (deviceProviderService == null || linkProviderService == null) {
+            return;
+        }
+
+        // Discover the optical ROADM objects
+        Iterator<Roadm> iterWdmNode = RAW_ROADMS.iterator();
+        while (iterWdmNode.hasNext()) {
+            Roadm value = iterWdmNode.next();
+            DeviceId did = deviceId("of:" + value.getNodeId().replace(":", ""));
+            ChassisId cid = new ChassisId(value.getNodeId());
+            DefaultAnnotations extendedAttributes = DefaultAnnotations.builder()
+                    .set(OPTICAL_ANNOTATION + "switchType", "ROADM")
+                    .set(OPTICAL_ANNOTATION + "switchName", value.getName())
+                    .set(OPTICAL_ANNOTATION + "latitude", Double.toString(value.getLatitude()))
+                    .set(OPTICAL_ANNOTATION + "longtitude", Double.toString(value.getLongtitude()))
+                    .set(OPTICAL_ANNOTATION + "regNum", Integer.toString(value.getRegenNum()))
+                    .build();
+
+            DeviceDescription description =
+                    new DefaultDeviceDescription(did.uri(),
+                                                 Device.Type.SWITCH,
+                                                 "",
+                                                 "",
+                                                 "",
+                                                 "",
+                                                 cid,
+                                                 extendedAttributes);
+            deviceProviderService.deviceConnected(did, description);
+        }
+
+        // Discover the optical WDM link objects
+        Iterator<WdmLink> iterWdmlink = RAW_WDMLINKS.iterator();
+        while (iterWdmlink.hasNext()) {
+            WdmLink value = iterWdmlink.next();
+
+            DeviceId srcNodeId = deviceId("of:" + value.getSrcNodeId().replace(":", ""));
+            DeviceId snkNodeId = deviceId("of:" + value.getSnkNodeId().replace(":", ""));
+
+            PortNumber srcPort = PortNumber.portNumber(value.getSrcPort());
+            PortNumber snkPort = PortNumber.portNumber(value.getSnkPort());
+
+            ConnectPoint srcPoint = new ConnectPoint(srcNodeId, srcPort);
+            ConnectPoint snkPoint = new ConnectPoint(snkNodeId, snkPort);
+
+            DefaultAnnotations extendedAttributes = DefaultAnnotations.builder()
+                    .set(OPTICAL_ANNOTATION + "linkType", "WDM")
+                    .set(OPTICAL_ANNOTATION + "distance", Double.toString(value.getDistance()))
+                    .set(OPTICAL_ANNOTATION + "cost", Double.toString(value.getDistance()))
+                    .set(OPTICAL_ANNOTATION + "adminWeight", Double.toString(value.getAdminWeight()))
+                    .set(OPTICAL_ANNOTATION + "wavelengthNum", Integer.toString(value.getWavelengthNumber()))
+                    .build();
+
+            DefaultLinkDescription linkDescription =
+                    new DefaultLinkDescription(srcPoint,
+                                                 snkPoint,
+                                                 Link.Type.DIRECT,
+                                                 extendedAttributes);
+
+            linkProviderService.linkDetected(linkDescription);
+            log.info(String.format("WDM link: %s : %s",
+                    linkDescription.src().toString(), linkDescription.dst().toString()));
+        }
+
+        // Discover the packet optical link objects
+        Iterator<PktOptLink> iterPktOptlink = RAW_PKTOPTLINKS.iterator();
+        while (iterPktOptlink.hasNext()) {
+            PktOptLink value = iterPktOptlink.next();
+            DeviceId srcNodeId = deviceId("of:" + value.getSrcNodeId().replace(":", ""));
+            DeviceId snkNodeId = deviceId("of:" + value.getSnkNodeId().replace(":", ""));
+
+            PortNumber srcPort = PortNumber.portNumber(value.getSrcPort());
+            PortNumber snkPort = PortNumber.portNumber(value.getSnkPort());
+
+            ConnectPoint srcPoint = new ConnectPoint(srcNodeId, srcPort);
+            ConnectPoint snkPoint = new ConnectPoint(snkNodeId, snkPort);
+
+            DefaultAnnotations extendedAttributes = DefaultAnnotations.builder()
+                    .set(OPTICAL_ANNOTATION + "linkType", "PktOptLink")
+                    .set(OPTICAL_ANNOTATION + "bandwidth", Double.toString(value.getBandwidth()))
+                    .set(OPTICAL_ANNOTATION + "cost", Double.toString(value.getBandwidth()))
+                    .set(OPTICAL_ANNOTATION + "adminWeight", Double.toString(value.getAdminWeight()))
+                    .build();
+
+            DefaultLinkDescription linkDescription =
+                    new DefaultLinkDescription(srcPoint,
+                                                 snkPoint,
+                                                 Link.Type.DIRECT,
+                                                 extendedAttributes);
+
+            linkProviderService.linkDetected(linkDescription);
+            log.info(String.format("Packet-optical link: %s : %s",
+                    linkDescription.src().toString(), linkDescription.dst().toString()));
+        }
+
+    }
+
+    @Override
+    public void triggerProbe(Device device) {
+        // TODO We may want to consider re-reading config files and publishing them based on this event.
+    }
+
+    @Override
+    public void roleChanged(Device device, MastershipRole newRole) {
+        // TODO Auto-generated method stub.
+    }
+
+}
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalLinkDescription.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalLinkDescription.java
new file mode 100644
index 0000000..af616ef
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalLinkDescription.java
@@ -0,0 +1,89 @@
+package org.onlab.onos.optical.cfg;
+
+import java.util.Map;
+import org.codehaus.jackson.JsonNode;
+import org.onlab.util.HexString;
+
+/**
+ * Public class corresponding to JSON described data model.
+ */
+public class OpticalLinkDescription {
+    protected String type;
+    protected Boolean allowed;
+    protected long dpid1;
+    protected long dpid2;
+    protected String nodeDpid1;
+    protected String nodeDpid2;
+    protected Map<String, JsonNode> params;
+    protected Map<String, String> publishAttributes;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Boolean isAllowed() {
+        return allowed;
+    }
+
+    public void setAllowed(Boolean allowed) {
+        this.allowed = allowed;
+    }
+
+    public String getNodeDpid1() {
+        return nodeDpid1;
+    }
+
+    public void setNodeDpid1(String nodeDpid1) {
+        this.nodeDpid1 = nodeDpid1;
+        this.dpid1 = HexString.toLong(nodeDpid1);
+    }
+
+    public String getNodeDpid2() {
+        return nodeDpid2;
+    }
+
+    public void setNodeDpid2(String nodeDpid2) {
+        this.nodeDpid2 = nodeDpid2;
+        this.dpid2 = HexString.toLong(nodeDpid2);
+    }
+
+    public long getDpid1() {
+        return dpid1;
+    }
+
+    public void setDpid1(long dpid1) {
+        this.dpid1 = dpid1;
+        this.nodeDpid1 = HexString.toHexString(dpid1);
+    }
+
+    public long getDpid2() {
+        return dpid2;
+    }
+
+    public void setDpid2(long dpid2) {
+        this.dpid2 = dpid2;
+        this.nodeDpid2 = HexString.toHexString(dpid2);
+    }
+
+    public Map<String, JsonNode> getParams() {
+        return params;
+    }
+
+    public void setParams(Map<String, JsonNode> params) {
+        this.params = params;
+    }
+
+    public Map<String, String> getPublishAttributes() {
+        return publishAttributes;
+    }
+
+    public void setPublishAttributes(Map<String, String> publishAttributes) {
+        this.publishAttributes = publishAttributes;
+    }
+
+}
+
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalNetworkConfig.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalNetworkConfig.java
new file mode 100644
index 0000000..a34f843
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalNetworkConfig.java
@@ -0,0 +1,40 @@
+package org.onlab.onos.optical.cfg;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Public class corresponding to JSON described data model.
+ */
+public class OpticalNetworkConfig {
+    protected static final Logger log = LoggerFactory.getLogger(OpticalNetworkConfig.class);
+
+    private List<OpticalSwitchDescription> opticalSwitches;
+    private List<OpticalLinkDescription> opticalLinks;
+
+    public OpticalNetworkConfig() {
+        opticalSwitches = new ArrayList<OpticalSwitchDescription>();
+        opticalLinks = new ArrayList<OpticalLinkDescription>();
+    }
+
+    public List<OpticalSwitchDescription> getOpticalSwitches() {
+        return opticalSwitches;
+    }
+
+    public void setOpticalSwitches(List<OpticalSwitchDescription> switches) {
+        this.opticalSwitches = switches;
+    }
+
+    public List<OpticalLinkDescription> getOpticalLinks() {
+        return opticalLinks;
+    }
+
+    public void setOpticalLinks(List<OpticalLinkDescription> links) {
+        this.opticalLinks = links;
+    }
+
+}
+
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalSwitchDescription.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalSwitchDescription.java
new file mode 100644
index 0000000..18a3982
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/OpticalSwitchDescription.java
@@ -0,0 +1,100 @@
+package org.onlab.onos.optical.cfg;
+
+import java.util.Map;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.onlab.util.HexString;
+
+/**
+ * Public class corresponding to JSON described data model.
+ */
+public class OpticalSwitchDescription {
+    protected String name;
+    protected long dpid;
+    protected String nodeDpid;
+    protected String type;
+    protected double latitude;
+    protected double longitude;
+    protected boolean allowed;
+    protected Map<String, JsonNode> params;
+    protected Map<String, String> publishAttributes;
+
+    public String getName() {
+        return name;
+    }
+    @JsonProperty("name")
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public long getDpid() {
+        return dpid;
+    }
+    @JsonProperty("dpid")
+    public void setDpid(long dpid) {
+        this.dpid = dpid;
+        this.nodeDpid = HexString.toHexString(dpid);
+    }
+
+    public String getNodeDpid() {
+        return nodeDpid;
+    }
+
+    public String getHexDpid() {
+        return nodeDpid;
+    }
+
+    public void setNodeDpid(String nodeDpid) {
+        this.nodeDpid = nodeDpid;
+        this.dpid = HexString.toLong(nodeDpid);
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public double getLatitude() {
+        return latitude;
+    }
+
+    public void setLatitude(double latitude) {
+        this.latitude = latitude;
+    }
+
+    public double getLongitude() {
+        return longitude;
+    }
+
+    public void setLongitude(double longitude) {
+        this.longitude = longitude;
+    }
+
+    public boolean isAllowed() {
+        return allowed;
+    }
+
+    public void setAllowed(boolean allowed) {
+        this.allowed = allowed;
+    }
+
+    public Map<String, JsonNode> getParams() {
+        return params;
+    }
+
+    public void setParams(Map<String, JsonNode> params) {
+        this.params = params;
+    }
+
+    public Map<String, String> getPublishAttributes() {
+        return publishAttributes;
+    }
+
+    public void setPublishAttributes(Map<String, String> publishAttributes) {
+        this.publishAttributes = publishAttributes;
+    }
+
+}
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/PktOptLink.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/PktOptLink.java
new file mode 100644
index 0000000..206109f
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/PktOptLink.java
@@ -0,0 +1,110 @@
+package org.onlab.onos.optical.cfg;
+
+/**
+ * Packet-optical link Java data object.
+ */
+class PktOptLink {
+    private String srcNodeName;
+    private String snkNodeName;
+    private String srcNodeId;
+    private String snkNodeId;
+    private int srcPort;
+    private int snkPort;
+    private double bandwidth;
+    private double cost;
+    private long adminWeight;
+
+    public PktOptLink(String srcName, String snkName) {
+        this.srcNodeName = srcName;
+        this.snkNodeName = snkName;
+    }
+
+    public PktOptLink() {
+        // TODO Auto-generated constructor stub
+    }
+
+    public void setSrcNodeName(String name) {
+        this.srcNodeName = name;
+    }
+
+    public String getSrcNodeName() {
+        return this.srcNodeName;
+    }
+
+    public void setSnkNodeName(String name) {
+        this.snkNodeName = name;
+    }
+
+    public String getSnkNodeName() {
+        return this.snkNodeName;
+    }
+
+    public void setSrcNodeId(String nodeId) {
+        this.srcNodeId = nodeId;
+    }
+
+    public String getSrcNodeId() {
+        return this.srcNodeId;
+    }
+
+    public void setSnkNodeId(String nodeId) {
+        this.snkNodeId = nodeId;
+    }
+
+    public String getSnkNodeId() {
+        return this.snkNodeId;
+    }
+
+    public void setSrcPort(int port) {
+        this.srcPort = port;
+    }
+
+    public int getSrcPort() {
+        return this.srcPort;
+    }
+
+    public void setSnkPort(int port) {
+        this.snkPort = port;
+    }
+
+    public int getSnkPort() {
+        return this.snkPort;
+    }
+
+    public void setBandwdith(double x) {
+        this.bandwidth = x;
+    }
+
+    public double getBandwidth() {
+        return this.bandwidth;
+    }
+
+    public void setCost(double x) {
+        this.cost = x;
+    }
+
+    public double getCost() {
+        return this.cost;
+    }
+
+    public void setAdminWeight(long x) {
+        this.adminWeight = x;
+    }
+
+    public long getAdminWeight() {
+        return this.adminWeight;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder(" srcNodeName: ").append(this.srcNodeName)
+                .append(" snkNodeName: ").append(this.snkNodeName)
+                .append(" srcNodeId: ").append(this.srcNodeId)
+                .append(" snkNodeId: ").append(this.snkNodeId)
+                .append(" srcPort: ").append(this.srcPort)
+                .append(" snkPort: ").append(this.snkPort)
+                .append(" bandwidth: ").append(this.bandwidth)
+                .append(" cost: ").append(this.cost)
+                .append(" adminWeight: ").append(this.adminWeight).toString();
+    }
+}
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/Roadm.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/Roadm.java
new file mode 100644
index 0000000..beca5af
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/Roadm.java
@@ -0,0 +1,106 @@
+package org.onlab.onos.optical.cfg;
+
+/**
+ * ROADM java data object converted from a JSON file.
+ */
+class Roadm {
+    private String name;
+    private String nodeID;
+    private double longtitude;
+    private double latitude;
+    private int regenNum;
+
+    //TODO use the following attributes when needed for configurations
+    private int tPort10G;
+    private int tPort40G;
+    private int tPort100G;
+    private int wPort;
+
+    public Roadm() {
+    }
+
+    public Roadm(String name) {
+        this.name = name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public void setNodeId(String nameId) {
+        this.nodeID = nameId;
+    }
+
+    public String getNodeId() {
+        return this.nodeID;
+    }
+
+    public void setLongtitude(double x) {
+        this.longtitude = x;
+    }
+
+    public double getLongtitude() {
+        return this.longtitude;
+    }
+
+    public void setLatitude(double y) {
+        this.latitude = y;
+    }
+
+    public double getLatitude() {
+        return this.latitude;
+    }
+
+    public void setRegenNum(int num) {
+        this.regenNum = num;
+    }
+    public int getRegenNum() {
+        return this.regenNum;
+    }
+
+    public void setTport10GNum(int num) {
+        this.tPort10G = num;
+    }
+    public int getTport10GNum() {
+        return this.tPort10G;
+    }
+
+    public void setTport40GNum(int num) {
+        this.tPort40G = num;
+    }
+    public int getTport40GNum() {
+        return this.tPort40G;
+    }
+
+    public void setTport100GNum(int num) {
+        this.tPort100G = num;
+    }
+    public int getTport100GNum() {
+        return this.tPort100G;
+    }
+
+    public void setWportNum(int num) {
+        this.wPort = num;
+    }
+    public int getWportNum() {
+        return this.wPort;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder(" ROADM Name: ").append(this.name)
+                .append(" nodeID: ").append(this.nodeID)
+                .append(" longtitude: ").append(this.longtitude)
+                .append(" latitude: ").append(this.latitude)
+                .append(" regenNum: ").append(this.regenNum)
+                .append(" 10GTportNum: ").append(this.tPort10G)
+                .append(" 40GTportNum: ").append(this.tPort40G)
+                .append(" 100GTportNum: ").append(this.tPort100G)
+                .append(" WportNum: ").append(this.wPort).toString();
+    }
+}
+
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/cfg/WdmLink.java b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/WdmLink.java
new file mode 100644
index 0000000..5e7b468
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/cfg/WdmLink.java
@@ -0,0 +1,121 @@
+package org.onlab.onos.optical.cfg;
+
+/**
+ * WDM Link Java data object converted from a JSON file.
+ */
+class WdmLink {
+    private String srcNodeName;
+    private String snkNodeName;
+    private String srcNodeId;
+    private String snkNodeId;
+    private int srcPort;
+    private int snkPort;
+    private double distance;
+    private double cost;
+    private int wavelengthNumber;
+    private long adminWeight;
+
+    public WdmLink(String name1, String name2) {
+        this.srcNodeName = name1;
+        this.snkNodeName = name2;
+    }
+
+    public WdmLink() {
+        // TODO Auto-generated constructor stub
+    }
+
+    public void setSrcNodeName(String name) {
+        this.srcNodeName = name;
+    }
+
+    public String getSrcNodeName() {
+        return this.srcNodeName;
+    }
+
+    public void setSnkNodeName(String name) {
+        this.snkNodeName = name;
+    }
+
+    public String getSnkNodeName() {
+        return this.snkNodeName;
+    }
+
+    public void setSrcNodeId(String nodeId) {
+        this.srcNodeId = nodeId;
+    }
+
+    public String getSrcNodeId() {
+        return this.srcNodeId;
+    }
+
+    public void setSnkNodeId(String nodeId) {
+        this.snkNodeId = nodeId;
+    }
+
+    public String getSnkNodeId() {
+        return this.snkNodeId;
+    }
+
+    public void setSrcPort(int port) {
+        this.srcPort = port;
+    }
+
+    public int getSrcPort() {
+        return this.srcPort;
+    }
+
+    public void setSnkPort(int port) {
+        this.snkPort = port;
+    }
+
+    public int getSnkPort() {
+        return this.snkPort;
+    }
+
+    public void setDistance(double x) {
+        this.distance = x;
+    }
+
+    public double getDistance() {
+        return this.distance;
+    }
+
+    public void setCost(double x) {
+        this.cost = x;
+    }
+
+    public double getCost() {
+        return this.cost;
+    }
+
+    public void setWavelengthNumber(int x) {
+        this.wavelengthNumber = x;
+    }
+
+    public int getWavelengthNumber() {
+        return this.wavelengthNumber;
+    }
+
+    public void setAdminWeight(long x) {
+        this.adminWeight = x;
+    }
+
+    public long getAdminWeight() {
+        return this.adminWeight;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder(" srcNodeName: ").append(this.srcNodeName)
+                .append(" snkNodeName: ").append(this.snkNodeName)
+                .append(" srcNodeId: ").append(this.srcNodeId)
+                .append(" snkNodeId: ").append(this.snkNodeId)
+                .append(" srcPort: ").append(this.srcPort)
+                .append(" snkPort: ").append(this.snkPort)
+                .append(" distance: ").append(this.distance)
+                .append(" cost: ").append(this.cost)
+                .append(" wavelengthNumber: ").append(this.wavelengthNumber)
+                .append(" adminWeight: ").append(this.adminWeight).toString();
+    }
+}
+
diff --git a/apps/optical/src/main/resources/demo-10-roadm-6-ps.json b/apps/optical/src/main/resources/demo-10-roadm-6-ps.json
new file mode 100644
index 0000000..e4e1122
--- /dev/null
+++ b/apps/optical/src/main/resources/demo-10-roadm-6-ps.json
@@ -0,0 +1,391 @@
+{
+        "opticalSwitches": [
+        {
+            "allowed": true,
+            "latitude": 37.6,
+            "longitude": 122.3,
+            "name": "SFO-W10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:01",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+        },
+        
+	{
+            "allowed": true,
+            "latitude": 37.3,
+            "longitude": 121.9,
+            "name": "SJC-W10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:02",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+         },
+
+ 	{
+            "allowed": true,
+            "latitude": 33.9,
+            "longitude": 118.4
+            "name": "LAX-W10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:03",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+         },
+
+	{
+            "allowed": true,
+            "latitude": 32.8,
+            "longitude": 117.1,
+            "name": "SDG-W10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:04",
+            "params": {
+                "numRegen": 3
+            },
+            "type": "Roadm"
+        },
+
+	{
+            "allowed": true,
+            "latitude": 44.8,
+            "longitude": 93.1,
+            "name": "MSP-M10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:05",
+            "params": {
+                "numRegen": 3
+            },
+            "type": "Roadm"
+         },
+
+	{
+            "allowed": true,
+            "latitude": 32.8,
+            "longitude": 97.1,
+            "name": "DFW-M10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:06",
+            "params": {
+                "numRegen": 3
+            },
+            "type": "Roadm"
+         },
+	
+	{
+            "allowed": true,
+            "latitude": 41.8,
+            "longitude": 120.1,
+            "name": "CHG-N10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:07",
+            "params": {
+                "numRegen": 3
+            },
+            "type": "Roadm"
+         },
+
+	{
+            "allowed": true,
+            "latitude": 38.8,
+            "longitude": 77.1,
+            "name": "IAD-M10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:08",
+            "params": {
+                "numRegen": 3
+            },
+            "type": "Roadm"
+        },
+
+	{
+            "allowed": true,
+            "latitude": 40.8,
+            "longitude": 73.1,
+            "name": "JFK-E10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:09",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+
+        },
+
+	{
+            "allowed": true,
+            "latitude": 33.8,
+            "longitude": 84.1,
+            "name": "ATL-S10",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:0A",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+        }
+	
+    ],
+
+        "opticalLinks": [
+        {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:01",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:02",
+            "params": {
+                "distKms": 1000,
+                "nodeName1": "SFO-W10",
+                "nodeName2": "SJC-W10",
+                "numWaves": 80,
+                "port1": 10,
+                "port2": 10
+            },
+            "type": "wdmLink"
+         },
+       
+       {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:02",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:03",
+            "params": {
+                "distKms": 1000,
+                "nodeName1": "SJC-W10",
+                "nodeName2": "LAX-W10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 10
+            },
+            "type": "wdmLink"
+       },
+
+       {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:03",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:04",
+            "params": {
+                "distKms": 1000,
+                "nodeName1": "LAX-W10",
+                "nodeName2": "SDG-W10",
+                "numWaves": 80,
+                "port1": 30,
+                "port2": 10
+            },
+            "type": "wdmLink"
+        },
+
+        {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:02",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:05",
+            "params": {
+                "distKms": 4000,
+                "nodeName1": "SJC-W10",
+                "nodeName2": "MSP-M10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 10
+            },
+            "type": "wdmLink"
+        },
+
+        {
+
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:03",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:06",
+            "params": {
+                "distKms": 5000,
+                "nodeName1": "LAX-W10",
+                "nodeName2": "DFW-M10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 10
+            },
+            "type": "wdmLink"
+         },
+
+       {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:05",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:06",
+            "params": {
+                "distKms": 3000,
+                "nodeName1": "MSP-M10",
+                "nodeName2": "DFW-M10",
+                "numWaves": 80,
+                "port1": 30,
+                "port2": 20
+            },
+            "type": "wdmLink"
+        },
+
+        {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:05",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:07",
+            "params": {
+                "distKms": 3000,
+                "nodeName1": "MSP-M10",
+                "nodeName2": "CHG-N10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 21
+            },
+            "type": "wdmLink"
+         },
+ 
+        {
+
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:06",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:08",
+            "params": {
+                "distKms": 4000,
+                "nodeName1": "DFW-M10",
+                "nodeName2": "IAD-M10",
+                "numWaves": 80,
+                "port1": 30,
+                "port2": 10
+            },
+            "type": "wdmLink"
+          },
+
+        {
+
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:07",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:08",
+            "params": {
+                "distKms": 4000,
+                "nodeName1": "CHG-M10",
+                "nodeName2": "IAD-M10",
+                "numWaves": 80,
+                "port1": 30,
+                "port2": 20
+            },
+            "type": "wdmLink"
+          },
+
+        {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:07",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:09",
+            "params": {
+                "distKms": 5000,
+                "nodeName1": "CHG-M10",
+                "nodeName2": "JFK-E10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 10
+            },
+            "type": "wdmLink"
+         },
+
+      {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:08",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:0A",
+            "params": {
+                "distKms": 3000,
+                "nodeName1": "IAD-M10",
+                "nodeName2": "ATL-S10",
+                "numWaves": 80,
+                "port1": 30,
+                "port2": 10
+            },
+            "type": "wdmLink"
+      },
+
+      {
+
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:09",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:0A",
+            "params": {
+                "distKms": 4000,
+                "nodeName1": "JFK-E10",
+                "nodeName2": "ATL-S10",
+                "numWaves": 80,
+                "port1": 20,
+                "port2": 20
+            },
+            "type": "wdmLink"
+        },
+
+
+        {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:01",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:01",
+            "params": {
+                "nodeName1": "SFO-R10",
+                "nodeName2": "SFO-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },
+
+       {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:03",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:03",
+            "params": {
+                "nodeName1": "LAX-R10",
+                "nodeName2": "LAX-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },
+
+         {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:04",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:04",
+            "params": {
+                "nodeName1": "SDG-R10",
+                "nodeName2": "SDG-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },
+
+       {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:07",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:07",
+            "params": {
+                "nodeName1": "CHG-R10",
+                "nodeName2": "CHG-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },
+
+    {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:09",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:09",
+            "params": {
+                "nodeName1": "JFK-R10",
+                "nodeName2": "JFK-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },
+
+       {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:0A",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:0A",
+            "params": {
+                "nodeName1": "ATL-R10",
+                "nodeName2": "ATL-W10",
+                "port1": 10,
+                "port2": 1
+            },
+            "type": "pktOptLink"
+        },   
+
+    ]
+}
diff --git a/apps/optical/src/main/resources/demo-3-roadm-2-ps.json b/apps/optical/src/main/resources/demo-3-roadm-2-ps.json
new file mode 100644
index 0000000..6f2c2f5
--- /dev/null
+++ b/apps/optical/src/main/resources/demo-3-roadm-2-ps.json
@@ -0,0 +1,101 @@
+{
+        "opticalSwitches": [
+        {
+            "allowed": true,
+            "latitude": 37.6,
+            "longitude": 122.3,
+            "name": "ROADM1",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:01",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+        },
+
+	{
+            "allowed": true,
+            "latitude": 37.3,
+            "longitude": 121.9,
+            "name": "ROADM2",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:02",
+            "params": {
+                "numRegen": 0
+            },
+            "type": "Roadm"
+         },
+
+ 	{
+            "allowed": true,
+            "latitude": 33.9,
+            "longitude": 118.4,
+            "name": "ROADM3",
+            "nodeDpid": "00:00:ff:ff:ff:ff:ff:03",
+            "params": {
+                "numRegen": 2
+            },
+            "type": "Roadm"
+         }
+    ],
+
+        "opticalLinks": [
+        {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:01",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:03",
+            "params": {
+                "distKms": 1000,
+                "nodeName1": "ROADM1",
+                "nodeName2": "ROADM3",
+                "numWaves": 80,
+                "port1": 10,
+                "port2": 30
+            },
+            "type": "wdmLink"
+         },
+       
+       {
+	"allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:ff:03",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:02",
+            "params": {
+                "distKms": 2000,
+                "nodeName1": "ROADM3",
+                "nodeName2": "ROADM2",
+                "numWaves": 80,
+                "port1": 31,
+                "port2": 20
+            },
+            "type": "wdmLink"
+       },
+
+   
+      {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:01",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:01",
+            "params": {
+                "nodeName1": "ROUTER1",
+                "nodeName2": "ROADM1",
+                "bandWidth": 100000,
+                "port1": 10,
+                "port2": 11
+            },
+            "type": "pktOptLink"
+        },
+
+       {
+            "allowed": true,
+            "nodeDpid1": "00:00:ff:ff:ff:ff:00:02",
+            "nodeDpid2": "00:00:ff:ff:ff:ff:ff:02",
+            "params": {
+                "nodeName1": "ROUTER2",
+                "nodeName2": "ROADM2",
+                "bandWidth": 100000,
+                "port1": 10,
+                "port2": 21
+            },
+            "type": "pktOptLink"
+        } 
+
+    ]
+}
diff --git a/apps/pom.xml b/apps/pom.xml
index e9870ea..e812c47 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -26,6 +26,7 @@
         <module>config</module>
         <module>sdnip</module>
         <module>calendar</module>
+        <module>optical</module>
     </modules>
 
     <properties>
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 8b966ed..6fc150c 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -37,6 +37,15 @@
     boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
 
     /**
+     * Sends a message synchronously.
+     * @param message message to send
+     * @param toNodeId recipient node identifier
+     * @return ClusterMessageResponse which is reply future.
+     * @throws IOException
+     */
+    ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
+
+    /**
      * Adds a new subscriber for the specified message subject.
      *
      * @param subject    message subject
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
new file mode 100644
index 0000000..ae2089d
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
@@ -0,0 +1,12 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onlab.onos.cluster.NodeId;
+
+public interface ClusterMessageResponse {
+    public NodeId sender();
+    public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
+    public byte[] get(long timeout) throws InterruptedException;
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
index 0bd1703..0556df0 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
@@ -43,7 +43,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HostService hostService;
 
-    private IdGenerator<IntentId> intentIdGenerator;
+    protected IdGenerator<IntentId> intentIdGenerator;
 
     @Activate
     public void activate() {
diff --git a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
index cd84dce..81f42c8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
@@ -167,6 +167,7 @@
             return;
         }
 
+        // TODO find the correct IP address
         Ethernet arpReply = buildArpReply(dst.ipAddresses().iterator().next(),
                 dst.mac(), eth);
         // TODO: check send status with host service.
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/TestPointToPointIntent.java b/core/net/src/test/java/org/onlab/onos/net/intent/TestPointToPointIntent.java
new file mode 100644
index 0000000..41769c6
--- /dev/null
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/TestPointToPointIntent.java
@@ -0,0 +1,96 @@
+package org.onlab.onos.net.intent;
+
+import org.junit.Test;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.flow.TrafficTreatment;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.onlab.onos.net.NetTestTools.connectPoint;
+
+/**
+ * Unit tests for the HostToHostIntent class.
+ */
+public class TestPointToPointIntent {
+
+    private TrafficSelector selector = new IntentTestsMocks.MockSelector();
+    private TrafficTreatment treatment = new IntentTestsMocks.MockTreatment();
+
+    private ConnectPoint point1 = connectPoint("dev1", 1);
+    private ConnectPoint point2 = connectPoint("dev2", 1);
+
+    private PointToPointIntent makePointToPoint(long id,
+                                                ConnectPoint ingress,
+                                                ConnectPoint egress) {
+        return new PointToPointIntent(new IntentId(id),
+                                      selector,
+                                      treatment,
+                                      ingress,
+                                      egress);
+    }
+
+    /**
+     * Tests the equals() method where two PointToPointIntents have references
+     * to the same ingress and egress points. These should compare equal.
+     */
+    @Test
+    public void testSameEquals() {
+        PointToPointIntent i1 = makePointToPoint(12, point1, point2);
+        PointToPointIntent i2 = makePointToPoint(12, point1, point2);
+
+        assertThat(i1, is(equalTo(i2)));
+    }
+
+    /**
+     * Tests the equals() method where two HostToHostIntents have references
+     * to different Hosts. These should compare not equal.
+     */
+    @Test
+    public void testLinksDifferentEquals() {
+
+        PointToPointIntent i1 = makePointToPoint(12, point1, point2);
+        PointToPointIntent i2 = makePointToPoint(12, point2, point1);
+
+        assertThat(i1, is(not(equalTo(i2))));
+    }
+
+    /**
+     * Tests the equals() method where two HostToHostIntents have different
+     * ids. These should compare not equal.
+     */
+    @Test
+    public void testBaseDifferentEquals() {
+        PointToPointIntent i1 = makePointToPoint(12, point1, point2);
+        PointToPointIntent i2 = makePointToPoint(11, point1, point2);
+
+
+        assertThat(i1, is(not(equalTo(i2))));
+    }
+
+    /**
+     * Tests that the hashCode() values for two equivalent HostToHostIntent
+     * objects are the same.
+     */
+    @Test
+    public void testHashCodeEquals() {
+        PointToPointIntent i1 = makePointToPoint(12, point1, point2);
+        PointToPointIntent i2 = makePointToPoint(12, point1, point2);
+
+        assertThat(i1.hashCode(), is(equalTo(i2.hashCode())));
+    }
+
+    /**
+     * Tests that the hashCode() values for two distinct LinkCollectionIntent
+     * objects are different.
+     */
+    @Test
+    public void testHashCodeDifferent() {
+        PointToPointIntent i1 = makePointToPoint(12, point1, point2);
+        PointToPointIntent i2 = makePointToPoint(22, point1, point2);
+
+        assertThat(i1.hashCode(), is(not(equalTo(i2.hashCode()))));
+    }
+}
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/TestPointToPointIntentCompiler.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/TestPointToPointIntentCompiler.java
new file mode 100644
index 0000000..e282347
--- /dev/null
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/TestPointToPointIntentCompiler.java
@@ -0,0 +1,127 @@
+package org.onlab.onos.net.intent.impl;
+
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentTestsMocks;
+import org.onlab.onos.net.intent.PathIntent;
+import org.onlab.onos.net.intent.PointToPointIntent;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.onlab.onos.net.NetTestTools.connectPoint;
+import static org.onlab.onos.net.intent.LinksHaveEntryWithSourceDestinationPairMatcher.linksHasPath;
+
+/**
+ * Unit tests for the HostToHost intent compiler.
+ */
+public class TestPointToPointIntentCompiler {
+
+    private TrafficSelector selector = new IntentTestsMocks.MockSelector();
+    private TrafficTreatment treatment = new IntentTestsMocks.MockTreatment();
+
+    /**
+     * Creates a PointToPoint intent based on ingress and egress device Ids.
+     *
+     * @param ingressIdString string for id of ingress device
+     * @param egressIdString string for id of egress device
+     * @return PointToPointIntent for the two devices
+     */
+    private PointToPointIntent makeIntent(String ingressIdString,
+                                          String egressIdString) {
+        return new PointToPointIntent(new IntentId(12),
+                                      selector,
+                                      treatment,
+                                      connectPoint(ingressIdString, 1),
+                                      connectPoint(egressIdString, 1));
+    }
+
+    /**
+     * Creates a compiler for HostToHost intents.
+     *
+     * @param hops string array describing the path hops to use when compiling
+     * @return HostToHost intent compiler
+     */
+    private PointToPointIntentCompiler makeCompiler(String[] hops) {
+        PointToPointIntentCompiler compiler =
+                new PointToPointIntentCompiler();
+        compiler.pathService = new IntentTestsMocks.MockPathService(hops);
+        IdBlockAllocator idBlockAllocator = new DummyIdBlockAllocator();
+        compiler.intentIdGenerator =
+                new IdBlockAllocatorBasedIntentIdGenerator(idBlockAllocator);
+        return compiler;
+    }
+
+
+    /**
+     * Tests a pair of devices in an 8 hop path, forward direction.
+     */
+    @Test
+    public void testForwardPathCompilation() {
+
+        PointToPointIntent intent = makeIntent("d1", "d8");
+        assertThat(intent, is(notNullValue()));
+
+        String[] hops = {"d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8"};
+        PointToPointIntentCompiler compiler = makeCompiler(hops);
+        assertThat(compiler, is(notNullValue()));
+
+        List<Intent> result = compiler.compile(intent);
+        assertThat(result, is(Matchers.notNullValue()));
+        assertThat(result, hasSize(1));
+        Intent forwardResultIntent = result.get(0);
+        assertThat(forwardResultIntent instanceof PathIntent, is(true));
+
+        if (forwardResultIntent instanceof PathIntent) {
+            PathIntent forwardPathIntent = (PathIntent) forwardResultIntent;
+            // 7 links for the hops, plus one default lnk on ingress and egress
+            assertThat(forwardPathIntent.path().links(), hasSize(hops.length + 1));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d1", "d2"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d2", "d3"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d3", "d4"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d4", "d5"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d5", "d6"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d6", "d7"));
+            assertThat(forwardPathIntent.path().links(), linksHasPath("d7", "d8"));
+        }
+    }
+
+    /**
+     * Tests a pair of devices in an 8 hop path, forward direction.
+     */
+    @Test
+    public void testReversePathCompilation() {
+
+        PointToPointIntent intent = makeIntent("d8", "d1");
+        assertThat(intent, is(notNullValue()));
+
+        String[] hops = {"d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8"};
+        PointToPointIntentCompiler compiler = makeCompiler(hops);
+        assertThat(compiler, is(notNullValue()));
+
+        List<Intent> result = compiler.compile(intent);
+        assertThat(result, is(Matchers.notNullValue()));
+        assertThat(result, hasSize(1));
+        Intent reverseResultIntent = result.get(0);
+        assertThat(reverseResultIntent instanceof PathIntent, is(true));
+
+        if (reverseResultIntent instanceof PathIntent) {
+            PathIntent reversePathIntent = (PathIntent) reverseResultIntent;
+            assertThat(reversePathIntent.path().links(), hasSize(hops.length + 1));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d2", "d1"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d3", "d2"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d4", "d3"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d5", "d4"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d6", "d5"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d7", "d6"));
+            assertThat(reversePathIntent.path().links(), linksHasPath("d8", "d7"));
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 30465ac..710d750 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,6 +4,9 @@
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -17,6 +20,7 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.ClusterMessageSerializer;
 import org.onlab.onos.store.serializers.KryoPoolUtil;
@@ -28,6 +32,7 @@
 import org.onlab.netty.MessageHandler;
 import org.onlab.netty.MessagingService;
 import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +125,22 @@
     }
 
     @Override
+    public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+        ControllerNode node = clusterService.getNode(toNodeId);
+        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+        try {
+            Response responseFuture =
+                    messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+            return new InternalClusterMessageResponse(toNodeId, responseFuture);
+
+        } catch (IOException e) {
+            log.error("Failed interaction with remote nodeId: " + toNodeId, e);
+            throw e;
+        }
+    }
+
+    @Override
     public void addSubscriber(MessageSubject subject,
             ClusterMessageHandler subscriber) {
         messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
@@ -144,4 +165,30 @@
             }
         }
     }
+
+    private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
+
+        private final NodeId sender;
+        private final Response responseFuture;
+
+        public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
+            this.sender = sender;
+            this.responseFuture = responseFuture;
+        }
+        @Override
+        public NodeId sender() {
+            return sender;
+        }
+
+        @Override
+        public byte[] get(long timeout, TimeUnit timeunit)
+                throws TimeoutException {
+            return responseFuture.get(timeout, timeunit);
+        }
+
+        @Override
+        public byte[] get(long timeout) throws InterruptedException {
+            return responseFuture.get();
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 084435f..1e7f73a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,14 +3,20 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
@@ -21,6 +27,13 @@
 import org.onlab.onos.net.flow.FlowRuleStore;
 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
+import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -28,9 +41,8 @@
 import com.google.common.collect.Multimap;
 
 /**
- * Manages inventory of flow rules using trivial in-memory implementation.
+ * Manages inventory of flow rules using a distributed state management protocol.
  */
-//FIXME I LIE. I AIN'T DISTRIBUTED
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
@@ -46,6 +58,28 @@
     private final Multimap<Short, FlowRule> flowEntriesById =
             ArrayListMultimap.<Short, FlowRule>create();
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ReplicaInfoManager replicaInfoManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(DistributedStoreSerializers.COMMON)
+                    .build()
+                    .populate(1);
+        }
+    };
+
+    // TODO: make this configurable
+    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -91,26 +125,92 @@
     }
 
     @Override
-    public synchronized void storeFlowRule(FlowRule rule) {
-        FlowEntry f = new DefaultFlowEntry(rule);
-        DeviceId did = f.deviceId();
-        if (!flowEntries.containsEntry(did, f)) {
-            flowEntries.put(did, f);
-            flowEntriesById.put(rule.appId(), f);
+    public void storeFlowRule(FlowRule rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            storeFlowEntryInternal(rule);
+            return;
         }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.STORE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+        FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+        DeviceId deviceId = flowRule.deviceId();
+        // write to local copy.
+        if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+            flowEntries.put(deviceId, flowEntry);
+            flowEntriesById.put(flowRule.appId(), flowEntry);
+        }
+        // write to backup.
+        // TODO: write to a hazelcast map.
     }
 
     @Override
     public synchronized void deleteFlowRule(FlowRule rule) {
-        FlowEntry entry = getFlowEntry(rule);
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            deleteFlowRuleInternal(rule);
+            return;
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.DELETE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+        FlowEntry entry = getFlowEntry(flowRule);
         if (entry == null) {
             return;
         }
         entry.setState(FlowEntryState.PENDING_REMOVE);
+        // TODO: also update backup.
     }
 
     @Override
-    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+    public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return addOrUpdateFlowRuleInternal(rule);
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
@@ -128,15 +228,39 @@
 
         flowEntries.put(did, rule);
         return null;
+
+        // TODO: also update backup.
     }
 
     @Override
-    public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+    public FlowRuleEvent removeFlowRule(FlowEntry rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            // bypass and handle it locally
+            return removeFlowRuleInternal(rule);
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
         // This is where one could mark a rule as removed and still keep it in the store.
         if (flowEntries.remove(rule.deviceId(), rule)) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
+        // TODO: also update backup.
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
new file mode 100644
index 0000000..a43dad6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.flow.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedFlowRuleStore peer-peer communication.
+ */
+public final class FlowStoreMessageSubjects {
+    private FlowStoreMessageSubjects() {}
+    public static final  MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
+    public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
+    public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
+        new MessageSubject("peer-forward-add-or-update-flow-rule");
+    public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index b63f844..f17c268 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -26,6 +26,7 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.host.DefaultHostDescription;
 import org.onlab.onos.net.host.HostDescription;
 import org.onlab.onos.net.link.DefaultLinkDescription;
@@ -86,7 +87,8 @@
                     Timestamp.class,
                     HostId.class,
                     HostDescription.class,
-                    DefaultHostDescription.class
+                    DefaultHostDescription.class,
+                    DefaultFlowRule.class
                     )
             .register(URI.class, new URISerializer())
             .register(NodeId.class, new NodeIdSerializer())
diff --git a/features/features.xml b/features/features.xml
index 03c1814..9636643 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -155,6 +155,13 @@
         <feature>onos-api</feature>
         <bundle>mvn:org.onlab.onos/onos-app-config/1.0.0-SNAPSHOT</bundle>
     </feature>
+   
+     <feature name="onos-app-optical" version="1.0.0"
+             description="ONOS optical network config">
+        <feature>onos-api</feature>
+        <bundle>mvn:org.onlab.onos/onos-app-optical/1.0.0-SNAPSHOT</bundle>
+     </feature>
+
 
     <feature name="onos-app-sdnip" version="1.0.0"
              description="SDN-IP peering application">
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
index 14c2c22..9f6c658 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
@@ -223,7 +223,7 @@
                 if (di.isCidrMask()) {
                     dip = IpPrefix.valueOf(di.getInt(), di.asCidrMaskLength());
                 } else {
-                    dip = IpPrefix.valueOf(di.getInt());
+                    dip = IpPrefix.valueOf(di.getInt(), IpPrefix.MAX_INET_MASK);
                 }
                 builder.matchIPDst(dip);
                 break;
@@ -233,7 +233,7 @@
                 if (si.isCidrMask()) {
                     sip = IpPrefix.valueOf(si.getInt(), si.asCidrMaskLength());
                 } else {
-                    sip = IpPrefix.valueOf(si.getInt());
+                    sip = IpPrefix.valueOf(si.getInt(), IpPrefix.MAX_INET_MASK);
                 }
                 builder.matchIPSrc(sip);
                 break;
@@ -249,6 +249,12 @@
                 VlanId vlanId = VlanId.vlanId(match.get(MatchField.VLAN_VID).getVlan());
                 builder.matchVlanId(vlanId);
                 break;
+            case TCP_DST:
+                builder.matchTcpDst((short) match.get(MatchField.TCP_DST).getPort());
+                break;
+            case TCP_SRC:
+                builder.matchTcpSrc((short) match.get(MatchField.TCP_SRC).getPort());
+                break;
             case ARP_OP:
             case ARP_SHA:
             case ARP_SPA:
@@ -272,8 +278,6 @@
             case MPLS_TC:
             case SCTP_DST:
             case SCTP_SRC:
-            case TCP_DST:
-            case TCP_SRC:
             case TUNNEL_ID:
             case UDP_DST:
             case UDP_SRC:
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 9568f1f..aa50833 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -15,6 +15,7 @@
 import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
 import org.onlab.onos.net.flow.criteria.Criteria.IPProtocolCriterion;
 import org.onlab.onos.net.flow.criteria.Criteria.PortCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.TcpPortCriterion;
 import org.onlab.onos.net.flow.criteria.Criteria.VlanIdCriterion;
 import org.onlab.onos.net.flow.criteria.Criteria.VlanPcpCriterion;
 import org.onlab.onos.net.flow.criteria.Criterion;
@@ -42,6 +43,7 @@
 import org.projectfloodlight.openflow.types.OFBufferId;
 import org.projectfloodlight.openflow.types.OFPort;
 import org.projectfloodlight.openflow.types.OFVlanVidMatch;
+import org.projectfloodlight.openflow.types.TransportPort;
 import org.projectfloodlight.openflow.types.U64;
 import org.projectfloodlight.openflow.types.VlanPcp;
 import org.projectfloodlight.openflow.types.VlanVid;
@@ -199,6 +201,7 @@
         Match.Builder mBuilder = factory.buildMatch();
         EthCriterion eth;
         IPCriterion ip;
+        TcpPortCriterion tp;
         for (Criterion c : selector.criteria()) {
             switch (c.type()) {
             case IN_PORT:
@@ -250,6 +253,14 @@
                 mBuilder.setExact(MatchField.VLAN_VID,
                         OFVlanVidMatch.ofVlanVid(VlanVid.ofVlan(vid.vlanId().toShort())));
                 break;
+            case TCP_DST:
+                tp = (TcpPortCriterion) c;
+                mBuilder.setExact(MatchField.TCP_DST, TransportPort.of(tp.tcpPort()));
+                break;
+            case TCP_SRC:
+                tp = (TcpPortCriterion) c;
+                mBuilder.setExact(MatchField.TCP_SRC, TransportPort.of(tp.tcpPort()));
+                break;
             case ARP_OP:
             case ARP_SHA:
             case ARP_SPA:
@@ -276,8 +287,6 @@
             case PBB_ISID:
             case SCTP_DST:
             case SCTP_SRC:
-            case TCP_DST:
-            case TCP_SRC:
             case TUNNEL_ID:
             case UDP_DST:
             case UDP_SRC:
diff --git a/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java b/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
index d0e44f7..845cc19 100644
--- a/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
+++ b/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
@@ -1,5 +1,9 @@
 package org.onlab.onos.provider.of.host.impl;
 
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.PortNumber.portNumber;
+import static org.slf4j.LoggerFactory.getLogger;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,10 +33,6 @@
 import org.onlab.packet.VlanId;
 import org.slf4j.Logger;
 
-import static org.onlab.onos.net.DeviceId.deviceId;
-import static org.onlab.onos.net.PortNumber.portNumber;
-import static org.slf4j.LoggerFactory.getLogger;
-
 /**
  * Provider which uses an OpenFlow controller to detect network
  * end-station hosts.
@@ -110,14 +110,16 @@
             // Potentially a new or moved host
             if (eth.getEtherType() == Ethernet.TYPE_ARP) {
                 ARP arp = (ARP) eth.getPayload();
-                IpPrefix ip = IpPrefix.valueOf(arp.getSenderProtocolAddress());
+                IpPrefix ip = IpPrefix.valueOf(arp.getSenderProtocolAddress(),
+                        IpPrefix.MAX_INET_MASK);
                 HostDescription hdescr =
                         new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ip);
                 providerService.hostDetected(hid, hdescr);
 
             } else if (ipLearn && eth.getEtherType() == Ethernet.TYPE_IPV4) {
                 IPv4 pip = (IPv4) eth.getPayload();
-                IpPrefix ip = IpPrefix.valueOf(pip.getSourceAddress());
+                IpPrefix ip = IpPrefix.valueOf(pip.getSourceAddress(),
+                        IpPrefix.MAX_INET_MASK);
                 HostDescription hdescr =
                         new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ip);
                 providerService.hostDetected(hid, hdescr);
diff --git a/tools/test/cells/single_optical b/tools/test/cells/single_optical
new file mode 100644
index 0000000..61b0d24
--- /dev/null
+++ b/tools/test/cells/single_optical
@@ -0,0 +1,7 @@
+# Local VirtualBox-based single ONOS instance & ONOS mininet box
+
+export ONOS_NIC=192.168.56.*
+export OC1="192.168.56.101"
+export OCN="192.168.56.103"
+
+export ONOS_FEATURES=webconsole,onos-api,onos-core-trivial,onos-cli,onos-openflow,onos-app-fwd,onos-app-mobility,onos-app-tvue,onos-app-optical
diff --git a/utils/misc/src/main/java/org/onlab/util/HexString.java b/utils/misc/src/main/java/org/onlab/util/HexString.java
new file mode 100644
index 0000000..db12aa3
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/HexString.java
@@ -0,0 +1,94 @@
+package org.onlab.util;
+
+public final class HexString {
+
+    private HexString() {
+
+    }
+
+    /**
+     * Convert a string of bytes to a ':' separated hex string.
+     *
+     * @param bytes
+     * @return "0f:ca:fe:de:ad:be:ef"
+     */
+    public static String toHexString(final byte[] bytes) {
+        int i;
+        StringBuilder ret = new StringBuilder();
+        String tmp;
+        for (i = 0; i < bytes.length; i++) {
+            if (i > 0) {
+                ret.append(':');
+            }
+            tmp = Integer.toHexString((bytes[i] & 0xff));
+            if (tmp.length() == 1) {
+                ret.append('0');
+            }
+            ret.append(tmp);
+        }
+        return ret.toString();
+    }
+
+    public static String toHexString(final long val, final int padTo) {
+        char[] arr = Long.toHexString(val).toCharArray();
+        String ret = "";
+        // prepend the right number of leading zeros
+        int i = 0;
+        for (; i < (padTo * 2 - arr.length); i++) {
+            ret += "0";
+            if ((i % 2) != 0) {
+                ret += ":";
+            }
+        }
+        for (int j = 0; j < arr.length; j++) {
+            ret += arr[j];
+            if ((((i + j) % 2) != 0) && (j < (arr.length - 1))) {
+                ret += ":";
+            }
+        }
+        return ret;
+    }
+
+    public static String toHexString(final long val) {
+        return toHexString(val, 8);
+    }
+
+    /**
+     * Convert a string of hex values into a string of bytes.
+     *
+     * @param values
+     *            "0f:ca:fe:de:ad:be:ef"
+     * @return [15, 5 ,2, 5, 17]
+     * @throws NumberFormatException
+     *             If the string can not be parsed
+     */
+    public static byte[] fromHexString(final String values) {
+        String[] octets = values.split(":");
+        byte[] ret = new byte[octets.length];
+
+        for (int i = 0; i < octets.length; i++) {
+            if (octets[i].length() > 2) {
+                throw new NumberFormatException("Invalid octet length");
+            }
+            ret[i] = Integer.valueOf(octets[i], 16).byteValue();
+        }
+        return ret;
+    }
+
+    public static long toLong(String value) {
+        String[] octets = value.split(":");
+        if (octets.length > 8) {
+            throw new NumberFormatException("Input string is too big to fit in long: " + value);
+        }
+        long l = 0;
+        for (String octet: octets) {
+            if (octet.length() > 2) {
+                throw new NumberFormatException(
+                        "Each colon-separated byte component must consist of 1 or 2 hex digits: " + value);
+            }
+            short s = Short.parseShort(octet, 16);
+            l = (l << 8) + s;
+        }
+        return l;
+    }
+}
diff --git a/utils/misc/src/test/java/org/onlab/util/HexStringTest.java b/utils/misc/src/test/java/org/onlab/util/HexStringTest.java
new file mode 100644
index 0000000..c20238f
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/HexStringTest.java
@@ -0,0 +1,70 @@
+package org.onlab.util;
+
+import org.junit.Test;
+
+import com.esotericsoftware.minlog.Log;
+
+import junit.framework.TestCase;
+
+/**
+ * Test of the Hexstring.
+ *
+ */
+
+public class HexStringTest extends TestCase {
+
+    @Test
+    public void testMarshalling() throws Exception {
+        String dpidStr = "00:00:00:23:20:2d:16:71";
+        long dpid = HexString.toLong(dpidStr);
+        String testStr = HexString.toHexString(dpid);
+        TestCase.assertEquals(dpidStr, testStr);
+    }
+
+    @Test
+    public void testToLong() {
+        String dpidStr = "3e:1f:01:fc:72:8c:63:31";
+        long valid = 0x3e1f01fc728c6331L;
+        long testLong = HexString.toLong(dpidStr);
+        TestCase.assertEquals(valid, testLong);
+    }
+
+    @Test
+    public void testToLongMSB() {
+        String dpidStr = "ca:7c:5e:d1:64:7a:95:9b";
+        long valid = -3856102927509056101L;
+        long testLong = HexString.toLong(dpidStr);
+        TestCase.assertEquals(valid, testLong);
+    }
+
+    @Test
+    public void testToLongError() {
+        String dpidStr = "09:08:07:06:05:04:03:02:01";
+        try {
+            HexString.toLong(dpidStr);
+            fail("HexString.toLong() should have thrown a NumberFormatException");
+        } catch (NumberFormatException expected) {
+            Log.info("HexString.toLong() have thrown a NumberFormatException");
+        }
+    }
+
+    @Test
+    public void testToStringBytes() {
+        byte[] dpid = {0, 0, 0, 0, 0, 0, 0, -1 };
+        String valid = "00:00:00:00:00:00:00:ff";
+        String testString = HexString.toHexString(dpid);
+        TestCase.assertEquals(valid, testString);
+    }
+
+    @Test
+    public void testFromHexStringError() {
+        String invalidStr = "00:00:00:00:00:00:ffff";
+        try {
+            HexString.fromHexString(invalidStr);
+            fail("HexString.fromHexString() should have thrown a NumberFormatException");
+        } catch (NumberFormatException expected) {
+            Log.info("HexString.toLong() have thrown a NumberFormatException");
+        }
+    }
+}
+