blob: f65da3faa6e7a2ced984f7d49af4e953883c613c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070040
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070044import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070046import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070047import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070048import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070049import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070050import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070051import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070053import org.onlab.onos.net.flow.DefaultFlowEntry;
54import org.onlab.onos.net.flow.FlowEntry;
55import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070057import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070058import org.onlab.onos.net.flow.FlowRuleBatchEntry;
59import org.onlab.onos.net.flow.FlowRuleBatchEvent;
60import org.onlab.onos.net.flow.FlowRuleBatchOperation;
61import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070062import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070063import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070064import org.onlab.onos.net.flow.FlowRuleEvent.Type;
65import org.onlab.onos.net.flow.FlowRuleStore;
66import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070067import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070068import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
69import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070070import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070071import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import org.onlab.onos.store.flow.ReplicaInfoEvent;
73import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070074import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070075import org.onlab.onos.store.hz.AbstractHazelcastStore;
76import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070077import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070078import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070079import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080080import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070081import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070082import org.slf4j.Logger;
83
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070084import com.google.common.cache.Cache;
85import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070086import com.google.common.cache.CacheLoader;
87import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070088import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070089import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070090import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070091import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070092import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070093import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.util.concurrent.ListenableFuture;
95import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070096import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070097
98/**
Madan Jampani38b250d2014-10-17 11:02:38 -070099 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700100 */
alshabib339a3d92014-09-26 17:54:32 -0700101@Component(immediate = true)
102@Service
103public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700104 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700105 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700106
107 private final Logger log = getLogger(getClass());
108
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800109 // primary data:
110 // read/write needs to be synchronized
alshabib339a3d92014-09-26 17:54:32 -0700111 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800112 private final Multimap<DeviceId, StoredFlowEntry> flowEntries
113 = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700114
Madan Jampani38b250d2014-10-17 11:02:38 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700119 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DeviceService deviceService;
126
127 private final AtomicInteger localBatchIdGen = new AtomicInteger();
128
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700129 // TODO: make this configurable
130 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700131
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700132 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
133 CacheBuilder.newBuilder()
134 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
135 // TODO Explicitly fail the future if expired?
136 //.removalListener(listener)
137 .build();
138
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800139 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700140 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
141
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700142
143 private final ExecutorService futureListeners =
144 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
145
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700146 private final ExecutorService backupExecutors =
147 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
148
149 // TODO make this configurable
150 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700151
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700152 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700153 @Override
154 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700155 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700156 .register(DistributedStoreSerializers.COMMON)
157 .build()
158 .populate(1);
159 }
160 };
161
162 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700163 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700164
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700165 private ReplicaInfoEventListener replicaInfoEventListener;
166
167 @Override
alshabib339a3d92014-09-26 17:54:32 -0700168 @Activate
169 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700170
171 super.serializer = SERIALIZER;
172 super.theInstance = storeService.getHazelcastInstance();
173
174 // Cache to create SMap on demand
175 smaps = CacheBuilder.newBuilder()
176 .softValues()
177 .build(new SMapLoader());
178
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700179 final NodeId local = clusterService.getLocalNode().id();
180
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700181 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700182
183 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
184
185 @Override
186 public void handle(ClusterMessage message) {
187 FlowRule rule = SERIALIZER.decode(message.payload());
188 log.info("received get flow entry request for {}", rule);
189 FlowEntry flowEntry = getFlowEntryInternal(rule);
190 try {
191 message.respond(SERIALIZER.encode(flowEntry));
192 } catch (IOException e) {
193 log.error("Failed to respond back", e);
194 }
195 }
196 });
197
Madan Jampanif5fdef02014-10-23 21:58:10 -0700198 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
199
200 @Override
201 public void handle(ClusterMessage message) {
202 DeviceId deviceId = SERIALIZER.decode(message.payload());
203 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
204 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
205 try {
206 message.respond(SERIALIZER.encode(flowEntries));
207 } catch (IOException e) {
208 log.error("Failed to respond to peer's getFlowEntries request", e);
209 }
210 }
211 });
212
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700213 replicaInfoEventListener = new InternalReplicaInfoEventListener();
214
215 replicaInfoManager.addListener(replicaInfoEventListener);
216
alshabib339a3d92014-09-26 17:54:32 -0700217 log.info("Started");
218 }
219
220 @Deactivate
221 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700222 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700223 log.info("Stopped");
224 }
225
226
Madan Jampani117aaae2014-10-23 10:04:05 -0700227 // TODO: This is not a efficient operation on a distributed sharded
228 // flow store. We need to revisit the need for this operation or at least
229 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700230 @Override
tom9b4030d2014-10-06 10:39:03 -0700231 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700232 // implementing in-efficient operation for debugging purpose.
233 int sum = 0;
234 for (Device device : deviceService.getDevices()) {
235 final DeviceId did = device.id();
236 sum += Iterables.size(getFlowEntries(did));
237 }
238 return sum;
tom9b4030d2014-10-06 10:39:03 -0700239 }
240
241 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700242 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700243 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700244
245 if (!replicaInfo.master().isPresent()) {
246 log.warn("No master for {}", rule);
247 // TODO: revisit if this should be returning null.
248 // FIXME: throw a FlowStoreException
249 throw new RuntimeException("No master for " + rule);
250 }
251
Madan Jampani117aaae2014-10-23 10:04:05 -0700252 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
253 return getFlowEntryInternal(rule);
254 }
255
256 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
257 replicaInfo.master().orNull(), rule.deviceId());
258
259 ClusterMessage message = new ClusterMessage(
260 clusterService.getLocalNode().id(),
261 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
262 SERIALIZER.encode(rule));
263
264 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700265 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
266 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
267 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700268 // FIXME: throw a FlowStoreException
269 throw new RuntimeException(e);
270 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700271 }
272
273 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
274 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700275 if (f.equals(rule)) {
276 return f;
277 }
278 }
279 return null;
280 }
281
282 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700283 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700284
285 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700286
287 if (!replicaInfo.master().isPresent()) {
288 log.warn("No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700289 // TODO: revisit if this should be returning empty collection or throwing exception.
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700290 // FIXME: throw a FlowStoreException
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700291 //throw new RuntimeException("No master for " + deviceId);
292 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700293 }
294
Madan Jampanif5fdef02014-10-23 21:58:10 -0700295 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
296 return getFlowEntriesInternal(deviceId);
297 }
298
299 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
300 replicaInfo.master().orNull(), deviceId);
301
302 ClusterMessage message = new ClusterMessage(
303 clusterService.getLocalNode().id(),
304 GET_DEVICE_FLOW_ENTRIES,
305 SERIALIZER.encode(deviceId));
306
307 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700308 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
309 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
310 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700311 // FIXME: throw a FlowStoreException
312 throw new RuntimeException(e);
313 }
314 }
315
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800316 private synchronized Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700317 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700318 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700319 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700320 }
321 return ImmutableSet.copyOf(rules);
322 }
323
324 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700325 public void storeFlowRule(FlowRule rule) {
326 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
327 }
328
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700329 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700330 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700331 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700332
Madan Jampani117aaae2014-10-23 10:04:05 -0700333 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700334 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700335 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700336
Madan Jampani117aaae2014-10-23 10:04:05 -0700337 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
338
339 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
340
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700341 if (!replicaInfo.master().isPresent()) {
342 log.warn("No master for {}", deviceId);
343 // TODO: revisit if this should be "success" from Future point of view
344 // with every FlowEntry failed
345 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
346 }
347
348 final NodeId local = clusterService.getLocalNode().id();
349 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700350 return storeBatchInternal(operation);
351 }
352
353 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
354 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700355
Madan Jampani38b250d2014-10-17 11:02:38 -0700356 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700357 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700358 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700359 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700360
361 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700362 ListenableFuture<byte[]> responseFuture =
363 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700364 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700365 } catch (IOException e) {
366 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700367 }
368 }
369
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800370 private synchronized ListenableFuture<CompletedBatchOperation>
371 storeBatchInternal(FlowRuleBatchOperation operation) {
372
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700373 final List<StoredFlowEntry> toRemove = new ArrayList<>();
374 final List<StoredFlowEntry> toAdd = new ArrayList<>();
375 DeviceId did = null;
376
Madan Jampani117aaae2014-10-23 10:04:05 -0700377 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
378 FlowRule flowRule = batchEntry.getTarget();
379 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700380 if (did == null) {
381 did = flowRule.deviceId();
382 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 if (op.equals(FlowRuleOperation.REMOVE)) {
384 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
385 if (entry != null) {
386 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700387 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700388 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 } else if (op.equals(FlowRuleOperation.ADD)) {
390 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
391 DeviceId deviceId = flowRule.deviceId();
392 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
393 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700394 toAdd.add(flowEntry);
395 }
396 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700397 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700398 if (toAdd.isEmpty() && toRemove.isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700399 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700400 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700401
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700402 // create remote backup copies
403 final DeviceId deviceId = did;
404 updateBackup(deviceId, toAdd, toRemove);
405
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700406 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
407 final int batchId = localBatchIdGen.incrementAndGet();
408
409 pendingFutures.put(batchId, r);
410 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700411
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700412 return r;
alshabib339a3d92014-09-26 17:54:32 -0700413 }
414
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700415 private void updateBackup(final DeviceId deviceId,
416 final List<StoredFlowEntry> toAdd,
417 final List<? extends FlowRule> list) {
418
419 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
420
421 if (syncBackup) {
422 // wait for backup to complete
423 try {
424 submit.get();
425 } catch (InterruptedException | ExecutionException e) {
426 log.error("Failed to create backups", e);
427 }
428 }
429 }
430
431 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
432 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
433 }
434
alshabib339a3d92014-09-26 17:54:32 -0700435 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700436 public void deleteFlowRule(FlowRule rule) {
437 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700438 }
439
440 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700441 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
442 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700443 final NodeId localId = clusterService.getLocalNode().id();
444 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700445 return addOrUpdateFlowRuleInternal(rule);
446 }
447
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700448 log.error("Tried to update FlowRule {} state,"
449 + " while the Node was not the master.", rule);
450 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700451 }
452
453 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700454 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700455
456 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700457 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700458 if (stored != null) {
459 stored.setBytes(rule.bytes());
460 stored.setLife(rule.life());
461 stored.setPackets(rule.packets());
462 if (stored.state() == FlowEntryState.PENDING_ADD) {
463 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700464 // update backup.
465 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700466 return new FlowRuleEvent(Type.RULE_ADDED, rule);
467 }
alshabib339a3d92014-09-26 17:54:32 -0700468 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
469 }
470
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700471 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700472 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700473 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700474 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700475
alshabib339a3d92014-09-26 17:54:32 -0700476 }
477
478 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700479 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
480 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700481
482 final NodeId localId = clusterService.getLocalNode().id();
483 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700484 // bypass and handle it locally
485 return removeFlowRuleInternal(rule);
486 }
487
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700488 log.error("Tried to remove FlowRule {},"
489 + " while the Node was not the master.", rule);
490 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700491 }
492
493 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700494 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700495 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700496 final boolean removed = flowEntries.remove(deviceId, rule);
497 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
498 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700499 return new FlowRuleEvent(RULE_REMOVED, rule);
500 } else {
501 return null;
502 }
alshabib339a3d92014-09-26 17:54:32 -0700503 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700504
505 @Override
506 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700507 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700508 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700509 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700510 if (future != null) {
511 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700512 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700513 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700514 notifyDelegate(event);
515 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700516
517 private synchronized void loadFromBackup(final DeviceId did) {
518 // should relax synchronized condition
519
520 try {
521 log.info("Loading FlowRules for {} from backups", did);
522 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
523 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
524 : backupFlowTable.entrySet()) {
525
526 // TODO: should we be directly updating internal structure or
527 // should we be triggering event?
528 log.debug("loading {}", e.getValue());
529 for (StoredFlowEntry entry : e.getValue()) {
530 flowEntries.remove(did, entry);
531 flowEntries.put(did, entry);
532 }
533 }
534 } catch (ExecutionException e) {
535 log.error("Failed to load backup flowtable for {}", did, e);
536 }
537 }
538
539 private synchronized void removeFromPrimary(final DeviceId did) {
540 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
541 log.debug("removedFromPrimary {}", removed);
542 }
543
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700544 private final class OnStoreBatch implements ClusterMessageHandler {
545 private final NodeId local;
546
547 private OnStoreBatch(NodeId local) {
548 this.local = local;
549 }
550
551 @Override
552 public void handle(final ClusterMessage message) {
553 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
554 log.info("received batch request {}", operation);
555
556 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
557 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
558 if (!local.equals(replicaInfo.master().orNull())) {
559
560 Set<FlowRule> failures = new HashSet<>(operation.size());
561 for (FlowRuleBatchEntry op : operation.getOperations()) {
562 failures.add(op.getTarget());
563 }
564 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
565 // This node is no longer the master, respond as all failed.
566 // TODO: we might want to wrap response in envelope
567 // to distinguish sw programming failure and hand over
568 // it make sense in the latter case to retry immediately.
569 try {
570 message.respond(SERIALIZER.encode(allFailed));
571 } catch (IOException e) {
572 log.error("Failed to respond back", e);
573 }
574 return;
575 }
576
577 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
578
579 f.addListener(new Runnable() {
580
581 @Override
582 public void run() {
583 CompletedBatchOperation result = Futures.getUnchecked(f);
584 try {
585 message.respond(SERIALIZER.encode(result));
586 } catch (IOException e) {
587 log.error("Failed to respond back", e);
588 }
589 }
590 }, futureListeners);
591 }
592 }
593
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700594 private final class SMapLoader
595 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
596
597 @Override
598 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
599 throws Exception {
600 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
601 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
602 }
603 }
604
605 private final class InternalReplicaInfoEventListener
606 implements ReplicaInfoEventListener {
607
608 @Override
609 public void event(ReplicaInfoEvent event) {
610 final NodeId local = clusterService.getLocalNode().id();
611 final DeviceId did = event.subject();
612 final ReplicaInfo rInfo = event.replicaInfo();
613
614 switch (event.type()) {
615 case MASTER_CHANGED:
616 if (local.equals(rInfo.master().orNull())) {
617 // This node is the new master, populate local structure
618 // from backup
619 loadFromBackup(did);
620 } else {
621 // This node is no longer the master holder,
622 // clean local structure
623 removeFromPrimary(did);
624 // FIXME: probably should stop pending backup activities in
625 // executors to avoid overwriting with old value
626 }
627 break;
628 default:
629 break;
630
631 }
632 }
633 }
634
635 // Task to update FlowEntries in backup HZ store
636 private final class UpdateBackup implements Runnable {
637
638 private final DeviceId deviceId;
639 private final List<StoredFlowEntry> toAdd;
640 private final List<? extends FlowRule> toRemove;
641
642 public UpdateBackup(DeviceId deviceId,
643 List<StoredFlowEntry> toAdd,
644 List<? extends FlowRule> list) {
645 this.deviceId = checkNotNull(deviceId);
646 this.toAdd = checkNotNull(toAdd);
647 this.toRemove = checkNotNull(list);
648 }
649
650 @Override
651 public void run() {
652 try {
653 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
654 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
655 // Following should be rewritten using async APIs
656 for (StoredFlowEntry entry : toAdd) {
657 final FlowId id = entry.id();
658 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
659 List<StoredFlowEntry> list = new ArrayList<>();
660 if (original != null) {
661 list.addAll(original);
662 }
663
664 list.remove(entry);
665 list.add(entry);
666
667 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
668 boolean success;
669 if (original == null) {
670 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
671 } else {
672 success = backupFlowTable.replace(id, original, newValue);
673 }
674 // TODO retry?
675 if (!success) {
676 log.error("Updating backup failed.");
677 }
678 }
679 for (FlowRule entry : toRemove) {
680 final FlowId id = entry.id();
681 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
682 List<StoredFlowEntry> list = new ArrayList<>();
683 if (original != null) {
684 list.addAll(original);
685 }
686
687 list.remove(entry);
688
689 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
690 boolean success;
691 if (original == null) {
692 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
693 } else {
694 success = backupFlowTable.replace(id, original, newValue);
695 }
696 // TODO retry?
697 if (!success) {
698 log.error("Updating backup failed.");
699 }
700 }
701 } catch (ExecutionException e) {
702 log.error("Failed to write to backups", e);
703 }
704
705 }
706 }
alshabib339a3d92014-09-26 17:54:32 -0700707}