blob: 450bb3d4e659b0049d70f7a171917f54456c347d [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Brian O'Connor72cb19a2015-01-16 16:14:41 -080018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080025import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070029import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070031import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080032import org.onlab.util.BoundedThreadPool;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080033import org.onlab.util.KryoNamespace;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080034import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080037import org.onosproject.core.CoreService;
38import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.net.Device;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.flow.CompletedBatchOperation;
43import org.onosproject.net.flow.DefaultFlowEntry;
44import org.onosproject.net.flow.FlowEntry;
45import org.onosproject.net.flow.FlowEntry.FlowEntryState;
46import org.onosproject.net.flow.FlowId;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080049import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080050import org.onosproject.net.flow.FlowRuleBatchEvent;
51import org.onosproject.net.flow.FlowRuleBatchOperation;
52import org.onosproject.net.flow.FlowRuleBatchRequest;
53import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080054import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080055import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080056import org.onosproject.net.flow.FlowRuleStore;
57import org.onosproject.net.flow.FlowRuleStoreDelegate;
58import org.onosproject.net.flow.StoredFlowEntry;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.flow.ReplicaInfo;
63import org.onosproject.store.flow.ReplicaInfoEvent;
64import org.onosproject.store.flow.ReplicaInfoEventListener;
65import org.onosproject.store.flow.ReplicaInfoService;
66import org.onosproject.store.hz.AbstractHazelcastStore;
67import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080068import org.onosproject.store.serializers.KryoSerializer;
69import org.onosproject.store.serializers.StoreSerializer;
70import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib339a3d92014-09-26 17:54:32 -070071import org.slf4j.Logger;
72
Brian O'Connor72cb19a2015-01-16 16:14:41 -080073import java.io.IOException;
74import java.util.ArrayList;
75import java.util.Arrays;
76import java.util.Collections;
77import java.util.HashSet;
78import java.util.List;
79import java.util.Map;
80import java.util.Map.Entry;
81import java.util.Set;
82import java.util.concurrent.ConcurrentHashMap;
83import java.util.concurrent.ConcurrentMap;
84import java.util.concurrent.CopyOnWriteArraySet;
85import java.util.concurrent.ExecutionException;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
88import java.util.concurrent.Future;
89import java.util.concurrent.TimeUnit;
90import java.util.concurrent.TimeoutException;
91import java.util.stream.Collectors;
92
93import static com.google.common.base.Preconditions.checkNotNull;
94import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080095import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080096import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
97import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
98import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -070099
100/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700101 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700102 */
alshabib339a3d92014-09-26 17:54:32 -0700103@Component(immediate = true)
104@Service
105public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700106 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700107 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700108
109 private final Logger log = getLogger(getClass());
110
Madan Jampani2af244a2015-02-22 13:12:01 -0800111 // TODO: Make configurable.
112 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
113
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800114 private InternalFlowTable flowTable = new InternalFlowTable();
115
116 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
117 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700118
Madan Jampani38b250d2014-10-17 11:02:38 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700120 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700123 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected DeviceService deviceService;
130
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700133
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800134 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700135
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800136 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700137 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
138
Madan Jampani2af244a2015-02-22 13:12:01 -0800139 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700140
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700141 private final ExecutorService backupExecutors =
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800142 BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
143 //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700144
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700145 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700146
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700147 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700148 @Override
149 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700150 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800151 .register(DistributedStoreSerializers.STORE_COMMON)
152 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800153 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800154 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800155 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700156 }
157 };
158
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700159 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700160
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700161 private ReplicaInfoEventListener replicaInfoEventListener;
162
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800163 private IdGenerator idGenerator;
164
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700165 @Override
alshabib339a3d92014-09-26 17:54:32 -0700166 @Activate
167 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168
169 super.serializer = SERIALIZER;
170 super.theInstance = storeService.getHazelcastInstance();
171
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800172 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
173
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700174 // Cache to create SMap on demand
175 smaps = CacheBuilder.newBuilder()
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800176 .softValues()
177 .build(new SMapLoader());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700178
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700179 final NodeId local = clusterService.getLocalNode().id();
180
Madan Jampani2af244a2015-02-22 13:12:01 -0800181 messageHandlingExecutor = Executors.newFixedThreadPool(
182 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -0800183 groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800184
185 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700186
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800187 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
188 @Override
189 public void handle(ClusterMessage message) {
190 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
191 log.trace("received completed notification for {}", event);
192 notifyDelegate(event);
193 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800194 }, messageHandlingExecutor);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800195
Madan Jampani117aaae2014-10-23 10:04:05 -0700196 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
197
198 @Override
199 public void handle(ClusterMessage message) {
200 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800201 log.trace("received get flow entry request for {}", rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800202 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700203 try {
204 message.respond(SERIALIZER.encode(flowEntry));
205 } catch (IOException e) {
206 log.error("Failed to respond back", e);
207 }
208 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800209 }, messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700210
Madan Jampanif5fdef02014-10-23 21:58:10 -0700211 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
212
213 @Override
214 public void handle(ClusterMessage message) {
215 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800216 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800217 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700218 try {
219 message.respond(SERIALIZER.encode(flowEntries));
220 } catch (IOException e) {
221 log.error("Failed to respond to peer's getFlowEntries request", e);
222 }
223 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800224 }, messageHandlingExecutor);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700225
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800226 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
227
228 @Override
229 public void handle(ClusterMessage message) {
230 FlowEntry rule = SERIALIZER.decode(message.payload());
231 log.trace("received get flow entry request for {}", rule);
232 FlowRuleEvent event = removeFlowRuleInternal(rule);
233 try {
234 message.respond(SERIALIZER.encode(event));
235 } catch (IOException e) {
236 log.error("Failed to respond back", e);
237 }
238 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800239 }, messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800240
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700241 replicaInfoEventListener = new InternalReplicaInfoEventListener();
242
243 replicaInfoManager.addListener(replicaInfoEventListener);
244
alshabib339a3d92014-09-26 17:54:32 -0700245 log.info("Started");
246 }
247
248 @Deactivate
249 public void deactivate() {
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800250 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
251 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
252 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
253 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
alshabib371abe82015-02-13 10:44:17 -0800254 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Madan Jampani2af244a2015-02-22 13:12:01 -0800255 messageHandlingExecutor.shutdown();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700256 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700257 log.info("Stopped");
258 }
259
260
Brian O'Connor44008532014-12-04 16:41:36 -0800261 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700262 // flow store. We need to revisit the need for this operation or at least
263 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700264 @Override
tom9b4030d2014-10-06 10:39:03 -0700265 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700266 // implementing in-efficient operation for debugging purpose.
267 int sum = 0;
268 for (Device device : deviceService.getDevices()) {
269 final DeviceId did = device.id();
270 sum += Iterables.size(getFlowEntries(did));
271 }
272 return sum;
tom9b4030d2014-10-06 10:39:03 -0700273 }
274
275 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800276 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700277 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700278
279 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800280 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800281 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700282 }
283
Madan Jampani117aaae2014-10-23 10:04:05 -0700284 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800285 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700286 }
287
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800288 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800289 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700290
291 ClusterMessage message = new ClusterMessage(
292 clusterService.getLocalNode().id(),
293 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
294 SERIALIZER.encode(rule));
295
296 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700297 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
298 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
299 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800300 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampani117aaae2014-10-23 10:04:05 -0700301 }
Brian O'Connor44008532014-12-04 16:41:36 -0800302 return null;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700303 }
304
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800305
alshabib339a3d92014-09-26 17:54:32 -0700306
307 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800308 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700309
310 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700311
312 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800313 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700314 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700315 }
316
Madan Jampanif5fdef02014-10-23 21:58:10 -0700317 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800318 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700319 }
320
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800321 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800322 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700323
324 ClusterMessage message = new ClusterMessage(
325 clusterService.getLocalNode().id(),
326 GET_DEVICE_FLOW_ENTRIES,
327 SERIALIZER.encode(deviceId));
328
329 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700330 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
331 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
332 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800333 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700334 }
Yuta HIGUCHI24f79eb2014-12-12 15:46:43 -0800335 return Collections.emptyList();
Madan Jampanif5fdef02014-10-23 21:58:10 -0700336 }
337
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800338
alshabib339a3d92014-09-26 17:54:32 -0700339
340 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700341 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800342 storeBatch(new FlowRuleBatchOperation(
343 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
344 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700345 }
346
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700347 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800348 public void storeBatch(FlowRuleBatchOperation operation) {
349
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700350
Madan Jampani117aaae2014-10-23 10:04:05 -0700351 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800352
353 notifyDelegate(FlowRuleBatchEvent.completed(
354 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
355 new CompletedBatchOperation(true, Collections.emptySet(),
356 operation.deviceId())));
357 return;
alshabib339a3d92014-09-26 17:54:32 -0700358 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700359
alshabib371abe82015-02-13 10:44:17 -0800360 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700361
362 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
363
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700364 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800365 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800366
alshabib371abe82015-02-13 10:44:17 -0800367 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800368
369 notifyDelegate(FlowRuleBatchEvent.completed(
370 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800371 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800372 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700373 }
374
375 final NodeId local = clusterService.getLocalNode().id();
376 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800377 storeBatchInternal(operation);
378 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700379 }
380
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800381 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800382 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700383
Madan Jampani38b250d2014-10-17 11:02:38 -0700384 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700385 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700386 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700387 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700388
alshabib371abe82015-02-13 10:44:17 -0800389
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800390 if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
391 log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800392
393 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800394 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800395 .collect(Collectors.toSet());
396
397 notifyDelegate(FlowRuleBatchEvent.completed(
398 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
399 new CompletedBatchOperation(false, allFailures, deviceId)));
400 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700401 }
402 }
403
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800404 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800405
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800406 final DeviceId did = operation.deviceId();
407 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800408 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
409 if (currentOps.isEmpty()) {
410 batchOperationComplete(FlowRuleBatchEvent.completed(
411 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
412 new CompletedBatchOperation(true, Collections.emptySet(), did)));
413 return;
414 }
415 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700416
alshabib371abe82015-02-13 10:44:17 -0800417 notifyDelegate(FlowRuleBatchEvent.requested(new
418 FlowRuleBatchRequest(operation.id(),
419 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700420
alshabib371abe82015-02-13 10:44:17 -0800421 }
422
423 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
424 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800425 op -> {
426 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800427 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800428 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800429 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800430 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800431 // Note: 2 equal FlowEntry may have different treatment
432 flowTable.remove(entry.deviceId(), entry);
433 flowTable.add(entry);
434
435 return op;
436 case REMOVE:
437 entry = flowTable.getFlowEntry(op.target());
438 if (entry != null) {
439 entry.setState(FlowEntryState.PENDING_REMOVE);
440 return op;
441 }
442 break;
443 case MODIFY:
444 //TODO: figure this out at some point
445 break;
446 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800447 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800448 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800449 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800450 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800451 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700452 }
453
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800454 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
455 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700456
457 if (syncBackup) {
458 // wait for backup to complete
459 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800460 backup.get();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700461 } catch (InterruptedException | ExecutionException e) {
462 log.error("Failed to create backups", e);
463 }
464 }
465 }
466
alshabib339a3d92014-09-26 17:54:32 -0700467 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700468 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800469 storeBatch(
470 new FlowRuleBatchOperation(
471 Arrays.asList(
472 new FlowRuleBatchEntry(
473 FlowRuleOperation.REMOVE,
474 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700475 }
476
477 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700478 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
479 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700480 final NodeId localId = clusterService.getLocalNode().id();
481 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700482 return addOrUpdateFlowRuleInternal(rule);
483 }
484
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800485 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800486 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700487 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700488 }
489
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800490 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700491 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700492
alshabib339a3d92014-09-26 17:54:32 -0700493
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800494 // check if this new rule is an update to an existing entry
495 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
496 if (stored != null) {
497 stored.setBytes(rule.bytes());
498 stored.setLife(rule.life());
499 stored.setPackets(rule.packets());
500 if (stored.state() == FlowEntryState.PENDING_ADD) {
501 stored.setState(FlowEntryState.ADDED);
502 FlowRuleBatchEntry entry =
503 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
504 updateBackup(did, Sets.newHashSet(entry));
505 return new FlowRuleEvent(Type.RULE_ADDED, rule);
506 }
507 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800508 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800509
510 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
511 // TODO: also update backup if the behavior is correct.
512 flowTable.add(rule);
513
514
alshabib1c319ff2014-10-04 20:29:09 -0700515 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700516
alshabib339a3d92014-09-26 17:54:32 -0700517 }
518
519 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700520 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800521 final DeviceId deviceId = rule.deviceId();
522 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700523
524 final NodeId localId = clusterService.getLocalNode().id();
525 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700526 // bypass and handle it locally
527 return removeFlowRuleInternal(rule);
528 }
529
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800530 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800531 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800532 // TODO: revisit if this should be null (="no-op") or Exception
533 return null;
534 }
535
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800536 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800537 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800538
539 ClusterMessage message = new ClusterMessage(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800540 clusterService.getLocalNode().id(),
541 REMOVE_FLOW_ENTRY,
542 SERIALIZER.encode(rule));
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800543
544 try {
545 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
546 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
547 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800548 // TODO: Retry against latest master or throw a FlowStoreException
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800549 throw new RuntimeException(e);
550 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700551 }
552
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800553 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700554 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800555 // This is where one could mark a rule as removed and still keep it in the store.
556 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
557 FlowRuleBatchEntry entry =
558 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
559 updateBackup(deviceId, Sets.newHashSet(entry));
560 if (removed) {
561 return new FlowRuleEvent(RULE_REMOVED, rule);
562 } else {
563 return null;
alshabib339a3d92014-09-26 17:54:32 -0700564 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800565
alshabib339a3d92014-09-26 17:54:32 -0700566 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700567
568 @Override
569 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800570 //FIXME: need a per device pending response
571
572 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
573 if (nodeId == null) {
574 notifyDelegate(event);
575 } else {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800576 ClusterMessage message = new ClusterMessage(
577 clusterService.getLocalNode().id(),
578 REMOTE_APPLY_COMPLETED,
579 SERIALIZER.encode(event));
580 // TODO check unicast return value
581 clusterCommunicator.unicast(message, nodeId);
582 //error log: log.warn("Failed to respond to peer for batch operation result");
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700583 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700584 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700585
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800586 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700587
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800588
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700589 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800590 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700591 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
592 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
593 : backupFlowTable.entrySet()) {
594
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800595 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700596 for (StoredFlowEntry entry : e.getValue()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800597 flowTable.getFlowEntriesById(entry).remove(entry);
598 flowTable.getFlowEntriesById(entry).add(entry);
599
600
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700601 }
602 }
603 } catch (ExecutionException e) {
604 log.error("Failed to load backup flowtable for {}", did, e);
605 }
606 }
607
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800608 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800609 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700610 }
611
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800612
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700613 private final class OnStoreBatch implements ClusterMessageHandler {
614 private final NodeId local;
615
616 private OnStoreBatch(NodeId local) {
617 this.local = local;
618 }
619
620 @Override
621 public void handle(final ClusterMessage message) {
622 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800623 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700624
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800625 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700626 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
627 if (!local.equals(replicaInfo.master().orNull())) {
628
629 Set<FlowRule> failures = new HashSet<>(operation.size());
630 for (FlowRuleBatchEntry op : operation.getOperations()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800631 failures.add(op.target());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700632 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800633 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700634 // This node is no longer the master, respond as all failed.
635 // TODO: we might want to wrap response in envelope
636 // to distinguish sw programming failure and hand over
637 // it make sense in the latter case to retry immediately.
638 try {
639 message.respond(SERIALIZER.encode(allFailed));
640 } catch (IOException e) {
641 log.error("Failed to respond back", e);
642 }
643 return;
644 }
645
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700646
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800647 pendingResponses.put(operation.id(), message.sender());
648 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700649
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700650 }
651 }
652
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700653 private final class SMapLoader
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800654 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700655
656 @Override
657 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
658 throws Exception {
659 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
660 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
661 }
662 }
663
664 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800665 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700666
667 @Override
668 public void event(ReplicaInfoEvent event) {
669 final NodeId local = clusterService.getLocalNode().id();
670 final DeviceId did = event.subject();
671 final ReplicaInfo rInfo = event.replicaInfo();
672
673 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800674 case MASTER_CHANGED:
675 if (local.equals(rInfo.master().orNull())) {
676 // This node is the new master, populate local structure
677 // from backup
678 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800679 }
680 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800681 // This node is no longer the master holder,
682 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800683 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800684 // TODO: probably should stop pending backup activities in
685 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800686 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800687 break;
688 default:
689 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700690
691 }
692 }
693 }
694
695 // Task to update FlowEntries in backup HZ store
696 private final class UpdateBackup implements Runnable {
697
698 private final DeviceId deviceId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800699 private final Set<FlowRuleBatchEntry> ops;
700
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700701
702 public UpdateBackup(DeviceId deviceId,
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800703 Set<FlowRuleBatchEntry> ops) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700704 this.deviceId = checkNotNull(deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800705 this.ops = checkNotNull(ops);
706
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700707 }
708
709 @Override
710 public void run() {
711 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800712 log.trace("update backup {} {}", deviceId, ops
713 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700714 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700715
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700716
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800717 ops.stream().forEach(
718 op -> {
Ray Milkeyf7329c72015-02-17 11:37:01 -0800719 final FlowRule entry = op.target();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800720 final FlowId id = entry.id();
721 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
722 List<StoredFlowEntry> list = new ArrayList<>();
723 if (original != null) {
724 list.addAll(original);
725 }
Ray Milkeyf7329c72015-02-17 11:37:01 -0800726 list.remove(op.target());
727 if (op.operator() == FlowRuleOperation.ADD) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800728 list.add((StoredFlowEntry) entry);
729 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700730
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800731 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
732 boolean success;
733 if (original == null) {
734 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
735 } else {
736 success = backupFlowTable.replace(id, original, newValue);
737 }
738 if (!success) {
739 log.error("Updating backup failed.");
740 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700741
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800742 }
743 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700744 } catch (ExecutionException e) {
745 log.error("Failed to write to backups", e);
746 }
747
748 }
749 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800750
751 private class InternalFlowTable {
752
753 /*
754 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
755 map when it supports distributed to a sequence of instances.
756 */
757
758
759 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
760 flowEntries = new ConcurrentHashMap<>();
761
762
763 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
764 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
765 }
766
767 /**
768 * Returns the flow table for specified device.
769 *
770 * @param deviceId identifier of the device
771 * @return Map representing Flow Table of given device.
772 */
773 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
774 return createIfAbsentUnchecked(flowEntries,
775 deviceId, lazyEmptyFlowTable());
776 }
777
778 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
779 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
780 Set<StoredFlowEntry> r = flowTable.get(flowId);
781 if (r == null) {
782 final Set<StoredFlowEntry> concurrentlyAdded;
783 r = new CopyOnWriteArraySet<>();
784 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
785 if (concurrentlyAdded != null) {
786 return concurrentlyAdded;
787 }
788 }
789 return r;
790 }
791
792 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
793 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
794 if (f.equals(rule)) {
795 return f;
796 }
797 }
798 return null;
799 }
800
801 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
802 return getFlowTable(deviceId).values().stream()
803 .flatMap((list -> list.stream())).collect(Collectors.toSet());
804
805 }
806
807
808 public StoredFlowEntry getFlowEntry(FlowRule rule) {
809 return getFlowEntryInternal(rule);
810 }
811
812 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
813 return getFlowEntriesInternal(deviceId);
814 }
815
816 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
817 return getFlowEntriesInternal(entry.deviceId(), entry.id());
818 }
819
820 public void add(FlowEntry rule) {
821 ((CopyOnWriteArraySet)
822 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
823 }
824
825 public boolean remove(DeviceId deviceId, FlowEntry rule) {
826 return ((CopyOnWriteArraySet)
827 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
828 //return flowEntries.remove(deviceId, rule);
829 }
830
831 public void clearDevice(DeviceId did) {
832 flowEntries.remove(did);
833 }
834 }
835
836
alshabib339a3d92014-09-26 17:54:32 -0700837}