Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 09aa401..3d66386 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,33 +15,15 @@
*/
package org.onosproject.store.flow.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.namedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -49,8 +31,11 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
@@ -67,6 +52,7 @@
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
@@ -79,27 +65,37 @@
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.hazelcast.core.IMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -112,12 +108,10 @@
private final Logger log = getLogger(getClass());
- // primary data:
- // read/write needs to be locked
- private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
- // store entries as a pile of rules, no info about device tables
- private final Multimap<DeviceId, StoredFlowEntry> flowEntries
- = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
+ private InternalFlowTable flowTable = new InternalFlowTable();
+
+ /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();*/
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -131,23 +125,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
- private final AtomicInteger localBatchIdGen = new AtomicInteger();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
- private int pendingFutureTimeoutMinutes = 5;
-
- private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
- CacheBuilder.newBuilder()
- .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
- .removalListener(new TimeoutFuture())
- .build();
+ private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
- private final ExecutorService futureListeners =
- Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
-
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
@@ -169,6 +155,8 @@
private ReplicaInfoEventListener replicaInfoEventListener;
+ private IdGenerator idGenerator;
+
@Override
@Activate
public void activate() {
@@ -176,22 +164,33 @@
super.serializer = SERIALIZER;
super.theInstance = storeService.getHazelcastInstance();
+ idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+
// Cache to create SMap on demand
smaps = CacheBuilder.newBuilder()
- .softValues()
- .build(new SMapLoader());
+ .softValues()
+ .build(new SMapLoader());
final NodeId local = clusterService.getLocalNode().id();
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+ clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+ log.trace("received completed notification for {}", event);
+ notifyDelegate(event);
+ }
+ });
+
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.trace("received get flow entry request for {}", rule);
- FlowEntry flowEntry = getFlowEntryInternal(rule);
+ FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
} catch (IOException e) {
@@ -206,7 +205,7 @@
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
- Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+ Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
@@ -272,11 +271,11 @@
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return getFlowEntryInternal(rule);
+ return flowTable.getFlowEntry(rule);
}
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), rule.deviceId());
+ replicaInfo.master().orNull(), rule.deviceId());
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -292,19 +291,7 @@
return null;
}
- private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- flowEntriesLock.readLock().lock();
- try {
- for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
- if (f.equals(rule)) {
- return f;
- }
- }
- } finally {
- flowEntriesLock.readLock().unlock();
- }
- return null;
- }
+
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -317,11 +304,11 @@
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return getFlowEntriesInternal(deviceId);
+ return flowTable.getFlowEntries(deviceId);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -337,30 +324,26 @@
return Collections.emptyList();
}
- private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- flowEntriesLock.readLock().lock();
- try {
- Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
- if (rules == null) {
- return Collections.emptySet();
- }
- return ImmutableSet.copyOf(rules);
- } finally {
- flowEntriesLock.readLock().unlock();
- }
- }
+
@Override
public void storeFlowRule(FlowRule rule) {
- storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+ storeBatch(new FlowRuleBatchOperation(
+ Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+ rule.deviceId(), idGenerator.getNewId()));
}
@Override
- public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+ public void storeBatch(FlowRuleBatchOperation operation) {
+
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true,
- Collections.<FlowRule>emptySet()));
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(),
+ operation.deviceId())));
+ return;
}
DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
@@ -369,110 +352,129 @@
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to storeBatch: No master for {}", deviceId);
- // TODO: revisit if this should be "success" from Future point of view
- // with every FlowEntry failed
- return Futures.immediateFailedFuture(new IOException("Failed to storeBatch: No master for " + deviceId));
+
+ Set<FlowRule> allFailures = operation.getOperations().stream()
+ .map(op -> op.getTarget())
+ .collect(Collectors.toSet());
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+ return;
}
final NodeId local = clusterService.getLocalNode().id();
if (replicaInfo.master().get().equals(local)) {
- return storeBatchInternal(operation);
+ storeBatchInternal(operation);
+ return;
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
local,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
+ //CompletedBatchOperation response;
try {
ListenableFuture<byte[]> responseFuture =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
+ /*response =
+ Futures.transform(responseFuture,
+ new DecodeTo<CompletedBatchOperation>(SERIALIZER))
+ .get(500 * operation.size(), TimeUnit.MILLISECONDS);
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+
+ } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+ log.warn("Failed to storeBatch: {}", e.getMessage());
+
+ Set<FlowRule> allFailures = operation.getOperations().stream()
+ .map(op -> op.getTarget())
+ .collect(Collectors.toSet());
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
+ return;
}
+
}
- private ListenableFuture<CompletedBatchOperation>
- storeBatchInternal(FlowRuleBatchOperation operation) {
+ private void storeBatchInternal(FlowRuleBatchOperation operation) {
- final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
- final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
- DeviceId did = null;
+ final DeviceId did = operation.deviceId();
+ //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
+ Set<FlowRuleBatchEntry> currentOps;
- flowEntriesLock.writeLock().lock();
- try {
- for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
- FlowRule flowRule = batchEntry.target();
- FlowRuleOperation op = batchEntry.operator();
- if (did == null) {
- did = flowRule.deviceId();
- }
- if (op.equals(FlowRuleOperation.REMOVE)) {
- StoredFlowEntry entry = getFlowEntryInternal(flowRule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- toRemove.add(batchEntry);
+ currentOps = operation.getOperations().stream().map(
+ op -> {
+ StoredFlowEntry entry;
+ switch (op.getOperator()) {
+ case ADD:
+ entry = new DefaultFlowEntry(op.getTarget());
+ // always add requested FlowRule

+ // Note: 2 equal FlowEntry may have different treatment
+ flowTable.remove(entry.deviceId(), entry);
+ flowTable.add(entry);
+
+ return op;
+ case REMOVE:
+ entry = flowTable.getFlowEntry(op.target());
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ return op;
+ }
+ break;
+ case MODIFY:
+ //TODO: figure this out at some point
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
}
- } else if (op.equals(FlowRuleOperation.ADD)) {
- StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
- DeviceId deviceId = flowRule.deviceId();
- Collection<StoredFlowEntry> ft = flowEntries.get(deviceId);
-
- // always add requested FlowRule
- // Note: 2 equal FlowEntry may have different treatment
- ft.remove(flowEntry);
- ft.add(flowEntry);
- toAdd.add(batchEntry);
+ return null;
}
- }
- if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
- }
-
- // create remote backup copies
- updateBackup(did, toAdd, toRemove);
- } finally {
- flowEntriesLock.writeLock().unlock();
+ ).filter(op -> op != null).collect(Collectors.toSet());
+ if (currentOps.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), did)));
+ return;
}
+ updateBackup(did, currentOps);
- SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
- final int batchId = localBatchIdGen.incrementAndGet();
- pendingFutures.put(batchId, r);
- notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+ notifyDelegate(FlowRuleBatchEvent.requested(new
+ FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
- return r;
+
}
- private void updateBackup(final DeviceId deviceId,
- final List<FlowRuleBatchEntry> toAdd,
- final List<FlowRuleBatchEntry> list) {
-
- Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+ private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
+ Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
if (syncBackup) {
// wait for backup to complete
try {
- submit.get();
+ backup.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create backups", e);
}
}
}
- private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
-
- updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
- }
-
@Override
public void deleteFlowRule(FlowRule rule) {
- storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
+ storeBatch(
+ new FlowRuleBatchOperation(
+ Arrays.asList(
+ new FlowRuleBatchEntry(
+ FlowRuleOperation.REMOVE,
+ rule)), rule.deviceId(), idGenerator.getNewId()));
}
@Override
@@ -484,37 +486,35 @@
}
log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
+ + " while the Node was not the master.", rule);
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
- flowEntriesLock.writeLock().lock();
- try {
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = getFlowEntryInternal(rule);
- if (stored != null) {
- stored.setBytes(rule.bytes());
- stored.setLife(rule.life());
- stored.setPackets(rule.packets());
- if (stored.state() == FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.ADDED);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
- updateBackup(did, Arrays.asList(entry));
- return new FlowRuleEvent(Type.RULE_ADDED, rule);
- }
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
- // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
- // TODO: also update backup if the behavior is correct.
- flowEntries.put(did, new DefaultFlowEntry(rule));
- } finally {
- flowEntriesLock.writeLock().unlock();
+ // check if this new rule is an update to an existing entry
+ StoredFlowEntry stored = flowTable.getFlowEntry(rule);
+ if (stored != null) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+ updateBackup(did, Sets.newHashSet(entry));
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
+ return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
+
+ // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+ // TODO: also update backup if the behavior is correct.
+ flowTable.add(rule);
+
+
return null;
}
@@ -540,9 +540,9 @@
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOVE_FLOW_ENTRY,
- SERIALIZER.encode(rule));
+ clusterService.getLocalNode().id(),
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER.encode(rule));
try {
Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -555,38 +555,42 @@
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
- flowEntriesLock.writeLock().lock();
- try {
- // This is where one could mark a rule as removed and still keep it in the store.
- final boolean removed = flowEntries.remove(deviceId, rule);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
- updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
- if (removed) {
- return new FlowRuleEvent(RULE_REMOVED, rule);
- } else {
- return null;
- }
- } finally {
- flowEntriesLock.writeLock().unlock();
+ // This is where one could mark a rule as removed and still keep it in the store.
+ final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+ updateBackup(deviceId, Sets.newHashSet(entry));
+ if (removed) {
+ return new FlowRuleEvent(RULE_REMOVED, rule);
+ } else {
+ return null;
}
+
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
- final Integer batchId = event.subject().batchId();
- SettableFuture<CompletedBatchOperation> future
- = pendingFutures.getIfPresent(batchId);
- if (future != null) {
- future.set(event.result());
- pendingFutures.invalidate(batchId);
+ //FIXME: need a per device pending response
+
+ NodeId nodeId = pendingResponses.remove(event.subject().batchId());
+ if (nodeId == null) {
+ notifyDelegate(event);
+ } else {
+ try {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ REMOTE_APPLY_COMPLETED,
+ SERIALIZER.encode(event));
+ clusterCommunicator.sendAndReceive(message, nodeId);
+ } catch (IOException e) {
+ log.warn("Failed to respond to peer for batch operation result");
+ }
}
- notifyDelegate(event);
}
private void loadFromBackup(final DeviceId did) {
- flowEntriesLock.writeLock().lock();
+
try {
log.debug("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -595,38 +599,21 @@
log.trace("loading {}", e.getValue());
for (StoredFlowEntry entry : e.getValue()) {
- flowEntries.remove(did, entry);
- flowEntries.put(did, entry);
+ flowTable.getFlowEntriesById(entry).remove(entry);
+ flowTable.getFlowEntriesById(entry).add(entry);
+
+
}
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
- } finally {
- flowEntriesLock.writeLock().unlock();
}
}
private void removeFromPrimary(final DeviceId did) {
- Collection<StoredFlowEntry> removed = null;
- flowEntriesLock.writeLock().lock();
- try {
- removed = flowEntries.removeAll(did);
- } finally {
- flowEntriesLock.writeLock().unlock();
- }
- log.trace("removedFromPrimary {}", removed);
+ flowTable.clearDevice(did);
}
- private static final class TimeoutFuture
- implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
- @Override
- public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
- // wrapping in ExecutionException to support Future.get
- notification.getValue()
- .setException(new ExecutionException("Timed out",
- new TimeoutException()));
- }
- }
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
@@ -640,7 +627,7 @@
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.debug("received batch request {}", operation);
- final DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+ final DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
@@ -648,7 +635,7 @@
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
}
- CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+ CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
// This node is no longer the master, respond as all failed.
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
@@ -661,36 +648,15 @@
return;
}
- final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
- f.addListener(new Runnable() {
+ pendingResponses.put(operation.id(), message.sender());
+ storeBatchInternal(operation);
- @Override
- public void run() {
- CompletedBatchOperation result;
- try {
- result = f.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Batch operation failed", e);
- // create everything failed response
- Set<FlowRule> failures = new HashSet<>(operation.size());
- for (FlowRuleBatchEntry op : operation.getOperations()) {
- failures.add(op.target());
- }
- result = new CompletedBatchOperation(false, failures);
- }
- try {
- message.respond(SERIALIZER.encode(result));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- }, futureListeners);
}
}
private final class SMapLoader
- extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+ extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
@Override
public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
@@ -701,7 +667,7 @@
}
private final class InternalReplicaInfoEventListener
- implements ReplicaInfoEventListener {
+ implements ReplicaInfoEventListener {
@Override
public void event(ReplicaInfoEvent event) {
@@ -710,98 +676,166 @@
final ReplicaInfo rInfo = event.replicaInfo();
switch (event.type()) {
- case MASTER_CHANGED:
- if (local.equals(rInfo.master().orNull())) {
- // This node is the new master, populate local structure
- // from backup
- loadFromBackup(did);
- } else {
- // This node is no longer the master holder,
- // clean local structure
- removeFromPrimary(did);
- // TODO: probably should stop pending backup activities in
- // executors to avoid overwriting with old value
- }
- break;
- default:
- break;
+ case MASTER_CHANGED:
+ if (local.equals(rInfo.master().orNull())) {
+ // This node is the new master, populate local structure
+ // from backup
+ loadFromBackup(did);
+ } else {
+ // This node is no longer the master holder,
+ // clean local structure
+ removeFromPrimary(did);
+ // TODO: probably should stop pending backup activities in
+ // executors to avoid overwriting with old value
+ }
+ break;
+ default:
+ break;
}
}
}
// Task to update FlowEntries in backup HZ store
- // TODO: Should be refactored to contain only one list and not
- // toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
- private final List<FlowRuleBatchEntry> toAdd;
- private final List<FlowRuleBatchEntry> toRemove;
+ private final Set<FlowRuleBatchEntry> ops;
+
public UpdateBackup(DeviceId deviceId,
- List<FlowRuleBatchEntry> toAdd,
- List<FlowRuleBatchEntry> list) {
+ Set<FlowRuleBatchEntry> ops) {
this.deviceId = checkNotNull(deviceId);
- this.toAdd = checkNotNull(toAdd);
- this.toRemove = checkNotNull(list);
+ this.ops = checkNotNull(ops);
+
}
@Override
public void run() {
try {
- log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+ log.trace("update backup {} {}", deviceId, ops
+ );
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
- // Following should be rewritten using async APIs
- for (FlowRuleBatchEntry bEntry : toAdd) {
- final FlowRule entry = bEntry.target();
- final FlowId id = entry.id();
- ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
- List<StoredFlowEntry> list = new ArrayList<>();
- if (original != null) {
- list.addAll(original);
- }
- list.remove(bEntry.target());
- list.add((StoredFlowEntry) entry);
- ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
- boolean success;
- if (original == null) {
- success = (backupFlowTable.putIfAbsent(id, newValue) == null);
- } else {
- success = backupFlowTable.replace(id, original, newValue);
- }
- if (!success) {
- log.error("Updating backup failed.");
- }
- }
- for (FlowRuleBatchEntry bEntry : toRemove) {
- final FlowRule entry = bEntry.target();
- final FlowId id = entry.id();
- ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
- List<StoredFlowEntry> list = new ArrayList<>();
- if (original != null) {
- list.addAll(original);
- }
+ ops.stream().forEach(
+ op -> {
+ final FlowRule entry = op.getTarget();
+ final FlowId id = entry.id();
+ ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+ List<StoredFlowEntry> list = new ArrayList<>();
+ if (original != null) {
+ list.addAll(original);
+ }
+ list.remove(op.getTarget());
+ if (op.getOperator() == FlowRuleOperation.ADD) {
+ list.add((StoredFlowEntry) entry);
+ }
- list.remove(bEntry.target());
+ ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+ boolean success;
+ if (original == null) {
+ success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+ } else {
+ success = backupFlowTable.replace(id, original, newValue);
+ }
+ if (!success) {
+ log.error("Updating backup failed.");
+ }
- ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
- boolean success;
- if (original == null) {
- success = (backupFlowTable.putIfAbsent(id, newValue) == null);
- } else {
- success = backupFlowTable.replace(id, original, newValue);
- }
- if (!success) {
- log.error("Updating backup failed.");
- }
- }
+ }
+ );
} catch (ExecutionException e) {
log.error("Failed to write to backups", e);
}
}
}
+
+ private class InternalFlowTable {
+
+ /*
+ TODO: This needs to be cleaned up. Perhaps using the eventually consistent
+ map when it supports distributed to a sequence of instances.
+ */
+
+
+ private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();
+
+
+ private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
+ return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+ }
+
+ /**
+ * Returns the flow table for specified device.
+ *
+ * @param deviceId identifier of the device
+ * @return Map representing Flow Table of given device.
+ */
+ private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
+ return createIfAbsentUnchecked(flowEntries,
+ deviceId, lazyEmptyFlowTable());
+ }
+
+ private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
+ final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+ Set<StoredFlowEntry> r = flowTable.get(flowId);
+ if (r == null) {
+ final Set<StoredFlowEntry> concurrentlyAdded;
+ r = new CopyOnWriteArraySet<>();
+ concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return r;
+ }
+
+ private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+ for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
+ if (f.equals(rule)) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+ return getFlowTable(deviceId).values().stream()
+ .flatMap((list -> list.stream())).collect(Collectors.toSet());
+
+ }
+
+
+ public StoredFlowEntry getFlowEntry(FlowRule rule) {
+ return getFlowEntryInternal(rule);
+ }
+
+ public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ return getFlowEntriesInternal(deviceId);
+ }
+
+ public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
+ return getFlowEntriesInternal(entry.deviceId(), entry.id());
+ }
+
+ public void add(FlowEntry rule) {
+ ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
+ }
+
+ public boolean remove(DeviceId deviceId, FlowEntry rule) {
+ return ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
+ //return flowEntries.remove(deviceId, rule);
+ }
+
+ public void clearDevice(DeviceId did) {
+ flowEntries.remove(did);
+ }
+ }
+
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
index a21d73a..79df272 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
@@ -34,4 +34,7 @@
public static final MessageSubject REMOVE_FLOW_ENTRY
= new MessageSubject("peer-forward-remove-flow-entry");
+
+ public static final MessageSubject REMOTE_APPLY_COMPLETED
+ = new MessageSubject("peer-apply-completed");
}