[CORD-1623] Fix DistributedMcastStore
Change-Id: I5db88a65aeed7ee46e69701d189c1c27c9eb4df0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
index 8a3db91..6b1eff5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
@@ -15,6 +15,9 @@
*/
package org.onosproject.store.mcast.impl;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -22,6 +25,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.mcast.McastEvent;
import org.onosproject.net.mcast.McastRoute;
@@ -30,14 +34,20 @@
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
import static org.slf4j.LoggerFactory.getLogger;
@@ -47,12 +57,9 @@
*/
@Component(immediate = true)
@Service
-public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
- implements McastStore {
- //FIXME the number of events that will potentially be generated here is
- // not sustainable, consider changing this to an eventually consistent
- // map and not emitting events but rather use a provider-like mechanism
- // to program the dataplane.
+public class DistributedMcastStore
+ extends AbstractStore<McastEvent, McastStoreDelegate>
+ implements McastStore {
private static final String MCASTRIB = "onos-mcast-rib-table";
private Logger log = getLogger(getClass());
@@ -60,13 +67,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- protected ConsistentMap<McastRoute, MulticastData> mcastRib;
- protected Map<McastRoute, MulticastData> mcastRoutes;
+ private Map<McastRoute, MulticastData> mcastRoutes;
+ private ConsistentMap<McastRoute, MulticastData> mcastRib;
+ private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
+ new McastRouteListener();
+
+ private ScheduledExecutorService executor;
@Activate
public void activate() {
-
mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
.withName(MCASTRIB)
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
@@ -77,34 +87,93 @@
McastRoute.class,
McastRoute.Type.class
).build()))
- //.withRelaxedReadConsistency()
- .build();
+ .build();
mcastRoutes = mcastRib.asJavaMap();
-
+ mcastRib.addListener(mcastRouteListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ mcastRib.removeListener(mcastRouteListener);
log.info("Stopped");
}
+
+ private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
+ @Override
+ public void event(MapEvent<McastRoute, MulticastData> event) {
+ final McastRoute route = event.key();
+ final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
+ final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
+
+ switch (event.type()) {
+ case INSERT:
+ checkNotNull(newData);
+
+ if (newData.source() != null) {
+ notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+ mcastRouteInfo(route,
+ newData.sinks(),
+ newData.source())));
+ } else if (!newData.sinks().isEmpty()) {
+ newData.sinks().forEach(sink ->
+ notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
+ mcastRouteInfo(route,
+ sink,
+ newData.source())))
+ );
+ } else {
+ notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
+ mcastRouteInfo(route)));
+ }
+ break;
+ case UPDATE:
+ checkNotNull(newData);
+ checkNotNull(oldData);
+
+ if (!Objects.equal(oldData.source(), newData.source())) {
+ notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+ mcastRouteInfo(route,
+ newData.sinks(),
+ newData.source())));
+ } else {
+ Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
+ notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
+ mcastRouteInfo(route,
+ sink,
+ newData.source())))
+ );
+
+ Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
+ notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
+ mcastRouteInfo(route,
+ sink,
+ newData.source())))
+ );
+ }
+ break;
+ case REMOVE:
+ notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
+ mcastRouteInfo(route)));
+ break;
+ default:
+ log.warn("Unknown mcast operation type: {}", event.type());
+ }
+ }
+ }
+
@Override
public void storeRoute(McastRoute route, Type operation) {
switch (operation) {
case ADD:
- if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
- notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
- mcastRouteInfo(route)));
- }
+ mcastRoutes.putIfAbsent(route, MulticastData.empty());
break;
case REMOVE:
- if (mcastRoutes.remove(route) != null) {
- notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
- mcastRouteInfo(route)));
- }
+ // before remove the route should check that source and sinks are removed?
+ mcastRoutes.remove(route);
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
@@ -121,15 +190,6 @@
}
return v;
});
-
-
- if (data != null) {
- notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
- mcastRouteInfo(route,
- data.sinks(),
- source)));
- }
-
}
@Override
@@ -152,28 +212,6 @@
}
return v;
});
-
-
- if (data != null) {
- switch (operation) {
- case ADD:
- notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
- mcastRouteInfo(route, sink,
- data.source())));
- break;
- case REMOVE:
- if (data != null) {
- notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
- mcastRouteInfo(route,
- sink,
- data.source())));
- }
- break;
- default:
- log.warn("Unknown mcast operation type: {}", operation);
- }
- }
-
}
@Override
@@ -190,5 +228,4 @@
public Set<McastRoute> getRoutes() {
return mcastRoutes.keySet();
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
index 3dd8297..b1214830 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
@@ -20,6 +20,7 @@
import org.onosproject.net.ConnectPoint;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -80,4 +81,24 @@
return new MulticastData();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), source, sinks, isEmpty);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof MulticastData)) {
+ return false;
+ }
+ final MulticastData other = (MulticastData) obj;
+
+ return super.equals(obj) &&
+ Objects.equals(source(), other.source()) &&
+ Objects.equals(sinks(), other.sinks()) &&
+ Objects.equals(isEmpty, other.isEmpty);
+ }
}