Fix ONOS-4976 - buffer underflow from flow backup serialization
Change-Id: I924e53cfd436c38a45d1c9a237553a56acf888ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 2d864f9..61764f5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,91 +15,95 @@
*/
package org.onosproject.store.flow.impl;
- import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
-import org.onosproject.core.IdGenerator;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.FlowRuleBatchEvent;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
-import org.onosproject.net.flow.FlowRuleBatchRequest;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleEvent.Type;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.FlowRuleStore;
-import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
-import org.onosproject.net.flow.TableStatisticsEntry;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.flow.ReplicaInfoEventListener;
-import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.impl.MastershipBasedTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-
import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
+ import java.util.Dictionary;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.stream.Collectors;
+
+ import org.apache.felix.scr.annotations.Activate;
+ import org.apache.felix.scr.annotations.Component;
+ import org.apache.felix.scr.annotations.Deactivate;
+ import org.apache.felix.scr.annotations.Modified;
+ import org.apache.felix.scr.annotations.Property;
+ import org.apache.felix.scr.annotations.Reference;
+ import org.apache.felix.scr.annotations.ReferenceCardinality;
+ import org.apache.felix.scr.annotations.Service;
+ import org.onlab.util.KryoNamespace;
+ import org.onlab.util.Tools;
+ import org.onosproject.cfg.ComponentConfigService;
+ import org.onosproject.cluster.ClusterService;
+ import org.onosproject.cluster.NodeId;
+ import org.onosproject.core.CoreService;
+ import org.onosproject.core.IdGenerator;
+ import org.onosproject.mastership.MastershipService;
+ import org.onosproject.net.DeviceId;
+ import org.onosproject.net.device.DeviceService;
+ import org.onosproject.net.flow.CompletedBatchOperation;
+ import org.onosproject.net.flow.DefaultFlowEntry;
+ import org.onosproject.net.flow.FlowEntry;
+ import org.onosproject.net.flow.FlowEntry.FlowEntryState;
+ import org.onosproject.net.flow.FlowId;
+ import org.onosproject.net.flow.FlowRule;
+ import org.onosproject.net.flow.FlowRuleBatchEntry;
+ import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+ import org.onosproject.net.flow.FlowRuleBatchEvent;
+ import org.onosproject.net.flow.FlowRuleBatchOperation;
+ import org.onosproject.net.flow.FlowRuleBatchRequest;
+ import org.onosproject.net.flow.FlowRuleEvent;
+ import org.onosproject.net.flow.FlowRuleEvent.Type;
+ import org.onosproject.net.flow.FlowRuleService;
+ import org.onosproject.net.flow.FlowRuleStore;
+ import org.onosproject.net.flow.FlowRuleStoreDelegate;
+ import org.onosproject.net.flow.StoredFlowEntry;
+ import org.onosproject.net.flow.TableStatisticsEntry;
+ import org.onosproject.persistence.PersistenceService;
+ import org.onosproject.store.AbstractStore;
+ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+ import org.onosproject.store.cluster.messaging.ClusterMessage;
+ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+ import org.onosproject.store.flow.ReplicaInfoEvent;
+ import org.onosproject.store.flow.ReplicaInfoEventListener;
+ import org.onosproject.store.flow.ReplicaInfoService;
+ import org.onosproject.store.impl.MastershipBasedTimestamp;
+ import org.onosproject.store.serializers.KryoNamespaces;
+ import org.onosproject.store.service.EventuallyConsistentMap;
+ import org.onosproject.store.service.EventuallyConsistentMapEvent;
+ import org.onosproject.store.service.EventuallyConsistentMapListener;
+ import org.onosproject.store.service.Serializer;
+ import org.onosproject.store.service.StorageService;
+ import org.onosproject.store.service.WallClockTimestamp;
+ import org.osgi.service.component.ComponentContext;
+ import org.slf4j.Logger;
+
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Futures;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
-import static org.slf4j.LoggerFactory.getLogger;
+ import static org.onlab.util.Tools.get;
+ import static org.onlab.util.Tools.groupedThreads;
+ import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+ import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.FLOW_TABLE_BACKUP;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
+ import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
+ import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -724,7 +728,7 @@
log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
deviceFlowEntries = Maps.newConcurrentMap();
- deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
+ deviceIds.forEach(id -> deviceFlowEntries.put(id, getFlowTableCopy(id)));
clusterCommunicator.<Map<DeviceId,
Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
Set<DeviceId>>
@@ -738,8 +742,9 @@
deviceFlowEntries.keySet() :
Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
if (devicesNotBackedup.size() > 0) {
- log.warn("Failed to backup devices: {}. Reason: {}",
- devicesNotBackedup, error != null ? error.getMessage() : "none");
+ log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
+ devicesNotBackedup, error != null ? error.getMessage() : "none",
+ nodeId);
}
if (backedupDevices != null) {
backedupDevices.forEach(id -> {
@@ -777,6 +782,32 @@
}
}
+ private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
+ if (persistenceEnabled) {
+ return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
+ .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
+ .withName("FlowTable:" + deviceId.toString())
+ .withSerializer(new Serializer() {
+ @Override
+ public <T> byte[] encode(T object) {
+ return serializer.encode(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return serializer.decode(bytes);
+ }
+ })
+ .build());
+ } else {
+ flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
+ copy.put(k, Maps.newHashMap(v));
+ });
+ return copy;
+ }
+ }
+
private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
}