Continue updating GossipIntentStore to new API.
Includes changes to ECMap to support generating timestamps based on values.
Change-Id: Ide55979aaa4f7757e67a6b3efed6e51d45ee318c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 051a01c..2fc6fd8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.intent.impl;
-import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -28,8 +27,6 @@
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.net.intent.IntentOperation;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
@@ -44,11 +41,9 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -63,14 +58,17 @@
private final Logger log = getLogger(getClass());
- private EventuallyConsistentMap<IntentId, Intent> intents;
+ /*private EventuallyConsistentMap<IntentId, Intent> intents;
private EventuallyConsistentMap<IntentId, IntentState> intentStates;
- private EventuallyConsistentMap<IntentId, List<Intent>> installables;
+ private EventuallyConsistentMap<IntentId, List<Intent>> installables;*/
+
+ // Map of intent key => current intent state
+ private EventuallyConsistentMap<Key, IntentData> currentState;
// Map of intent key => pending intent operation
- private EventuallyConsistentMap<String, IntentOperation> pending;
+ private EventuallyConsistentMap<Key, IntentData> pending;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -85,7 +83,7 @@
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
- intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
+ /*intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
@@ -101,6 +99,13 @@
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
+ */
+
+ currentState = new EventuallyConsistentMapImpl<>("intent-current",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
pending = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
@@ -108,7 +113,7 @@
intentSerializer, // TODO
new WallclockClockManager<>());
- intentStates.addListener(new InternalIntentStatesListener());
+ currentState.addListener(new InternalIntentStatesListener());
pending.addListener(new InternalPendingListener());
log.info("Started");
@@ -117,9 +122,10 @@
@Deactivate
public void deactivate() {
- intents.destroy();
+ /*intents.destroy();
intentStates.destroy();
- installables.destroy();
+ installables.destroy();*/
+ currentState.destroy();
pending.destroy();
log.info("Stopped");
@@ -127,24 +133,15 @@
@Override
public long getIntentCount() {
- return intents.size();
+ //return intents.size();
+ return currentState.size();
}
@Override
public Iterable<Intent> getIntents() {
- // TODO don't actually need to copy intents, they are immutable
- return ImmutableList.copyOf(intents.values());
- }
-
- @Override
- public Intent getIntent(Key intentKey) {
- // TODO: Implement this
- return null;
- }
-
-
- public Intent getIntent(IntentId intentId) {
- return intents.get(intentId);
+ return currentState.values().stream()
+ .map(IntentData::intent)
+ .collect(Collectors.toList());
}
@Override
@@ -164,7 +161,7 @@
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
-
+ /*
List<BatchWrite.Operation> failed = new ArrayList<>();
for (BatchWrite.Operation op : batch.operations()) {
@@ -223,29 +220,49 @@
}
return failed;
+ */
+ return null;
}
@Override
public void write(IntentData newData) {
- // TODO
+ // Only the master is modifying the current state. Therefore assume
+ // this always succeeds
+ currentState.put(newData.key(), newData);
+
+ // if current.put succeeded
+ //pending.remove(newData.key(), newData);
+
+ try {
+ notifyDelegate(IntentEvent.getEvent(newData));
+ } catch (IllegalArgumentException e) {
+ //no-op
+ log.trace("ignore this exception: {}", e);
+ }
}
@Override
public void batchWrite(Iterable<IntentData> updates) {
- // TODO
+ updates.forEach(this::write);
+ }
+
+ @Override
+ public Intent getIntent(Key key) {
+ IntentData data = currentState.get(key);
+ if (data != null) {
+ return data.intent();
+ }
+ return null;
}
@Override
public IntentData getIntentData(Key key) {
- return null; // TODO
+ return currentState.get(key);
}
@Override
public void addPending(IntentData data) {
- // TODO implement
-
- // Check the intent versions
- //pending.put(op.key(), op);
+ pending.put(data.key(), data);
}
@Override
@@ -262,37 +279,40 @@
}
private final class InternalIntentStatesListener implements
- EventuallyConsistentMapListener<IntentId, IntentState> {
+ EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
- EventuallyConsistentMapEvent<IntentId, IntentState> event) {
+ EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ // TODO check event send logic
IntentEvent externalEvent;
- Intent intent = intents.get(event.key()); // TODO OK if this is null?
+ IntentData intentData = currentState.get(event.key()); // TODO OK if this is null?
+ /*
try {
externalEvent = IntentEvent.getEvent(event.value(), intent);
} catch (IllegalArgumentException e) {
externalEvent = null;
}
- notifyDelegateIfNotNull(externalEvent);
+ notifyDelegateIfNotNull(externalEvent);*/
}
}
}
private final class InternalPendingListener implements
- EventuallyConsistentMapListener<String, IntentOperation> {
+ EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
- EventuallyConsistentMapEvent<String, IntentOperation> event) {
+ EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
// The pending intents map has been updated. If we are master for
- // this intent's partition, notify the Manager that they should do
+ // this intent's partition, notify the Manager that it should do
// some work.
if (isMaster(event.value().intent())) {
- // TODO delegate.process(event.value());
- log.debug("implement this");
+ if (delegate != null) {
+ delegate.process(event.value());
+ }
}
}
}