blob: c94b6c0257d0c1e6e86d3c0a98eaef23d8a2d279 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070040
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070044import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070046import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070047import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070048import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070049import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070050import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070051import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070053import org.onlab.onos.net.flow.DefaultFlowEntry;
54import org.onlab.onos.net.flow.FlowEntry;
55import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070057import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070058import org.onlab.onos.net.flow.FlowRuleBatchEntry;
59import org.onlab.onos.net.flow.FlowRuleBatchEvent;
60import org.onlab.onos.net.flow.FlowRuleBatchOperation;
61import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070062import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070063import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070064import org.onlab.onos.net.flow.FlowRuleEvent.Type;
65import org.onlab.onos.net.flow.FlowRuleStore;
66import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070067import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070068import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
69import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070070import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070071import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import org.onlab.onos.store.flow.ReplicaInfoEvent;
73import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070074import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070075import org.onlab.onos.store.hz.AbstractHazelcastStore;
76import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070077import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070078import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070079import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080080import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070081import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070082import org.slf4j.Logger;
83
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070084import com.google.common.cache.Cache;
85import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070086import com.google.common.cache.CacheLoader;
87import com.google.common.cache.LoadingCache;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080088import com.google.common.cache.RemovalListener;
89import com.google.common.cache.RemovalNotification;
alshabib339a3d92014-09-26 17:54:32 -070090import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070091import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070092import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070093import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070094import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070095import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070096import com.google.common.util.concurrent.ListenableFuture;
97import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070098import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070099
100/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700101 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700102 */
alshabib339a3d92014-09-26 17:54:32 -0700103@Component(immediate = true)
104@Service
105public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700106 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700107 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700108
109 private final Logger log = getLogger(getClass());
110
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800111 // primary data:
112 // read/write needs to be synchronized
alshabib339a3d92014-09-26 17:54:32 -0700113 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800114 private final Multimap<DeviceId, StoredFlowEntry> flowEntries
115 = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700116
Madan Jampani38b250d2014-10-17 11:02:38 -0700117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700118 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700121 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700124 protected ClusterService clusterService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected DeviceService deviceService;
128
129 private final AtomicInteger localBatchIdGen = new AtomicInteger();
130
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700131 // TODO: make this configurable
132 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700133
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700134 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
135 CacheBuilder.newBuilder()
136 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800137 .removalListener(new TimeoutFuture())
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700138 .build();
139
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800140 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700141 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
142
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700143
144 private final ExecutorService futureListeners =
145 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
146
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700147 private final ExecutorService backupExecutors =
148 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
149
150 // TODO make this configurable
151 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700152
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700153 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700154 @Override
155 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700156 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700157 .register(DistributedStoreSerializers.COMMON)
158 .build()
159 .populate(1);
160 }
161 };
162
163 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700164 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700165
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700166 private ReplicaInfoEventListener replicaInfoEventListener;
167
168 @Override
alshabib339a3d92014-09-26 17:54:32 -0700169 @Activate
170 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700171
172 super.serializer = SERIALIZER;
173 super.theInstance = storeService.getHazelcastInstance();
174
175 // Cache to create SMap on demand
176 smaps = CacheBuilder.newBuilder()
177 .softValues()
178 .build(new SMapLoader());
179
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700180 final NodeId local = clusterService.getLocalNode().id();
181
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700182 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700183
184 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
185
186 @Override
187 public void handle(ClusterMessage message) {
188 FlowRule rule = SERIALIZER.decode(message.payload());
189 log.info("received get flow entry request for {}", rule);
190 FlowEntry flowEntry = getFlowEntryInternal(rule);
191 try {
192 message.respond(SERIALIZER.encode(flowEntry));
193 } catch (IOException e) {
194 log.error("Failed to respond back", e);
195 }
196 }
197 });
198
Madan Jampanif5fdef02014-10-23 21:58:10 -0700199 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
200
201 @Override
202 public void handle(ClusterMessage message) {
203 DeviceId deviceId = SERIALIZER.decode(message.payload());
204 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
205 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
206 try {
207 message.respond(SERIALIZER.encode(flowEntries));
208 } catch (IOException e) {
209 log.error("Failed to respond to peer's getFlowEntries request", e);
210 }
211 }
212 });
213
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700214 replicaInfoEventListener = new InternalReplicaInfoEventListener();
215
216 replicaInfoManager.addListener(replicaInfoEventListener);
217
alshabib339a3d92014-09-26 17:54:32 -0700218 log.info("Started");
219 }
220
221 @Deactivate
222 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700223 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700224 log.info("Stopped");
225 }
226
227
Madan Jampani117aaae2014-10-23 10:04:05 -0700228 // TODO: This is not a efficient operation on a distributed sharded
229 // flow store. We need to revisit the need for this operation or at least
230 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700231 @Override
tom9b4030d2014-10-06 10:39:03 -0700232 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700233 // implementing in-efficient operation for debugging purpose.
234 int sum = 0;
235 for (Device device : deviceService.getDevices()) {
236 final DeviceId did = device.id();
237 sum += Iterables.size(getFlowEntries(did));
238 }
239 return sum;
tom9b4030d2014-10-06 10:39:03 -0700240 }
241
242 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700243 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700244 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700245
246 if (!replicaInfo.master().isPresent()) {
247 log.warn("No master for {}", rule);
248 // TODO: revisit if this should be returning null.
249 // FIXME: throw a FlowStoreException
250 throw new RuntimeException("No master for " + rule);
251 }
252
Madan Jampani117aaae2014-10-23 10:04:05 -0700253 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
254 return getFlowEntryInternal(rule);
255 }
256
257 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
258 replicaInfo.master().orNull(), rule.deviceId());
259
260 ClusterMessage message = new ClusterMessage(
261 clusterService.getLocalNode().id(),
262 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
263 SERIALIZER.encode(rule));
264
265 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700266 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
267 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
268 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700269 // FIXME: throw a FlowStoreException
270 throw new RuntimeException(e);
271 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700272 }
273
274 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
275 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700276 if (f.equals(rule)) {
277 return f;
278 }
279 }
280 return null;
281 }
282
283 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700284 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700285
286 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700287
288 if (!replicaInfo.master().isPresent()) {
289 log.warn("No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700290 // TODO: revisit if this should be returning empty collection or throwing exception.
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700291 // FIXME: throw a FlowStoreException
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700292 //throw new RuntimeException("No master for " + deviceId);
293 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700294 }
295
Madan Jampanif5fdef02014-10-23 21:58:10 -0700296 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
297 return getFlowEntriesInternal(deviceId);
298 }
299
300 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
301 replicaInfo.master().orNull(), deviceId);
302
303 ClusterMessage message = new ClusterMessage(
304 clusterService.getLocalNode().id(),
305 GET_DEVICE_FLOW_ENTRIES,
306 SERIALIZER.encode(deviceId));
307
308 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700309 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
310 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
311 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700312 // FIXME: throw a FlowStoreException
313 throw new RuntimeException(e);
314 }
315 }
316
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800317 private synchronized Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700318 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700319 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700320 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700321 }
322 return ImmutableSet.copyOf(rules);
323 }
324
325 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700326 public void storeFlowRule(FlowRule rule) {
327 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
328 }
329
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700330 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700331 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700332 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700333
Madan Jampani117aaae2014-10-23 10:04:05 -0700334 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700335 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700336 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700337
Madan Jampani117aaae2014-10-23 10:04:05 -0700338 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
339
340 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
341
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700342 if (!replicaInfo.master().isPresent()) {
343 log.warn("No master for {}", deviceId);
344 // TODO: revisit if this should be "success" from Future point of view
345 // with every FlowEntry failed
346 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
347 }
348
349 final NodeId local = clusterService.getLocalNode().id();
350 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700351 return storeBatchInternal(operation);
352 }
353
354 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
355 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700356
Madan Jampani38b250d2014-10-17 11:02:38 -0700357 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700358 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700359 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700360 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700361
362 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700363 ListenableFuture<byte[]> responseFuture =
364 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700365 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700366 } catch (IOException e) {
367 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700368 }
369 }
370
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800371 private synchronized ListenableFuture<CompletedBatchOperation>
372 storeBatchInternal(FlowRuleBatchOperation operation) {
373
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700374 final List<StoredFlowEntry> toRemove = new ArrayList<>();
375 final List<StoredFlowEntry> toAdd = new ArrayList<>();
376 DeviceId did = null;
377
Madan Jampani117aaae2014-10-23 10:04:05 -0700378 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
379 FlowRule flowRule = batchEntry.getTarget();
380 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700381 if (did == null) {
382 did = flowRule.deviceId();
383 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 if (op.equals(FlowRuleOperation.REMOVE)) {
385 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
386 if (entry != null) {
387 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700388 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 } else if (op.equals(FlowRuleOperation.ADD)) {
391 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
392 DeviceId deviceId = flowRule.deviceId();
393 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
394 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700395 toAdd.add(flowEntry);
396 }
397 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700398 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700399 if (toAdd.isEmpty() && toRemove.isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700400 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700402
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700403 // create remote backup copies
404 final DeviceId deviceId = did;
405 updateBackup(deviceId, toAdd, toRemove);
406
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700407 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
408 final int batchId = localBatchIdGen.incrementAndGet();
409
410 pendingFutures.put(batchId, r);
411 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700412
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700413 return r;
alshabib339a3d92014-09-26 17:54:32 -0700414 }
415
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700416 private void updateBackup(final DeviceId deviceId,
417 final List<StoredFlowEntry> toAdd,
418 final List<? extends FlowRule> list) {
419
420 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
421
422 if (syncBackup) {
423 // wait for backup to complete
424 try {
425 submit.get();
426 } catch (InterruptedException | ExecutionException e) {
427 log.error("Failed to create backups", e);
428 }
429 }
430 }
431
432 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
433 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
434 }
435
alshabib339a3d92014-09-26 17:54:32 -0700436 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700437 public void deleteFlowRule(FlowRule rule) {
438 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700439 }
440
441 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700442 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
443 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700444 final NodeId localId = clusterService.getLocalNode().id();
445 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700446 return addOrUpdateFlowRuleInternal(rule);
447 }
448
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700449 log.error("Tried to update FlowRule {} state,"
450 + " while the Node was not the master.", rule);
451 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700452 }
453
454 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700455 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700456
457 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700458 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700459 if (stored != null) {
460 stored.setBytes(rule.bytes());
461 stored.setLife(rule.life());
462 stored.setPackets(rule.packets());
463 if (stored.state() == FlowEntryState.PENDING_ADD) {
464 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700465 // update backup.
466 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700467 return new FlowRuleEvent(Type.RULE_ADDED, rule);
468 }
alshabib339a3d92014-09-26 17:54:32 -0700469 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
470 }
471
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700472 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700473 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700474 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700475 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700476
alshabib339a3d92014-09-26 17:54:32 -0700477 }
478
479 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700480 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
481 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700482
483 final NodeId localId = clusterService.getLocalNode().id();
484 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700485 // bypass and handle it locally
486 return removeFlowRuleInternal(rule);
487 }
488
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700489 log.error("Tried to remove FlowRule {},"
490 + " while the Node was not the master.", rule);
491 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700492 }
493
494 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700495 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700496 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700497 final boolean removed = flowEntries.remove(deviceId, rule);
498 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
499 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700500 return new FlowRuleEvent(RULE_REMOVED, rule);
501 } else {
502 return null;
503 }
alshabib339a3d92014-09-26 17:54:32 -0700504 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700505
506 @Override
507 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700508 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700509 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700510 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700511 if (future != null) {
512 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700513 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700514 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700515 notifyDelegate(event);
516 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700517
518 private synchronized void loadFromBackup(final DeviceId did) {
519 // should relax synchronized condition
520
521 try {
522 log.info("Loading FlowRules for {} from backups", did);
523 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
524 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
525 : backupFlowTable.entrySet()) {
526
527 // TODO: should we be directly updating internal structure or
528 // should we be triggering event?
529 log.debug("loading {}", e.getValue());
530 for (StoredFlowEntry entry : e.getValue()) {
531 flowEntries.remove(did, entry);
532 flowEntries.put(did, entry);
533 }
534 }
535 } catch (ExecutionException e) {
536 log.error("Failed to load backup flowtable for {}", did, e);
537 }
538 }
539
540 private synchronized void removeFromPrimary(final DeviceId did) {
541 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
542 log.debug("removedFromPrimary {}", removed);
543 }
544
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800545 private static final class TimeoutFuture
546 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
547 @Override
548 public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
549 // wrapping in ExecutionException to support Future.get
550 notification.getValue()
551 .setException(new ExecutionException("Timed out",
552 new TimeoutException()));
553 }
554 }
555
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700556 private final class OnStoreBatch implements ClusterMessageHandler {
557 private final NodeId local;
558
559 private OnStoreBatch(NodeId local) {
560 this.local = local;
561 }
562
563 @Override
564 public void handle(final ClusterMessage message) {
565 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
566 log.info("received batch request {}", operation);
567
568 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
569 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
570 if (!local.equals(replicaInfo.master().orNull())) {
571
572 Set<FlowRule> failures = new HashSet<>(operation.size());
573 for (FlowRuleBatchEntry op : operation.getOperations()) {
574 failures.add(op.getTarget());
575 }
576 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
577 // This node is no longer the master, respond as all failed.
578 // TODO: we might want to wrap response in envelope
579 // to distinguish sw programming failure and hand over
580 // it make sense in the latter case to retry immediately.
581 try {
582 message.respond(SERIALIZER.encode(allFailed));
583 } catch (IOException e) {
584 log.error("Failed to respond back", e);
585 }
586 return;
587 }
588
589 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
590
591 f.addListener(new Runnable() {
592
593 @Override
594 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800595 CompletedBatchOperation result;
596 try {
597 result = f.get();
598 } catch (InterruptedException | ExecutionException e) {
599 log.error("Batch operation failed", e);
600 // create everything failed response
601 Set<FlowRule> failures = new HashSet<>(operation.size());
602 for (FlowRuleBatchEntry op : operation.getOperations()) {
603 failures.add(op.getTarget());
604 }
605 result = new CompletedBatchOperation(false, failures);
606 }
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700607 try {
608 message.respond(SERIALIZER.encode(result));
609 } catch (IOException e) {
610 log.error("Failed to respond back", e);
611 }
612 }
613 }, futureListeners);
614 }
615 }
616
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700617 private final class SMapLoader
618 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
619
620 @Override
621 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
622 throws Exception {
623 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
624 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
625 }
626 }
627
628 private final class InternalReplicaInfoEventListener
629 implements ReplicaInfoEventListener {
630
631 @Override
632 public void event(ReplicaInfoEvent event) {
633 final NodeId local = clusterService.getLocalNode().id();
634 final DeviceId did = event.subject();
635 final ReplicaInfo rInfo = event.replicaInfo();
636
637 switch (event.type()) {
638 case MASTER_CHANGED:
639 if (local.equals(rInfo.master().orNull())) {
640 // This node is the new master, populate local structure
641 // from backup
642 loadFromBackup(did);
643 } else {
644 // This node is no longer the master holder,
645 // clean local structure
646 removeFromPrimary(did);
647 // FIXME: probably should stop pending backup activities in
648 // executors to avoid overwriting with old value
649 }
650 break;
651 default:
652 break;
653
654 }
655 }
656 }
657
658 // Task to update FlowEntries in backup HZ store
659 private final class UpdateBackup implements Runnable {
660
661 private final DeviceId deviceId;
662 private final List<StoredFlowEntry> toAdd;
663 private final List<? extends FlowRule> toRemove;
664
665 public UpdateBackup(DeviceId deviceId,
666 List<StoredFlowEntry> toAdd,
667 List<? extends FlowRule> list) {
668 this.deviceId = checkNotNull(deviceId);
669 this.toAdd = checkNotNull(toAdd);
670 this.toRemove = checkNotNull(list);
671 }
672
673 @Override
674 public void run() {
675 try {
676 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
677 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
678 // Following should be rewritten using async APIs
679 for (StoredFlowEntry entry : toAdd) {
680 final FlowId id = entry.id();
681 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
682 List<StoredFlowEntry> list = new ArrayList<>();
683 if (original != null) {
684 list.addAll(original);
685 }
686
687 list.remove(entry);
688 list.add(entry);
689
690 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
691 boolean success;
692 if (original == null) {
693 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
694 } else {
695 success = backupFlowTable.replace(id, original, newValue);
696 }
697 // TODO retry?
698 if (!success) {
699 log.error("Updating backup failed.");
700 }
701 }
702 for (FlowRule entry : toRemove) {
703 final FlowId id = entry.id();
704 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
705 List<StoredFlowEntry> list = new ArrayList<>();
706 if (original != null) {
707 list.addAll(original);
708 }
709
710 list.remove(entry);
711
712 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
713 boolean success;
714 if (original == null) {
715 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
716 } else {
717 success = backupFlowTable.replace(id, original, newValue);
718 }
719 // TODO retry?
720 if (!success) {
721 log.error("Updating backup failed.");
722 }
723 }
724 } catch (ExecutionException e) {
725 log.error("Failed to write to backups", e);
726 }
727
728 }
729 }
alshabib339a3d92014-09-26 17:54:32 -0700730}