blob: b609a3cf5a41aa32facfef3689177b73c0f61eea [file] [log] [blame]
Hongtao Yin142b7582015-01-21 14:41:30 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowext.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.util.concurrent.Futures;
21import com.google.common.util.concurrent.ListenableFuture;
22import com.google.common.util.concurrent.SettableFuture;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070023
Hongtao Yin142b7582015-01-21 14:41:30 -080024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.onlab.util.KryoNamespace;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.CompletedBatchOperation;
35import org.onosproject.net.flow.FlowRuleBatchEntry;
36import org.onosproject.net.flow.FlowRuleBatchEvent;
37import org.onosproject.net.flow.FlowRuleBatchRequest;
38import org.onosproject.net.flowext.DefaultFlowRuleExt;
39import org.onosproject.net.flowext.DownStreamFlowEntry;
40import org.onosproject.net.flowext.FlowExtCompletedOperation;
41import org.onosproject.net.flowext.FlowRuleExtRouter;
42import org.onosproject.net.flowext.FlowRuleExtRouterListener;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
44import org.onosproject.store.cluster.messaging.ClusterMessage;
45import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
46import org.onosproject.store.flow.ReplicaInfo;
47import org.onosproject.store.flow.ReplicaInfoEventListener;
48import org.onosproject.store.flow.ReplicaInfoService;
Hongtao Yin142b7582015-01-21 14:41:30 -080049import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
51import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
52import org.slf4j.Logger;
53
54import java.io.IOException;
55import java.util.Collection;
56import java.util.Collections;
57import java.util.HashSet;
58import java.util.Iterator;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61import java.util.concurrent.Executors;
62import java.util.concurrent.Future;
63import java.util.concurrent.TimeUnit;
64
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onlab.util.Tools.groupedThreads;
Hongtao Yin142b7582015-01-21 14:41:30 -080066import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
67import static org.slf4j.LoggerFactory.getLogger;
68
69/**
70 * Experimental extension to the flow rule subsystem; still under development.
71 * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
72 * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
73 */
Brian O'Connor916de342015-02-17 18:07:32 -080074@Component(immediate = true, enabled = false)
Hongtao Yin142b7582015-01-21 14:41:30 -080075@Service
76public class DefaultFlowRuleExtRouter
77 implements FlowRuleExtRouter {
78
79 private final Logger log = getLogger(getClass());
80
Madan Jampani2af244a2015-02-22 13:12:01 -080081 // TODO: Make configurable.
82 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
83
Hongtao Yin142b7582015-01-21 14:41:30 -080084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ReplicaInfoService replicaInfoManager;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterCommunicationService clusterCommunicator;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected ClusterService clusterService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected DeviceService deviceService;
95
96 private int pendingFutureTimeoutMinutes = 5;
97
98 protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
99 private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
100 .newBuilder()
101 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
102 // .removalListener(new TimeoutFuture())
103 .build();
104
105 private final ExecutorService futureListeners = Executors
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800106 .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
Hongtao Yin142b7582015-01-21 14:41:30 -0800107
Madan Jampani2af244a2015-02-22 13:12:01 -0800108 private ExecutorService messageHandlingExecutor;
109
Hongtao Yin142b7582015-01-21 14:41:30 -0800110 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
111 @Override
112 protected void setupKryoPool() {
113 serializerPool = KryoNamespace.newBuilder()
114 .register(DistributedStoreSerializers.STORE_COMMON)
115 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
116 .register(FlowExtCompletedOperation.class)
117 .register(FlowRuleBatchRequest.class)
118 .register(DownStreamFlowEntry.class)
119 .register(DefaultFlowRuleExt.class)
120 .build();
121 }
122 };
123
124 private ReplicaInfoEventListener replicaInfoEventListener;
125
126 @Activate
127 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800128
129 messageHandlingExecutor = Executors.newFixedThreadPool(
130 MESSAGE_HANDLER_THREAD_POOL_SIZE,
131 groupedThreads("onos/flow", "message-handlers"));
132
Hongtao Yin142b7582015-01-21 14:41:30 -0800133 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
134 new ClusterMessageHandler() {
135
136 @Override
137 public void handle(ClusterMessage message) {
138 // decode the extended flow entry and store them in memory.
139 FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
140 log.info("received batch request {}", operation);
141 final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
142 f.addListener(new Runnable() {
143 @Override
144 public void run() {
145 FlowExtCompletedOperation result = Futures.getUnchecked(f);
146 try {
147 message.respond(SERIALIZER.encode(result));
148 } catch (IOException e) {
149 log.error("Failed to respond back", e);
150 }
151 }
152 }, futureListeners);
153 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800154 }, messageHandlingExecutor);
Hongtao Yin142b7582015-01-21 14:41:30 -0800155
156 replicaInfoManager.addListener(replicaInfoEventListener);
157
158 log.info("Started");
159 }
160
161 @Deactivate
162 public void deactivate() {
163 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
Madan Jampani2af244a2015-02-22 13:12:01 -0800164 messageHandlingExecutor.shutdown();
Hongtao Yin142b7582015-01-21 14:41:30 -0800165 replicaInfoManager.removeListener(replicaInfoEventListener);
166 log.info("Stopped");
167 }
168
169 /**
170 * apply the sub batch of flow extension rules.
171 *
172 * @param batchOperation batch of flow rules.
173 * A batch can contain flow rules for a single device only.
174 * @return Future response indicating success/failure of the batch operation
175 * all the way down to the device.
176 */
177 @Override
178 public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
179 // TODO Auto-generated method stub
180 if (batchOperation.ops().isEmpty()) {
181 return Futures.immediateFuture(new FlowExtCompletedOperation(
182 batchOperation.batchId(), true, Collections.emptySet()));
183 }
184 // get the deviceId all the collection belongs to
185 DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
186
187 if (deviceId == null) {
188 log.error("This Batch exists more than two deviceId");
189 return null;
190 }
191 ReplicaInfo replicaInfo = replicaInfoManager
192 .getReplicaInfoFor(deviceId);
193
194 if (replicaInfo.master().get()
195 .equals(clusterService.getLocalNode().id())) {
196 return applyBatchInternal(batchOperation);
197 }
198
199 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
200 replicaInfo.master().orNull(), deviceId);
201
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700202 return clusterCommunicator.sendAndReceive(
203 batchOperation,
204 APPLY_EXTEND_FLOWS,
205 SERIALIZER::encode,
206 SERIALIZER::decode,
207 replicaInfo.master().get());
Hongtao Yin142b7582015-01-21 14:41:30 -0800208 }
209
210 /**
211 * apply the batch in local node.
212 * It means this instance is master of the device the flow entry belongs to.
213 *
214 * @param batchOperation a collection of flow entry, all they should send down to one device
215 * @return Future response indicating success/failure of the batch operation
216 * all the way down to the device.
217 */
218 private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
219 SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
220 pendingExtendFutures.put(batchOperation.batchId(), r);
221 // here should notify manager to complete
222 notify(batchOperation);
223 return r;
224 }
225
226 /**
227 * Get the deviceId of this batch.
228 * The whole Batch should belong to one deviceId.
229 *
230 * @param batchOperation a collection of flow entry, all they should send down to one device
231 * @return the deviceId the whole batch belongs to
232 */
233 private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
234 Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
235 FlowRuleBatchEntry headOp = head.next();
236 boolean sameId = true;
237 for (FlowRuleBatchEntry operation : batchOperation) {
238 if (operation.target().deviceId() != headOp.target().deviceId()) {
239 log.warn("this batch does not apply on one device Id ");
240 sameId = false;
241 break;
242 }
243 }
244 return sameId ? headOp.target().deviceId() : null;
245 }
246
247 /**
248 * Notify the listener of Router to do some reaction.
249 *
250 * @param request the requested operation to do
251 */
252 public void notify(FlowRuleBatchRequest request) {
253 for (FlowRuleExtRouterListener listener : routerListener) {
254 listener.notify(FlowRuleBatchEvent
255 // TODO fill in the deviceId
256 .requested(request, null));
257 }
258 }
259
260 /**
261 * Invoked on the completion of a storeBatch operation.
262 *
263 * @param event flow rule batch event
264 */
265 @Override
266 public void batchOperationComplete(FlowRuleBatchEvent event) {
267 // TODO Auto-generated method stub
268 final Long batchId = event.subject().batchId();
269 SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
270 .getIfPresent(batchId);
271 if (future != null) {
272 FlowRuleBatchRequest request = event.subject();
273 CompletedBatchOperation result = event.result();
274 FlowExtCompletedOperation completed =
275 new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
276 future.set(completed);
277 pendingExtendFutures.invalidate(batchId);
278 }
279 }
280
281 /**
282 * Register the listener to monitor Router,
283 * The Router find master to send downStream.
284 *
285 * @param listener the listener to register
286 */
287 @Override
288 public void addListener(FlowRuleExtRouterListener listener) {
289 routerListener.add(listener);
290 }
291
292 /**
293 * Remove the listener of Router.
294 *
295 * @param listener the listener to remove
296 */
297 @Override
298 public void removeListener(FlowRuleExtRouterListener listener) {
299 routerListener.remove(listener);
300 }
301}