blob: 85015c9c02ea09c0f3ff1503bad6849ddbfe1d47 [file] [log] [blame]
Madan Jampanib0a3dd62015-03-17 13:41:27 -07001 /*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.flow.impl;
alshabib339a3d92014-09-26 17:54:32 -070017
Madan Jampanib0a3dd62015-03-17 13:41:27 -070018import com.google.common.cache.CacheBuilder;
19import com.google.common.cache.CacheLoader;
20import com.google.common.cache.LoadingCache;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080021import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070025import com.hazelcast.core.IMap;
26
alshabib339a3d92014-09-26 17:54:32 -070027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070030import org.apache.felix.scr.annotations.Modified;
31import org.apache.felix.scr.annotations.Property;
Madan Jampani38b250d2014-10-17 11:02:38 -070032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070034import org.apache.felix.scr.annotations.Service;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070035import org.onlab.util.BoundedThreadPool;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080036import org.onlab.util.KryoNamespace;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070037import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070038import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.cluster.ClusterService;
40import org.onosproject.cluster.NodeId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080041import org.onosproject.core.CoreService;
42import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.net.Device;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.device.DeviceService;
46import org.onosproject.net.flow.CompletedBatchOperation;
47import org.onosproject.net.flow.DefaultFlowEntry;
48import org.onosproject.net.flow.FlowEntry;
49import org.onosproject.net.flow.FlowEntry.FlowEntryState;
50import org.onosproject.net.flow.FlowId;
51import org.onosproject.net.flow.FlowRule;
52import org.onosproject.net.flow.FlowRuleBatchEntry;
Jonathan Hart4fb5cde2014-12-22 12:09:07 -080053import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080054import org.onosproject.net.flow.FlowRuleBatchEvent;
55import org.onosproject.net.flow.FlowRuleBatchOperation;
56import org.onosproject.net.flow.FlowRuleBatchRequest;
57import org.onosproject.net.flow.FlowRuleEvent;
Brian O'Connorabafb502014-12-02 22:26:20 -080058import org.onosproject.net.flow.FlowRuleEvent.Type;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080059import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080060import org.onosproject.net.flow.FlowRuleStore;
61import org.onosproject.net.flow.FlowRuleStoreDelegate;
62import org.onosproject.net.flow.StoredFlowEntry;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.cluster.messaging.ClusterMessage;
65import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
66import org.onosproject.store.flow.ReplicaInfo;
67import org.onosproject.store.flow.ReplicaInfoEvent;
68import org.onosproject.store.flow.ReplicaInfoEventListener;
69import org.onosproject.store.flow.ReplicaInfoService;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070070import org.onosproject.store.hz.AbstractHazelcastStore;
71import org.onosproject.store.hz.SMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080072import org.onosproject.store.serializers.KryoSerializer;
73import org.onosproject.store.serializers.StoreSerializer;
74import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070075import org.osgi.service.component.ComponentContext;
alshabib339a3d92014-09-26 17:54:32 -070076import org.slf4j.Logger;
77
Brian O'Connor72cb19a2015-01-16 16:14:41 -080078import java.io.IOException;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070079import java.util.ArrayList;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080080import java.util.Arrays;
81import java.util.Collections;
Brian O'Connorc4f351d2015-03-10 20:53:11 -070082import java.util.Dictionary;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070083import java.util.HashSet;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080084import java.util.List;
85import java.util.Map;
Madan Jampanib0a3dd62015-03-17 13:41:27 -070086import java.util.Map.Entry;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080087import java.util.Set;
88import java.util.concurrent.ConcurrentHashMap;
89import java.util.concurrent.ConcurrentMap;
90import java.util.concurrent.CopyOnWriteArraySet;
91import java.util.concurrent.ExecutionException;
92import java.util.concurrent.ExecutorService;
93import java.util.concurrent.Executors;
94import java.util.concurrent.Future;
95import java.util.concurrent.TimeUnit;
96import java.util.concurrent.TimeoutException;
97import java.util.stream.Collectors;
98
Madan Jampanib0a3dd62015-03-17 13:41:27 -070099import static com.google.common.base.Preconditions.checkNotNull;
100import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700101import static com.google.common.base.Strings.isNullOrEmpty;
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700102import static org.onlab.util.Tools.get;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800103import static org.onlab.util.Tools.groupedThreads;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800104import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
105import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
106import static org.slf4j.LoggerFactory.getLogger;
alshabib339a3d92014-09-26 17:54:32 -0700107
108/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700109 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700110 */
alshabib339a3d92014-09-26 17:54:32 -0700111@Component(immediate = true)
112@Service
113public class DistributedFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700114 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700115 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700116
117 private final Logger log = getLogger(getClass());
118
Madan Jampani2af244a2015-02-22 13:12:01 -0800119 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700120 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700121 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
122
123 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
124 label = "Number of threads in the message handler pool")
125 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
126
127 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
128 label = "Indicates whether backups are enabled or not")
129 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani2af244a2015-02-22 13:12:01 -0800130
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700131 private InternalFlowTable flowTable = new InternalFlowTable();
132
133 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
134 flowEntries = new ConcurrentHashMap<>();*/
alshabib339a3d92014-09-26 17:54:32 -0700135
Madan Jampani38b250d2014-10-17 11:02:38 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700137 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700140 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700143 protected ClusterService clusterService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected DeviceService deviceService;
147
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
149 protected CoreService coreService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700150
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700152 protected ComponentConfigService configService;
153
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800154 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700155
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700156 // Cache of SMaps used for backup data. each SMap contain device flow table
157 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
158
Madan Jampani2af244a2015-02-22 13:12:01 -0800159 private ExecutorService messageHandlingExecutor;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700160
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700161 private final ExecutorService backupExecutors =
162 BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
163 //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
164
165 private boolean syncBackup = false;
166
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700167 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700168 @Override
169 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700170 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800171 .register(DistributedStoreSerializers.STORE_COMMON)
172 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI2f158332014-11-25 13:32:06 -0800173 .register(FlowRuleEvent.class)
Jonathan Hart4fb5cde2014-12-22 12:09:07 -0800174 .register(FlowRuleEvent.Type.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800175 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700176 }
177 };
178
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700179 private ReplicaInfoEventListener replicaInfoEventListener;
180
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800181 private IdGenerator idGenerator;
182
alshabib339a3d92014-09-26 17:54:32 -0700183 @Activate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700184 public void activate(ComponentContext context) {
185 configService.registerProperties(getClass());
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700186 super.serializer = SERIALIZER;
187 super.theInstance = storeService.getHazelcastInstance();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700188
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800189 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
190
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700191 // Cache to create SMap on demand
192 smaps = CacheBuilder.newBuilder()
193 .softValues()
194 .build(new SMapLoader());
195
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700196 final NodeId local = clusterService.getLocalNode().id();
197
Madan Jampani2af244a2015-02-22 13:12:01 -0800198 messageHandlingExecutor = Executors.newFixedThreadPool(
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700199 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
Madan Jampani2af244a2015-02-22 13:12:01 -0800200
201 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700202
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800203 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
204 @Override
205 public void handle(ClusterMessage message) {
206 FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
207 log.trace("received completed notification for {}", event);
208 notifyDelegate(event);
209 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800210 }, messageHandlingExecutor);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800211
Madan Jampani117aaae2014-10-23 10:04:05 -0700212 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
213
214 @Override
215 public void handle(ClusterMessage message) {
216 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800217 log.trace("received get flow entry request for {}", rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800218 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700219 try {
220 message.respond(SERIALIZER.encode(flowEntry));
221 } catch (IOException e) {
222 log.error("Failed to respond back", e);
223 }
224 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800225 }, messageHandlingExecutor);
Madan Jampani117aaae2014-10-23 10:04:05 -0700226
Madan Jampanif5fdef02014-10-23 21:58:10 -0700227 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
228
229 @Override
230 public void handle(ClusterMessage message) {
231 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800232 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700233 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700234 try {
235 message.respond(SERIALIZER.encode(flowEntries));
236 } catch (IOException e) {
237 log.error("Failed to respond to peer's getFlowEntries request", e);
238 }
239 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800240 }, messageHandlingExecutor);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700241
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800242 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
243
244 @Override
245 public void handle(ClusterMessage message) {
246 FlowEntry rule = SERIALIZER.decode(message.payload());
247 log.trace("received get flow entry request for {}", rule);
248 FlowRuleEvent event = removeFlowRuleInternal(rule);
249 try {
250 message.respond(SERIALIZER.encode(event));
251 } catch (IOException e) {
252 log.error("Failed to respond back", e);
253 }
254 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800255 }, messageHandlingExecutor);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800256
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700257 replicaInfoEventListener = new InternalReplicaInfoEventListener();
258
259 replicaInfoManager.addListener(replicaInfoEventListener);
260
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700261 logConfig("Started");
alshabib339a3d92014-09-26 17:54:32 -0700262 }
263
264 @Deactivate
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700265 public void deactivate(ComponentContext context) {
266 configService.unregisterProperties(getClass(), false);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800267 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
268 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
269 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
270 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
alshabib371abe82015-02-13 10:44:17 -0800271 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
Madan Jampani2af244a2015-02-22 13:12:01 -0800272 messageHandlingExecutor.shutdown();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700273 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700274 log.info("Stopped");
275 }
276
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700277 @Modified
278 public void modified(ComponentContext context) {
279 if (context == null) {
280 backupEnabled = DEFAULT_BACKUP_ENABLED;
281 logConfig("Default config");
282 return;
283 }
284
285 Dictionary properties = context.getProperties();
286 int newPoolSize;
287 boolean newBackupEnabled;
288 try {
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700289 String s = get(properties, "msgHandlerPoolSize");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700290 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
291
Thomas Vachuska6519e6f2015-03-11 02:29:31 -0700292 s = get(properties, "backupEnabled");
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700293 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
294
295 } catch (NumberFormatException | ClassCastException e) {
296 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
297 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
298 }
299
300 if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
301 msgHandlerPoolSize = newPoolSize;
302 backupEnabled = newBackupEnabled;
Brian O'Connorc4f351d2015-03-10 20:53:11 -0700303 ExecutorService oldMsgHandler = messageHandlingExecutor;
304 messageHandlingExecutor = Executors.newFixedThreadPool(
305 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
306 oldMsgHandler.shutdown();
307 logConfig("Reconfigured");
308 }
309 }
310
311 private void logConfig(String prefix) {
312 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
313 prefix, msgHandlerPoolSize, backupEnabled);
314 }
alshabib339a3d92014-09-26 17:54:32 -0700315
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700316
Brian O'Connor44008532014-12-04 16:41:36 -0800317 // This is not a efficient operation on a distributed sharded
Madan Jampani117aaae2014-10-23 10:04:05 -0700318 // flow store. We need to revisit the need for this operation or at least
319 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700320 @Override
tom9b4030d2014-10-06 10:39:03 -0700321 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700322 // implementing in-efficient operation for debugging purpose.
323 int sum = 0;
324 for (Device device : deviceService.getDevices()) {
325 final DeviceId did = device.id();
326 sum += Iterables.size(getFlowEntries(did));
327 }
328 return sum;
tom9b4030d2014-10-06 10:39:03 -0700329 }
330
331 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800332 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700333 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700334
335 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800336 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800337 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700338 }
339
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800341 return flowTable.getFlowEntry(rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700342 }
343
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800344 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800345 replicaInfo.master().orNull(), rule.deviceId());
Madan Jampani117aaae2014-10-23 10:04:05 -0700346
347 ClusterMessage message = new ClusterMessage(
348 clusterService.getLocalNode().id(),
349 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
350 SERIALIZER.encode(rule));
351
352 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700353 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
354 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
355 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800356 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampani117aaae2014-10-23 10:04:05 -0700357 }
Brian O'Connor44008532014-12-04 16:41:36 -0800358 return null;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700359 }
360
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800361
alshabib339a3d92014-09-26 17:54:32 -0700362
363 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800364 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700365
366 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700367
368 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800369 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700370 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700371 }
372
Madan Jampanif5fdef02014-10-23 21:58:10 -0700373 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700374 return flowTable.getFlowEntries(deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700375 }
376
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800377 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800378 replicaInfo.master().orNull(), deviceId);
Madan Jampanif5fdef02014-10-23 21:58:10 -0700379
380 ClusterMessage message = new ClusterMessage(
381 clusterService.getLocalNode().id(),
382 GET_DEVICE_FLOW_ENTRIES,
383 SERIALIZER.encode(deviceId));
384
385 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700386 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
387 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
388 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Brian O'Connor44008532014-12-04 16:41:36 -0800389 log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700390 }
Yuta HIGUCHI24f79eb2014-12-12 15:46:43 -0800391 return Collections.emptyList();
Madan Jampanif5fdef02014-10-23 21:58:10 -0700392 }
393
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800394
alshabib339a3d92014-09-26 17:54:32 -0700395
396 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700397 public void storeFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800398 storeBatch(new FlowRuleBatchOperation(
399 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
400 rule.deviceId(), idGenerator.getNewId()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 }
402
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700403 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800404 public void storeBatch(FlowRuleBatchOperation operation) {
405
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700406
Madan Jampani117aaae2014-10-23 10:04:05 -0700407 if (operation.getOperations().isEmpty()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800408
409 notifyDelegate(FlowRuleBatchEvent.completed(
410 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
411 new CompletedBatchOperation(true, Collections.emptySet(),
412 operation.deviceId())));
413 return;
alshabib339a3d92014-09-26 17:54:32 -0700414 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700415
alshabib371abe82015-02-13 10:44:17 -0800416 DeviceId deviceId = operation.deviceId();
Madan Jampani117aaae2014-10-23 10:04:05 -0700417
418 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
419
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700420 if (!replicaInfo.master().isPresent()) {
alshabib371abe82015-02-13 10:44:17 -0800421 log.warn("No master for {} : flows will be marked for removal", deviceId);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800422
alshabib371abe82015-02-13 10:44:17 -0800423 updateStoreInternal(operation);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800424
425 notifyDelegate(FlowRuleBatchEvent.completed(
426 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
alshabib371abe82015-02-13 10:44:17 -0800427 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800428 return;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700429 }
430
431 final NodeId local = clusterService.getLocalNode().id();
432 if (replicaInfo.master().get().equals(local)) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800433 storeBatchInternal(operation);
434 return;
Madan Jampani117aaae2014-10-23 10:04:05 -0700435 }
436
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800437 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800438 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700439
Madan Jampani38b250d2014-10-17 11:02:38 -0700440 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700441 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700442 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700443 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700444
alshabib371abe82015-02-13 10:44:17 -0800445
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800446 if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
447 log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800448
449 Set<FlowRule> allFailures = operation.getOperations().stream()
Ray Milkeyf7329c72015-02-17 11:37:01 -0800450 .map(op -> op.target())
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800451 .collect(Collectors.toSet());
452
453 notifyDelegate(FlowRuleBatchEvent.completed(
454 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
455 new CompletedBatchOperation(false, allFailures, deviceId)));
456 return;
Madan Jampani38b250d2014-10-17 11:02:38 -0700457 }
458 }
459
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800460 private void storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800461
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800462 final DeviceId did = operation.deviceId();
463 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
alshabib371abe82015-02-13 10:44:17 -0800464 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
465 if (currentOps.isEmpty()) {
466 batchOperationComplete(FlowRuleBatchEvent.completed(
467 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
468 new CompletedBatchOperation(true, Collections.emptySet(), did)));
469 return;
470 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700471 updateBackup(did, currentOps);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700472
alshabib371abe82015-02-13 10:44:17 -0800473 notifyDelegate(FlowRuleBatchEvent.requested(new
474 FlowRuleBatchRequest(operation.id(),
475 currentOps), operation.deviceId()));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700476
alshabib371abe82015-02-13 10:44:17 -0800477 }
478
479 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
480 return operation.getOperations().stream().map(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800481 op -> {
482 StoredFlowEntry entry;
Ray Milkeyf7329c72015-02-17 11:37:01 -0800483 switch (op.operator()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800484 case ADD:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800485 entry = new DefaultFlowEntry(op.target());
Madan Jampani2af244a2015-02-22 13:12:01 -0800486 // always add requested FlowRule
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800487 // Note: 2 equal FlowEntry may have different treatment
488 flowTable.remove(entry.deviceId(), entry);
489 flowTable.add(entry);
490
491 return op;
492 case REMOVE:
493 entry = flowTable.getFlowEntry(op.target());
494 if (entry != null) {
495 entry.setState(FlowEntryState.PENDING_REMOVE);
496 return op;
497 }
498 break;
499 case MODIFY:
500 //TODO: figure this out at some point
501 break;
502 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800503 log.warn("Unknown flow operation operator: {}", op.operator());
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800504 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800505 return null;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800506 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800507 ).filter(op -> op != null).collect(Collectors.toSet());
alshabib339a3d92014-09-26 17:54:32 -0700508 }
509
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700510 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
511 if (!backupEnabled) {
512 return;
513 }
514
515 Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
516
517 if (syncBackup) {
518 // wait for backup to complete
519 try {
520 backup.get();
521 } catch (InterruptedException | ExecutionException e) {
522 log.error("Failed to create backups", e);
523 }
524 }
525 }
526
alshabib339a3d92014-09-26 17:54:32 -0700527 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700528 public void deleteFlowRule(FlowRule rule) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800529 storeBatch(
530 new FlowRuleBatchOperation(
531 Arrays.asList(
532 new FlowRuleBatchEntry(
533 FlowRuleOperation.REMOVE,
534 rule)), rule.deviceId(), idGenerator.getNewId()));
alshabib339a3d92014-09-26 17:54:32 -0700535 }
536
537 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700538 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
539 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700540 final NodeId localId = clusterService.getLocalNode().id();
541 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700542 return addOrUpdateFlowRuleInternal(rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700543 }
544
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800545 log.warn("Tried to update FlowRule {} state,"
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800546 + " while the Node was not the master.", rule);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700547 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700548 }
549
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700550 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
551 final DeviceId did = rule.deviceId();
552
553
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800554 // check if this new rule is an update to an existing entry
555 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
556 if (stored != null) {
557 stored.setBytes(rule.bytes());
558 stored.setLife(rule.life());
559 stored.setPackets(rule.packets());
560 if (stored.state() == FlowEntryState.PENDING_ADD) {
561 stored.setState(FlowEntryState.ADDED);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700562 FlowRuleBatchEntry entry =
563 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
564 updateBackup(did, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800565 return new FlowRuleEvent(Type.RULE_ADDED, rule);
566 }
567 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800568 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800569
570 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700571 // TODO: also update backup if the behavior is correct.
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800572 flowTable.add(rule);
573
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700574
alshabib1c319ff2014-10-04 20:29:09 -0700575 return null;
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700576
alshabib339a3d92014-09-26 17:54:32 -0700577 }
578
579 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700580 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800581 final DeviceId deviceId = rule.deviceId();
582 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700583
584 final NodeId localId = clusterService.getLocalNode().id();
585 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700586 // bypass and handle it locally
587 return removeFlowRuleInternal(rule);
588 }
589
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800590 if (!replicaInfo.master().isPresent()) {
Yuta HIGUCHI6b98ab62014-12-03 16:08:08 -0800591 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800592 // TODO: revisit if this should be null (="no-op") or Exception
593 return null;
594 }
595
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800596 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
Yuta HIGUCHIb6eb9142014-11-26 16:18:13 -0800597 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800598
599 ClusterMessage message = new ClusterMessage(
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800600 clusterService.getLocalNode().id(),
601 REMOVE_FLOW_ENTRY,
602 SERIALIZER.encode(rule));
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800603
604 try {
605 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
606 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
607 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800608 // TODO: Retry against latest master or throw a FlowStoreException
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800609 throw new RuntimeException(e);
610 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700611 }
612
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800613 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700614 final DeviceId deviceId = rule.deviceId();
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800615 // This is where one could mark a rule as removed and still keep it in the store.
616 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700617 FlowRuleBatchEntry entry =
618 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
619 updateBackup(deviceId, Sets.newHashSet(entry));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800620 if (removed) {
621 return new FlowRuleEvent(RULE_REMOVED, rule);
622 } else {
623 return null;
alshabib339a3d92014-09-26 17:54:32 -0700624 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800625
alshabib339a3d92014-09-26 17:54:32 -0700626 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700627
628 @Override
629 public void batchOperationComplete(FlowRuleBatchEvent event) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800630 //FIXME: need a per device pending response
631
632 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
633 if (nodeId == null) {
634 notifyDelegate(event);
635 } else {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800636 ClusterMessage message = new ClusterMessage(
637 clusterService.getLocalNode().id(),
638 REMOTE_APPLY_COMPLETED,
639 SERIALIZER.encode(event));
640 // TODO check unicast return value
641 clusterCommunicator.unicast(message, nodeId);
642 //error log: log.warn("Failed to respond to peer for batch operation result");
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700643 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700644 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700645
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700646 private void loadFromBackup(final DeviceId did) {
647 if (!backupEnabled) {
648 return;
649 }
650 log.info("We are now the master for {}. Will load flow rules from backup", did);
651 try {
652 log.debug("Loading FlowRules for {} from backups", did);
653 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
654 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
655 : backupFlowTable.entrySet()) {
656
657 log.trace("loading {}", e.getValue());
658 for (StoredFlowEntry entry : e.getValue()) {
659 flowTable.getFlowEntriesById(entry).remove(entry);
660 flowTable.getFlowEntriesById(entry).add(entry);
661
662
663 }
664 }
665 } catch (ExecutionException e) {
666 log.error("Failed to load backup flowtable for {}", did, e);
667 }
668 }
669
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800670 private void removeFromPrimary(final DeviceId did) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800671 flowTable.clearDevice(did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700672 }
673
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700674
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700675 private final class OnStoreBatch implements ClusterMessageHandler {
676 private final NodeId local;
677
678 private OnStoreBatch(NodeId local) {
679 this.local = local;
680 }
681
682 @Override
683 public void handle(final ClusterMessage message) {
684 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800685 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700686
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800687 final DeviceId deviceId = operation.deviceId();
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700688 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
689 if (!local.equals(replicaInfo.master().orNull())) {
690
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700691 Set<FlowRule> failures = new HashSet<>(operation.size());
692 for (FlowRuleBatchEntry op : operation.getOperations()) {
693 failures.add(op.target());
694 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800695 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700696 // This node is no longer the master, respond as all failed.
697 // TODO: we might want to wrap response in envelope
698 // to distinguish sw programming failure and hand over
699 // it make sense in the latter case to retry immediately.
700 try {
701 message.respond(SERIALIZER.encode(allFailed));
702 } catch (IOException e) {
703 log.error("Failed to respond back", e);
704 }
705 return;
706 }
707
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700708
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800709 pendingResponses.put(operation.id(), message.sender());
710 storeBatchInternal(operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700711
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700712 }
713 }
714
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700715 private final class SMapLoader
716 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
717
718 @Override
719 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
720 throws Exception {
721 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
722 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
723 }
724 }
725
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700726 private final class InternalReplicaInfoEventListener
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800727 implements ReplicaInfoEventListener {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700728
729 @Override
730 public void event(ReplicaInfoEvent event) {
731 final NodeId local = clusterService.getLocalNode().id();
732 final DeviceId did = event.subject();
733 final ReplicaInfo rInfo = event.replicaInfo();
734
735 switch (event.type()) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800736 case MASTER_CHANGED:
737 if (local.equals(rInfo.master().orNull())) {
738 // This node is the new master, populate local structure
739 // from backup
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700740 loadFromBackup(did);
alshabib93cb57f2015-02-12 17:43:26 -0800741 }
742 //else {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800743 // This node is no longer the master holder,
744 // clean local structure
alshabib93cb57f2015-02-12 17:43:26 -0800745 //removeFromPrimary(did);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800746 // TODO: probably should stop pending backup activities in
747 // executors to avoid overwriting with old value
alshabib93cb57f2015-02-12 17:43:26 -0800748 //}
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800749 break;
750 default:
751 break;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700752
753 }
754 }
755 }
756
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700757 // Task to update FlowEntries in backup HZ store
758 private final class UpdateBackup implements Runnable {
759
760 private final DeviceId deviceId;
761 private final Set<FlowRuleBatchEntry> ops;
762
763
764 public UpdateBackup(DeviceId deviceId,
765 Set<FlowRuleBatchEntry> ops) {
766 this.deviceId = checkNotNull(deviceId);
767 this.ops = checkNotNull(ops);
768
769 }
770
771 @Override
772 public void run() {
773 try {
774 log.trace("update backup {} {}", deviceId, ops
775 );
776 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
777
778
779 ops.stream().forEach(
780 op -> {
781 final FlowRule entry = op.target();
782 final FlowId id = entry.id();
783 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
784 List<StoredFlowEntry> list = new ArrayList<>();
785 if (original != null) {
786 list.addAll(original);
787 }
788 list.remove(op.target());
789 if (op.operator() == FlowRuleOperation.ADD) {
790 list.add((StoredFlowEntry) entry);
791 }
792
793 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
794 boolean success;
795 if (original == null) {
796 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
797 } else {
798 success = backupFlowTable.replace(id, original, newValue);
799 }
800 if (!success) {
801 log.error("Updating backup failed.");
802 }
803
804 }
805 );
806 } catch (ExecutionException e) {
807 log.error("Failed to write to backups", e);
808 }
809
810 }
811 }
812
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800813 private class InternalFlowTable {
814
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700815 /*
816 TODO: This needs to be cleaned up. Perhaps using the eventually consistent
817 map when it supports distributed to a sequence of instances.
818 */
819
820
821 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
822 flowEntries = new ConcurrentHashMap<>();
823
824
825 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
826 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
827 }
Madan Jampanie1356282015-03-10 19:05:36 -0700828
829 /**
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700830 * Returns the flow table for specified device.
831 *
832 * @param deviceId identifier of the device
833 * @return Map representing Flow Table of given device.
Madan Jampanie1356282015-03-10 19:05:36 -0700834 */
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700835 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
836 return createIfAbsentUnchecked(flowEntries,
837 deviceId, lazyEmptyFlowTable());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800838 }
839
840 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700841 final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
842 Set<StoredFlowEntry> r = flowTable.get(flowId);
843 if (r == null) {
844 final Set<StoredFlowEntry> concurrentlyAdded;
845 r = new CopyOnWriteArraySet<>();
846 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
847 if (concurrentlyAdded != null) {
848 return concurrentlyAdded;
849 }
850 }
851 return r;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800852 }
853
854 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700855 for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
856 if (f.equals(rule)) {
857 return f;
858 }
859 }
860 return null;
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800861 }
862
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700863 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
864 return getFlowTable(deviceId).values().stream()
865 .flatMap((list -> list.stream())).collect(Collectors.toSet());
866
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800867 }
868
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700869
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800870 public StoredFlowEntry getFlowEntry(FlowRule rule) {
871 return getFlowEntryInternal(rule);
872 }
873
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700874 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800875 return getFlowEntriesInternal(deviceId);
876 }
877
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700878 public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
879 return getFlowEntriesInternal(entry.deviceId(), entry.id());
880 }
881
882 public void add(FlowEntry rule) {
883 ((CopyOnWriteArraySet)
884 getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800885 }
886
887 public boolean remove(DeviceId deviceId, FlowEntry rule) {
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700888 return ((CopyOnWriteArraySet)
889 getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
890 //return flowEntries.remove(deviceId, rule);
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800891 }
892
893 public void clearDevice(DeviceId did) {
894 flowEntries.remove(did);
895 }
896 }
Madan Jampanib0a3dd62015-03-17 13:41:27 -0700897
898
Madan Jampanie1356282015-03-10 19:05:36 -0700899}