blob: 708faf614b556e82d22a31fb193e028478ce165a [file] [log] [blame]
Hongtao Yin142b7582015-01-21 14:41:30 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowext.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.util.concurrent.Futures;
21import com.google.common.util.concurrent.ListenableFuture;
22import com.google.common.util.concurrent.SettableFuture;
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.device.DeviceService;
33import org.onosproject.net.flow.CompletedBatchOperation;
34import org.onosproject.net.flow.FlowRuleBatchEntry;
35import org.onosproject.net.flow.FlowRuleBatchEvent;
36import org.onosproject.net.flow.FlowRuleBatchRequest;
37import org.onosproject.net.flowext.DefaultFlowRuleExt;
38import org.onosproject.net.flowext.DownStreamFlowEntry;
39import org.onosproject.net.flowext.FlowExtCompletedOperation;
40import org.onosproject.net.flowext.FlowRuleExtRouter;
41import org.onosproject.net.flowext.FlowRuleExtRouterListener;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.flow.ReplicaInfo;
46import org.onosproject.store.flow.ReplicaInfoEventListener;
47import org.onosproject.store.flow.ReplicaInfoService;
48import org.onosproject.store.serializers.DecodeTo;
49import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
51import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
52import org.slf4j.Logger;
53
54import java.io.IOException;
55import java.util.Collection;
56import java.util.Collections;
57import java.util.HashSet;
58import java.util.Iterator;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61import java.util.concurrent.Executors;
62import java.util.concurrent.Future;
63import java.util.concurrent.TimeUnit;
64
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080065import static org.onlab.util.Tools.groupedThreads;
Hongtao Yin142b7582015-01-21 14:41:30 -080066import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
67import static org.slf4j.LoggerFactory.getLogger;
68
69/**
70 * Experimental extension to the flow rule subsystem; still under development.
71 * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
72 * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
73 */
Brian O'Connor916de342015-02-17 18:07:32 -080074@Component(immediate = true, enabled = false)
Hongtao Yin142b7582015-01-21 14:41:30 -080075@Service
76public class DefaultFlowRuleExtRouter
77 implements FlowRuleExtRouter {
78
79 private final Logger log = getLogger(getClass());
80
Madan Jampani2af244a2015-02-22 13:12:01 -080081 // TODO: Make configurable.
82 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
83
Hongtao Yin142b7582015-01-21 14:41:30 -080084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ReplicaInfoService replicaInfoManager;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterCommunicationService clusterCommunicator;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected ClusterService clusterService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected DeviceService deviceService;
95
96 private int pendingFutureTimeoutMinutes = 5;
97
98 protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
99 private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
100 .newBuilder()
101 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
102 // .removalListener(new TimeoutFuture())
103 .build();
104
105 private final ExecutorService futureListeners = Executors
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800106 .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
Hongtao Yin142b7582015-01-21 14:41:30 -0800107
Madan Jampani2af244a2015-02-22 13:12:01 -0800108 private ExecutorService messageHandlingExecutor;
109
Hongtao Yin142b7582015-01-21 14:41:30 -0800110 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
111 @Override
112 protected void setupKryoPool() {
113 serializerPool = KryoNamespace.newBuilder()
114 .register(DistributedStoreSerializers.STORE_COMMON)
115 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
116 .register(FlowExtCompletedOperation.class)
117 .register(FlowRuleBatchRequest.class)
118 .register(DownStreamFlowEntry.class)
119 .register(DefaultFlowRuleExt.class)
120 .build();
121 }
122 };
123
124 private ReplicaInfoEventListener replicaInfoEventListener;
125
126 @Activate
127 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800128
129 messageHandlingExecutor = Executors.newFixedThreadPool(
130 MESSAGE_HANDLER_THREAD_POOL_SIZE,
131 groupedThreads("onos/flow", "message-handlers"));
132
Hongtao Yin142b7582015-01-21 14:41:30 -0800133 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
134 new ClusterMessageHandler() {
135
136 @Override
137 public void handle(ClusterMessage message) {
138 // decode the extended flow entry and store them in memory.
139 FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
140 log.info("received batch request {}", operation);
141 final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
142 f.addListener(new Runnable() {
143 @Override
144 public void run() {
145 FlowExtCompletedOperation result = Futures.getUnchecked(f);
146 try {
147 message.respond(SERIALIZER.encode(result));
148 } catch (IOException e) {
149 log.error("Failed to respond back", e);
150 }
151 }
152 }, futureListeners);
153 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800154 }, messageHandlingExecutor);
Hongtao Yin142b7582015-01-21 14:41:30 -0800155
156 replicaInfoManager.addListener(replicaInfoEventListener);
157
158 log.info("Started");
159 }
160
161 @Deactivate
162 public void deactivate() {
163 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
Madan Jampani2af244a2015-02-22 13:12:01 -0800164 messageHandlingExecutor.shutdown();
Hongtao Yin142b7582015-01-21 14:41:30 -0800165 replicaInfoManager.removeListener(replicaInfoEventListener);
166 log.info("Stopped");
167 }
168
169 /**
170 * apply the sub batch of flow extension rules.
171 *
172 * @param batchOperation batch of flow rules.
173 * A batch can contain flow rules for a single device only.
174 * @return Future response indicating success/failure of the batch operation
175 * all the way down to the device.
176 */
177 @Override
178 public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
179 // TODO Auto-generated method stub
180 if (batchOperation.ops().isEmpty()) {
181 return Futures.immediateFuture(new FlowExtCompletedOperation(
182 batchOperation.batchId(), true, Collections.emptySet()));
183 }
184 // get the deviceId all the collection belongs to
185 DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
186
187 if (deviceId == null) {
188 log.error("This Batch exists more than two deviceId");
189 return null;
190 }
191 ReplicaInfo replicaInfo = replicaInfoManager
192 .getReplicaInfoFor(deviceId);
193
194 if (replicaInfo.master().get()
195 .equals(clusterService.getLocalNode().id())) {
196 return applyBatchInternal(batchOperation);
197 }
198
199 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
200 replicaInfo.master().orNull(), deviceId);
201
202 ClusterMessage message = new ClusterMessage(clusterService
203 .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
204
205 try {
206 ListenableFuture<byte[]> responseFuture = clusterCommunicator
207 .sendAndReceive(message, replicaInfo.master().get());
208 // here should add another decode process
209 return Futures.transform(responseFuture,
210 new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
211 } catch (IOException e) {
212 return Futures.immediateFailedFuture(e);
213 }
214 }
215
216 /**
217 * apply the batch in local node.
218 * It means this instance is master of the device the flow entry belongs to.
219 *
220 * @param batchOperation a collection of flow entry, all they should send down to one device
221 * @return Future response indicating success/failure of the batch operation
222 * all the way down to the device.
223 */
224 private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
225 SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
226 pendingExtendFutures.put(batchOperation.batchId(), r);
227 // here should notify manager to complete
228 notify(batchOperation);
229 return r;
230 }
231
232 /**
233 * Get the deviceId of this batch.
234 * The whole Batch should belong to one deviceId.
235 *
236 * @param batchOperation a collection of flow entry, all they should send down to one device
237 * @return the deviceId the whole batch belongs to
238 */
239 private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
240 Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
241 FlowRuleBatchEntry headOp = head.next();
242 boolean sameId = true;
243 for (FlowRuleBatchEntry operation : batchOperation) {
244 if (operation.target().deviceId() != headOp.target().deviceId()) {
245 log.warn("this batch does not apply on one device Id ");
246 sameId = false;
247 break;
248 }
249 }
250 return sameId ? headOp.target().deviceId() : null;
251 }
252
253 /**
254 * Notify the listener of Router to do some reaction.
255 *
256 * @param request the requested operation to do
257 */
258 public void notify(FlowRuleBatchRequest request) {
259 for (FlowRuleExtRouterListener listener : routerListener) {
260 listener.notify(FlowRuleBatchEvent
261 // TODO fill in the deviceId
262 .requested(request, null));
263 }
264 }
265
266 /**
267 * Invoked on the completion of a storeBatch operation.
268 *
269 * @param event flow rule batch event
270 */
271 @Override
272 public void batchOperationComplete(FlowRuleBatchEvent event) {
273 // TODO Auto-generated method stub
274 final Long batchId = event.subject().batchId();
275 SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
276 .getIfPresent(batchId);
277 if (future != null) {
278 FlowRuleBatchRequest request = event.subject();
279 CompletedBatchOperation result = event.result();
280 FlowExtCompletedOperation completed =
281 new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
282 future.set(completed);
283 pendingExtendFutures.invalidate(batchId);
284 }
285 }
286
287 /**
288 * Register the listener to monitor Router,
289 * The Router find master to send downStream.
290 *
291 * @param listener the listener to register
292 */
293 @Override
294 public void addListener(FlowRuleExtRouterListener listener) {
295 routerListener.add(listener);
296 }
297
298 /**
299 * Remove the listener of Router.
300 *
301 * @param listener the listener to remove
302 */
303 @Override
304 public void removeListener(FlowRuleExtRouterListener listener) {
305 routerListener.remove(listener);
306 }
307}