blob: bcb34eafb71d210f8a5770829d2da509afa82846 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Brian O'Connor72cb19a2015-01-16 16:14:41 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Brian O'Connorb9a91c12015-03-10 11:19:50 -070022import org.apache.commons.lang.math.RandomUtils;
alshabib339a3d92014-09-26 17:54:32 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070026import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
Madan Jampani38b250d2014-10-17 11:02:38 -070028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070030import org.apache.felix.scr.annotations.Service;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080031import org.onlab.util.KryoNamespace;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070032import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080035import org.onosproject.core.CoreService;
36import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080037import org.onosproject.net.Device;
38import org.onosproject.net.DeviceId;
Madan Jampani54d34992015-03-06 17:27:52 -080039import org.onosproject.net.device.DeviceClockService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.CompletedBatchOperation;
42import org.onosproject.net.flow.DefaultFlowEntry;
43import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowEntry.FlowEntryState;
45import org.onosproject.net.flow.FlowId;
46import org.onosproject.net.flow.FlowRule;
47import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080048import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080049import org.onosproject.net.flow.FlowRuleBatchEvent;
50import org.onosproject.net.flow.FlowRuleBatchOperation;
51import org.onosproject.net.flow.FlowRuleBatchRequest;
52import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080054import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import org.onosproject.net.flow.FlowRuleStore;
56import org.onosproject.net.flow.FlowRuleStoreDelegate;
57import org.onosproject.net.flow.StoredFlowEntry;
Madan Jampani54d34992015-03-06 17:27:52 -080058import org.onosproject.store.AbstractStore;
Brian O'Connorabafb502014-12-02 22:26:20 -080059import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani54d34992015-03-06 17:27:52 -080062import org.onosproject.store.ecmap.EventuallyConsistentMap;
63import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
Brian O'Connorabafb502014-12-02 22:26:20 -080064import org.onosproject.store.flow.ReplicaInfo;
65import org.onosproject.store.flow.ReplicaInfoEvent;
66import org.onosproject.store.flow.ReplicaInfoEventListener;
67import org.onosproject.store.flow.ReplicaInfoService;
Madan Jampani54d34992015-03-06 17:27:52 -080068import org.onosproject.store.impl.ClockService;
69import org.onosproject.store.impl.MastershipBasedTimestamp;
70import org.onosproject.store.serializers.KryoNamespaces;
Brian O'Connorabafb502014-12-02 22:26:20 -080071import org.onosproject.store.serializers.KryoSerializer;
72import org.onosproject.store.serializers.StoreSerializer;
73import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070074import org.osgi.service.component.ComponentContext;
alshabib339a3d92014-09-26 17:54:32 -070075import org.slf4j.Logger;
76
Brian O'Connor72cb19a2015-01-16 16:14:41 -080077import java.io.IOException;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080078import java.util.Arrays;
Madan Jampani54d34992015-03-06 17:27:52 -080079import java.util.Collection;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080080import java.util.Collections;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070081import java.util.Dictionary;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080082import java.util.List;
83import java.util.Map;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080084import java.util.Set;
85import java.util.concurrent.ConcurrentHashMap;
86import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.CopyOnWriteArraySet;
88import java.util.concurrent.ExecutionException;
89import java.util.concurrent.ExecutorService;
90import java.util.concurrent.Executors;
91import java.util.concurrent.Future;
92import java.util.concurrent.TimeUnit;
93import java.util.concurrent.TimeoutException;
94import java.util.stream.Collectors;
95
Brian O'Connorc4f351d2015-03-10 20:53:11 -070096import static com.google.common.base.Strings.isNullOrEmpty;
97import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
Thomas Vachuska6519e6f2015-03-11 02:29:31 -070098import static org.onlab.util.Tools.get;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080099import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800100import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
101import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
102import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -0700103
104/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700105 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700106 */
alshabib339a3d92014-09-26 17:54:32 -0700107@Component(immediate = true)
108@Service
109public class DistributedFlowRuleStore
Madan Jampani54d34992015-03-06 17:27:52 -0800110 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700111 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700112
113 private final Logger log = getLogger(getClass());
114
Madan Jampani2af244a2015-02-22 13:12:01 -0800115 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700116 private static final boolean DEFAULT_BACKUP_ENABLED = false;
117 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
118
119 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
120 label = "Number of threads in the message handler pool")
121 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
122
123 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
124 label = "Indicates whether backups are enabled or not")
125 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani2af244a2015-02-22 13:12:01 -0800126
Madan Jampani54d34992015-03-06 17:27:52 -0800127 private InternalFlowTable flowTable;
alshabib339a3d92014-09-26 17:54:32 -0700128
Madan Jampani38b250d2014-10-17 11:02:38 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700130 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700133 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700136 protected ClusterService clusterService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected DeviceService deviceService;
140
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani54d34992015-03-06 17:27:52 -0800142 protected DeviceClockService deviceClockService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800145 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700146
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700147 @Reference(cardinality = MANDATORY_UNARY)
148 protected ComponentConfigService configService;
149
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800150 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700151
Madan Jampani2af244a2015-02-22 13:12:01 -0800152 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700153
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700154 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700155 @Override
156 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700157 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800158 .register(DistributedStoreSerializers.STORE_COMMON)
159 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800160 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800161 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800162 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700163 }
164 };
165
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700166 private ReplicaInfoEventListener replicaInfoEventListener;
167
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800168 private IdGenerator idGenerator;
169
alshabib339a3d92014-09-26 17:54:32 -0700170 @Activate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700173
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700174 flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700175
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800176 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
177
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700178 final NodeId local = clusterService.getLocalNode().id();
179
Madan Jampani2af244a2015-02-22 13:12:01 -0800180 messageHandlingExecutor = Executors.newFixedThreadPool(
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700181 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800182
183 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700184
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800185 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
186 @Override
187 public void handle(ClusterMessage message) {
188 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
189 log.trace("received completed notification for {}", event);
190 notifyDelegate(event);
191 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800192 }, messageHandlingExecutor);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800193
Madan Jampani117aaae2014-10-23 10:04:05 -0700194 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
195
196 @Override
197 public void handle(ClusterMessage message) {
198 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800199 log.trace("received get flow entry request for {}", rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800200 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700201 try {
202 message.respond(SERIALIZER.encode(flowEntry));
203 } catch (IOException e) {
204 log.error("Failed to respond back", e);
205 }
206 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800207 }, messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700208
Madan Jampanif5fdef02014-10-23 21:58:10 -0700209 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
210
211 @Override
212 public void handle(ClusterMessage message) {
213 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800214 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Madan Jampani54d34992015-03-06 17:27:52 -0800215 Set<StoredFlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700216 try {
217 message.respond(SERIALIZER.encode(flowEntries));
218 } catch (IOException e) {
219 log.error("Failed to respond to peer's getFlowEntries request", e);
220 }
221 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800222 }, messageHandlingExecutor);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700223
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800224 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
225
226 @Override
227 public void handle(ClusterMessage message) {
228 FlowEntry rule = SERIALIZER.decode(message.payload());
229 log.trace("received get flow entry request for {}", rule);
230 FlowRuleEvent event = removeFlowRuleInternal(rule);
231 try {
232 message.respond(SERIALIZER.encode(event));
233 } catch (IOException e) {
234 log.error("Failed to respond back", e);
235 }
236 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800237 }, messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800238
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700239 replicaInfoEventListener = new InternalReplicaInfoEventListener();
240
241 replicaInfoManager.addListener(replicaInfoEventListener);
242
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700243 logConfig("Started");
alshabib339a3d92014-09-26 17:54:32 -0700244 }
245
246 @Deactivate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700247 public void deactivate(ComponentContext context) {
248 configService.unregisterProperties(getClass(), false);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800249 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
250 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
251 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
252 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
alshabib371abe82015-02-13 10:44:17 -0800253 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Madan Jampani2af244a2015-02-22 13:12:01 -0800254 messageHandlingExecutor.shutdown();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700255 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700256 log.info("Stopped");
257 }
258
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700259 @Modified
260 public void modified(ComponentContext context) {
261 if (context == null) {
262 backupEnabled = DEFAULT_BACKUP_ENABLED;
263 logConfig("Default config");
264 return;
265 }
266
267 Dictionary properties = context.getProperties();
268 int newPoolSize;
269 boolean newBackupEnabled;
270 try {
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700271 String s = get(properties, "msgHandlerPoolSize");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700272 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
273
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700274 s = get(properties, "backupEnabled");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700275 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
276
277 } catch (NumberFormatException | ClassCastException e) {
278 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
279 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
280 }
281
282 if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
283 msgHandlerPoolSize = newPoolSize;
284 backupEnabled = newBackupEnabled;
285 // reconfigure the store
286 flowTable.withBackupsEnabled(backupEnabled);
287 ExecutorService oldMsgHandler = messageHandlingExecutor;
288 messageHandlingExecutor = Executors.newFixedThreadPool(
289 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
290 oldMsgHandler.shutdown();
291 logConfig("Reconfigured");
292 }
293 }
294
295 private void logConfig(String prefix) {
296 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
297 prefix, msgHandlerPoolSize, backupEnabled);
298 }
alshabib339a3d92014-09-26 17:54:32 -0700299
Brian O'Connor44008532014-12-04 16:41:36 -0800300 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700301 // flow store. We need to revisit the need for this operation or at least
302 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700303 @Override
tom9b4030d2014-10-06 10:39:03 -0700304 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700305 // implementing in-efficient operation for debugging purpose.
306 int sum = 0;
307 for (Device device : deviceService.getDevices()) {
308 final DeviceId did = device.id();
309 sum += Iterables.size(getFlowEntries(did));
310 }
311 return sum;
tom9b4030d2014-10-06 10:39:03 -0700312 }
313
314 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800315 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700316 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700317
318 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800319 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800320 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700321 }
322
Madan Jampani117aaae2014-10-23 10:04:05 -0700323 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800324 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700325 }
326
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800327 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800328 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700329
330 ClusterMessage message = new ClusterMessage(
331 clusterService.getLocalNode().id(),
332 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
333 SERIALIZER.encode(rule));
334
335 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700336 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
337 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
338 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800339 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 }
Brian O'Connor44008532014-12-04 16:41:36 -0800341 return null;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700342 }
343
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800344
alshabib339a3d92014-09-26 17:54:32 -0700345
346 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800347 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700348
349 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700350
351 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800352 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700353 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700354 }
355
Madan Jampanif5fdef02014-10-23 21:58:10 -0700356 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani54d34992015-03-06 17:27:52 -0800357 return flowTable.getFlowEntries(deviceId).stream().collect(Collectors.toSet());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700358 }
359
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800360 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800361 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700362
363 ClusterMessage message = new ClusterMessage(
364 clusterService.getLocalNode().id(),
365 GET_DEVICE_FLOW_ENTRIES,
366 SERIALIZER.encode(deviceId));
367
368 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700369 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
370 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
371 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800372 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700373 }
Yuta HIGUCHI24f79eb2014-12-12 15:46:43 -0800374 return Collections.emptyList();
Madan Jampanif5fdef02014-10-23 21:58:10 -0700375 }
376
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800377
alshabib339a3d92014-09-26 17:54:32 -0700378
379 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700380 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800381 storeBatch(new FlowRuleBatchOperation(
382 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
383 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 }
385
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700386 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800387 public void storeBatch(FlowRuleBatchOperation operation) {
388
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700389
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800391
392 notifyDelegate(FlowRuleBatchEvent.completed(
393 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
394 new CompletedBatchOperation(true, Collections.emptySet(),
395 operation.deviceId())));
396 return;
alshabib339a3d92014-09-26 17:54:32 -0700397 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700398
alshabib371abe82015-02-13 10:44:17 -0800399 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700400
401 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
402
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700403 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800404 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800405
alshabib371abe82015-02-13 10:44:17 -0800406 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800407
408 notifyDelegate(FlowRuleBatchEvent.completed(
409 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800410 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800411 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700412 }
413
414 final NodeId local = clusterService.getLocalNode().id();
415 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800416 storeBatchInternal(operation);
417 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700418 }
419
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800420 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800421 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700422
Madan Jampani38b250d2014-10-17 11:02:38 -0700423 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700424 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700425 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700426 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700427
alshabib371abe82015-02-13 10:44:17 -0800428
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800429 if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
430 log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800431
432 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800433 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800434 .collect(Collectors.toSet());
435
436 notifyDelegate(FlowRuleBatchEvent.completed(
437 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
438 new CompletedBatchOperation(false, allFailures, deviceId)));
439 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700440 }
441 }
442
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800443 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800444
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800445 final DeviceId did = operation.deviceId();
446 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800447 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
448 if (currentOps.isEmpty()) {
449 batchOperationComplete(FlowRuleBatchEvent.completed(
450 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
451 new CompletedBatchOperation(true, Collections.emptySet(), did)));
452 return;
453 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700454
alshabib371abe82015-02-13 10:44:17 -0800455 notifyDelegate(FlowRuleBatchEvent.requested(new
456 FlowRuleBatchRequest(operation.id(),
457 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700458
alshabib371abe82015-02-13 10:44:17 -0800459 }
460
461 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
462 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800463 op -> {
464 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800465 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800466 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800467 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800468 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800469 // Note: 2 equal FlowEntry may have different treatment
470 flowTable.remove(entry.deviceId(), entry);
471 flowTable.add(entry);
472
473 return op;
474 case REMOVE:
475 entry = flowTable.getFlowEntry(op.target());
476 if (entry != null) {
477 entry.setState(FlowEntryState.PENDING_REMOVE);
478 return op;
479 }
480 break;
481 case MODIFY:
482 //TODO: figure this out at some point
483 break;
484 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800485 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800486 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800487 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800488 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800489 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700490 }
491
492 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700493 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800494 storeBatch(
495 new FlowRuleBatchOperation(
496 Arrays.asList(
497 new FlowRuleBatchEntry(
498 FlowRuleOperation.REMOVE,
499 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700500 }
501
502 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700503 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
504 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700505 final NodeId localId = clusterService.getLocalNode().id();
506 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani54d34992015-03-06 17:27:52 -0800507 return addOrUpdateFlowRuleInternal((StoredFlowEntry) rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700508 }
509
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800510 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800511 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700512 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700513 }
514
Madan Jampani54d34992015-03-06 17:27:52 -0800515 private FlowRuleEvent addOrUpdateFlowRuleInternal(StoredFlowEntry rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800516 // check if this new rule is an update to an existing entry
517 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
518 if (stored != null) {
519 stored.setBytes(rule.bytes());
520 stored.setLife(rule.life());
521 stored.setPackets(rule.packets());
522 if (stored.state() == FlowEntryState.PENDING_ADD) {
523 stored.setState(FlowEntryState.ADDED);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800524 return new FlowRuleEvent(Type.RULE_ADDED, rule);
525 }
526 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800527 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800528
529 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800530 flowTable.add(rule);
531
alshabib1c319ff2014-10-04 20:29:09 -0700532 return null;
alshabib339a3d92014-09-26 17:54:32 -0700533 }
534
535 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700536 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800537 final DeviceId deviceId = rule.deviceId();
538 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700539
540 final NodeId localId = clusterService.getLocalNode().id();
541 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700542 // bypass and handle it locally
543 return removeFlowRuleInternal(rule);
544 }
545
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800546 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800547 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800548 // TODO: revisit if this should be null (="no-op") or Exception
549 return null;
550 }
551
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800552 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800553 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800554
555 ClusterMessage message = new ClusterMessage(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800556 clusterService.getLocalNode().id(),
557 REMOVE_FLOW_ENTRY,
558 SERIALIZER.encode(rule));
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800559
560 try {
561 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
562 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
563 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800564 // TODO: Retry against latest master or throw a FlowStoreException
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800565 throw new RuntimeException(e);
566 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700567 }
568
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800569 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700570 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800571 // This is where one could mark a rule as removed and still keep it in the store.
572 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800573 if (removed) {
574 return new FlowRuleEvent(RULE_REMOVED, rule);
575 } else {
576 return null;
alshabib339a3d92014-09-26 17:54:32 -0700577 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800578
alshabib339a3d92014-09-26 17:54:32 -0700579 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700580
581 @Override
582 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800583 //FIXME: need a per device pending response
584
585 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
586 if (nodeId == null) {
587 notifyDelegate(event);
588 } else {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800589 ClusterMessage message = new ClusterMessage(
590 clusterService.getLocalNode().id(),
591 REMOTE_APPLY_COMPLETED,
592 SERIALIZER.encode(event));
593 // TODO check unicast return value
594 clusterCommunicator.unicast(message, nodeId);
595 //error log: log.warn("Failed to respond to peer for batch operation result");
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700596 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700597 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700598
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800599 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800600 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700601 }
602
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700603 private final class OnStoreBatch implements ClusterMessageHandler {
604 private final NodeId local;
605
606 private OnStoreBatch(NodeId local) {
607 this.local = local;
608 }
609
610 @Override
611 public void handle(final ClusterMessage message) {
612 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800613 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700614
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800615 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700616 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
617 if (!local.equals(replicaInfo.master().orNull())) {
618
Madan Jampani54d34992015-03-06 17:27:52 -0800619 Set<FlowRule> failures = operation.getOperations()
620 .stream()
621 .map(FlowRuleBatchEntry::target)
622 .collect(Collectors.toSet());
623
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800624 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700625 // This node is no longer the master, respond as all failed.
626 // TODO: we might want to wrap response in envelope
627 // to distinguish sw programming failure and hand over
628 // it make sense in the latter case to retry immediately.
629 try {
630 message.respond(SERIALIZER.encode(allFailed));
631 } catch (IOException e) {
632 log.error("Failed to respond back", e);
633 }
634 return;
635 }
636
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700637
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800638 pendingResponses.put(operation.id(), message.sender());
639 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700640
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700641 }
642 }
643
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700644 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800645 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700646
647 @Override
648 public void event(ReplicaInfoEvent event) {
649 final NodeId local = clusterService.getLocalNode().id();
650 final DeviceId did = event.subject();
651 final ReplicaInfo rInfo = event.replicaInfo();
652
653 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800654 case MASTER_CHANGED:
655 if (local.equals(rInfo.master().orNull())) {
Madan Jampani54d34992015-03-06 17:27:52 -0800656 log.info("{} is now the master for {}. Will load flow rules from backup", local, did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800657 // This node is the new master, populate local structure
658 // from backup
Madan Jampani54d34992015-03-06 17:27:52 -0800659 flowTable.loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800660 }
661 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800662 // This node is no longer the master holder,
663 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800664 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800665 // TODO: probably should stop pending backup activities in
666 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800667 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800668 break;
669 default:
670 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700671
672 }
673 }
674 }
675
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800676 private class InternalFlowTable {
677
Madan Jampanie1356282015-03-10 19:05:36 -0700678 private boolean backupsEnabled = true;
679
680 /**
681 * Turns backups on or off.
682 * @param backupsEnabled whether backups should be enabled or not
683 * @return this instance
684 */
685 public InternalFlowTable withBackupsEnabled(boolean backupsEnabled) {
686 this.backupsEnabled = backupsEnabled;
687 return this;
688 }
689
Madan Jampani54d34992015-03-06 17:27:52 -0800690 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
691 flowEntries = Maps.newConcurrentMap();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800692
Madan Jampani54d34992015-03-06 17:27:52 -0800693 private final KryoNamespace.Builder flowSerializer = KryoNamespace.newBuilder()
694 .register(KryoNamespaces.API)
695 .register(MastershipBasedTimestamp.class);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800696
Madan Jampani54d34992015-03-06 17:27:52 -0800697 private final ClockService<FlowId, StoredFlowEntry> clockService =
Brian O'Connorb9a91c12015-03-10 11:19:50 -0700698 (flowId, flowEntry) ->
699 (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800700
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700701 private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
Madan Jampani54d34992015-03-06 17:27:52 -0800702 new EventuallyConsistentMapImpl<>("flow-backup",
703 clusterService,
704 clusterCommunicator,
705 flowSerializer,
706 clockService,
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700707 (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800708
Madan Jampani54d34992015-03-06 17:27:52 -0800709 private Collection<NodeId> getPeerNodes() {
710 List<NodeId> nodes = clusterService.getNodes()
711 .stream()
712 .map(node -> node.id())
713 .filter(id -> !id.equals(clusterService.getLocalNode().id()))
714 .collect(Collectors.toList());
715
716 if (nodes.isEmpty()) {
717 return ImmutableList.of();
718 } else {
Brian O'Connorb9a91c12015-03-10 11:19:50 -0700719 // get a random peer
720 return ImmutableList.of(nodes.get(RandomUtils.nextInt(nodes.size())));
Madan Jampani54d34992015-03-06 17:27:52 -0800721 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800722 }
723
Madan Jampani54d34992015-03-06 17:27:52 -0800724 public void loadFromBackup(DeviceId deviceId) {
Madan Jampanie1356282015-03-10 19:05:36 -0700725 if (!backupsEnabled) {
726 return;
727 }
728
Madan Jampani54d34992015-03-06 17:27:52 -0800729 ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
730
731 backupMap.values()
732 .stream()
733 .filter(entry -> entry.deviceId().equals(deviceId))
734 .forEach(entry -> flowTable.computeIfPresent(entry.id(), (k, v) -> {
735 if (v == null) {
736 return Sets.newHashSet(entry);
737 } else {
738 v.add(entry);
739 }
740 return v;
741 }));
742 flowEntries.putIfAbsent(deviceId, flowTable);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800743 }
744
745 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Madan Jampani54d34992015-03-06 17:27:52 -0800746 return flowEntries
747 .computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
748 .computeIfAbsent(flowId, k -> new CopyOnWriteArraySet<>());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800749 }
750
751 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
Madan Jampani54d34992015-03-06 17:27:52 -0800752 return getFlowEntriesInternal(rule.deviceId(), rule.id())
753 .stream()
754 .filter(element -> element.equals(rule))
755 .findFirst()
756 .orElse(null);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800757 }
758
Madan Jampani54d34992015-03-06 17:27:52 -0800759 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
760 Set<StoredFlowEntry> entries = Sets.newHashSet();
761 flowEntries.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
762 .values()
763 .forEach(entries::addAll);
764 return entries;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800765 }
766
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800767 public StoredFlowEntry getFlowEntry(FlowRule rule) {
768 return getFlowEntryInternal(rule);
769 }
770
Madan Jampani54d34992015-03-06 17:27:52 -0800771 public Set<StoredFlowEntry> getFlowEntries(DeviceId deviceId) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800772 return getFlowEntriesInternal(deviceId);
773 }
774
Madan Jampani54d34992015-03-06 17:27:52 -0800775 public void add(StoredFlowEntry rule) {
776 getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule);
Madan Jampanie1356282015-03-10 19:05:36 -0700777 if (backupsEnabled) {
778 try {
779 backupMap.put(rule.id(), rule);
780 } catch (Exception e) {
781 log.warn("Failed to backup flow rule", e);
782 }
Madan Jampani54d34992015-03-06 17:27:52 -0800783 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800784 }
785
786 public boolean remove(DeviceId deviceId, FlowEntry rule) {
Madan Jampani54d34992015-03-06 17:27:52 -0800787 boolean status =
788 getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
Madan Jampanie1356282015-03-10 19:05:36 -0700789 if (backupsEnabled && status) {
Madan Jampani54d34992015-03-06 17:27:52 -0800790 try {
791 backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
792 } catch (Exception e) {
793 log.warn("Failed to remove backup of flow rule", e);
794 }
795 }
796 return status;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800797 }
798
799 public void clearDevice(DeviceId did) {
800 flowEntries.remove(did);
Madan Jampani54d34992015-03-06 17:27:52 -0800801 // Flow entries should continue to remain in backup map.
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800802 }
803 }
Madan Jampanie1356282015-03-10 19:05:36 -0700804}