blob: 8ae7d2a9939d2ec1a92271b80b61aabd43c896af [file] [log] [blame]
Madan Jampanib0a3dd62015-03-17 13:41:27 -07001 /*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Madan Jampanib0a3dd62015-03-17 13:41:27 -070018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070025import com.google.common.util.concurrent.Futures;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070026import com.hazelcast.core.IMap;
27
alshabib339a3d92014-09-26 17:54:32 -070028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070031import org.apache.felix.scr.annotations.Modified;
32import org.apache.felix.scr.annotations.Property;
Madan Jampani38b250d2014-10-17 11:02:38 -070033import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070035import org.apache.felix.scr.annotations.Service;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070036import org.onlab.util.BoundedThreadPool;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080037import org.onlab.util.KryoNamespace;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070038import org.onlab.util.NewConcurrentHashMap;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import org.onlab.util.Tools;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070040import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080043import org.onosproject.core.CoreService;
44import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080045import org.onosproject.net.Device;
46import org.onosproject.net.DeviceId;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.net.flow.CompletedBatchOperation;
49import org.onosproject.net.flow.DefaultFlowEntry;
50import org.onosproject.net.flow.FlowEntry;
51import org.onosproject.net.flow.FlowEntry.FlowEntryState;
52import org.onosproject.net.flow.FlowId;
53import org.onosproject.net.flow.FlowRule;
54import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080055import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080056import org.onosproject.net.flow.FlowRuleBatchEvent;
57import org.onosproject.net.flow.FlowRuleBatchOperation;
58import org.onosproject.net.flow.FlowRuleBatchRequest;
59import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080060import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080061import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080062import org.onosproject.net.flow.FlowRuleStore;
63import org.onosproject.net.flow.FlowRuleStoreDelegate;
64import org.onosproject.net.flow.StoredFlowEntry;
65import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
66import org.onosproject.store.cluster.messaging.ClusterMessage;
67import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
68import org.onosproject.store.flow.ReplicaInfo;
69import org.onosproject.store.flow.ReplicaInfoEvent;
70import org.onosproject.store.flow.ReplicaInfoEventListener;
71import org.onosproject.store.flow.ReplicaInfoService;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070072import org.onosproject.store.hz.AbstractHazelcastStore;
73import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080074import org.onosproject.store.serializers.KryoSerializer;
75import org.onosproject.store.serializers.StoreSerializer;
76import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070077import org.osgi.service.component.ComponentContext;
alshabib339a3d92014-09-26 17:54:32 -070078import org.slf4j.Logger;
79
Brian O'Connor72cb19a2015-01-16 16:14:41 -080080import java.io.IOException;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070081import java.util.ArrayList;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080082import java.util.Arrays;
83import java.util.Collections;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070084import java.util.Dictionary;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070085import java.util.HashSet;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080086import java.util.List;
87import java.util.Map;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070088import java.util.Map.Entry;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080089import java.util.Set;
90import java.util.concurrent.ConcurrentHashMap;
91import java.util.concurrent.ConcurrentMap;
92import java.util.concurrent.CopyOnWriteArraySet;
93import java.util.concurrent.ExecutionException;
94import java.util.concurrent.ExecutorService;
95import java.util.concurrent.Executors;
96import java.util.concurrent.Future;
97import java.util.concurrent.TimeUnit;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080098import java.util.stream.Collectors;
99
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700100import static com.google.common.base.Preconditions.checkNotNull;
101import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700102import static com.google.common.base.Strings.isNullOrEmpty;
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700103import static org.onlab.util.Tools.get;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800104import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800105import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
106import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
107import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -0700108
109/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700110 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700111 */
alshabib339a3d92014-09-26 17:54:32 -0700112@Component(immediate = true)
113@Service
114public class DistributedFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700115 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700116 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700117
118 private final Logger log = getLogger(getClass());
119
Madan Jampani2af244a2015-02-22 13:12:01 -0800120 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700121 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700122 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
123
124 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
125 label = "Number of threads in the message handler pool")
126 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
127
128 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
129 label = "Indicates whether backups are enabled or not")
130 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani2af244a2015-02-22 13:12:01 -0800131
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700132 private InternalFlowTable flowTable = new InternalFlowTable();
133
134 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
135 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700136
Madan Jampani38b250d2014-10-17 11:02:38 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700138 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700141 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700144 protected ClusterService clusterService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 protected DeviceService deviceService;
148
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800149 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
150 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700151
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700153 protected ComponentConfigService configService;
154
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800155 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700156
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700157 // Cache of SMaps used for backup data. each SMap contain device flow table
158 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
159
Madan Jampani2af244a2015-02-22 13:12:01 -0800160 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700161
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700162 private final ExecutorService backupExecutors =
163 BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
164 //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
165
166 private boolean syncBackup = false;
167
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700168 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700169 @Override
170 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700171 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800172 .register(DistributedStoreSerializers.STORE_COMMON)
173 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800174 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800175 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800176 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700177 }
178 };
179
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700180 private ReplicaInfoEventListener replicaInfoEventListener;
181
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800182 private IdGenerator idGenerator;
183
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700184 private NodeId local;
185
alshabib339a3d92014-09-26 17:54:32 -0700186 @Activate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700187 public void activate(ComponentContext context) {
188 configService.registerProperties(getClass());
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700189 super.serializer = SERIALIZER;
190 super.theInstance = storeService.getHazelcastInstance();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700191
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800192 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
193
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700194 local = clusterService.getLocalNode().id();
195
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700196 // Cache to create SMap on demand
197 smaps = CacheBuilder.newBuilder()
198 .softValues()
199 .build(new SMapLoader());
200
Madan Jampani2af244a2015-02-22 13:12:01 -0800201 messageHandlingExecutor = Executors.newFixedThreadPool(
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700202 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800203
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700204 registerMessageHandlers(messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800205
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700206 replicaInfoEventListener = new InternalReplicaInfoEventListener();
207
208 replicaInfoManager.addListener(replicaInfoEventListener);
209
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700210 logConfig("Started");
alshabib339a3d92014-09-26 17:54:32 -0700211 }
212
213 @Deactivate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700214 public void deactivate(ComponentContext context) {
215 configService.unregisterProperties(getClass(), false);
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700216 unregisterMessageHandlers();
217 messageHandlingExecutor.shutdownNow();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700218 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700219 log.info("Stopped");
220 }
221
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700222 @Modified
223 public void modified(ComponentContext context) {
224 if (context == null) {
225 backupEnabled = DEFAULT_BACKUP_ENABLED;
226 logConfig("Default config");
227 return;
228 }
229
230 Dictionary properties = context.getProperties();
231 int newPoolSize;
232 boolean newBackupEnabled;
233 try {
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700234 String s = get(properties, "msgHandlerPoolSize");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700235 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
236
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700237 s = get(properties, "backupEnabled");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700238 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
239
240 } catch (NumberFormatException | ClassCastException e) {
241 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
242 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
243 }
244
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700245 if (newBackupEnabled != backupEnabled) {
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700246 backupEnabled = newBackupEnabled;
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700247 }
248 if (newPoolSize != msgHandlerPoolSize) {
249 msgHandlerPoolSize = newPoolSize;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700250 ExecutorService oldMsgHandler = messageHandlingExecutor;
251 messageHandlingExecutor = Executors.newFixedThreadPool(
252 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700253
254 // replace previously registered handlers.
255 registerMessageHandlers(messageHandlingExecutor);
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700256 oldMsgHandler.shutdown();
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700257 }
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700258 logConfig("Reconfigured");
259 }
260
261 private void registerMessageHandlers(ExecutorService executor) {
262
263 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
264
265 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
266 @Override
267 public void handle(ClusterMessage message) {
268 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
269 log.trace("received completed notification for {}", event);
270 notifyDelegate(event);
271 }
272 }, executor);
273
274 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
275
276 @Override
277 public void handle(ClusterMessage message) {
278 FlowRule rule = SERIALIZER.decode(message.payload());
279 log.trace("received get flow entry request for {}", rule);
280 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
281 try {
282 message.respond(SERIALIZER.encode(flowEntry));
283 } catch (IOException e) {
284 log.error("Failed to respond back", e);
285 }
286 }
287 }, executor);
288
289 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
290
291 @Override
292 public void handle(ClusterMessage message) {
293 DeviceId deviceId = SERIALIZER.decode(message.payload());
294 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
295 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
296 try {
297 message.respond(SERIALIZER.encode(flowEntries));
298 } catch (IOException e) {
299 log.error("Failed to respond to peer's getFlowEntries request", e);
300 }
301 }
302 }, executor);
303
304 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
305
306 @Override
307 public void handle(ClusterMessage message) {
308 FlowEntry rule = SERIALIZER.decode(message.payload());
309 log.trace("received get flow entry request for {}", rule);
310 FlowRuleEvent event = removeFlowRuleInternal(rule);
311 try {
312 message.respond(SERIALIZER.encode(event));
313 } catch (IOException e) {
314 log.error("Failed to respond back", e);
315 }
316 }
317 }, executor);
318 }
319
320 private void unregisterMessageHandlers() {
321 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
322 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
323 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
324 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
325 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700326 }
327
328 private void logConfig(String prefix) {
329 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
330 prefix, msgHandlerPoolSize, backupEnabled);
331 }
alshabib339a3d92014-09-26 17:54:32 -0700332
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700333
Brian O'Connor44008532014-12-04 16:41:36 -0800334 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700335 // flow store. We need to revisit the need for this operation or at least
336 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700337 @Override
tom9b4030d2014-10-06 10:39:03 -0700338 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700339 // implementing in-efficient operation for debugging purpose.
340 int sum = 0;
341 for (Device device : deviceService.getDevices()) {
342 final DeviceId did = device.id();
343 sum += Iterables.size(getFlowEntries(did));
344 }
345 return sum;
tom9b4030d2014-10-06 10:39:03 -0700346 }
347
348 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800349 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700350 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700351
352 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800353 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800354 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700355 }
356
Madan Jampani117aaae2014-10-23 10:04:05 -0700357 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800358 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700359 }
360
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800361 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800362 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700363
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700364 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
365 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
366 SERIALIZER::encode,
367 SERIALIZER::decode,
368 replicaInfo.master().get()),
369 FLOW_RULE_STORE_TIMEOUT_MILLIS,
370 TimeUnit.MILLISECONDS,
371 null);
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700372 }
373
alshabib339a3d92014-09-26 17:54:32 -0700374 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800375 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700376
377 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700378
379 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800380 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700381 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700382 }
383
Madan Jampanif5fdef02014-10-23 21:58:10 -0700384 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700385 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700386 }
387
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800388 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800389 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700390
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700391 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
392 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
393 SERIALIZER::encode,
394 SERIALIZER::decode,
395 replicaInfo.master().get()),
396 FLOW_RULE_STORE_TIMEOUT_MILLIS,
397 TimeUnit.MILLISECONDS,
398 Collections.emptyList());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700399 }
400
alshabib339a3d92014-09-26 17:54:32 -0700401 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700402 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800403 storeBatch(new FlowRuleBatchOperation(
404 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
405 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700406 }
407
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700408 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800409 public void storeBatch(FlowRuleBatchOperation operation) {
410
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700411
Madan Jampani117aaae2014-10-23 10:04:05 -0700412 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800413
414 notifyDelegate(FlowRuleBatchEvent.completed(
415 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
416 new CompletedBatchOperation(true, Collections.emptySet(),
417 operation.deviceId())));
418 return;
alshabib339a3d92014-09-26 17:54:32 -0700419 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700420
alshabib371abe82015-02-13 10:44:17 -0800421 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700422
423 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
424
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700425 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800426 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800427
alshabib371abe82015-02-13 10:44:17 -0800428 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800429
430 notifyDelegate(FlowRuleBatchEvent.completed(
431 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800432 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800433 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700434 }
435
436 final NodeId local = clusterService.getLocalNode().id();
437 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800438 storeBatchInternal(operation);
439 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700440 }
441
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800442 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800443 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700444
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700445 if (!clusterCommunicator.unicast(operation,
446 APPLY_BATCH_FLOWS, SERIALIZER::encode,
447 replicaInfo.master().get())) {
448 log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800449
450 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800451 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800452 .collect(Collectors.toSet());
453
454 notifyDelegate(FlowRuleBatchEvent.completed(
455 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
456 new CompletedBatchOperation(false, allFailures, deviceId)));
457 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700458 }
459 }
460
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800461 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800462
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800463 final DeviceId did = operation.deviceId();
464 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800465 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
466 if (currentOps.isEmpty()) {
467 batchOperationComplete(FlowRuleBatchEvent.completed(
468 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
469 new CompletedBatchOperation(true, Collections.emptySet(), did)));
470 return;
471 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700472 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700473
alshabib371abe82015-02-13 10:44:17 -0800474 notifyDelegate(FlowRuleBatchEvent.requested(new
475 FlowRuleBatchRequest(operation.id(),
476 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700477
alshabib371abe82015-02-13 10:44:17 -0800478 }
479
480 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
481 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800482 op -> {
483 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800484 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800485 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800486 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800487 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800488 // Note: 2 equal FlowEntry may have different treatment
489 flowTable.remove(entry.deviceId(), entry);
490 flowTable.add(entry);
491
492 return op;
493 case REMOVE:
494 entry = flowTable.getFlowEntry(op.target());
495 if (entry != null) {
496 entry.setState(FlowEntryState.PENDING_REMOVE);
497 return op;
498 }
499 break;
500 case MODIFY:
501 //TODO: figure this out at some point
502 break;
503 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800504 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800505 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800506 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800507 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800508 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700509 }
510
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700511 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
512 if (!backupEnabled) {
513 return;
514 }
515
516 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
517
518 if (syncBackup) {
519 // wait for backup to complete
520 try {
521 backup.get();
522 } catch (InterruptedException | ExecutionException e) {
523 log.error("Failed to create backups", e);
524 }
525 }
526 }
527
alshabib339a3d92014-09-26 17:54:32 -0700528 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700529 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800530 storeBatch(
531 new FlowRuleBatchOperation(
532 Arrays.asList(
533 new FlowRuleBatchEntry(
534 FlowRuleOperation.REMOVE,
535 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700536 }
537
538 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700539 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
540 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700541 final NodeId localId = clusterService.getLocalNode().id();
542 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700543 return addOrUpdateFlowRuleInternal(rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700544 }
545
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800546 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800547 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700548 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700549 }
550
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700551 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
552 final DeviceId did = rule.deviceId();
553
554
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800555 // check if this new rule is an update to an existing entry
556 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
557 if (stored != null) {
558 stored.setBytes(rule.bytes());
559 stored.setLife(rule.life());
560 stored.setPackets(rule.packets());
561 if (stored.state() == FlowEntryState.PENDING_ADD) {
562 stored.setState(FlowEntryState.ADDED);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700563 FlowRuleBatchEntry entry =
564 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
565 updateBackup(did, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800566 return new FlowRuleEvent(Type.RULE_ADDED, rule);
567 }
568 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800569 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800570
571 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700572 // TODO: also update backup if the behavior is correct.
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800573 flowTable.add(rule);
574
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700575
alshabib1c319ff2014-10-04 20:29:09 -0700576 return null;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700577
alshabib339a3d92014-09-26 17:54:32 -0700578 }
579
580 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700581 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800582 final DeviceId deviceId = rule.deviceId();
583 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700584
585 final NodeId localId = clusterService.getLocalNode().id();
586 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700587 // bypass and handle it locally
588 return removeFlowRuleInternal(rule);
589 }
590
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800591 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800592 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800593 // TODO: revisit if this should be null (="no-op") or Exception
594 return null;
595 }
596
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800597 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800598 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800599
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700600 return Futures.get(clusterCommunicator.sendAndReceive(
601 rule,
602 REMOVE_FLOW_ENTRY,
603 SERIALIZER::encode,
604 SERIALIZER::decode,
605 replicaInfo.master().get()),
606 FLOW_RULE_STORE_TIMEOUT_MILLIS,
607 TimeUnit.MILLISECONDS,
608 RuntimeException.class);
Madan Jampani38b250d2014-10-17 11:02:38 -0700609 }
610
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800611 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700612 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800613 // This is where one could mark a rule as removed and still keep it in the store.
614 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700615 FlowRuleBatchEntry entry =
616 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
617 updateBackup(deviceId, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800618 if (removed) {
619 return new FlowRuleEvent(RULE_REMOVED, rule);
620 } else {
621 return null;
alshabib339a3d92014-09-26 17:54:32 -0700622 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800623
alshabib339a3d92014-09-26 17:54:32 -0700624 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700625
626 @Override
627 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800628 //FIXME: need a per device pending response
629
630 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
631 if (nodeId == null) {
632 notifyDelegate(event);
633 } else {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800634 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700635 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800636 //error log: log.warn("Failed to respond to peer for batch operation result");
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700637 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700638 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700639
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700640 private void loadFromBackup(final DeviceId did) {
641 if (!backupEnabled) {
642 return;
643 }
644 log.info("We are now the master for {}. Will load flow rules from backup", did);
645 try {
646 log.debug("Loading FlowRules for {} from backups", did);
647 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
648 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
649 : backupFlowTable.entrySet()) {
650
651 log.trace("loading {}", e.getValue());
652 for (StoredFlowEntry entry : e.getValue()) {
653 flowTable.getFlowEntriesById(entry).remove(entry);
654 flowTable.getFlowEntriesById(entry).add(entry);
655
656
657 }
658 }
659 } catch (ExecutionException e) {
660 log.error("Failed to load backup flowtable for {}", did, e);
661 }
662 }
663
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800664 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800665 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700666 }
667
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700668
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700669 private final class OnStoreBatch implements ClusterMessageHandler {
670 private final NodeId local;
671
672 private OnStoreBatch(NodeId local) {
673 this.local = local;
674 }
675
676 @Override
677 public void handle(final ClusterMessage message) {
678 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800679 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700680
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800681 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700682 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
683 if (!local.equals(replicaInfo.master().orNull())) {
684
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700685 Set<FlowRule> failures = new HashSet<>(operation.size());
686 for (FlowRuleBatchEntry op : operation.getOperations()) {
687 failures.add(op.target());
688 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800689 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700690 // This node is no longer the master, respond as all failed.
691 // TODO: we might want to wrap response in envelope
692 // to distinguish sw programming failure and hand over
693 // it make sense in the latter case to retry immediately.
694 try {
695 message.respond(SERIALIZER.encode(allFailed));
696 } catch (IOException e) {
697 log.error("Failed to respond back", e);
698 }
699 return;
700 }
701
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700702
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800703 pendingResponses.put(operation.id(), message.sender());
704 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700705
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700706 }
707 }
708
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700709 private final class SMapLoader
710 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
711
712 @Override
713 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
714 throws Exception {
715 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
716 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
717 }
718 }
719
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700720 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800721 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700722
723 @Override
724 public void event(ReplicaInfoEvent event) {
725 final NodeId local = clusterService.getLocalNode().id();
726 final DeviceId did = event.subject();
727 final ReplicaInfo rInfo = event.replicaInfo();
728
729 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800730 case MASTER_CHANGED:
731 if (local.equals(rInfo.master().orNull())) {
732 // This node is the new master, populate local structure
733 // from backup
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700734 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800735 }
736 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800737 // This node is no longer the master holder,
738 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800739 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800740 // TODO: probably should stop pending backup activities in
741 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800742 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800743 break;
744 default:
745 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700746
747 }
748 }
749 }
750
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700751 // Task to update FlowEntries in backup HZ store
752 private final class UpdateBackup implements Runnable {
753
754 private final DeviceId deviceId;
755 private final Set<FlowRuleBatchEntry> ops;
756
757
758 public UpdateBackup(DeviceId deviceId,
759 Set<FlowRuleBatchEntry> ops) {
760 this.deviceId = checkNotNull(deviceId);
761 this.ops = checkNotNull(ops);
762
763 }
764
765 @Override
766 public void run() {
767 try {
768 log.trace("update backup {} {}", deviceId, ops
769 );
770 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
771
772
773 ops.stream().forEach(
774 op -> {
775 final FlowRule entry = op.target();
776 final FlowId id = entry.id();
777 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
778 List<StoredFlowEntry> list = new ArrayList<>();
779 if (original != null) {
780 list.addAll(original);
781 }
782 list.remove(op.target());
783 if (op.operator() == FlowRuleOperation.ADD) {
784 list.add((StoredFlowEntry) entry);
785 }
786
787 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
788 boolean success;
789 if (original == null) {
790 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
791 } else {
792 success = backupFlowTable.replace(id, original, newValue);
793 }
794 if (!success) {
795 log.error("Updating backup failed.");
796 }
797
798 }
799 );
800 } catch (ExecutionException e) {
801 log.error("Failed to write to backups", e);
802 }
803
804 }
805 }
806
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800807 private class InternalFlowTable {
808
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700809 /*
810 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
811 map when it supports distributed to a sequence of instances.
812 */
813
814
815 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
816 flowEntries = new ConcurrentHashMap<>();
817
818
819 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
820 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
821 }
Madan Jampanie1356282015-03-10 19:05:36 -0700822
823 /**
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700824 * Returns the flow table for specified device.
825 *
826 * @param deviceId identifier of the device
827 * @return Map representing Flow Table of given device.
Madan Jampanie1356282015-03-10 19:05:36 -0700828 */
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700829 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
830 return createIfAbsentUnchecked(flowEntries,
831 deviceId, lazyEmptyFlowTable());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800832 }
833
834 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700835 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
836 Set<StoredFlowEntry> r = flowTable.get(flowId);
837 if (r == null) {
838 final Set<StoredFlowEntry> concurrentlyAdded;
839 r = new CopyOnWriteArraySet<>();
840 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
841 if (concurrentlyAdded != null) {
842 return concurrentlyAdded;
843 }
844 }
845 return r;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800846 }
847
848 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700849 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
850 if (f.equals(rule)) {
851 return f;
852 }
853 }
854 return null;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800855 }
856
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700857 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
858 return getFlowTable(deviceId).values().stream()
859 .flatMap((list -> list.stream())).collect(Collectors.toSet());
860
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800861 }
862
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700863
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800864 public StoredFlowEntry getFlowEntry(FlowRule rule) {
865 return getFlowEntryInternal(rule);
866 }
867
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700868 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800869 return getFlowEntriesInternal(deviceId);
870 }
871
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700872 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
873 return getFlowEntriesInternal(entry.deviceId(), entry.id());
874 }
875
876 public void add(FlowEntry rule) {
877 ((CopyOnWriteArraySet)
878 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800879 }
880
881 public boolean remove(DeviceId deviceId, FlowEntry rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700882 return ((CopyOnWriteArraySet)
883 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
884 //return flowEntries.remove(deviceId, rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800885 }
886
887 public void clearDevice(DeviceId did) {
888 flowEntries.remove(did);
889 }
890 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700891
892
Madan Jampanie1356282015-03-10 19:05:36 -0700893}