[Falcon] Refactored mcast store implementation.

Change-Id: Ie3fbc675d02c5abe5f5a419d2fc12dbe8fb4ec35

refactored mcast store implementation

Change-Id: I67d70d678813184c522c78e0771f6b8f8f9c25f8
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java
deleted file mode 100644
index 946d8c6..0000000
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastData.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.incubator.net.mcast.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.onosproject.net.ConnectPoint;
-
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Simple entity maintaining a mapping between a source and a collection of sink
- * connect points.
- */
-public final class MulticastData {
-
-    private final ConnectPoint source;
-    private final List<ConnectPoint> sinks;
-    private final boolean isEmpty;
-
-    private MulticastData() {
-        this.source = null;
-        this.sinks = Collections.EMPTY_LIST;
-        isEmpty = true;
-    }
-
-    public MulticastData(ConnectPoint source, List<ConnectPoint> sinks) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = checkNotNull(sinks, "List of sinks cannot be null.");
-        isEmpty = false;
-    }
-
-    public MulticastData(ConnectPoint source, ConnectPoint sink) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = Lists.newArrayList(checkNotNull(sink, "Sink cannot be null."));
-        isEmpty = false;
-    }
-
-    public MulticastData(ConnectPoint source) {
-        this.source = checkNotNull(source, "Multicast source cannot be null.");
-        this.sinks = Lists.newArrayList();
-        isEmpty = false;
-    }
-
-    public ConnectPoint source() {
-        return source;
-    }
-
-    public List<ConnectPoint> sinks() {
-        return ImmutableList.copyOf(sinks);
-    }
-
-    public void appendSink(ConnectPoint sink) {
-        sinks.add(sink);
-    }
-
-    public boolean removeSink(ConnectPoint sink) {
-        return sinks.remove(sink);
-    }
-
-    public boolean isEmpty() {
-        return isEmpty;
-    }
-
-    public static MulticastData empty() {
-        return new MulticastData();
-    }
-
-}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
index f73dfe4..fe23505 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManager.java
@@ -21,25 +21,19 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastListener;
 import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.McastStore;
+import org.onosproject.net.mcast.McastStoreDelegate;
 import org.onosproject.net.mcast.MulticastRouteService;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -52,38 +46,18 @@
         implements MulticastRouteService {
     //TODO: add MulticastRouteAdminService
 
-    private static final String MCASTRIB = "mcast-rib-table";
-
     private Logger log = getLogger(getClass());
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private StorageService storageService;
+    private final McastStoreDelegate delegate = new InternalMcastStoreDelegate();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private CoreService coreService;
-
-
-    protected ApplicationId appId;
-    protected ConsistentMap<McastRoute, MulticastData> mcastRoutes;
+    protected McastStore store;
 
     @Activate
     public void activate() {
 
         eventDispatcher.addSink(McastEvent.class, listenerRegistry);
-
-        appId = coreService.registerApplication("org.onosproject.mcastrib");
-
-        mcastRoutes = storageService.<McastRoute, MulticastData>consistentMapBuilder()
-                .withApplicationId(appId)
-                .withName(MCASTRIB)
-                .withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
-                        MulticastData.class,
-                        McastRoute.class,
-                        McastRoute.Type.class,
-                        IpPrefix.class,
-                        List.class,
-                        ConnectPoint.class
-                ).build())).build();
+        store.setDelegate(delegate);
 
         log.info("Started");
     }
@@ -95,80 +69,55 @@
 
     @Override
     public void add(McastRoute route) {
-        mcastRoutes.put(route, MulticastData.empty());
-        post(new McastEvent(McastEvent.Type.ROUTE_ADDED, route, null, null));
+        checkNotNull(route, "Route cannot be null");
+        store.storeRoute(route, McastStore.Type.ADD);
     }
 
     @Override
     public void remove(McastRoute route) {
-        mcastRoutes.remove(route);
-        post(new McastEvent(McastEvent.Type.ROUTE_REMOVED, route, null, null));
+        checkNotNull(route, "Route cannot be null");
+        store.storeRoute(route, McastStore.Type.REMOVE);
     }
 
     @Override
     public void addSource(McastRoute route, ConnectPoint connectPoint) {
-        Versioned<MulticastData> d = mcastRoutes.compute(route, (k, v) -> {
-            if (v.isEmpty()) {
-                return new MulticastData(connectPoint);
-            } else {
-                log.warn("Route {} is already in use.", route);
-                return v;
-            }
-        });
-
-        if (d != null) {
-            post(new McastEvent(McastEvent.Type.SOURCE_ADDED,
-                                route, null, connectPoint));
-        }
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Source cannot be null");
+        store.storeSource(route, connectPoint);
     }
 
     @Override
     public void addSink(McastRoute route, ConnectPoint connectPoint) {
-        AtomicReference<ConnectPoint> source = new AtomicReference<>();
-        mcastRoutes.compute(route, (k, v) -> {
-            if (!v.isEmpty()) {
-                v.appendSink(connectPoint);
-                source.set(v.source());
-            } else {
-                log.warn("Route {} does not exist");
-            }
-            return v;
-        });
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Sink cannot be null");
+        store.storeSink(route, connectPoint, McastStore.Type.ADD);
 
-        if (source.get() != null) {
-            post(new McastEvent(McastEvent.Type.SINK_ADDED, route,
-                                connectPoint, source.get()));
-        }
     }
 
 
     @Override
     public void removeSink(McastRoute route, ConnectPoint connectPoint) {
-        AtomicReference<ConnectPoint> source = new AtomicReference<>();
-        mcastRoutes.compute(route, (k, v) -> {
-            if (v.removeSink(connectPoint)) {
-                source.set(v.source());
-            }
-            return v;
-        });
 
-        if (source.get() != null) {
-            post(new McastEvent(McastEvent.Type.SINK_REMOVED, route,
-                                connectPoint, source.get()));
-        }
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(connectPoint, "Sink cannot be null");
+
+        store.storeSink(route, connectPoint, McastStore.Type.REMOVE);
     }
 
     @Override
     public ConnectPoint fetchSource(McastRoute route) {
-        MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
-                                                               MulticastData.empty());
-        return d.source();
+        return store.sourceFor(route);
     }
 
     @Override
-    public List<ConnectPoint> fetchSinks(McastRoute route) {
-        MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
-                                                               MulticastData.empty());
-        return d.sinks();
+    public Set<ConnectPoint> fetchSinks(McastRoute route) {
+        return store.sinksFor(route);
+    }
+
+    private class InternalMcastStoreDelegate implements McastStoreDelegate {
+        @Override
+        public void notify(McastEvent event) {
+            post(event);
+        }
     }
 }
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
index bec9cde..d0f4cf2 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/mcast/impl/MulticastRouteManagerTest.java
@@ -16,15 +16,17 @@
 package org.onosproject.incubator.net.mcast.impl;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
-import org.onlab.packet.IpPrefix;
+import org.onlab.packet.IpAddress;
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.incubator.store.mcast.impl.DistributedMcastStore;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.mcast.McastEvent;
@@ -44,16 +46,16 @@
  */
 public class MulticastRouteManagerTest {
 
-    McastRoute r1 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
-                                   IpPrefix.valueOf("1.1.1.2/8"),
+    McastRoute r1 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
+                                   IpAddress.valueOf("1.1.1.2"),
                                    McastRoute.Type.IGMP);
 
-    McastRoute r11 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
-                                    IpPrefix.valueOf("1.1.1.2/8"),
+    McastRoute r11 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
+                                    IpAddress.valueOf("1.1.1.2"),
                                     McastRoute.Type.STATIC);
 
-    McastRoute r2 = new McastRoute(IpPrefix.valueOf("2.2.2.1/8"),
-                                   IpPrefix.valueOf("2.2.2.2/8"),
+    McastRoute r2 = new McastRoute(IpAddress.valueOf("2.2.2.1"),
+                                   IpAddress.valueOf("2.2.2.2"),
                                    McastRoute.Type.PIM);
 
     ConnectPoint cp1 = new ConnectPoint(did("1"), PortNumber.portNumber(1));
@@ -66,13 +68,17 @@
 
     private List<McastEvent> events;
 
+    private DistributedMcastStore mcastStore;
+
     @Before
     public void setUp() throws Exception {
         manager = new MulticastRouteManager();
+        mcastStore = new DistributedMcastStore();
+        TestUtils.setField(mcastStore, "storageService", new TestStorageService());
         injectEventDispatcher(manager, new TestEventDispatcher());
-        TestUtils.setField(manager, "storageService", new TestStorageService());
-        TestUtils.setField(manager, "coreService", new TestCoreService());
         events = Lists.newArrayList();
+        manager.store = mcastStore;
+        mcastStore.activate();
         manager.activate();
         manager.addListener(listener);
     }
@@ -81,13 +87,13 @@
     public void tearDown() {
         manager.removeListener(listener);
         manager.deactivate();
+        mcastStore.deactivate();
     }
 
     @Test
     public void testAdd() {
         manager.add(r1);
 
-        assertEquals("Add failed", manager.mcastRoutes.size(), 1);
         validateEvents(McastEvent.Type.ROUTE_ADDED);
     }
 
@@ -97,48 +103,39 @@
 
         manager.remove(r1);
 
-        assertEquals("Remove failed", manager.mcastRoutes.size(), 0);
+
         validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.ROUTE_REMOVED);
     }
 
     @Test
     public void testAddSource() {
-        manager.add(r1);
-
         manager.addSource(r1, cp1);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.SOURCE_ADDED);
+        validateEvents(McastEvent.Type.SOURCE_ADDED);
         assertEquals("Route is not equal", cp1, manager.fetchSource(r1));
     }
 
     @Test
     public void testAddSink() {
-        manager.add(r1);
-
-        manager.addSource(r1, cp1);
         manager.addSink(r1, cp1);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED,
-                       McastEvent.Type.SOURCE_ADDED,
-                       McastEvent.Type.SINK_ADDED);
-        assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
+        validateEvents(McastEvent.Type.SINK_ADDED);
+        assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
     }
 
     @Test
     public void testRemoveSink() {
-        manager.add(r1);
 
         manager.addSource(r1, cp1);
         manager.addSink(r1, cp1);
         manager.addSink(r1, cp2);
         manager.removeSink(r1, cp2);
 
-        validateEvents(McastEvent.Type.ROUTE_ADDED,
-                       McastEvent.Type.SOURCE_ADDED,
+        validateEvents(McastEvent.Type.SOURCE_ADDED,
                        McastEvent.Type.SINK_ADDED,
                        McastEvent.Type.SINK_ADDED,
                        McastEvent.Type.SINK_REMOVED);
-        assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
+        assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
     }
 
     private void validateEvents(McastEvent.Type... evs) {