Modified GossipIntentStore to use EventuallyConsistentMaps.
All IntentStore operations are now implemented.
ONOS-858
Change-Id: I5081805b61c7e25e28707b90093cae12b5a4374b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
index bbddd1d..5417505 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
@@ -151,7 +151,7 @@
*
* @param listener listener to register for events
*/
- public void addListener(EventuallyConsistentMapListener listener);
+ public void addListener(EventuallyConsistentMapListener<K, V> listener);
/**
* Removes the specified listener from the map such that it will no longer
@@ -159,7 +159,7 @@
*
* @param listener listener to deregister for events
*/
- public void removeListener(EventuallyConsistentMapListener listener);
+ public void removeListener(EventuallyConsistentMapListener<K, V> listener);
/**
* Shuts down the map and breaks communication between different instances.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
index 19786f8..0acef1a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -78,7 +78,7 @@
private final ScheduledExecutorService backgroundExecutor;
private volatile boolean destroyed = false;
- private static final String ERROR_DESTROYED = " is already destroyed";
+ private static final String ERROR_DESTROYED = " map is already destroyed";
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
@@ -154,6 +154,7 @@
serializerPool = builder
.register(WallClockTimestamp.class)
.register(PutEntry.class)
+ .register(RemoveEntry.class)
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
@@ -166,25 +167,25 @@
@Override
public int size() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.size();
}
@Override
public boolean isEmpty() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.anyMatch(timestamped -> timestamped.value().equals(value));
@@ -192,7 +193,7 @@
@Override
public V get(K key) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamped<V> value = items.get(key);
if (value != null) {
@@ -203,7 +204,7 @@
@Override
public void put(K key, V value) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (putInternal(key, value, timestamp)) {
@@ -235,7 +236,7 @@
@Override
public void remove(K key) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
@@ -261,7 +262,7 @@
@Override
public void putAll(Map<? extends K, ? extends V> m) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
@@ -287,7 +288,7 @@
@Override
public void clear() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
@@ -311,14 +312,14 @@
@Override
public Set<K> keySet() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.keySet();
}
@Override
public Collection<V> values() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.map(Timestamped::value)
@@ -327,7 +328,7 @@
@Override
public Set<Map.Entry<K, V>> entrySet() {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
return items.entrySet().stream()
.map(e -> new Entry(e.getKey(), e.getValue().value()))
@@ -335,15 +336,15 @@
}
@Override
- public void addListener(EventuallyConsistentMapListener listener) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ public void addListener(EventuallyConsistentMapListener<K, V> listener) {
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
listeners.add(checkNotNull(listener));
}
@Override
- public void removeListener(EventuallyConsistentMapListener listener) {
- checkState(destroyed, mapName + ERROR_DESTROYED);
+ public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
+ checkState(!destroyed, mapName + ERROR_DESTROYED);
listeners.remove(checkNotNull(listener));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
index 289f46c..d77445a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
@@ -19,12 +19,12 @@
* Listener interested in receiving modification events for an
* EventuallyConsistentMap.
*/
-public interface EventuallyConsistentMapListener {
+public interface EventuallyConsistentMapListener<K, V> {
/**
* Reacts to the specified event.
*
* @param event the event
*/
- public void event(EventuallyConsistentMapEvent event);
+ public void event(EventuallyConsistentMapEvent<K, V> event);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 4ffd4bf..157d66c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -16,8 +16,6 @@
package org.onosproject.store.intent.impl;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -26,46 +24,29 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentClockService;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.Timestamped;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.onosproject.store.impl.EventuallyConsistentMap;
+import org.onosproject.store.impl.EventuallyConsistentMapEvent;
+import org.onosproject.store.impl.EventuallyConsistentMapImpl;
+import org.onosproject.store.impl.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.minPriority;
-import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -80,20 +61,11 @@
private final Logger log = getLogger(getClass());
- private final ConcurrentMap<IntentId, Intent> intents =
- new ConcurrentHashMap<>();
+ private EventuallyConsistentMap<IntentId, Intent> intents;
- private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
- = new ConcurrentHashMap<>();
+ private EventuallyConsistentMap<IntentId, IntentState> intentStates;
- private final Set<IntentId> withdrawRequestedIntents
- = Sets.newConcurrentHashSet();
-
- private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
- = new ConcurrentHashMap<>();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentClockService intentClockService;
+ private EventuallyConsistentMap<IntentId, List<Intent>> installables;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -101,64 +73,39 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(InternalIntentEvent.class)
- .register(InternalSetInstallablesEvent.class)
- .register(Collections.emptyList().getClass())
- //.register(InternalIntentAntiEntropyEvent.class)
- //.register(IntentAntiEntropyAdvertisement.class)
- .build();
- }
- };
-
- private ExecutorService executor;
-
- private ScheduledExecutorService backgroundExecutor;
-
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
-
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
- new InternalIntentCreateOrUpdateEventListener());
- clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
- new InternalIntentSetInstallablesListener());
- clusterCommunicator.addSubscriber(
- INTENT_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalIntentAntiEntropyAdvertisementListener());
+ KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ // TODO this should be in BASIC namespace
+ .register(Collections.emptyList().getClass());
+ intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
+ intentStates = new EventuallyConsistentMapImpl<>("intent-states",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- backgroundExecutor =
- newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
+ installables = new EventuallyConsistentMapImpl<>("intent-installables",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- // start anti-entropy thread
- //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- //initialDelaySec, periodSec, TimeUnit.SECONDS);
+ intentStates.addListener(new InternalIntentStatesListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
- executor.shutdownNow();
- backgroundExecutor.shutdownNow();
- try {
- if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Timeout during executor shutdown");
- }
- } catch (InterruptedException e) {
- log.error("Error during executor shutdown", e);
- }
-
- intents.clear();
+ intents.destroy();
+ intentStates.destroy();
+ installables.destroy();
log.info("Stopped");
}
@@ -181,76 +128,19 @@
@Override
public IntentState getIntentState(IntentId intentId) {
- Timestamped<IntentState> state = intentStates.get(intentId);
- if (state != null) {
- return state.value();
- }
- return null;
- }
-
- private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
- switch (newState) {
- case WITHDRAW_REQ:
- withdrawRequestedIntents.add(intentId);
- break;
- case INSTALL_REQ:
- case COMPILING:
- case INSTALLING:
- case INSTALLED:
- case RECOMPILING:
- case WITHDRAWING:
- case WITHDRAWN:
- case FAILED:
- synchronized (intentStates) {
- Timestamped<IntentState> existing = intentStates.get(intentId);
- if (existing == null || !existing.isNewer(timestamp)) {
- intentStates.put(intentId, new Timestamped<>(newState, timestamp));
- }
- }
- break;
- default:
- log.warn("Unknown intent state {}", newState);
- break;
- }
-
- try {
- // TODO make sure it's OK if the intent is null
- return IntentEvent.getEvent(newState, intents.get(intentId));
- } catch (IllegalArgumentException e) {
- // Transient states can't be used for events, so don't send one
- return null;
- }
- }
-
- private void setInstallableIntentsInternal(IntentId intentId,
- List<Intent> installableIntents,
- Timestamp timestamp) {
- synchronized (installables) {
- Timestamped<List<Intent>> existing = installables.get(intentId);
- if (existing == null || !existing.isNewer(timestamp)) {
- installables.put(intentId,
- new Timestamped<>(installableIntents, timestamp));
- }
- }
+ return intentStates.get(intentId);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
- Timestamped<List<Intent>> tInstallables = installables.get(intentId);
- if (tInstallables != null) {
- return tInstallables.value();
- }
- return null;
+ return installables.get(intentId);
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
- List<IntentEvent> events = Lists.newArrayList();
List<BatchWrite.Operation> failed = new ArrayList<>();
- Timestamp timestamp = null;
-
for (BatchWrite.Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
@@ -258,19 +148,18 @@
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
- timestamp = intentClockService.getTimestamp(intent.id());
- if (createIntentInternal(intent)) {
- events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp));
- notifyPeers(new InternalIntentEvent(intent.id(), intent,
- INSTALL_REQ, timestamp));
- }
+ intents.put(intent.id(), intent);
+ intentStates.put(intent.id(), INSTALL_REQ);
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
- IntentId intentId = (IntentId) op.arg(0);
- // TODO implement
+ IntentId intentId = op.arg(0);
+
+ intents.remove(intentId);
+ intentStates.remove(intentId);
+ installables.remove(intentId);
break;
case SET_STATE:
@@ -279,10 +168,7 @@
intent = op.arg(0);
IntentState newState = op.arg(1);
- timestamp = intentClockService.getTimestamp(intent.id());
- IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
- events.add(externalEvent);
- notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
+ intentStates.put(intent.id(), newState);
break;
case SET_INSTALLABLE:
@@ -291,18 +177,14 @@
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
- timestamp = intentClockService.getTimestamp(intentId);
- setInstallableIntentsInternal(
- intentId, installableIntents, timestamp);
-
- notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp));
+ installables.put(intentId, installableIntents);
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
- // TODO implement
+ installables.remove(intentId);
break;
default:
log.warn("Unknown Operation encountered: {}", op);
@@ -311,121 +193,34 @@
}
}
- notifyDelegate(events);
return failed;
}
- private boolean createIntentInternal(Intent intent) {
- Intent oldValue = intents.putIfAbsent(intent.id(), intent);
- if (oldValue == null) {
- return true;
- }
-
- log.warn("Intent ID {} already in store, throwing new update away",
- intent.id());
- return false;
- }
-
- private void notifyPeers(InternalIntentEvent event) {
- broadcastMessage(INTENT_UPDATED_MSG, event);
- }
-
- private void notifyPeers(InternalSetInstallablesEvent event) {
- broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
- }
-
- private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
- }
-
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, peer);
- }
-
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
- private final class InternalIntentCreateOrUpdateEventListener
- implements ClusterMessageHandler {
+ private final class InternalIntentStatesListener implements
+ EventuallyConsistentMapListener<IntentId, IntentState> {
@Override
- public void handle(ClusterMessage message) {
+ public void event(
+ EventuallyConsistentMapEvent<IntentId, IntentState> event) {
+ if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ IntentEvent externalEvent;
+ Intent intent = intents.get(event.key()); // TODO OK if this is null?
- log.debug("Received intent update event from peer: {}", message.sender());
- InternalIntentEvent event = SERIALIZER.decode(message.payload());
-
- IntentId intentId = event.intentId();
- Intent intent = event.intent();
- IntentState state = event.state();
- Timestamp timestamp = event.timestamp();
-
- executor.submit(() -> {
try {
- switch (state) {
- case INSTALL_REQ:
- createIntentInternal(intent);
- // Fallthrough to setStateInternal for INSTALL_REQ
- default:
- notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
- break;
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling intent create or update", e);
+ externalEvent = IntentEvent.getEvent(event.value(), intent);
+ } catch (IllegalArgumentException e) {
+ externalEvent = null;
}
- });
+
+ notifyDelegateIfNotNull(externalEvent);
+ }
}
}
- private final class InternalIntentSetInstallablesListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received intent set installables event from peer: {}", message.sender());
- InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
-
- IntentId intentId = event.intentId();
- List<Intent> installables = event.installables();
- Timestamp timestamp = event.timestamp();
-
- executor.submit(() -> {
- try {
- setInstallableIntentsInternal(intentId, installables, timestamp);
- } catch (Exception e) {
- log.warn("Exception thrown handling intent set installables", e);
- }
- });
- }
- }
-
- private final class InternalIntentAntiEntropyAdvertisementListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
- // TODO implement
- //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutor.submit(() -> {
- try {
- log.debug("something");
- //handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling intent advertisements", e);
- }
- });
- }
- }
}