blob: 8b7be9a9dc8a33bbb49572941f1d7c528e2a71b0 [file] [log] [blame]
Madan Jampanib0a3dd62015-03-17 13:41:27 -07001 /*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Madan Jampanib0a3dd62015-03-17 13:41:27 -070018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070025import com.google.common.util.concurrent.Futures;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070026import com.hazelcast.core.IMap;
27
alshabib339a3d92014-09-26 17:54:32 -070028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070031import org.apache.felix.scr.annotations.Modified;
32import org.apache.felix.scr.annotations.Property;
Madan Jampani38b250d2014-10-17 11:02:38 -070033import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070035import org.apache.felix.scr.annotations.Service;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070036import org.onlab.util.BoundedThreadPool;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080037import org.onlab.util.KryoNamespace;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070038import org.onlab.util.NewConcurrentHashMap;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import org.onlab.util.Tools;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070040import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080043import org.onosproject.core.CoreService;
44import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080045import org.onosproject.net.Device;
46import org.onosproject.net.DeviceId;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.net.flow.CompletedBatchOperation;
49import org.onosproject.net.flow.DefaultFlowEntry;
50import org.onosproject.net.flow.FlowEntry;
51import org.onosproject.net.flow.FlowEntry.FlowEntryState;
52import org.onosproject.net.flow.FlowId;
53import org.onosproject.net.flow.FlowRule;
54import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080055import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080056import org.onosproject.net.flow.FlowRuleBatchEvent;
57import org.onosproject.net.flow.FlowRuleBatchOperation;
58import org.onosproject.net.flow.FlowRuleBatchRequest;
59import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080060import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080061import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080062import org.onosproject.net.flow.FlowRuleStore;
63import org.onosproject.net.flow.FlowRuleStoreDelegate;
64import org.onosproject.net.flow.StoredFlowEntry;
65import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
66import org.onosproject.store.cluster.messaging.ClusterMessage;
67import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
68import org.onosproject.store.flow.ReplicaInfo;
69import org.onosproject.store.flow.ReplicaInfoEvent;
70import org.onosproject.store.flow.ReplicaInfoEventListener;
71import org.onosproject.store.flow.ReplicaInfoService;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070072import org.onosproject.store.hz.AbstractHazelcastStore;
73import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080074import org.onosproject.store.serializers.KryoSerializer;
75import org.onosproject.store.serializers.StoreSerializer;
76import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070077import org.osgi.service.component.ComponentContext;
alshabib339a3d92014-09-26 17:54:32 -070078import org.slf4j.Logger;
79
Madan Jampanib0a3dd62015-03-17 13:41:27 -070080import java.util.ArrayList;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080081import java.util.Collections;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070082import java.util.Dictionary;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070083import java.util.HashSet;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080084import java.util.List;
85import java.util.Map;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070086import java.util.Map.Entry;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080087import java.util.Set;
88import java.util.concurrent.ConcurrentHashMap;
89import java.util.concurrent.ConcurrentMap;
90import java.util.concurrent.CopyOnWriteArraySet;
91import java.util.concurrent.ExecutionException;
92import java.util.concurrent.ExecutorService;
93import java.util.concurrent.Executors;
94import java.util.concurrent.Future;
95import java.util.concurrent.TimeUnit;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080096import java.util.stream.Collectors;
97
Madan Jampanib0a3dd62015-03-17 13:41:27 -070098import static com.google.common.base.Preconditions.checkNotNull;
99import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700100import static com.google.common.base.Strings.isNullOrEmpty;
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700101import static org.onlab.util.Tools.get;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800102import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800103import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
104import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
105import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -0700106
107/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700108 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700109 */
Madan Jampani86940d92015-05-06 11:47:57 -0700110@Component(immediate = false, enabled = false)
alshabib339a3d92014-09-26 17:54:32 -0700111@Service
112public class DistributedFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700113 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700114 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700115
116 private final Logger log = getLogger(getClass());
117
Madan Jampani2af244a2015-02-22 13:12:01 -0800118 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700119 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700120 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
121
122 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
123 label = "Number of threads in the message handler pool")
124 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
125
126 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
127 label = "Indicates whether backups are enabled or not")
128 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani2af244a2015-02-22 13:12:01 -0800129
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700130 private InternalFlowTable flowTable = new InternalFlowTable();
131
132 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
133 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700134
Madan Jampani38b250d2014-10-17 11:02:38 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700136 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700139 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700142 protected ClusterService clusterService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected DeviceService deviceService;
146
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700149
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700151 protected ComponentConfigService configService;
152
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800153 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700154
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700155 // Cache of SMaps used for backup data. each SMap contain device flow table
156 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
157
Madan Jampani2af244a2015-02-22 13:12:01 -0800158 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700159
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700160 private final ExecutorService backupExecutors =
161 BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
162 //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
163
164 private boolean syncBackup = false;
165
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700166 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700167 @Override
168 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700169 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800170 .register(DistributedStoreSerializers.STORE_COMMON)
171 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800172 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800173 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800174 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700175 }
176 };
177
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700178 private ReplicaInfoEventListener replicaInfoEventListener;
179
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800180 private IdGenerator idGenerator;
181
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700182 private NodeId local;
183
alshabib339a3d92014-09-26 17:54:32 -0700184 @Activate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700185 public void activate(ComponentContext context) {
186 configService.registerProperties(getClass());
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700187 super.serializer = SERIALIZER;
188 super.theInstance = storeService.getHazelcastInstance();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700189
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800190 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
191
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700192 local = clusterService.getLocalNode().id();
193
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700194 // Cache to create SMap on demand
195 smaps = CacheBuilder.newBuilder()
196 .softValues()
197 .build(new SMapLoader());
198
Madan Jampani2af244a2015-02-22 13:12:01 -0800199 messageHandlingExecutor = Executors.newFixedThreadPool(
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700200 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800201
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700202 registerMessageHandlers(messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800203
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700204 replicaInfoEventListener = new InternalReplicaInfoEventListener();
205
206 replicaInfoManager.addListener(replicaInfoEventListener);
207
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700208 logConfig("Started");
alshabib339a3d92014-09-26 17:54:32 -0700209 }
210
211 @Deactivate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700212 public void deactivate(ComponentContext context) {
213 configService.unregisterProperties(getClass(), false);
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700214 unregisterMessageHandlers();
215 messageHandlingExecutor.shutdownNow();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700216 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700217 log.info("Stopped");
218 }
219
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700220 @Modified
221 public void modified(ComponentContext context) {
222 if (context == null) {
223 backupEnabled = DEFAULT_BACKUP_ENABLED;
224 logConfig("Default config");
225 return;
226 }
227
228 Dictionary properties = context.getProperties();
229 int newPoolSize;
230 boolean newBackupEnabled;
231 try {
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700232 String s = get(properties, "msgHandlerPoolSize");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700233 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
234
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700235 s = get(properties, "backupEnabled");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700236 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
237
238 } catch (NumberFormatException | ClassCastException e) {
239 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
240 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
241 }
242
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700243 if (newBackupEnabled != backupEnabled) {
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700244 backupEnabled = newBackupEnabled;
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700245 }
246 if (newPoolSize != msgHandlerPoolSize) {
247 msgHandlerPoolSize = newPoolSize;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700248 ExecutorService oldMsgHandler = messageHandlingExecutor;
249 messageHandlingExecutor = Executors.newFixedThreadPool(
250 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700251
252 // replace previously registered handlers.
253 registerMessageHandlers(messageHandlingExecutor);
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700254 oldMsgHandler.shutdown();
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700255 }
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700256 logConfig("Reconfigured");
257 }
258
259 private void registerMessageHandlers(ExecutorService executor) {
260
261 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
262
263 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
264 @Override
265 public void handle(ClusterMessage message) {
266 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
267 log.trace("received completed notification for {}", event);
268 notifyDelegate(event);
269 }
270 }, executor);
271
272 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
273
274 @Override
275 public void handle(ClusterMessage message) {
276 FlowRule rule = SERIALIZER.decode(message.payload());
277 log.trace("received get flow entry request for {}", rule);
278 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampanic26eede2015-04-16 11:42:16 -0700279 message.respond(SERIALIZER.encode(flowEntry));
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700280 }
281 }, executor);
282
283 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
284
285 @Override
286 public void handle(ClusterMessage message) {
287 DeviceId deviceId = SERIALIZER.decode(message.payload());
288 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
289 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanic26eede2015-04-16 11:42:16 -0700290 message.respond(SERIALIZER.encode(flowEntries));
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700291 }
292 }, executor);
293
294 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
295
296 @Override
297 public void handle(ClusterMessage message) {
298 FlowEntry rule = SERIALIZER.decode(message.payload());
299 log.trace("received get flow entry request for {}", rule);
300 FlowRuleEvent event = removeFlowRuleInternal(rule);
Madan Jampanic26eede2015-04-16 11:42:16 -0700301 message.respond(SERIALIZER.encode(event));
Madan Jampanicd48bfd2015-03-25 18:06:22 -0700302 }
303 }, executor);
304 }
305
306 private void unregisterMessageHandlers() {
307 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
308 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
309 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
310 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
311 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700312 }
313
314 private void logConfig(String prefix) {
315 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
316 prefix, msgHandlerPoolSize, backupEnabled);
317 }
alshabib339a3d92014-09-26 17:54:32 -0700318
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700319
Brian O'Connor44008532014-12-04 16:41:36 -0800320 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700321 // flow store. We need to revisit the need for this operation or at least
322 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700323 @Override
tom9b4030d2014-10-06 10:39:03 -0700324 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700325 // implementing in-efficient operation for debugging purpose.
326 int sum = 0;
327 for (Device device : deviceService.getDevices()) {
328 final DeviceId did = device.id();
329 sum += Iterables.size(getFlowEntries(did));
330 }
331 return sum;
tom9b4030d2014-10-06 10:39:03 -0700332 }
333
334 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800335 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700336 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700337
338 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800339 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800340 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700341 }
342
Madan Jampani117aaae2014-10-23 10:04:05 -0700343 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800344 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700345 }
346
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800347 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800348 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700349
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700350 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
351 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
352 SERIALIZER::encode,
353 SERIALIZER::decode,
354 replicaInfo.master().get()),
355 FLOW_RULE_STORE_TIMEOUT_MILLIS,
356 TimeUnit.MILLISECONDS,
357 null);
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700358 }
359
alshabib339a3d92014-09-26 17:54:32 -0700360 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800361 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700362
363 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700364
365 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800366 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700367 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700368 }
369
Madan Jampanif5fdef02014-10-23 21:58:10 -0700370 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700371 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700372 }
373
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800374 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800375 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700376
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700377 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
378 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
379 SERIALIZER::encode,
380 SERIALIZER::decode,
381 replicaInfo.master().get()),
382 FLOW_RULE_STORE_TIMEOUT_MILLIS,
383 TimeUnit.MILLISECONDS,
384 Collections.emptyList());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700385 }
386
alshabib339a3d92014-09-26 17:54:32 -0700387 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700388 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800389 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700390 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800391 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700392 }
393
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700394 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800395 public void storeBatch(FlowRuleBatchOperation operation) {
396
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700397
Madan Jampani117aaae2014-10-23 10:04:05 -0700398 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800399
400 notifyDelegate(FlowRuleBatchEvent.completed(
401 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
402 new CompletedBatchOperation(true, Collections.emptySet(),
403 operation.deviceId())));
404 return;
alshabib339a3d92014-09-26 17:54:32 -0700405 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700406
alshabib371abe82015-02-13 10:44:17 -0800407 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700408
409 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
410
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700411 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800412 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800413
alshabib371abe82015-02-13 10:44:17 -0800414 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800415
416 notifyDelegate(FlowRuleBatchEvent.completed(
417 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800418 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800419 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700420 }
421
422 final NodeId local = clusterService.getLocalNode().id();
423 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800424 storeBatchInternal(operation);
425 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700426 }
427
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800428 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800429 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700430
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700431 if (!clusterCommunicator.unicast(operation,
432 APPLY_BATCH_FLOWS, SERIALIZER::encode,
433 replicaInfo.master().get())) {
434 log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800435
436 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800437 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800438 .collect(Collectors.toSet());
439
440 notifyDelegate(FlowRuleBatchEvent.completed(
441 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
442 new CompletedBatchOperation(false, allFailures, deviceId)));
443 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700444 }
445 }
446
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800447 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800448
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800449 final DeviceId did = operation.deviceId();
450 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800451 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
452 if (currentOps.isEmpty()) {
453 batchOperationComplete(FlowRuleBatchEvent.completed(
454 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
455 new CompletedBatchOperation(true, Collections.emptySet(), did)));
456 return;
457 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700458 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700459
alshabib371abe82015-02-13 10:44:17 -0800460 notifyDelegate(FlowRuleBatchEvent.requested(new
461 FlowRuleBatchRequest(operation.id(),
462 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700463
alshabib371abe82015-02-13 10:44:17 -0800464 }
465
466 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
467 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800468 op -> {
469 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800470 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800471 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800472 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800473 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800474 // Note: 2 equal FlowEntry may have different treatment
475 flowTable.remove(entry.deviceId(), entry);
476 flowTable.add(entry);
477
478 return op;
479 case REMOVE:
480 entry = flowTable.getFlowEntry(op.target());
481 if (entry != null) {
482 entry.setState(FlowEntryState.PENDING_REMOVE);
483 return op;
484 }
485 break;
486 case MODIFY:
487 //TODO: figure this out at some point
488 break;
489 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800490 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800491 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800492 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800493 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800494 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700495 }
496
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700497 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
498 if (!backupEnabled) {
499 return;
500 }
501
502 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
503
504 if (syncBackup) {
505 // wait for backup to complete
506 try {
507 backup.get();
508 } catch (InterruptedException | ExecutionException e) {
509 log.error("Failed to create backups", e);
510 }
511 }
512 }
513
alshabib339a3d92014-09-26 17:54:32 -0700514 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700515 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800516 storeBatch(
517 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700518 Collections.singletonList(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800519 new FlowRuleBatchEntry(
520 FlowRuleOperation.REMOVE,
521 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700522 }
523
524 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700525 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
526 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700527 final NodeId localId = clusterService.getLocalNode().id();
528 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700529 return addOrUpdateFlowRuleInternal(rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700530 }
531
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800532 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800533 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700534 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700535 }
536
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700537 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
538 final DeviceId did = rule.deviceId();
539
540
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800541 // check if this new rule is an update to an existing entry
542 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
543 if (stored != null) {
544 stored.setBytes(rule.bytes());
545 stored.setLife(rule.life());
546 stored.setPackets(rule.packets());
547 if (stored.state() == FlowEntryState.PENDING_ADD) {
548 stored.setState(FlowEntryState.ADDED);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700549 FlowRuleBatchEntry entry =
550 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
551 updateBackup(did, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800552 return new FlowRuleEvent(Type.RULE_ADDED, rule);
553 }
554 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800555 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800556
557 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700558 // TODO: also update backup if the behavior is correct.
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800559 flowTable.add(rule);
560
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700561
alshabib1c319ff2014-10-04 20:29:09 -0700562 return null;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700563
alshabib339a3d92014-09-26 17:54:32 -0700564 }
565
566 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700567 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800568 final DeviceId deviceId = rule.deviceId();
569 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700570
571 final NodeId localId = clusterService.getLocalNode().id();
572 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700573 // bypass and handle it locally
574 return removeFlowRuleInternal(rule);
575 }
576
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800577 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800578 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800579 // TODO: revisit if this should be null (="no-op") or Exception
580 return null;
581 }
582
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800583 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800584 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800585
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700586 return Futures.get(clusterCommunicator.sendAndReceive(
587 rule,
588 REMOVE_FLOW_ENTRY,
589 SERIALIZER::encode,
590 SERIALIZER::decode,
591 replicaInfo.master().get()),
592 FLOW_RULE_STORE_TIMEOUT_MILLIS,
593 TimeUnit.MILLISECONDS,
594 RuntimeException.class);
Madan Jampani38b250d2014-10-17 11:02:38 -0700595 }
596
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800597 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700598 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800599 // This is where one could mark a rule as removed and still keep it in the store.
600 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700601 FlowRuleBatchEntry entry =
602 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
603 updateBackup(deviceId, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800604 if (removed) {
605 return new FlowRuleEvent(RULE_REMOVED, rule);
606 } else {
607 return null;
alshabib339a3d92014-09-26 17:54:32 -0700608 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800609
alshabib339a3d92014-09-26 17:54:32 -0700610 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700611
612 @Override
613 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800614 //FIXME: need a per device pending response
615
616 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
617 if (nodeId == null) {
618 notifyDelegate(event);
619 } else {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800620 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700621 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800622 //error log: log.warn("Failed to respond to peer for batch operation result");
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700623 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700624 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700625
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700626 private void loadFromBackup(final DeviceId did) {
627 if (!backupEnabled) {
628 return;
629 }
630 log.info("We are now the master for {}. Will load flow rules from backup", did);
631 try {
632 log.debug("Loading FlowRules for {} from backups", did);
633 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
634 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
635 : backupFlowTable.entrySet()) {
636
637 log.trace("loading {}", e.getValue());
638 for (StoredFlowEntry entry : e.getValue()) {
639 flowTable.getFlowEntriesById(entry).remove(entry);
640 flowTable.getFlowEntriesById(entry).add(entry);
641
642
643 }
644 }
645 } catch (ExecutionException e) {
646 log.error("Failed to load backup flowtable for {}", did, e);
647 }
648 }
649
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800650 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800651 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700652 }
653
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700654
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700655 private final class OnStoreBatch implements ClusterMessageHandler {
656 private final NodeId local;
657
658 private OnStoreBatch(NodeId local) {
659 this.local = local;
660 }
661
662 @Override
663 public void handle(final ClusterMessage message) {
664 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800665 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700666
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800667 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700668 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
669 if (!local.equals(replicaInfo.master().orNull())) {
670
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700671 Set<FlowRule> failures = new HashSet<>(operation.size());
672 for (FlowRuleBatchEntry op : operation.getOperations()) {
673 failures.add(op.target());
674 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800675 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700676 // This node is no longer the master, respond as all failed.
677 // TODO: we might want to wrap response in envelope
678 // to distinguish sw programming failure and hand over
679 // it make sense in the latter case to retry immediately.
Madan Jampanic26eede2015-04-16 11:42:16 -0700680 message.respond(SERIALIZER.encode(allFailed));
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700681 return;
682 }
683
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700684
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800685 pendingResponses.put(operation.id(), message.sender());
686 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700687
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700688 }
689 }
690
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700691 private final class SMapLoader
692 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
693
694 @Override
695 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
696 throws Exception {
697 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
698 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
699 }
700 }
701
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700702 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800703 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700704
705 @Override
706 public void event(ReplicaInfoEvent event) {
707 final NodeId local = clusterService.getLocalNode().id();
708 final DeviceId did = event.subject();
709 final ReplicaInfo rInfo = event.replicaInfo();
710
711 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800712 case MASTER_CHANGED:
713 if (local.equals(rInfo.master().orNull())) {
714 // This node is the new master, populate local structure
715 // from backup
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700716 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800717 }
718 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800719 // This node is no longer the master holder,
720 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800721 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800722 // TODO: probably should stop pending backup activities in
723 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800724 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800725 break;
726 default:
727 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700728
729 }
730 }
731 }
732
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700733 // Task to update FlowEntries in backup HZ store
734 private final class UpdateBackup implements Runnable {
735
736 private final DeviceId deviceId;
737 private final Set<FlowRuleBatchEntry> ops;
738
739
740 public UpdateBackup(DeviceId deviceId,
741 Set<FlowRuleBatchEntry> ops) {
742 this.deviceId = checkNotNull(deviceId);
743 this.ops = checkNotNull(ops);
744
745 }
746
747 @Override
748 public void run() {
749 try {
750 log.trace("update backup {} {}", deviceId, ops
751 );
752 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
753
754
755 ops.stream().forEach(
756 op -> {
757 final FlowRule entry = op.target();
758 final FlowId id = entry.id();
759 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
760 List<StoredFlowEntry> list = new ArrayList<>();
761 if (original != null) {
762 list.addAll(original);
763 }
764 list.remove(op.target());
765 if (op.operator() == FlowRuleOperation.ADD) {
766 list.add((StoredFlowEntry) entry);
767 }
768
769 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
770 boolean success;
771 if (original == null) {
772 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
773 } else {
774 success = backupFlowTable.replace(id, original, newValue);
775 }
776 if (!success) {
777 log.error("Updating backup failed.");
778 }
779
780 }
781 );
782 } catch (ExecutionException e) {
783 log.error("Failed to write to backups", e);
784 }
785
786 }
787 }
788
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800789 private class InternalFlowTable {
790
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700791 /*
792 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
793 map when it supports distributed to a sequence of instances.
794 */
795
796
797 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
798 flowEntries = new ConcurrentHashMap<>();
799
800
801 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
802 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
803 }
Madan Jampanie1356282015-03-10 19:05:36 -0700804
805 /**
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700806 * Returns the flow table for specified device.
807 *
808 * @param deviceId identifier of the device
809 * @return Map representing Flow Table of given device.
Madan Jampanie1356282015-03-10 19:05:36 -0700810 */
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700811 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
812 return createIfAbsentUnchecked(flowEntries,
813 deviceId, lazyEmptyFlowTable());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800814 }
815
816 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700817 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
818 Set<StoredFlowEntry> r = flowTable.get(flowId);
819 if (r == null) {
820 final Set<StoredFlowEntry> concurrentlyAdded;
821 r = new CopyOnWriteArraySet<>();
822 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
823 if (concurrentlyAdded != null) {
824 return concurrentlyAdded;
825 }
826 }
827 return r;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800828 }
829
830 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700831 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
832 if (f.equals(rule)) {
833 return f;
834 }
835 }
836 return null;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800837 }
838
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700839 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
840 return getFlowTable(deviceId).values().stream()
841 .flatMap((list -> list.stream())).collect(Collectors.toSet());
842
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800843 }
844
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700845
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800846 public StoredFlowEntry getFlowEntry(FlowRule rule) {
847 return getFlowEntryInternal(rule);
848 }
849
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700850 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800851 return getFlowEntriesInternal(deviceId);
852 }
853
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700854 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
855 return getFlowEntriesInternal(entry.deviceId(), entry.id());
856 }
857
858 public void add(FlowEntry rule) {
859 ((CopyOnWriteArraySet)
860 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800861 }
862
863 public boolean remove(DeviceId deviceId, FlowEntry rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700864 return ((CopyOnWriteArraySet)
865 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
866 //return flowEntries.remove(deviceId, rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800867 }
868
869 public void clearDevice(DeviceId did) {
870 flowEntries.remove(did);
871 }
872 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700873
874
Madan Jampanie1356282015-03-10 19:05:36 -0700875}