blob: 6ab5e12f220c01a1a9a1ee48e093bbec67e1d9ef [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -080039import java.util.concurrent.locks.ReentrantReadWriteLock;
Madan Jampani117aaae2014-10-23 10:04:05 -070040import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070041
42import org.apache.felix.scr.annotations.Activate;
43import org.apache.felix.scr.annotations.Component;
44import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070045import org.apache.felix.scr.annotations.Reference;
46import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070047import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070048import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070049import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070050import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070051import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070052import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070053import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070054import org.onlab.onos.net.flow.DefaultFlowEntry;
55import org.onlab.onos.net.flow.FlowEntry;
56import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070057import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070058import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070059import org.onlab.onos.net.flow.FlowRuleBatchEntry;
60import org.onlab.onos.net.flow.FlowRuleBatchEvent;
61import org.onlab.onos.net.flow.FlowRuleBatchOperation;
62import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070063import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070064import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070065import org.onlab.onos.net.flow.FlowRuleEvent.Type;
66import org.onlab.onos.net.flow.FlowRuleStore;
67import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070068import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070069import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
70import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070071import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070072import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070073import org.onlab.onos.store.flow.ReplicaInfoEvent;
74import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070075import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070076import org.onlab.onos.store.hz.AbstractHazelcastStore;
77import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070078import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070079import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070080import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080081import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070082import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070083import org.slf4j.Logger;
84
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070085import com.google.common.cache.Cache;
86import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070087import com.google.common.cache.CacheLoader;
88import com.google.common.cache.LoadingCache;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080089import com.google.common.cache.RemovalListener;
90import com.google.common.cache.RemovalNotification;
alshabib339a3d92014-09-26 17:54:32 -070091import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070092import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070093import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070095import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070096import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070097import com.google.common.util.concurrent.ListenableFuture;
98import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070099import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -0700100
101/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700102 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700103 */
alshabib339a3d92014-09-26 17:54:32 -0700104@Component(immediate = true)
105@Service
106public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700107 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700108 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700109
110 private final Logger log = getLogger(getClass());
111
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800112 // primary data:
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800113 // read/write needs to be locked
114 private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
alshabib339a3d92014-09-26 17:54:32 -0700115 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800116 private final Multimap<DeviceId, StoredFlowEntry> flowEntries
117 = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700118
Madan Jampani38b250d2014-10-17 11:02:38 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700120 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700123 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected DeviceService deviceService;
130
131 private final AtomicInteger localBatchIdGen = new AtomicInteger();
132
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700133 // TODO: make this configurable
134 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700136 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
137 CacheBuilder.newBuilder()
138 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800139 .removalListener(new TimeoutFuture())
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700140 .build();
141
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800142 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700143 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
144
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700145
146 private final ExecutorService futureListeners =
147 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
148
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700149 private final ExecutorService backupExecutors =
150 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
151
152 // TODO make this configurable
153 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700154
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700156 @Override
157 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700158 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
161 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700162 }
163 };
164
165 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700166 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700167
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168 private ReplicaInfoEventListener replicaInfoEventListener;
169
170 @Override
alshabib339a3d92014-09-26 17:54:32 -0700171 @Activate
172 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700173
174 super.serializer = SERIALIZER;
175 super.theInstance = storeService.getHazelcastInstance();
176
177 // Cache to create SMap on demand
178 smaps = CacheBuilder.newBuilder()
179 .softValues()
180 .build(new SMapLoader());
181
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700182 final NodeId local = clusterService.getLocalNode().id();
183
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700184 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700185
186 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
187
188 @Override
189 public void handle(ClusterMessage message) {
190 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800191 log.trace("received get flow entry request for {}", rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700192 FlowEntry flowEntry = getFlowEntryInternal(rule);
193 try {
194 message.respond(SERIALIZER.encode(flowEntry));
195 } catch (IOException e) {
196 log.error("Failed to respond back", e);
197 }
198 }
199 });
200
Madan Jampanif5fdef02014-10-23 21:58:10 -0700201 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
202
203 @Override
204 public void handle(ClusterMessage message) {
205 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800206 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700207 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
208 try {
209 message.respond(SERIALIZER.encode(flowEntries));
210 } catch (IOException e) {
211 log.error("Failed to respond to peer's getFlowEntries request", e);
212 }
213 }
214 });
215
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700216 replicaInfoEventListener = new InternalReplicaInfoEventListener();
217
218 replicaInfoManager.addListener(replicaInfoEventListener);
219
alshabib339a3d92014-09-26 17:54:32 -0700220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700225 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700226 log.info("Stopped");
227 }
228
229
Madan Jampani117aaae2014-10-23 10:04:05 -0700230 // TODO: This is not a efficient operation on a distributed sharded
231 // flow store. We need to revisit the need for this operation or at least
232 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700233 @Override
tom9b4030d2014-10-06 10:39:03 -0700234 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700235 // implementing in-efficient operation for debugging purpose.
236 int sum = 0;
237 for (Device device : deviceService.getDevices()) {
238 final DeviceId did = device.id();
239 sum += Iterables.size(getFlowEntries(did));
240 }
241 return sum;
tom9b4030d2014-10-06 10:39:03 -0700242 }
243
244 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800245 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700246 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700247
248 if (!replicaInfo.master().isPresent()) {
249 log.warn("No master for {}", rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800250 // TODO: should we try returning from backup?
251 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700252 }
253
Madan Jampani117aaae2014-10-23 10:04:05 -0700254 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
255 return getFlowEntryInternal(rule);
256 }
257
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800258 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700259 replicaInfo.master().orNull(), rule.deviceId());
260
261 ClusterMessage message = new ClusterMessage(
262 clusterService.getLocalNode().id(),
263 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
264 SERIALIZER.encode(rule));
265
266 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700267 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
268 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
269 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700270 // FIXME: throw a FlowStoreException
271 throw new RuntimeException(e);
272 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700273 }
274
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800275 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
276 flowEntriesLock.readLock().lock();
277 try {
278 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
279 if (f.equals(rule)) {
280 return f;
281 }
alshabib339a3d92014-09-26 17:54:32 -0700282 }
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800283 } finally {
284 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700285 }
286 return null;
287 }
288
289 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800290 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700291
292 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700293
294 if (!replicaInfo.master().isPresent()) {
295 log.warn("No master for {}", deviceId);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800296 // TODO: should we try returning from backup?
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700297 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700298 }
299
Madan Jampanif5fdef02014-10-23 21:58:10 -0700300 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
301 return getFlowEntriesInternal(deviceId);
302 }
303
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800304 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Madan Jampanif5fdef02014-10-23 21:58:10 -0700305 replicaInfo.master().orNull(), deviceId);
306
307 ClusterMessage message = new ClusterMessage(
308 clusterService.getLocalNode().id(),
309 GET_DEVICE_FLOW_ENTRIES,
310 SERIALIZER.encode(deviceId));
311
312 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700313 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
314 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
315 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700316 // FIXME: throw a FlowStoreException
317 throw new RuntimeException(e);
318 }
319 }
320
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800321 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
322 flowEntriesLock.readLock().lock();
323 try {
324 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
325 if (rules == null) {
326 return Collections.emptySet();
327 }
328 return ImmutableSet.copyOf(rules);
329 } finally {
330 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700331 }
alshabib339a3d92014-09-26 17:54:32 -0700332 }
333
334 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700335 public void storeFlowRule(FlowRule rule) {
336 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
337 }
338
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700339 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700341
Madan Jampani117aaae2014-10-23 10:04:05 -0700342 if (operation.getOperations().isEmpty()) {
Brian O'Connor427a1762014-11-19 18:40:32 -0800343 return Futures.immediateFuture(new CompletedBatchOperation(true,
344 Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700345 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700346
Madan Jampani117aaae2014-10-23 10:04:05 -0700347 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
348
349 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
350
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700351 if (!replicaInfo.master().isPresent()) {
352 log.warn("No master for {}", deviceId);
353 // TODO: revisit if this should be "success" from Future point of view
354 // with every FlowEntry failed
355 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
356 }
357
358 final NodeId local = clusterService.getLocalNode().id();
359 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700360 return storeBatchInternal(operation);
361 }
362
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800363 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700364 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700365
Madan Jampani38b250d2014-10-17 11:02:38 -0700366 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700367 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700368 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700369 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700370
371 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700372 ListenableFuture<byte[]> responseFuture =
373 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700374 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700375 } catch (IOException e) {
376 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700377 }
378 }
379
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800380 private ListenableFuture<CompletedBatchOperation>
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800381 storeBatchInternal(FlowRuleBatchOperation operation) {
382
Brian O'Connor427a1762014-11-19 18:40:32 -0800383 final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
384 final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700385 DeviceId did = null;
386
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700387
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800388 flowEntriesLock.writeLock().lock();
389 try {
390 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
391 FlowRule flowRule = batchEntry.getTarget();
392 FlowRuleOperation op = batchEntry.getOperator();
393 if (did == null) {
394 did = flowRule.deviceId();
395 }
396 if (op.equals(FlowRuleOperation.REMOVE)) {
397 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
398 if (entry != null) {
399 entry.setState(FlowEntryState.PENDING_REMOVE);
Brian O'Connor427a1762014-11-19 18:40:32 -0800400 toRemove.add(batchEntry);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800401 }
402 } else if (op.equals(FlowRuleOperation.ADD)) {
403 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
404 DeviceId deviceId = flowRule.deviceId();
405 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
406 flowEntries.put(deviceId, flowEntry);
Brian O'Connor427a1762014-11-19 18:40:32 -0800407 toAdd.add(batchEntry);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800408 }
409 }
410 }
411 if (toAdd.isEmpty() && toRemove.isEmpty()) {
412 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
413 }
414
415 // create remote backup copies
416 updateBackup(did, toAdd, toRemove);
417 } finally {
418 flowEntriesLock.writeLock().unlock();
419 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700420
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700421 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
422 final int batchId = localBatchIdGen.incrementAndGet();
423
424 pendingFutures.put(batchId, r);
425 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700426
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700427 return r;
alshabib339a3d92014-09-26 17:54:32 -0700428 }
429
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700430 private void updateBackup(final DeviceId deviceId,
Brian O'Connor427a1762014-11-19 18:40:32 -0800431 final List<FlowRuleBatchEntry> toAdd,
432 final List<FlowRuleBatchEntry> list) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700433
434 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
435
436 if (syncBackup) {
437 // wait for backup to complete
438 try {
439 submit.get();
440 } catch (InterruptedException | ExecutionException e) {
441 log.error("Failed to create backups", e);
442 }
443 }
444 }
445
Brian O'Connor427a1762014-11-19 18:40:32 -0800446 private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
447
448 updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700449 }
450
alshabib339a3d92014-09-26 17:54:32 -0700451 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700452 public void deleteFlowRule(FlowRule rule) {
453 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700454 }
455
456 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700457 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
458 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700459 final NodeId localId = clusterService.getLocalNode().id();
460 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700461 return addOrUpdateFlowRuleInternal(rule);
462 }
463
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800464 log.warn("Tried to update FlowRule {} state,"
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700465 + " while the Node was not the master.", rule);
466 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700467 }
468
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800469 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700470 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700471
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800472 flowEntriesLock.writeLock().lock();
473 try {
474 // check if this new rule is an update to an existing entry
475 StoredFlowEntry stored = getFlowEntryInternal(rule);
476 if (stored != null) {
477 stored.setBytes(rule.bytes());
478 stored.setLife(rule.life());
479 stored.setPackets(rule.packets());
480 if (stored.state() == FlowEntryState.PENDING_ADD) {
481 stored.setState(FlowEntryState.ADDED);
Brian O'Connor427a1762014-11-19 18:40:32 -0800482 FlowRuleBatchEntry entry =
483 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
484 updateBackup(did, Arrays.asList(entry));
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800485 return new FlowRuleEvent(Type.RULE_ADDED, rule);
486 }
487 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
alshabib1c319ff2014-10-04 20:29:09 -0700488 }
alshabib339a3d92014-09-26 17:54:32 -0700489
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800490 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
491 // TODO: also update backup.
492 flowEntries.put(did, new DefaultFlowEntry(rule));
493 } finally {
494 flowEntriesLock.writeLock().unlock();
495 }
alshabib1c319ff2014-10-04 20:29:09 -0700496 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700497
alshabib339a3d92014-09-26 17:54:32 -0700498 }
499
500 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700501 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
502 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700503
504 final NodeId localId = clusterService.getLocalNode().id();
505 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700506 // bypass and handle it locally
507 return removeFlowRuleInternal(rule);
508 }
509
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800510 log.warn("Tried to remove FlowRule {},"
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700511 + " while the Node was not the master.", rule);
512 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700513 }
514
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800515 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700516 final DeviceId deviceId = rule.deviceId();
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800517 flowEntriesLock.writeLock().lock();
518 try {
519 // This is where one could mark a rule as removed and still keep it in the store.
520 final boolean removed = flowEntries.remove(deviceId, rule);
Brian O'Connor427a1762014-11-19 18:40:32 -0800521 FlowRuleBatchEntry entry =
522 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
523 updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800524 if (removed) {
525 return new FlowRuleEvent(RULE_REMOVED, rule);
526 } else {
527 return null;
528 }
529 } finally {
530 flowEntriesLock.writeLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700531 }
alshabib339a3d92014-09-26 17:54:32 -0700532 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700533
534 @Override
535 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700536 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700537 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700538 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700539 if (future != null) {
540 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700541 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700542 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700543 notifyDelegate(event);
544 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700545
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800546 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700547
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800548 flowEntriesLock.writeLock().lock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700549 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800550 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700551 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
552 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
553 : backupFlowTable.entrySet()) {
554
555 // TODO: should we be directly updating internal structure or
556 // should we be triggering event?
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800557 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700558 for (StoredFlowEntry entry : e.getValue()) {
559 flowEntries.remove(did, entry);
560 flowEntries.put(did, entry);
561 }
562 }
563 } catch (ExecutionException e) {
564 log.error("Failed to load backup flowtable for {}", did, e);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800565 } finally {
566 flowEntriesLock.writeLock().unlock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700567 }
568 }
569
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800570 private void removeFromPrimary(final DeviceId did) {
571 Collection<StoredFlowEntry> removed = null;
572 flowEntriesLock.writeLock().lock();
573 try {
574 removed = flowEntries.removeAll(did);
575 } finally {
576 flowEntriesLock.writeLock().unlock();
577 }
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800578 log.trace("removedFromPrimary {}", removed);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700579 }
580
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800581 private static final class TimeoutFuture
582 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
583 @Override
584 public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
585 // wrapping in ExecutionException to support Future.get
586 notification.getValue()
587 .setException(new ExecutionException("Timed out",
588 new TimeoutException()));
589 }
590 }
591
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700592 private final class OnStoreBatch implements ClusterMessageHandler {
593 private final NodeId local;
594
595 private OnStoreBatch(NodeId local) {
596 this.local = local;
597 }
598
599 @Override
600 public void handle(final ClusterMessage message) {
601 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800602 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700603
604 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
605 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
606 if (!local.equals(replicaInfo.master().orNull())) {
607
608 Set<FlowRule> failures = new HashSet<>(operation.size());
609 for (FlowRuleBatchEntry op : operation.getOperations()) {
610 failures.add(op.getTarget());
611 }
612 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
613 // This node is no longer the master, respond as all failed.
614 // TODO: we might want to wrap response in envelope
615 // to distinguish sw programming failure and hand over
616 // it make sense in the latter case to retry immediately.
617 try {
618 message.respond(SERIALIZER.encode(allFailed));
619 } catch (IOException e) {
620 log.error("Failed to respond back", e);
621 }
622 return;
623 }
624
625 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
626
627 f.addListener(new Runnable() {
628
629 @Override
630 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800631 CompletedBatchOperation result;
632 try {
633 result = f.get();
634 } catch (InterruptedException | ExecutionException e) {
635 log.error("Batch operation failed", e);
636 // create everything failed response
637 Set<FlowRule> failures = new HashSet<>(operation.size());
638 for (FlowRuleBatchEntry op : operation.getOperations()) {
639 failures.add(op.getTarget());
640 }
641 result = new CompletedBatchOperation(false, failures);
642 }
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700643 try {
644 message.respond(SERIALIZER.encode(result));
645 } catch (IOException e) {
646 log.error("Failed to respond back", e);
647 }
648 }
649 }, futureListeners);
650 }
651 }
652
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700653 private final class SMapLoader
654 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
655
656 @Override
657 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
658 throws Exception {
659 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
660 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
661 }
662 }
663
664 private final class InternalReplicaInfoEventListener
665 implements ReplicaInfoEventListener {
666
667 @Override
668 public void event(ReplicaInfoEvent event) {
669 final NodeId local = clusterService.getLocalNode().id();
670 final DeviceId did = event.subject();
671 final ReplicaInfo rInfo = event.replicaInfo();
672
673 switch (event.type()) {
674 case MASTER_CHANGED:
675 if (local.equals(rInfo.master().orNull())) {
676 // This node is the new master, populate local structure
677 // from backup
678 loadFromBackup(did);
679 } else {
680 // This node is no longer the master holder,
681 // clean local structure
682 removeFromPrimary(did);
683 // FIXME: probably should stop pending backup activities in
684 // executors to avoid overwriting with old value
685 }
686 break;
687 default:
688 break;
689
690 }
691 }
692 }
693
694 // Task to update FlowEntries in backup HZ store
Brian O'Connor427a1762014-11-19 18:40:32 -0800695 // TODO: Should be refactored to contain only one list and not
696 // toAdd and toRemove
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700697 private final class UpdateBackup implements Runnable {
698
699 private final DeviceId deviceId;
Brian O'Connor427a1762014-11-19 18:40:32 -0800700 private final List<FlowRuleBatchEntry> toAdd;
701 private final List<FlowRuleBatchEntry> toRemove;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700702
703 public UpdateBackup(DeviceId deviceId,
Brian O'Connor427a1762014-11-19 18:40:32 -0800704 List<FlowRuleBatchEntry> toAdd,
705 List<FlowRuleBatchEntry> list) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700706 this.deviceId = checkNotNull(deviceId);
707 this.toAdd = checkNotNull(toAdd);
708 this.toRemove = checkNotNull(list);
709 }
710
711 @Override
712 public void run() {
713 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800714 log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700715 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
716 // Following should be rewritten using async APIs
Brian O'Connor427a1762014-11-19 18:40:32 -0800717 for (FlowRuleBatchEntry bEntry : toAdd) {
718 final FlowRule entry = bEntry.getTarget();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700719 final FlowId id = entry.id();
720 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
721 List<StoredFlowEntry> list = new ArrayList<>();
722 if (original != null) {
723 list.addAll(original);
724 }
725
Brian O'Connor427a1762014-11-19 18:40:32 -0800726 list.remove(bEntry.getTarget());
727 list.add((StoredFlowEntry) entry);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700728
729 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
730 boolean success;
731 if (original == null) {
732 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
733 } else {
734 success = backupFlowTable.replace(id, original, newValue);
735 }
736 // TODO retry?
737 if (!success) {
738 log.error("Updating backup failed.");
739 }
740 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800741 for (FlowRuleBatchEntry bEntry : toRemove) {
742 final FlowRule entry = bEntry.getTarget();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700743 final FlowId id = entry.id();
744 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
745 List<StoredFlowEntry> list = new ArrayList<>();
746 if (original != null) {
747 list.addAll(original);
748 }
749
Brian O'Connor427a1762014-11-19 18:40:32 -0800750 list.remove(bEntry.getTarget());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700751
752 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
753 boolean success;
754 if (original == null) {
755 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
756 } else {
757 success = backupFlowTable.replace(id, original, newValue);
758 }
759 // TODO retry?
760 if (!success) {
761 log.error("Updating backup failed.");
762 }
763 }
764 } catch (ExecutionException e) {
765 log.error("Failed to write to backups", e);
766 }
767
768 }
769 }
alshabib339a3d92014-09-26 17:54:32 -0700770}