[Falcon] Refactored mcast store implementation.

Change-Id: Ie3fbc675d02c5abe5f5a419d2fc12dbe8fb4ec35

refactored mcast store implementation

Change-Id: I67d70d678813184c522c78e0771f6b8f8f9c25f8
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessMembership.java b/apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessMembership.java
similarity index 96%
rename from apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessMembership.java
rename to apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessMembership.java
index 3d7d603..0065af4 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessMembership.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessMembership.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.igmp.impl;
+package org.onosproject.igmp;
 
 import org.onlab.packet.IGMP;
 import org.onosproject.net.ConnectPoint;
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessQuery.java b/apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessQuery.java
similarity index 96%
rename from apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessQuery.java
rename to apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessQuery.java
index eb25679..a5ddbf3 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPProcessQuery.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IGMPProcessQuery.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.igmp.impl;
+package org.onosproject.igmp;
 
 import org.onlab.packet.IGMP;
 import org.onosproject.net.ConnectPoint;
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceConfig.java
similarity index 60%
copy from apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
copy to apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceConfig.java
index 7d42019..309aca4 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceConfig.java
@@ -14,7 +14,22 @@
  * limitations under the License.
  */
 
+package org.onosproject.igmp;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.Config;
+
 /**
- * IGMP implementation.
+ * Config object for access device data.
  */
-package org.onosproject.igmp.impl;
+public class IgmpDeviceConfig extends Config<DeviceId> {
+
+    /**
+     * Gets the device information.
+     *
+     * @return device information
+     */
+    public IgmpDeviceData getDevice() {
+        return new IgmpDeviceData(subject());
+    }
+}
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceData.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceData.java
new file mode 100644
index 0000000..228494e
--- /dev/null
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpDeviceData.java
@@ -0,0 +1,28 @@
+package org.onosproject.igmp;
+
+import org.onosproject.net.DeviceId;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Information about an igmp enabled device.
+ */
+public class IgmpDeviceData {
+
+    private static final String DEVICE_ID_MISSING = "Device ID cannot be null";
+
+    private final DeviceId deviceId;
+
+    public IgmpDeviceData(DeviceId deviceId) {
+        this.deviceId = checkNotNull(deviceId, DEVICE_ID_MISSING);
+    }
+
+    /**
+     * Retrieves the access device ID.
+     *
+     * @return device ID
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPComponent.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
similarity index 65%
rename from apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPComponent.java
rename to apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index b782740..cf5f8a0 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/IGMPComponent.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -13,13 +13,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.igmp.impl;
+package org.onosproject.igmp;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
@@ -30,8 +31,13 @@
 import org.onlab.packet.IGMP;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.MulticastRouteService;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketPriority;
@@ -39,20 +45,34 @@
 import org.onosproject.net.packet.PacketService;
 import org.slf4j.Logger;
 
+import java.util.Optional;
+
 /**
  * Internet Group Management Protocol.
  */
 @Component(immediate = true)
-public class IGMPComponent {
+public class IgmpSnoop {
     private final Logger log = getLogger(getClass());
 
+    private static final String DEFAULT_MCAST_ADDR = "224.0.0.0/4";
+
+    @Property(name = "multicastAddress",
+            label = "Define the multicast base raneg to listen to")
+    private String multicastAddress = DEFAULT_MCAST_ADDR;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
-    private IGMPPacketProcessor processor = new IGMPPacketProcessor();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigRegistry networkConfig;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MulticastRouteService multicastService;
+
+    private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private static ApplicationId appId;
 
     @Activate
@@ -61,11 +81,16 @@
 
         packetService.addProcessor(processor, PacketProcessor.director(1));
 
-        // Build a traffic selector for all multicast traffic
-        TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
-        selector.matchEthType(Ethernet.TYPE_IPV4);
-        selector.matchIPProtocol(IPv4.PROTOCOL_IGMP);
-        packetService.requestPackets(selector.build(), PacketPriority.REACTIVE, appId);
+        networkConfig.getSubjects(DeviceId.class, IgmpDeviceConfig.class).forEach(
+                subject -> {
+                    IgmpDeviceConfig config = networkConfig.getConfig(subject,
+                                                                      IgmpDeviceConfig.class);
+                    if (config != null) {
+                        IgmpDeviceData data = config.getDevice();
+                        submitPacketRequests(data.deviceId());
+                    }
+                }
+        );
 
         log.info("Started");
     }
@@ -77,10 +102,21 @@
         log.info("Stopped");
     }
 
+    private void submitPacketRequests(DeviceId deviceId) {
+        TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
+        selector.matchEthType(Ethernet.TYPE_IPV4);
+        selector.matchIPProtocol(IPv4.PROTOCOL_IGMP);
+        packetService.requestPackets(selector.build(),
+                                     PacketPriority.REACTIVE,
+                                     appId,
+                                     Optional.of(deviceId));
+
+    }
+
     /**
      * Packet processor responsible for handling IGMP packets.
      */
-    private class IGMPPacketProcessor implements PacketProcessor {
+    private class IgmpPacketProcessor implements PacketProcessor {
 
         @Override
         public void process(PacketContext context) {
@@ -107,15 +143,16 @@
             IPv4 ip = (IPv4) ethPkt.getPayload();
             IpAddress gaddr = IpAddress.valueOf(ip.getDestinationAddress());
             IpAddress saddr = Ip4Address.valueOf(ip.getSourceAddress());
-            log.debug("Packet (" + saddr.toString() + ", " + gaddr.toString() +
-                    "\tingress port: " + context.inPacket().receivedFrom().toString());
+            log.debug("Packet ({}, {}) -> ingress port: {}", saddr, gaddr,
+                      context.inPacket().receivedFrom());
+
 
             if (ip.getProtocol() != IPv4.PROTOCOL_IGMP) {
                 log.debug("IGMP Picked up a non IGMP packet.");
                 return;
             }
 
-            IpPrefix mcast = IpPrefix.valueOf("224.0.0.0/4");
+            IpPrefix mcast = IpPrefix.valueOf(DEFAULT_MCAST_ADDR);
             if (!mcast.contains(gaddr)) {
                 log.debug("IGMP Picked up a non multicast packet.");
                 return;
@@ -125,8 +162,6 @@
                 log.debug("IGMP Picked up a packet with a multicast source address.");
                 return;
             }
-            IpPrefix spfx = IpPrefix.valueOf(saddr, 32);
-            IpPrefix gpfx = IpPrefix.valueOf(gaddr, 32);
 
             IGMP igmp = (IGMP) ip.getPayload();
             switch (igmp.getIgmpType()) {
@@ -136,14 +171,14 @@
                     break;
 
                 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
-                    IGMPProcessQuery.processQuery(igmp, pkt.receivedFrom());
+                    processQuery(igmp, pkt.receivedFrom());
                     break;
 
                 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
                 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
                 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
                     log.debug("IGMP version 1 & 2 message types are not currently supported. Message type: " +
-                            igmp.getIgmpType());
+                                      igmp.getIgmpType());
                     break;
 
                 default:
@@ -152,4 +187,16 @@
             }
         }
     }
+
+    private void processQuery(IGMP pkt, ConnectPoint location) {
+        pkt.getGroups().forEach(group -> group.getSources().forEach(src -> {
+
+            McastRoute route = new McastRoute(src,
+                                              group.getGaddr(),
+                                              McastRoute.Type.IGMP);
+            multicastService.add(route);
+            multicastService.addSink(route, location);
+
+        }));
+    }
 }
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java b/apps/igmp/src/main/java/org/onosproject/igmp/package-info.java
similarity index 94%
rename from apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
rename to apps/igmp/src/main/java/org/onosproject/igmp/package-info.java
index 7d42019..28dcf77 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/package-info.java
@@ -17,4 +17,4 @@
 /**
  * IGMP implementation.
  */
-package org.onosproject.igmp.impl;
+package org.onosproject.igmp;
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
index 979194c..ec45a8b 100644
--- a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
@@ -17,9 +17,6 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.event.AbstractEvent;
-import org.onosproject.net.ConnectPoint;
-
-import java.util.Optional;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
 
@@ -28,10 +25,8 @@
  * sinks or sources.
  */
 @Beta
-public class McastEvent extends AbstractEvent<McastEvent.Type, McastRoute> {
+public class McastEvent extends AbstractEvent<McastEvent.Type, McastRouteInfo> {
 
-    private final Optional<ConnectPoint> sink;
-    private final Optional<ConnectPoint> source;
 
     public enum Type {
         /**
@@ -60,59 +55,15 @@
         SINK_REMOVED
     }
 
-    private McastEvent(McastEvent.Type type, McastRoute subject) {
+    public McastEvent(McastEvent.Type type, McastRouteInfo subject) {
         super(type, subject);
-        sink = Optional.empty();
-        source = Optional.empty();
     }
 
-    private McastEvent(McastEvent.Type type, McastRoute subject, long time) {
-        super(type, subject, time);
-        sink = Optional.empty();
-        source = Optional.empty();
-    }
-
-    public McastEvent(McastEvent.Type type, McastRoute subject,
-                      ConnectPoint sink,
-                      ConnectPoint source) {
-        super(type, subject);
-        this.sink = Optional.ofNullable(sink);
-        this.source = Optional.ofNullable(source);
-    }
-
-    public McastEvent(McastEvent.Type type, McastRoute subject, long time,
-                       ConnectPoint sink,
-                       ConnectPoint source) {
-        super(type, subject, time);
-        this.sink = Optional.ofNullable(sink);
-        this.source = Optional.ofNullable(source);
-    }
-
-    /**
-     * The sink which has been removed or added. The field may not be set
-     * if the sink has not been detected yet or has been removed.
-     *
-     * @return an optional connect point
-     */
-    public Optional<ConnectPoint> sink() {
-        return sink;
-    }
-
-    /**
-     * The source which has been removed or added.
-
-     * @return an optional connect point
-     */
-    public Optional<ConnectPoint> source() {
-        return source;
-    }
 
     @Override
     public String toString() {
         return toStringHelper(this)
                 .add("type", type())
-                .add("route", subject())
-                .add("source", source)
-                .add("sinks", sink).toString();
+                .add("info", subject()).toString();
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastRoute.java b/core/api/src/main/java/org/onosproject/net/mcast/McastRoute.java
index ff1292b..496b93b 100644
--- a/core/api/src/main/java/org/onosproject/net/mcast/McastRoute.java
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastRoute.java
@@ -17,7 +17,7 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Objects;
-import org.onlab.packet.IpPrefix;
+import org.onlab.packet.IpAddress;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -46,11 +46,11 @@
         STATIC
     }
 
-    private final IpPrefix source;
-    private final IpPrefix group;
+    private final IpAddress source;
+    private final IpAddress group;
     private final Type type;
 
-    public McastRoute(IpPrefix source, IpPrefix group, Type type) {
+    public McastRoute(IpAddress source, IpAddress group, Type type) {
         checkNotNull(source, "Multicast route must have a source");
         checkNotNull(group, "Multicast route must specify a group address");
         checkNotNull(type, "Must indicate what type of route");
@@ -64,7 +64,7 @@
      *
      * @return an ip address
      */
-    public IpPrefix source() {
+    public IpAddress source() {
         return source;
     }
 
@@ -73,7 +73,7 @@
      *
      * @return an ip address
      */
-    public IpPrefix group() {
+    public IpAddress group() {
         return group;
     }
 
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastRouteInfo.java b/core/api/src/main/java/org/onosproject/net/mcast/McastRouteInfo.java
new file mode 100644
index 0000000..92087c1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastRouteInfo.java
@@ -0,0 +1,90 @@
+package org.onosproject.net.mcast;
+
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.ConnectPoint;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Multicast information as stored in the store.
+ */
+public final class McastRouteInfo {
+
+    private static final String ROUTE_NOT_NULL = "Route cannot be null";
+
+    private final McastRoute route;
+    private final Optional<ConnectPoint> sink;
+    private final Optional<ConnectPoint> source;
+    private final Set<ConnectPoint> sinks;
+
+    private McastRouteInfo(McastRoute route, ConnectPoint sink,
+                           ConnectPoint source, Set<ConnectPoint> sinks) {
+        this.route = checkNotNull(route, ROUTE_NOT_NULL);
+        this.sink = Optional.ofNullable(sink);
+        this.source = Optional.ofNullable(source);
+        this.sinks = sinks;
+    }
+
+    public static McastRouteInfo mcastRouteInfo(McastRoute route) {
+        return new McastRouteInfo(route, null, null, Collections.EMPTY_SET);
+    }
+
+    public static McastRouteInfo mcastRouteInfo(McastRoute route,
+                                                ConnectPoint sink,
+                                                ConnectPoint source) {
+        return new McastRouteInfo(route, sink, source, Collections.EMPTY_SET);
+    }
+
+    public static McastRouteInfo mcastRouteInfo(McastRoute route,
+                                                Set<ConnectPoint> sinks,
+                                                ConnectPoint source) {
+        return new McastRouteInfo(route, null, source, ImmutableSet.copyOf(sinks));
+    }
+
+    public boolean isComplete() {
+        return ((sink.isPresent() || sinks.size() > 0) && source.isPresent());
+    }
+
+    /**
+     * The route associated with this multicast information.
+     *
+     * @return a mulicast route
+     */
+    public McastRoute route() {
+        return route;
+    }
+
+    /**
+     * The source which has been removed or added.
+
+     * @return an optional connect point
+     */
+    public Optional<ConnectPoint> source() {
+        return source;
+    }
+
+    /**
+     * The sink which has been removed or added. The field may not be set
+     * if the sink has not been detected yet or has been removed.
+     *
+     * @return an optional connect point
+     */
+    public Optional<ConnectPoint> sink() {
+        return sink;
+    }
+
+    /**
+     * Returns the set of sinks associated with this route. Only valid with
+     * SOURCE_ADDED events.
+     *
+     * @return a set of connect points
+     */
+    public Set<ConnectPoint> sinks() {
+        return sinks;
+    }
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastStore.java b/core/api/src/main/java/org/onosproject/net/mcast/McastStore.java
new file mode 100644
index 0000000..96b21f6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastStore.java
@@ -0,0 +1,68 @@
+package org.onosproject.net.mcast;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.store.Store;
+
+import java.util.Set;
+
+/**
+ * Entity responsible for storing multicast state information.
+ */
+public interface McastStore extends Store<McastEvent, McastStoreDelegate> {
+
+    enum Type {
+        /**
+         * Adding a route to the mcast rib.
+         */
+        ADD,
+
+        /**
+         * Removing a route from the mcast rib.
+         */
+        REMOVE
+    }
+
+    /**
+     * Updates the store with the route information.
+     *
+     * @param route a multicast route
+     * @param operation an operation
+     */
+    void storeRoute(McastRoute route, Type operation);
+
+    /**
+     * Updates the store with source information for the given route. Only one
+     * source is permitted. Submitting another source will replace the previous
+     * value.
+     *
+     * @param route a multicast route
+     * @param source a source
+     */
+    void storeSource(McastRoute route, ConnectPoint source);
+
+    /**
+     * Updates the store with sink information for a given route. There may be
+     * multiple sinks.
+     *
+     * @param route a multicast route
+     * @param sink a sink
+     * @param operation an operation
+     */
+    void storeSink(McastRoute route, ConnectPoint sink, Type operation);
+
+    /**
+     * Obtain the source for a multicast route.
+     *
+     * @param route a multicast route
+     * @return a connect point
+     */
+    ConnectPoint sourceFor(McastRoute route);
+
+    /**
+     * Obtain the sinks for a multicast route.
+     *
+     * @param route a multicast route
+     * @return a set of sinks
+     */
+    Set<ConnectPoint> sinksFor(McastRoute route);
+}
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/mcast/McastStoreDelegate.java
new file mode 100644
index 0000000..bdedce8
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastStoreDelegate.java
@@ -0,0 +1,10 @@
+package org.onosproject.net.mcast;
+
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Mcast store delegate abstraction.
+ */
+public interface McastStoreDelegate extends StoreDelegate<McastEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/MulticastRouteService.java b/core/api/src/main/java/org/onosproject/net/mcast/MulticastRouteService.java
index bf65033..05bd6cb 100644
--- a/core/api/src/main/java/org/onosproject/net/mcast/MulticastRouteService.java
+++ b/core/api/src/main/java/org/onosproject/net/mcast/MulticastRouteService.java
@@ -19,7 +19,7 @@
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.ConnectPoint;
 
-import java.util.List;
+import java.util.Set;
 
 /**
  * A service interface for maintaining multicast information.
@@ -82,5 +82,5 @@
      * @param route a multicast route
      * @return a list of connect points
      */
-    List<ConnectPoint> fetchSinks(McastRoute route);
+    Set<ConnectPoint> fetchSinks(McastRoute route);
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java
deleted file mode 100644
index 946d8c6..0000000
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.incubator.net.mcast.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.onosproject.net.ConnectPoint;
-
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Simple entity maintaining a mapping between a source and a collection of sink
- * connect points.
- */
-public final class MulticastData {
-
-    private final ConnectPoint source;
-    private final List<ConnectPoint> sinks;
-    private final boolean isEmpty;
-
-    private MulticastData() {
-        this.source = null;
-        this.sinks = Collections.EMPTY_LIST;
-        isEmpty = true;
-    }
-
-    public MulticastData(ConnectPoint source, List<ConnectPoint> sinks) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = checkNotNull(sinks, "List of sinks cannot be null.");
-        isEmpty = false;
-    }
-
-    public MulticastData(ConnectPoint source, ConnectPoint sink) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = Lists.newArrayList(checkNotNull(sink, "Sink cannot be null."));
-        isEmpty = false;
-    }
-
-    public MulticastData(ConnectPoint source) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = Lists.newArrayList();
-        isEmpty = false;
-    }
-
-    public ConnectPoint source() {
-        return source;
-    }
-
-    public List<ConnectPoint> sinks() {
-        return ImmutableList.copyOf(sinks);
-    }
-
-    public void appendSink(ConnectPoint sink) {
-        sinks.add(sink);
-    }
-
-    public boolean removeSink(ConnectPoint sink) {
-        return sinks.remove(sink);
-    }
-
-    public boolean isEmpty() {
-        return isEmpty;
-    }
-
-    public static MulticastData empty() {
-        return new MulticastData();
-    }
-
-}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
index f73dfe4..fe23505 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
@@ -21,25 +21,19 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastListener;
 import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.McastStore;
+import org.onosproject.net.mcast.McastStoreDelegate;
 import org.onosproject.net.mcast.MulticastRouteService;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -52,38 +46,18 @@
         implements MulticastRouteService {
     //TODO: add MulticastRouteAdminService
 
-    private static final String MCASTRIB = "mcast-rib-table";
-
     private Logger log = getLogger(getClass());
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private StorageService storageService;
+    private final McastStoreDelegate delegate = new InternalMcastStoreDelegate();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private CoreService coreService;
-
-
-    protected ApplicationId appId;
-    protected ConsistentMap<McastRoute, MulticastData> mcastRoutes;
+    protected McastStore store;
 
     @Activate
     public void activate() {
 
         eventDispatcher.addSink(McastEvent.class, listenerRegistry);
-
-        appId = coreService.registerApplication("org.onosproject.mcastrib");
-
-        mcastRoutes = storageService.<McastRoute, MulticastData>consistentMapBuilder()
-                .withApplicationId(appId)
-                .withName(MCASTRIB)
-                .withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
-                        MulticastData.class,
-                        McastRoute.class,
-                        McastRoute.Type.class,
-                        IpPrefix.class,
-                        List.class,
-                        ConnectPoint.class
-                ).build())).build();
+        store.setDelegate(delegate);
 
         log.info("Started");
     }
@@ -95,80 +69,55 @@
 
     @Override
     public void add(McastRoute route) {
-        mcastRoutes.put(route, MulticastData.empty());
-        post(new McastEvent(McastEvent.Type.ROUTE_ADDED, route, null, null));
+        checkNotNull(route, "Route cannot be null");
+        store.storeRoute(route, McastStore.Type.ADD);
     }
 
     @Override
     public void remove(McastRoute route) {
-        mcastRoutes.remove(route);
-        post(new McastEvent(McastEvent.Type.ROUTE_REMOVED, route, null, null));
+        checkNotNull(route, "Route cannot be null");
+        store.storeRoute(route, McastStore.Type.REMOVE);
     }
 
     @Override
     public void addSource(McastRoute route, ConnectPoint connectPoint) {
-        Versioned<MulticastData> d = mcastRoutes.compute(route, (k, v) -> {
-            if (v.isEmpty()) {
-                return new MulticastData(connectPoint);
-            } else {
-                log.warn("Route {} is already in use.", route);
-                return v;
-            }
-        });
-
-        if (d != null) {
-            post(new McastEvent(McastEvent.Type.SOURCE_ADDED,
-                                route, null, connectPoint));
-        }
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Source cannot be null");
+        store.storeSource(route, connectPoint);
     }
 
     @Override
     public void addSink(McastRoute route, ConnectPoint connectPoint) {
-        AtomicReference<ConnectPoint> source = new AtomicReference<>();
-        mcastRoutes.compute(route, (k, v) -> {
-            if (!v.isEmpty()) {
-                v.appendSink(connectPoint);
-                source.set(v.source());
-            } else {
-                log.warn("Route {} does not exist");
-            }
-            return v;
-        });
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Sink cannot be null");
+        store.storeSink(route, connectPoint, McastStore.Type.ADD);
 
-        if (source.get() != null) {
-            post(new McastEvent(McastEvent.Type.SINK_ADDED, route,
-                                connectPoint, source.get()));
-        }
     }
 
 
     @Override
     public void removeSink(McastRoute route, ConnectPoint connectPoint) {
-        AtomicReference<ConnectPoint> source = new AtomicReference<>();
-        mcastRoutes.compute(route, (k, v) -> {
-            if (v.removeSink(connectPoint)) {
-                source.set(v.source());
-            }
-            return v;
-        });
 
-        if (source.get() != null) {
-            post(new McastEvent(McastEvent.Type.SINK_REMOVED, route,
-                                connectPoint, source.get()));
-        }
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Sink cannot be null");
+
+        store.storeSink(route, connectPoint, McastStore.Type.REMOVE);
     }
 
     @Override
     public ConnectPoint fetchSource(McastRoute route) {
-        MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
-                                                               MulticastData.empty());
-        return d.source();
+        return store.sourceFor(route);
     }
 
     @Override
-    public List<ConnectPoint> fetchSinks(McastRoute route) {
-        MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
-                                                               MulticastData.empty());
-        return d.sinks();
+    public Set<ConnectPoint> fetchSinks(McastRoute route) {
+        return store.sinksFor(route);
+    }
+
+    private class InternalMcastStoreDelegate implements McastStoreDelegate {
+        @Override
+        public void notify(McastEvent event) {
+            post(event);
+        }
     }
 }
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
index bec9cde..d0f4cf2 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
@@ -16,15 +16,17 @@
 package org.onosproject.incubator.net.mcast.impl;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
-import org.onlab.packet.IpPrefix;
+import org.onlab.packet.IpAddress;
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.incubator.store.mcast.impl.DistributedMcastStore;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.mcast.McastEvent;
@@ -44,16 +46,16 @@
  */
 public class MulticastRouteManagerTest {
 
-    McastRoute r1 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
-                                   IpPrefix.valueOf("1.1.1.2/8"),
+    McastRoute r1 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
+                                   IpAddress.valueOf("1.1.1.2"),
                                    McastRoute.Type.IGMP);
 
-    McastRoute r11 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
-                                    IpPrefix.valueOf("1.1.1.2/8"),
+    McastRoute r11 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
+                                    IpAddress.valueOf("1.1.1.2"),
                                     McastRoute.Type.STATIC);
 
-    McastRoute r2 = new McastRoute(IpPrefix.valueOf("2.2.2.1/8"),
-                                   IpPrefix.valueOf("2.2.2.2/8"),
+    McastRoute r2 = new McastRoute(IpAddress.valueOf("2.2.2.1"),
+                                   IpAddress.valueOf("2.2.2.2"),
                                    McastRoute.Type.PIM);
 
     ConnectPoint cp1 = new ConnectPoint(did("1"), PortNumber.portNumber(1));
@@ -66,13 +68,17 @@
 
     private List<McastEvent> events;
 
+    private DistributedMcastStore mcastStore;
+
     @Before
     public void setUp() throws Exception {
         manager = new MulticastRouteManager();
+        mcastStore = new DistributedMcastStore();
+        TestUtils.setField(mcastStore, "storageService", new TestStorageService());
         injectEventDispatcher(manager, new TestEventDispatcher());
-        TestUtils.setField(manager, "storageService", new TestStorageService());
-        TestUtils.setField(manager, "coreService", new TestCoreService());
         events = Lists.newArrayList();
+        manager.store = mcastStore;
+        mcastStore.activate();
         manager.activate();
         manager.addListener(listener);
     }
@@ -81,13 +87,13 @@
     public void tearDown() {
         manager.removeListener(listener);
         manager.deactivate();
+        mcastStore.deactivate();
     }
 
     @Test
     public void testAdd() {
         manager.add(r1);
 
-        assertEquals("Add failed", manager.mcastRoutes.size(), 1);
         validateEvents(McastEvent.Type.ROUTE_ADDED);
     }
 
@@ -97,48 +103,39 @@
 
         manager.remove(r1);
 
-        assertEquals("Remove failed", manager.mcastRoutes.size(), 0);
+
         validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.ROUTE_REMOVED);
     }
 
     @Test
     public void testAddSource() {
-        manager.add(r1);
-
         manager.addSource(r1, cp1);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.SOURCE_ADDED);
+        validateEvents(McastEvent.Type.SOURCE_ADDED);
         assertEquals("Route is not equal", cp1, manager.fetchSource(r1));
     }
 
     @Test
     public void testAddSink() {
-        manager.add(r1);
-
-        manager.addSource(r1, cp1);
         manager.addSink(r1, cp1);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED,
-                       McastEvent.Type.SOURCE_ADDED,
-                       McastEvent.Type.SINK_ADDED);
-        assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
+        validateEvents(McastEvent.Type.SINK_ADDED);
+        assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
     }
 
     @Test
     public void testRemoveSink() {
-        manager.add(r1);
 
         manager.addSource(r1, cp1);
         manager.addSink(r1, cp1);
         manager.addSink(r1, cp2);
         manager.removeSink(r1, cp2);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED,
-                       McastEvent.Type.SOURCE_ADDED,
+        validateEvents(McastEvent.Type.SOURCE_ADDED,
                        McastEvent.Type.SINK_ADDED,
                        McastEvent.Type.SINK_ADDED,
                        McastEvent.Type.SINK_REMOVED);
-        assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
+        assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
     }
 
     private void validateEvents(McastEvent.Type... evs) {
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
new file mode 100644
index 0000000..2ad34d8
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
@@ -0,0 +1,177 @@
+package org.onosproject.incubator.store.mcast.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.McastRouteInfo;
+import org.onosproject.net.mcast.McastStore;
+import org.onosproject.net.mcast.McastStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed mcast store implementation. Routes are stored consistently
+ * across the cluster.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
+        implements McastStore {
+    //FIXME the number of events that will potentially be generated here is
+        // not sustainable, consider changing this to an eventually consistent
+        // map and not emitting events but rather use a provider-like mechanism
+        // to program the dataplane.
+
+    private static final String MCASTRIB = "mcast-rib-table";
+    private Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private StorageService storageService;
+
+    protected ConsistentMap<McastRoute, MulticastData> mcastRIB;
+    protected Map<McastRoute, MulticastData> mcastRoutes;
+
+
+    @Activate
+    public void activate() {
+
+        mcastRIB = storageService.<McastRoute, MulticastData>consistentMapBuilder()
+                .withName(MCASTRIB)
+                .withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
+                        MulticastData.class,
+                        McastRoute.class,
+                        McastRoute.Type.class,
+                        IpPrefix.class,
+                        List.class,
+                        ConnectPoint.class
+                ).build()))
+                .withRelaxedReadConsistency()
+                .build();
+
+        mcastRoutes = mcastRIB.asJavaMap();
+
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public void storeRoute(McastRoute route, Type operation) {
+        switch (operation) {
+            case ADD:
+                if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
+                    delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
+                                                   McastRouteInfo.mcastRouteInfo(route)));
+                }
+                break;
+            case REMOVE:
+                if (mcastRoutes.remove(route) != null) {
+                    delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
+                                                   McastRouteInfo.mcastRouteInfo(route)));
+                }
+                break;
+            default:
+                log.warn("Unknown mcast operation type: {}", operation);
+        }
+    }
+
+    @Override
+    public void storeSource(McastRoute route, ConnectPoint source) {
+        MulticastData data = mcastRoutes.compute(route, (k, v) -> {
+            if (v == null) {
+                return new MulticastData(source);
+            } else {
+                v.setSource(source);
+            }
+            return v;
+        });
+
+
+        if (data != null) {
+            delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+                                           McastRouteInfo.mcastRouteInfo(route,
+                                                                         data.sinks(),
+                                                                         source)));
+        }
+
+    }
+
+    @Override
+    public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
+        MulticastData data = mcastRoutes.compute(route, (k, v) -> {
+            switch (operation) {
+                case ADD:
+                    if (v == null) {
+                        v = MulticastData.empty();
+                    }
+                    v.appendSink(sink);
+                    break;
+                case REMOVE:
+                    if (v != null) {
+                        v.removeSink(sink);
+                    }
+                    break;
+                default:
+                    log.warn("Unknown mcast operation type: {}", operation);
+            }
+            return v;
+        });
+
+
+        if (data != null) {
+            switch (operation) {
+                case ADD:
+                    delegate.notify(new McastEvent(
+                            McastEvent.Type.SINK_ADDED,
+                            McastRouteInfo.mcastRouteInfo(route,
+                                                          sink,
+                                                          data.source())));
+                    break;
+                case REMOVE:
+                    if (data != null) {
+                        delegate.notify(new McastEvent(
+                                McastEvent.Type.SINK_REMOVED,
+                                McastRouteInfo.mcastRouteInfo(route,
+                                                              sink,
+                                                              data.source())));
+                    }
+                    break;
+                default:
+                    log.warn("Unknown mcast operation type: {}", operation);
+            }
+        }
+
+    }
+
+    @Override
+    public ConnectPoint sourceFor(McastRoute route) {
+        return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
+    }
+
+    @Override
+    public Set<ConnectPoint> sinksFor(McastRoute route) {
+        return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
+    }
+
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
new file mode 100644
index 0000000..412d6ef
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.mcast.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.onosproject.net.ConnectPoint;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Simple entity maintaining a mapping between a source and a collection of sink
+ * connect points.
+ */
+public final class MulticastData {
+
+    private final AtomicReference<ConnectPoint> source =
+            new AtomicReference<>();
+    private final Set<ConnectPoint> sinks;
+    private final AtomicBoolean isEmpty = new AtomicBoolean();
+
+    private MulticastData() {
+        this.sinks = Sets.newConcurrentHashSet();
+        isEmpty.set(true);
+    }
+
+    public MulticastData(ConnectPoint source) {
+        this.source.set(checkNotNull(source, "Multicast source cannot be null."));
+        this.sinks = Sets.newConcurrentHashSet();
+        isEmpty.set(false);
+    }
+
+    public ConnectPoint source() {
+        return source.get();
+    }
+
+    public Set<ConnectPoint> sinks() {
+        return ImmutableSet.copyOf(sinks);
+    }
+
+    public void setSource(ConnectPoint source) {
+        isEmpty.set(false);
+        this.source.set(source);
+    }
+
+    public void appendSink(ConnectPoint sink) {
+        sinks.add(sink);
+    }
+
+    public boolean removeSink(ConnectPoint sink) {
+        return sinks.remove(sink);
+    }
+
+    public boolean isEmpty() {
+        return isEmpty.get();
+    }
+
+    public static MulticastData empty() {
+        return new MulticastData();
+    }
+
+}
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/package-info.java
similarity index 78%
copy from apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
copy to incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/package-info.java
index 7d42019..d18b9e1 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/impl/package-info.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/package-info.java
@@ -15,6 +15,7 @@
  */
 
 /**
- * IGMP implementation.
+ * A distributed multicast store implementation that stores multicast rib
+ * data consistently across the cluster.
  */
-package org.onosproject.igmp.impl;
+package org.onosproject.incubator.store.mcast.impl;