allow flow to be marked for removal from flow store when device is disconnected
Change-Id: I0f60ff4f010d0d149be31272b9e592c5d812bef9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 3d66386..d6e5d40 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,7 +22,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -242,6 +241,7 @@
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
+ clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -346,20 +346,18 @@
return;
}
- DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+ DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to storeBatch: No master for {}", deviceId);
+ log.warn("No master for {} : flows will be marked for removal", deviceId);
- Set<FlowRule> allFailures = operation.getOperations().stream()
- .map(op -> op.getTarget())
- .collect(Collectors.toSet());
+ updateStoreInternal(operation);
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
return;
}
@@ -377,19 +375,12 @@
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
- //CompletedBatchOperation response;
+
try {
- ListenableFuture<byte[]> responseFuture =
- clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- /*response =
- Futures.transform(responseFuture,
- new DecodeTo<CompletedBatchOperation>(SERIALIZER))
- .get(500 * operation.size(), TimeUnit.MILLISECONDS);
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+ clusterCommunicator.unicast(message, replicaInfo.master().get());
- } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+ } catch (IOException e) {
log.warn("Failed to storeBatch: {}", e.getMessage());
Set<FlowRule> allFailures = operation.getOperations().stream()
@@ -408,10 +399,23 @@
final DeviceId did = operation.deviceId();
//final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
- Set<FlowRuleBatchEntry> currentOps;
+ Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
+ if (currentOps.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), did)));
+ return;
+ }
+ updateBackup(did, currentOps);
+ notifyDelegate(FlowRuleBatchEvent.requested(new
+ FlowRuleBatchRequest(operation.id(),
+ currentOps), operation.deviceId()));
- currentOps = operation.getOperations().stream().map(
+ }
+
+ private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
+ return operation.getOperations().stream().map(
op -> {
StoredFlowEntry entry;
switch (op.getOperator()) {
@@ -439,19 +443,6 @@
return null;
}
).filter(op -> op != null).collect(Collectors.toSet());
- if (currentOps.isEmpty()) {
- batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), did)));
- return;
- }
- updateBackup(did, currentOps);
-
-
- notifyDelegate(FlowRuleBatchEvent.requested(new
- FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
-
-
}
private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
@@ -581,7 +572,7 @@
clusterService.getLocalNode().id(),
REMOTE_APPLY_COMPLETED,
SERIALIZER.encode(event));
- clusterCommunicator.sendAndReceive(message, nodeId);
+ clusterCommunicator.unicast(message, nodeId);
} catch (IOException e) {
log.warn("Failed to respond to peer for batch operation result");
}