blob: 80f29142875c07f966ccae04da9ae92249c2fe6c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Brian O'Connor72cb19a2015-01-16 16:14:41 -080018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080025import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070029import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070031import org.apache.felix.scr.annotations.Service;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080032import org.onlab.util.KryoNamespace;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080033import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080036import org.onosproject.core.CoreService;
37import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.net.Device;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.CompletedBatchOperation;
42import org.onosproject.net.flow.DefaultFlowEntry;
43import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowEntry.FlowEntryState;
45import org.onosproject.net.flow.FlowId;
46import org.onosproject.net.flow.FlowRule;
47import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080048import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080049import org.onosproject.net.flow.FlowRuleBatchEvent;
50import org.onosproject.net.flow.FlowRuleBatchOperation;
51import org.onosproject.net.flow.FlowRuleBatchRequest;
52import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080054import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import org.onosproject.net.flow.FlowRuleStore;
56import org.onosproject.net.flow.FlowRuleStoreDelegate;
57import org.onosproject.net.flow.StoredFlowEntry;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
59import org.onosproject.store.cluster.messaging.ClusterMessage;
60import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
61import org.onosproject.store.flow.ReplicaInfo;
62import org.onosproject.store.flow.ReplicaInfoEvent;
63import org.onosproject.store.flow.ReplicaInfoEventListener;
64import org.onosproject.store.flow.ReplicaInfoService;
65import org.onosproject.store.hz.AbstractHazelcastStore;
66import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080067import org.onosproject.store.serializers.KryoSerializer;
68import org.onosproject.store.serializers.StoreSerializer;
69import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib339a3d92014-09-26 17:54:32 -070070import org.slf4j.Logger;
71
Brian O'Connor72cb19a2015-01-16 16:14:41 -080072import java.io.IOException;
73import java.util.ArrayList;
74import java.util.Arrays;
75import java.util.Collections;
76import java.util.HashSet;
77import java.util.List;
78import java.util.Map;
79import java.util.Map.Entry;
80import java.util.Set;
81import java.util.concurrent.ConcurrentHashMap;
82import java.util.concurrent.ConcurrentMap;
83import java.util.concurrent.CopyOnWriteArraySet;
84import java.util.concurrent.ExecutionException;
85import java.util.concurrent.ExecutorService;
86import java.util.concurrent.Executors;
87import java.util.concurrent.Future;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.TimeoutException;
90import java.util.stream.Collectors;
91
92import static com.google.common.base.Preconditions.checkNotNull;
93import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080094import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080095import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
96import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -070098
99/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700100 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700101 */
alshabib339a3d92014-09-26 17:54:32 -0700102@Component(immediate = true)
103@Service
104public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700105 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700106 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700107
108 private final Logger log = getLogger(getClass());
109
Madan Jampani2af244a2015-02-22 13:12:01 -0800110 // TODO: Make configurable.
111 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
112
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800113 private InternalFlowTable flowTable = new InternalFlowTable();
114
115 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
116 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700117
Madan Jampani38b250d2014-10-17 11:02:38 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700119 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700122 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700125 protected ClusterService clusterService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected DeviceService deviceService;
129
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700132
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800133 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700134
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800135 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700136 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
137
Madan Jampani2af244a2015-02-22 13:12:01 -0800138 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700139
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700140 private final ExecutorService backupExecutors =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800141 Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700142
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700143 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700144
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700145 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700146 @Override
147 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700148 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800149 .register(DistributedStoreSerializers.STORE_COMMON)
150 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800151 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800152 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800153 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700154 }
155 };
156
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700157 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700158
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700159 private ReplicaInfoEventListener replicaInfoEventListener;
160
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800161 private IdGenerator idGenerator;
162
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700163 @Override
alshabib339a3d92014-09-26 17:54:32 -0700164 @Activate
165 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700166
167 super.serializer = SERIALIZER;
168 super.theInstance = storeService.getHazelcastInstance();
169
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800170 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
171
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700172 // Cache to create SMap on demand
173 smaps = CacheBuilder.newBuilder()
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800174 .softValues()
175 .build(new SMapLoader());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700176
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700177 final NodeId local = clusterService.getLocalNode().id();
178
Madan Jampani2af244a2015-02-22 13:12:01 -0800179 messageHandlingExecutor = Executors.newFixedThreadPool(
180 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800181 groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800182
183 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700184
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800185 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
186 @Override
187 public void handle(ClusterMessage message) {
188 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
189 log.trace("received completed notification for {}", event);
190 notifyDelegate(event);
191 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800192 }, messageHandlingExecutor);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800193
Madan Jampani117aaae2014-10-23 10:04:05 -0700194 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
195
196 @Override
197 public void handle(ClusterMessage message) {
198 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800199 log.trace("received get flow entry request for {}", rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800200 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700201 try {
202 message.respond(SERIALIZER.encode(flowEntry));
203 } catch (IOException e) {
204 log.error("Failed to respond back", e);
205 }
206 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800207 }, messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700208
Madan Jampanif5fdef02014-10-23 21:58:10 -0700209 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
210
211 @Override
212 public void handle(ClusterMessage message) {
213 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800214 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800215 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700216 try {
217 message.respond(SERIALIZER.encode(flowEntries));
218 } catch (IOException e) {
219 log.error("Failed to respond to peer's getFlowEntries request", e);
220 }
221 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800222 }, messageHandlingExecutor);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700223
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800224 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
225
226 @Override
227 public void handle(ClusterMessage message) {
228 FlowEntry rule = SERIALIZER.decode(message.payload());
229 log.trace("received get flow entry request for {}", rule);
230 FlowRuleEvent event = removeFlowRuleInternal(rule);
231 try {
232 message.respond(SERIALIZER.encode(event));
233 } catch (IOException e) {
234 log.error("Failed to respond back", e);
235 }
236 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800237 }, messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800238
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700239 replicaInfoEventListener = new InternalReplicaInfoEventListener();
240
241 replicaInfoManager.addListener(replicaInfoEventListener);
242
alshabib339a3d92014-09-26 17:54:32 -0700243 log.info("Started");
244 }
245
246 @Deactivate
247 public void deactivate() {
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800248 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
249 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
250 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
251 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
alshabib371abe82015-02-13 10:44:17 -0800252 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Madan Jampani2af244a2015-02-22 13:12:01 -0800253 messageHandlingExecutor.shutdown();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700254 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700255 log.info("Stopped");
256 }
257
258
Brian O'Connor44008532014-12-04 16:41:36 -0800259 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700260 // flow store. We need to revisit the need for this operation or at least
261 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700262 @Override
tom9b4030d2014-10-06 10:39:03 -0700263 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700264 // implementing in-efficient operation for debugging purpose.
265 int sum = 0;
266 for (Device device : deviceService.getDevices()) {
267 final DeviceId did = device.id();
268 sum += Iterables.size(getFlowEntries(did));
269 }
270 return sum;
tom9b4030d2014-10-06 10:39:03 -0700271 }
272
273 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800274 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700275 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700276
277 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800278 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800279 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700280 }
281
Madan Jampani117aaae2014-10-23 10:04:05 -0700282 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800283 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700284 }
285
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800286 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800287 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700288
289 ClusterMessage message = new ClusterMessage(
290 clusterService.getLocalNode().id(),
291 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
292 SERIALIZER.encode(rule));
293
294 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700295 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
296 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
297 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800298 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampani117aaae2014-10-23 10:04:05 -0700299 }
Brian O'Connor44008532014-12-04 16:41:36 -0800300 return null;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700301 }
302
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800303
alshabib339a3d92014-09-26 17:54:32 -0700304
305 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800306 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700307
308 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700309
310 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800311 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700312 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700313 }
314
Madan Jampanif5fdef02014-10-23 21:58:10 -0700315 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800316 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700317 }
318
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800319 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800320 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700321
322 ClusterMessage message = new ClusterMessage(
323 clusterService.getLocalNode().id(),
324 GET_DEVICE_FLOW_ENTRIES,
325 SERIALIZER.encode(deviceId));
326
327 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700328 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
329 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
330 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800331 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700332 }
Yuta HIGUCHI24f79eb2014-12-12 15:46:43 -0800333 return Collections.emptyList();
Madan Jampanif5fdef02014-10-23 21:58:10 -0700334 }
335
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800336
alshabib339a3d92014-09-26 17:54:32 -0700337
338 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700339 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800340 storeBatch(new FlowRuleBatchOperation(
341 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
342 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700343 }
344
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700345 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800346 public void storeBatch(FlowRuleBatchOperation operation) {
347
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700348
Madan Jampani117aaae2014-10-23 10:04:05 -0700349 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800350
351 notifyDelegate(FlowRuleBatchEvent.completed(
352 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
353 new CompletedBatchOperation(true, Collections.emptySet(),
354 operation.deviceId())));
355 return;
alshabib339a3d92014-09-26 17:54:32 -0700356 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700357
alshabib371abe82015-02-13 10:44:17 -0800358 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700359
360 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
361
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700362 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800363 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800364
alshabib371abe82015-02-13 10:44:17 -0800365 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800366
367 notifyDelegate(FlowRuleBatchEvent.completed(
368 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800369 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800370 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700371 }
372
373 final NodeId local = clusterService.getLocalNode().id();
374 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800375 storeBatchInternal(operation);
376 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700377 }
378
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800379 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800380 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700381
Madan Jampani38b250d2014-10-17 11:02:38 -0700382 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700383 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700384 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700385 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700386
alshabib371abe82015-02-13 10:44:17 -0800387
Madan Jampani38b250d2014-10-17 11:02:38 -0700388 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800389
alshabib371abe82015-02-13 10:44:17 -0800390 clusterCommunicator.unicast(message, replicaInfo.master().get());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800391
alshabib371abe82015-02-13 10:44:17 -0800392 } catch (IOException e) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800393 log.warn("Failed to storeBatch: {}", e.getMessage());
394
395 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800396 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800397 .collect(Collectors.toSet());
398
399 notifyDelegate(FlowRuleBatchEvent.completed(
400 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
401 new CompletedBatchOperation(false, allFailures, deviceId)));
402 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700403 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800404
Madan Jampani38b250d2014-10-17 11:02:38 -0700405 }
406
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800407 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800408
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800409 final DeviceId did = operation.deviceId();
410 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800411 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
412 if (currentOps.isEmpty()) {
413 batchOperationComplete(FlowRuleBatchEvent.completed(
414 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
415 new CompletedBatchOperation(true, Collections.emptySet(), did)));
416 return;
417 }
418 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700419
alshabib371abe82015-02-13 10:44:17 -0800420 notifyDelegate(FlowRuleBatchEvent.requested(new
421 FlowRuleBatchRequest(operation.id(),
422 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700423
alshabib371abe82015-02-13 10:44:17 -0800424 }
425
426 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
427 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800428 op -> {
429 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800430 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800431 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800432 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800433 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800434 // Note: 2 equal FlowEntry may have different treatment
435 flowTable.remove(entry.deviceId(), entry);
436 flowTable.add(entry);
437
438 return op;
439 case REMOVE:
440 entry = flowTable.getFlowEntry(op.target());
441 if (entry != null) {
442 entry.setState(FlowEntryState.PENDING_REMOVE);
443 return op;
444 }
445 break;
446 case MODIFY:
447 //TODO: figure this out at some point
448 break;
449 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800450 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800451 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800452 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800453 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800454 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700455 }
456
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800457 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
458 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700459
460 if (syncBackup) {
461 // wait for backup to complete
462 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800463 backup.get();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700464 } catch (InterruptedException | ExecutionException e) {
465 log.error("Failed to create backups", e);
466 }
467 }
468 }
469
alshabib339a3d92014-09-26 17:54:32 -0700470 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700471 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800472 storeBatch(
473 new FlowRuleBatchOperation(
474 Arrays.asList(
475 new FlowRuleBatchEntry(
476 FlowRuleOperation.REMOVE,
477 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700478 }
479
480 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700481 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
482 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700483 final NodeId localId = clusterService.getLocalNode().id();
484 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700485 return addOrUpdateFlowRuleInternal(rule);
486 }
487
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800488 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800489 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700490 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700491 }
492
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800493 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700494 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700495
alshabib339a3d92014-09-26 17:54:32 -0700496
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800497 // check if this new rule is an update to an existing entry
498 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
499 if (stored != null) {
500 stored.setBytes(rule.bytes());
501 stored.setLife(rule.life());
502 stored.setPackets(rule.packets());
503 if (stored.state() == FlowEntryState.PENDING_ADD) {
504 stored.setState(FlowEntryState.ADDED);
505 FlowRuleBatchEntry entry =
506 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
507 updateBackup(did, Sets.newHashSet(entry));
508 return new FlowRuleEvent(Type.RULE_ADDED, rule);
509 }
510 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800511 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800512
513 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
514 // TODO: also update backup if the behavior is correct.
515 flowTable.add(rule);
516
517
alshabib1c319ff2014-10-04 20:29:09 -0700518 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700519
alshabib339a3d92014-09-26 17:54:32 -0700520 }
521
522 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700523 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800524 final DeviceId deviceId = rule.deviceId();
525 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700526
527 final NodeId localId = clusterService.getLocalNode().id();
528 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700529 // bypass and handle it locally
530 return removeFlowRuleInternal(rule);
531 }
532
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800533 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800534 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800535 // TODO: revisit if this should be null (="no-op") or Exception
536 return null;
537 }
538
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800539 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800540 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800541
542 ClusterMessage message = new ClusterMessage(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800543 clusterService.getLocalNode().id(),
544 REMOVE_FLOW_ENTRY,
545 SERIALIZER.encode(rule));
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800546
547 try {
548 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
549 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
550 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800551 // TODO: Retry against latest master or throw a FlowStoreException
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800552 throw new RuntimeException(e);
553 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700554 }
555
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800556 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700557 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800558 // This is where one could mark a rule as removed and still keep it in the store.
559 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
560 FlowRuleBatchEntry entry =
561 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
562 updateBackup(deviceId, Sets.newHashSet(entry));
563 if (removed) {
564 return new FlowRuleEvent(RULE_REMOVED, rule);
565 } else {
566 return null;
alshabib339a3d92014-09-26 17:54:32 -0700567 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800568
alshabib339a3d92014-09-26 17:54:32 -0700569 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700570
571 @Override
572 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800573 //FIXME: need a per device pending response
574
575 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
576 if (nodeId == null) {
577 notifyDelegate(event);
578 } else {
579 try {
580 ClusterMessage message = new ClusterMessage(
581 clusterService.getLocalNode().id(),
582 REMOTE_APPLY_COMPLETED,
583 SERIALIZER.encode(event));
alshabib371abe82015-02-13 10:44:17 -0800584 clusterCommunicator.unicast(message, nodeId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800585 } catch (IOException e) {
586 log.warn("Failed to respond to peer for batch operation result");
587 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700588 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700589 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700590
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800591 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700592
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800593
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700594 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800595 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700596 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
597 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
598 : backupFlowTable.entrySet()) {
599
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800600 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700601 for (StoredFlowEntry entry : e.getValue()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800602 flowTable.getFlowEntriesById(entry).remove(entry);
603 flowTable.getFlowEntriesById(entry).add(entry);
604
605
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700606 }
607 }
608 } catch (ExecutionException e) {
609 log.error("Failed to load backup flowtable for {}", did, e);
610 }
611 }
612
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800613 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800614 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700615 }
616
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800617
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700618 private final class OnStoreBatch implements ClusterMessageHandler {
619 private final NodeId local;
620
621 private OnStoreBatch(NodeId local) {
622 this.local = local;
623 }
624
625 @Override
626 public void handle(final ClusterMessage message) {
627 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800628 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700629
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800630 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700631 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
632 if (!local.equals(replicaInfo.master().orNull())) {
633
634 Set<FlowRule> failures = new HashSet<>(operation.size());
635 for (FlowRuleBatchEntry op : operation.getOperations()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800636 failures.add(op.target());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700637 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800638 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700639 // This node is no longer the master, respond as all failed.
640 // TODO: we might want to wrap response in envelope
641 // to distinguish sw programming failure and hand over
642 // it make sense in the latter case to retry immediately.
643 try {
644 message.respond(SERIALIZER.encode(allFailed));
645 } catch (IOException e) {
646 log.error("Failed to respond back", e);
647 }
648 return;
649 }
650
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700651
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800652 pendingResponses.put(operation.id(), message.sender());
653 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700654
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700655 }
656 }
657
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700658 private final class SMapLoader
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800659 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700660
661 @Override
662 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
663 throws Exception {
664 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
665 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
666 }
667 }
668
669 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800670 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700671
672 @Override
673 public void event(ReplicaInfoEvent event) {
674 final NodeId local = clusterService.getLocalNode().id();
675 final DeviceId did = event.subject();
676 final ReplicaInfo rInfo = event.replicaInfo();
677
678 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800679 case MASTER_CHANGED:
680 if (local.equals(rInfo.master().orNull())) {
681 // This node is the new master, populate local structure
682 // from backup
683 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800684 }
685 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800686 // This node is no longer the master holder,
687 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800688 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800689 // TODO: probably should stop pending backup activities in
690 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800691 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800692 break;
693 default:
694 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700695
696 }
697 }
698 }
699
700 // Task to update FlowEntries in backup HZ store
701 private final class UpdateBackup implements Runnable {
702
703 private final DeviceId deviceId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800704 private final Set<FlowRuleBatchEntry> ops;
705
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700706
707 public UpdateBackup(DeviceId deviceId,
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800708 Set<FlowRuleBatchEntry> ops) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700709 this.deviceId = checkNotNull(deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800710 this.ops = checkNotNull(ops);
711
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700712 }
713
714 @Override
715 public void run() {
716 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800717 log.trace("update backup {} {}", deviceId, ops
718 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700719 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700720
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700721
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800722 ops.stream().forEach(
723 op -> {
Ray Milkeyf7329c72015-02-17 11:37:01 -0800724 final FlowRule entry = op.target();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800725 final FlowId id = entry.id();
726 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
727 List<StoredFlowEntry> list = new ArrayList<>();
728 if (original != null) {
729 list.addAll(original);
730 }
Ray Milkeyf7329c72015-02-17 11:37:01 -0800731 list.remove(op.target());
732 if (op.operator() == FlowRuleOperation.ADD) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800733 list.add((StoredFlowEntry) entry);
734 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700735
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800736 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
737 boolean success;
738 if (original == null) {
739 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
740 } else {
741 success = backupFlowTable.replace(id, original, newValue);
742 }
743 if (!success) {
744 log.error("Updating backup failed.");
745 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700746
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800747 }
748 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700749 } catch (ExecutionException e) {
750 log.error("Failed to write to backups", e);
751 }
752
753 }
754 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800755
756 private class InternalFlowTable {
757
758 /*
759 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
760 map when it supports distributed to a sequence of instances.
761 */
762
763
764 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
765 flowEntries = new ConcurrentHashMap<>();
766
767
768 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
769 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
770 }
771
772 /**
773 * Returns the flow table for specified device.
774 *
775 * @param deviceId identifier of the device
776 * @return Map representing Flow Table of given device.
777 */
778 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
779 return createIfAbsentUnchecked(flowEntries,
780 deviceId, lazyEmptyFlowTable());
781 }
782
783 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
784 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
785 Set<StoredFlowEntry> r = flowTable.get(flowId);
786 if (r == null) {
787 final Set<StoredFlowEntry> concurrentlyAdded;
788 r = new CopyOnWriteArraySet<>();
789 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
790 if (concurrentlyAdded != null) {
791 return concurrentlyAdded;
792 }
793 }
794 return r;
795 }
796
797 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
798 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
799 if (f.equals(rule)) {
800 return f;
801 }
802 }
803 return null;
804 }
805
806 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
807 return getFlowTable(deviceId).values().stream()
808 .flatMap((list -> list.stream())).collect(Collectors.toSet());
809
810 }
811
812
813 public StoredFlowEntry getFlowEntry(FlowRule rule) {
814 return getFlowEntryInternal(rule);
815 }
816
817 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
818 return getFlowEntriesInternal(deviceId);
819 }
820
821 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
822 return getFlowEntriesInternal(entry.deviceId(), entry.id());
823 }
824
825 public void add(FlowEntry rule) {
826 ((CopyOnWriteArraySet)
827 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
828 }
829
830 public boolean remove(DeviceId deviceId, FlowEntry rule) {
831 return ((CopyOnWriteArraySet)
832 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
833 //return flowEntries.remove(deviceId, rule);
834 }
835
836 public void clearDevice(DeviceId did) {
837 flowEntries.remove(did);
838 }
839 }
840
841
alshabib339a3d92014-09-26 17:54:32 -0700842}