blob: e854201df8137e71c9d36c98d561906c96713975 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Brian O'Connor72cb19a2015-01-16 16:14:41 -080018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080025import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070029import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070031import org.apache.felix.scr.annotations.Service;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080032import org.onlab.util.KryoNamespace;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080033import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080036import org.onosproject.core.CoreService;
37import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.net.Device;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.CompletedBatchOperation;
42import org.onosproject.net.flow.DefaultFlowEntry;
43import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowEntry.FlowEntryState;
45import org.onosproject.net.flow.FlowId;
46import org.onosproject.net.flow.FlowRule;
47import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080048import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080049import org.onosproject.net.flow.FlowRuleBatchEvent;
50import org.onosproject.net.flow.FlowRuleBatchOperation;
51import org.onosproject.net.flow.FlowRuleBatchRequest;
52import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080054import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import org.onosproject.net.flow.FlowRuleStore;
56import org.onosproject.net.flow.FlowRuleStoreDelegate;
57import org.onosproject.net.flow.StoredFlowEntry;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
59import org.onosproject.store.cluster.messaging.ClusterMessage;
60import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
61import org.onosproject.store.flow.ReplicaInfo;
62import org.onosproject.store.flow.ReplicaInfoEvent;
63import org.onosproject.store.flow.ReplicaInfoEventListener;
64import org.onosproject.store.flow.ReplicaInfoService;
65import org.onosproject.store.hz.AbstractHazelcastStore;
66import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080067import org.onosproject.store.serializers.KryoSerializer;
68import org.onosproject.store.serializers.StoreSerializer;
69import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib339a3d92014-09-26 17:54:32 -070070import org.slf4j.Logger;
71
Brian O'Connor72cb19a2015-01-16 16:14:41 -080072import java.io.IOException;
73import java.util.ArrayList;
74import java.util.Arrays;
75import java.util.Collections;
76import java.util.HashSet;
77import java.util.List;
78import java.util.Map;
79import java.util.Map.Entry;
80import java.util.Set;
81import java.util.concurrent.ConcurrentHashMap;
82import java.util.concurrent.ConcurrentMap;
83import java.util.concurrent.CopyOnWriteArraySet;
84import java.util.concurrent.ExecutionException;
85import java.util.concurrent.ExecutorService;
86import java.util.concurrent.Executors;
87import java.util.concurrent.Future;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.TimeoutException;
90import java.util.stream.Collectors;
91
92import static com.google.common.base.Preconditions.checkNotNull;
93import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
94import static org.onlab.util.Tools.namedThreads;
95import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
96import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
97import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -070098
99/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700100 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700101 */
alshabib339a3d92014-09-26 17:54:32 -0700102@Component(immediate = true)
103@Service
104public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700105 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700106 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700107
108 private final Logger log = getLogger(getClass());
109
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800110 private InternalFlowTable flowTable = new InternalFlowTable();
111
112 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
113 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700114
Madan Jampani38b250d2014-10-17 11:02:38 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700119 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DeviceService deviceService;
126
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700129
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800130 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700131
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800132 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700133 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
134
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700136 private final ExecutorService backupExecutors =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800137 Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700138
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700139 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700140
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700141 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700142 @Override
143 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700144 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800145 .register(DistributedStoreSerializers.STORE_COMMON)
146 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800147 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800148 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800149 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700150 }
151 };
152
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700153 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700154
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700155 private ReplicaInfoEventListener replicaInfoEventListener;
156
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800157 private IdGenerator idGenerator;
158
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700159 @Override
alshabib339a3d92014-09-26 17:54:32 -0700160 @Activate
161 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700162
163 super.serializer = SERIALIZER;
164 super.theInstance = storeService.getHazelcastInstance();
165
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800166 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
167
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168 // Cache to create SMap on demand
169 smaps = CacheBuilder.newBuilder()
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800170 .softValues()
171 .build(new SMapLoader());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700172
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700173 final NodeId local = clusterService.getLocalNode().id();
174
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700175 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700176
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800177 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
178 @Override
179 public void handle(ClusterMessage message) {
180 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
181 log.trace("received completed notification for {}", event);
182 notifyDelegate(event);
183 }
184 });
185
Madan Jampani117aaae2014-10-23 10:04:05 -0700186 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
187
188 @Override
189 public void handle(ClusterMessage message) {
190 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800191 log.trace("received get flow entry request for {}", rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800192 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700193 try {
194 message.respond(SERIALIZER.encode(flowEntry));
195 } catch (IOException e) {
196 log.error("Failed to respond back", e);
197 }
198 }
199 });
200
Madan Jampanif5fdef02014-10-23 21:58:10 -0700201 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
202
203 @Override
204 public void handle(ClusterMessage message) {
205 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800206 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800207 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700208 try {
209 message.respond(SERIALIZER.encode(flowEntries));
210 } catch (IOException e) {
211 log.error("Failed to respond to peer's getFlowEntries request", e);
212 }
213 }
214 });
215
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800216 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
217
218 @Override
219 public void handle(ClusterMessage message) {
220 FlowEntry rule = SERIALIZER.decode(message.payload());
221 log.trace("received get flow entry request for {}", rule);
222 FlowRuleEvent event = removeFlowRuleInternal(rule);
223 try {
224 message.respond(SERIALIZER.encode(event));
225 } catch (IOException e) {
226 log.error("Failed to respond back", e);
227 }
228 }
229 });
230
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700231 replicaInfoEventListener = new InternalReplicaInfoEventListener();
232
233 replicaInfoManager.addListener(replicaInfoEventListener);
234
alshabib339a3d92014-09-26 17:54:32 -0700235 log.info("Started");
236 }
237
238 @Deactivate
239 public void deactivate() {
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800240 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
241 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
242 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
243 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
alshabib371abe82015-02-13 10:44:17 -0800244 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700245 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700246 log.info("Stopped");
247 }
248
249
Brian O'Connor44008532014-12-04 16:41:36 -0800250 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700251 // flow store. We need to revisit the need for this operation or at least
252 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700253 @Override
tom9b4030d2014-10-06 10:39:03 -0700254 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700255 // implementing in-efficient operation for debugging purpose.
256 int sum = 0;
257 for (Device device : deviceService.getDevices()) {
258 final DeviceId did = device.id();
259 sum += Iterables.size(getFlowEntries(did));
260 }
261 return sum;
tom9b4030d2014-10-06 10:39:03 -0700262 }
263
264 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800265 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700266 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700267
268 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800269 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800270 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700271 }
272
Madan Jampani117aaae2014-10-23 10:04:05 -0700273 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800274 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700275 }
276
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800277 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800278 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700279
280 ClusterMessage message = new ClusterMessage(
281 clusterService.getLocalNode().id(),
282 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
283 SERIALIZER.encode(rule));
284
285 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700286 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
287 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
288 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800289 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampani117aaae2014-10-23 10:04:05 -0700290 }
Brian O'Connor44008532014-12-04 16:41:36 -0800291 return null;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700292 }
293
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800294
alshabib339a3d92014-09-26 17:54:32 -0700295
296 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800297 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700298
299 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700300
301 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800302 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700303 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700304 }
305
Madan Jampanif5fdef02014-10-23 21:58:10 -0700306 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800307 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700308 }
309
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800310 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800311 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700312
313 ClusterMessage message = new ClusterMessage(
314 clusterService.getLocalNode().id(),
315 GET_DEVICE_FLOW_ENTRIES,
316 SERIALIZER.encode(deviceId));
317
318 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700319 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
320 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
321 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800322 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700323 }
Yuta HIGUCHI24f79eb2014-12-12 15:46:43 -0800324 return Collections.emptyList();
Madan Jampanif5fdef02014-10-23 21:58:10 -0700325 }
326
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800327
alshabib339a3d92014-09-26 17:54:32 -0700328
329 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700330 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800331 storeBatch(new FlowRuleBatchOperation(
332 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
333 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700334 }
335
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700336 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800337 public void storeBatch(FlowRuleBatchOperation operation) {
338
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700339
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800341
342 notifyDelegate(FlowRuleBatchEvent.completed(
343 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
344 new CompletedBatchOperation(true, Collections.emptySet(),
345 operation.deviceId())));
346 return;
alshabib339a3d92014-09-26 17:54:32 -0700347 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700348
alshabib371abe82015-02-13 10:44:17 -0800349 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700350
351 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
352
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700353 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800354 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800355
alshabib371abe82015-02-13 10:44:17 -0800356 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800357
358 notifyDelegate(FlowRuleBatchEvent.completed(
359 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800360 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800361 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700362 }
363
364 final NodeId local = clusterService.getLocalNode().id();
365 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800366 storeBatchInternal(operation);
367 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700368 }
369
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800370 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800371 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700372
Madan Jampani38b250d2014-10-17 11:02:38 -0700373 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700374 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700375 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700376 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700377
alshabib371abe82015-02-13 10:44:17 -0800378
Madan Jampani38b250d2014-10-17 11:02:38 -0700379 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800380
alshabib371abe82015-02-13 10:44:17 -0800381 clusterCommunicator.unicast(message, replicaInfo.master().get());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800382
alshabib371abe82015-02-13 10:44:17 -0800383 } catch (IOException e) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800384 log.warn("Failed to storeBatch: {}", e.getMessage());
385
386 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800387 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800388 .collect(Collectors.toSet());
389
390 notifyDelegate(FlowRuleBatchEvent.completed(
391 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
392 new CompletedBatchOperation(false, allFailures, deviceId)));
393 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700394 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800395
Madan Jampani38b250d2014-10-17 11:02:38 -0700396 }
397
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800398 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800399
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800400 final DeviceId did = operation.deviceId();
401 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800402 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
403 if (currentOps.isEmpty()) {
404 batchOperationComplete(FlowRuleBatchEvent.completed(
405 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
406 new CompletedBatchOperation(true, Collections.emptySet(), did)));
407 return;
408 }
409 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700410
alshabib371abe82015-02-13 10:44:17 -0800411 notifyDelegate(FlowRuleBatchEvent.requested(new
412 FlowRuleBatchRequest(operation.id(),
413 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700414
alshabib371abe82015-02-13 10:44:17 -0800415 }
416
417 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
418 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800419 op -> {
420 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800421 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800422 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800423 entry = new DefaultFlowEntry(op.target());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800424 // always add requested FlowRule

425 // Note: 2 equal FlowEntry may have different treatment
426 flowTable.remove(entry.deviceId(), entry);
427 flowTable.add(entry);
428
429 return op;
430 case REMOVE:
431 entry = flowTable.getFlowEntry(op.target());
432 if (entry != null) {
433 entry.setState(FlowEntryState.PENDING_REMOVE);
434 return op;
435 }
436 break;
437 case MODIFY:
438 //TODO: figure this out at some point
439 break;
440 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800441 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800442 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800443 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800444 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800445 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700446 }
447
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800448 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
449 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700450
451 if (syncBackup) {
452 // wait for backup to complete
453 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800454 backup.get();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700455 } catch (InterruptedException | ExecutionException e) {
456 log.error("Failed to create backups", e);
457 }
458 }
459 }
460
alshabib339a3d92014-09-26 17:54:32 -0700461 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700462 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800463 storeBatch(
464 new FlowRuleBatchOperation(
465 Arrays.asList(
466 new FlowRuleBatchEntry(
467 FlowRuleOperation.REMOVE,
468 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700469 }
470
471 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700472 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
473 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700474 final NodeId localId = clusterService.getLocalNode().id();
475 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700476 return addOrUpdateFlowRuleInternal(rule);
477 }
478
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800479 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800480 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700481 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700482 }
483
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800484 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700485 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700486
alshabib339a3d92014-09-26 17:54:32 -0700487
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800488 // check if this new rule is an update to an existing entry
489 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
490 if (stored != null) {
491 stored.setBytes(rule.bytes());
492 stored.setLife(rule.life());
493 stored.setPackets(rule.packets());
494 if (stored.state() == FlowEntryState.PENDING_ADD) {
495 stored.setState(FlowEntryState.ADDED);
496 FlowRuleBatchEntry entry =
497 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
498 updateBackup(did, Sets.newHashSet(entry));
499 return new FlowRuleEvent(Type.RULE_ADDED, rule);
500 }
501 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800502 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800503
504 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
505 // TODO: also update backup if the behavior is correct.
506 flowTable.add(rule);
507
508
alshabib1c319ff2014-10-04 20:29:09 -0700509 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700510
alshabib339a3d92014-09-26 17:54:32 -0700511 }
512
513 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700514 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800515 final DeviceId deviceId = rule.deviceId();
516 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700517
518 final NodeId localId = clusterService.getLocalNode().id();
519 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700520 // bypass and handle it locally
521 return removeFlowRuleInternal(rule);
522 }
523
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800524 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800525 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800526 // TODO: revisit if this should be null (="no-op") or Exception
527 return null;
528 }
529
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800530 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800531 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800532
533 ClusterMessage message = new ClusterMessage(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800534 clusterService.getLocalNode().id(),
535 REMOVE_FLOW_ENTRY,
536 SERIALIZER.encode(rule));
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800537
538 try {
539 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
540 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
541 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800542 // TODO: Retry against latest master or throw a FlowStoreException
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800543 throw new RuntimeException(e);
544 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700545 }
546
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800547 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700548 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800549 // This is where one could mark a rule as removed and still keep it in the store.
550 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
551 FlowRuleBatchEntry entry =
552 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
553 updateBackup(deviceId, Sets.newHashSet(entry));
554 if (removed) {
555 return new FlowRuleEvent(RULE_REMOVED, rule);
556 } else {
557 return null;
alshabib339a3d92014-09-26 17:54:32 -0700558 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800559
alshabib339a3d92014-09-26 17:54:32 -0700560 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700561
562 @Override
563 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800564 //FIXME: need a per device pending response
565
566 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
567 if (nodeId == null) {
568 notifyDelegate(event);
569 } else {
570 try {
571 ClusterMessage message = new ClusterMessage(
572 clusterService.getLocalNode().id(),
573 REMOTE_APPLY_COMPLETED,
574 SERIALIZER.encode(event));
alshabib371abe82015-02-13 10:44:17 -0800575 clusterCommunicator.unicast(message, nodeId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800576 } catch (IOException e) {
577 log.warn("Failed to respond to peer for batch operation result");
578 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700579 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700580 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700581
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800582 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700583
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800584
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700585 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800586 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700587 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
588 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
589 : backupFlowTable.entrySet()) {
590
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800591 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700592 for (StoredFlowEntry entry : e.getValue()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800593 flowTable.getFlowEntriesById(entry).remove(entry);
594 flowTable.getFlowEntriesById(entry).add(entry);
595
596
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700597 }
598 }
599 } catch (ExecutionException e) {
600 log.error("Failed to load backup flowtable for {}", did, e);
601 }
602 }
603
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800604 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800605 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700606 }
607
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800608
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700609 private final class OnStoreBatch implements ClusterMessageHandler {
610 private final NodeId local;
611
612 private OnStoreBatch(NodeId local) {
613 this.local = local;
614 }
615
616 @Override
617 public void handle(final ClusterMessage message) {
618 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800619 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700620
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800621 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700622 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
623 if (!local.equals(replicaInfo.master().orNull())) {
624
625 Set<FlowRule> failures = new HashSet<>(operation.size());
626 for (FlowRuleBatchEntry op : operation.getOperations()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800627 failures.add(op.target());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700628 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800629 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700630 // This node is no longer the master, respond as all failed.
631 // TODO: we might want to wrap response in envelope
632 // to distinguish sw programming failure and hand over
633 // it make sense in the latter case to retry immediately.
634 try {
635 message.respond(SERIALIZER.encode(allFailed));
636 } catch (IOException e) {
637 log.error("Failed to respond back", e);
638 }
639 return;
640 }
641
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700642
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800643 pendingResponses.put(operation.id(), message.sender());
644 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700645
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700646 }
647 }
648
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700649 private final class SMapLoader
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800650 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700651
652 @Override
653 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
654 throws Exception {
655 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
656 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
657 }
658 }
659
660 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800661 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700662
663 @Override
664 public void event(ReplicaInfoEvent event) {
665 final NodeId local = clusterService.getLocalNode().id();
666 final DeviceId did = event.subject();
667 final ReplicaInfo rInfo = event.replicaInfo();
668
669 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800670 case MASTER_CHANGED:
671 if (local.equals(rInfo.master().orNull())) {
672 // This node is the new master, populate local structure
673 // from backup
674 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800675 }
676 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800677 // This node is no longer the master holder,
678 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800679 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800680 // TODO: probably should stop pending backup activities in
681 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800682 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800683 break;
684 default:
685 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700686
687 }
688 }
689 }
690
691 // Task to update FlowEntries in backup HZ store
692 private final class UpdateBackup implements Runnable {
693
694 private final DeviceId deviceId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800695 private final Set<FlowRuleBatchEntry> ops;
696
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700697
698 public UpdateBackup(DeviceId deviceId,
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800699 Set<FlowRuleBatchEntry> ops) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700700 this.deviceId = checkNotNull(deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800701 this.ops = checkNotNull(ops);
702
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700703 }
704
705 @Override
706 public void run() {
707 try {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800708 log.trace("update backup {} {}", deviceId, ops
709 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700710 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700711
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700712
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800713 ops.stream().forEach(
714 op -> {
Ray Milkeyf7329c72015-02-17 11:37:01 -0800715 final FlowRule entry = op.target();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800716 final FlowId id = entry.id();
717 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
718 List<StoredFlowEntry> list = new ArrayList<>();
719 if (original != null) {
720 list.addAll(original);
721 }
Ray Milkeyf7329c72015-02-17 11:37:01 -0800722 list.remove(op.target());
723 if (op.operator() == FlowRuleOperation.ADD) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800724 list.add((StoredFlowEntry) entry);
725 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700726
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800727 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
728 boolean success;
729 if (original == null) {
730 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
731 } else {
732 success = backupFlowTable.replace(id, original, newValue);
733 }
734 if (!success) {
735 log.error("Updating backup failed.");
736 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700737
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800738 }
739 );
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700740 } catch (ExecutionException e) {
741 log.error("Failed to write to backups", e);
742 }
743
744 }
745 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800746
747 private class InternalFlowTable {
748
749 /*
750 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
751 map when it supports distributed to a sequence of instances.
752 */
753
754
755 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
756 flowEntries = new ConcurrentHashMap<>();
757
758
759 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
760 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
761 }
762
763 /**
764 * Returns the flow table for specified device.
765 *
766 * @param deviceId identifier of the device
767 * @return Map representing Flow Table of given device.
768 */
769 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
770 return createIfAbsentUnchecked(flowEntries,
771 deviceId, lazyEmptyFlowTable());
772 }
773
774 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
775 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
776 Set<StoredFlowEntry> r = flowTable.get(flowId);
777 if (r == null) {
778 final Set<StoredFlowEntry> concurrentlyAdded;
779 r = new CopyOnWriteArraySet<>();
780 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
781 if (concurrentlyAdded != null) {
782 return concurrentlyAdded;
783 }
784 }
785 return r;
786 }
787
788 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
789 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
790 if (f.equals(rule)) {
791 return f;
792 }
793 }
794 return null;
795 }
796
797 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
798 return getFlowTable(deviceId).values().stream()
799 .flatMap((list -> list.stream())).collect(Collectors.toSet());
800
801 }
802
803
804 public StoredFlowEntry getFlowEntry(FlowRule rule) {
805 return getFlowEntryInternal(rule);
806 }
807
808 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
809 return getFlowEntriesInternal(deviceId);
810 }
811
812 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
813 return getFlowEntriesInternal(entry.deviceId(), entry.id());
814 }
815
816 public void add(FlowEntry rule) {
817 ((CopyOnWriteArraySet)
818 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
819 }
820
821 public boolean remove(DeviceId deviceId, FlowEntry rule) {
822 return ((CopyOnWriteArraySet)
823 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
824 //return flowEntries.remove(deviceId, rule);
825 }
826
827 public void clearDevice(DeviceId did) {
828 flowEntries.remove(did);
829 }
830 }
831
832
alshabib339a3d92014-09-26 17:54:32 -0700833}