blob: b7d69b28c9abcd4fcd3f7a6a6603fb51d79cf556 [file] [log] [blame]
Hongtao Yin142b7582015-01-21 14:41:30 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowext.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.util.concurrent.Futures;
21import com.google.common.util.concurrent.ListenableFuture;
22import com.google.common.util.concurrent.SettableFuture;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070023
Hongtao Yin142b7582015-01-21 14:41:30 -080024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.onlab.util.KryoNamespace;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.CompletedBatchOperation;
35import org.onosproject.net.flow.FlowRuleBatchEntry;
36import org.onosproject.net.flow.FlowRuleBatchEvent;
37import org.onosproject.net.flow.FlowRuleBatchRequest;
38import org.onosproject.net.flowext.DefaultFlowRuleExt;
39import org.onosproject.net.flowext.DownStreamFlowEntry;
40import org.onosproject.net.flowext.FlowExtCompletedOperation;
41import org.onosproject.net.flowext.FlowRuleExtRouter;
42import org.onosproject.net.flowext.FlowRuleExtRouterListener;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
44import org.onosproject.store.cluster.messaging.ClusterMessage;
45import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
46import org.onosproject.store.flow.ReplicaInfo;
47import org.onosproject.store.flow.ReplicaInfoEventListener;
48import org.onosproject.store.flow.ReplicaInfoService;
Hongtao Yin142b7582015-01-21 14:41:30 -080049import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
51import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
52import org.slf4j.Logger;
53
Hongtao Yin142b7582015-01-21 14:41:30 -080054import java.util.Collection;
55import java.util.Collections;
56import java.util.HashSet;
57import java.util.Iterator;
58import java.util.Set;
59import java.util.concurrent.ExecutorService;
60import java.util.concurrent.Executors;
61import java.util.concurrent.Future;
62import java.util.concurrent.TimeUnit;
63
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080064import static org.onlab.util.Tools.groupedThreads;
Hongtao Yin142b7582015-01-21 14:41:30 -080065import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
66import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Experimental extension to the flow rule subsystem; still under development.
70 * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
71 * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
72 */
Brian O'Connor916de342015-02-17 18:07:32 -080073@Component(immediate = true, enabled = false)
Hongtao Yin142b7582015-01-21 14:41:30 -080074@Service
75public class DefaultFlowRuleExtRouter
76 implements FlowRuleExtRouter {
77
78 private final Logger log = getLogger(getClass());
79
Madan Jampani2af244a2015-02-22 13:12:01 -080080 // TODO: Make configurable.
81 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
82
Hongtao Yin142b7582015-01-21 14:41:30 -080083 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ReplicaInfoService replicaInfoManager;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterCommunicationService clusterCommunicator;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected DeviceService deviceService;
94
95 private int pendingFutureTimeoutMinutes = 5;
96
97 protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
98 private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
99 .newBuilder()
100 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
101 // .removalListener(new TimeoutFuture())
102 .build();
103
104 private final ExecutorService futureListeners = Executors
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800105 .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
Hongtao Yin142b7582015-01-21 14:41:30 -0800106
Madan Jampani2af244a2015-02-22 13:12:01 -0800107 private ExecutorService messageHandlingExecutor;
108
Hongtao Yin142b7582015-01-21 14:41:30 -0800109 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
110 @Override
111 protected void setupKryoPool() {
112 serializerPool = KryoNamespace.newBuilder()
113 .register(DistributedStoreSerializers.STORE_COMMON)
114 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
115 .register(FlowExtCompletedOperation.class)
116 .register(FlowRuleBatchRequest.class)
117 .register(DownStreamFlowEntry.class)
118 .register(DefaultFlowRuleExt.class)
119 .build();
120 }
121 };
122
123 private ReplicaInfoEventListener replicaInfoEventListener;
124
125 @Activate
126 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800127
128 messageHandlingExecutor = Executors.newFixedThreadPool(
129 MESSAGE_HANDLER_THREAD_POOL_SIZE,
130 groupedThreads("onos/flow", "message-handlers"));
131
Hongtao Yin142b7582015-01-21 14:41:30 -0800132 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
133 new ClusterMessageHandler() {
134
135 @Override
136 public void handle(ClusterMessage message) {
137 // decode the extended flow entry and store them in memory.
138 FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
139 log.info("received batch request {}", operation);
140 final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
141 f.addListener(new Runnable() {
142 @Override
143 public void run() {
144 FlowExtCompletedOperation result = Futures.getUnchecked(f);
Madan Jampanic26eede2015-04-16 11:42:16 -0700145 message.respond(SERIALIZER.encode(result));
Hongtao Yin142b7582015-01-21 14:41:30 -0800146 }
147 }, futureListeners);
148 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800149 }, messageHandlingExecutor);
Hongtao Yin142b7582015-01-21 14:41:30 -0800150
151 replicaInfoManager.addListener(replicaInfoEventListener);
152
153 log.info("Started");
154 }
155
156 @Deactivate
157 public void deactivate() {
158 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
Madan Jampani2af244a2015-02-22 13:12:01 -0800159 messageHandlingExecutor.shutdown();
Hongtao Yin142b7582015-01-21 14:41:30 -0800160 replicaInfoManager.removeListener(replicaInfoEventListener);
161 log.info("Stopped");
162 }
163
164 /**
165 * apply the sub batch of flow extension rules.
166 *
167 * @param batchOperation batch of flow rules.
168 * A batch can contain flow rules for a single device only.
169 * @return Future response indicating success/failure of the batch operation
170 * all the way down to the device.
171 */
172 @Override
173 public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
174 // TODO Auto-generated method stub
175 if (batchOperation.ops().isEmpty()) {
176 return Futures.immediateFuture(new FlowExtCompletedOperation(
177 batchOperation.batchId(), true, Collections.emptySet()));
178 }
179 // get the deviceId all the collection belongs to
180 DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
181
182 if (deviceId == null) {
183 log.error("This Batch exists more than two deviceId");
184 return null;
185 }
186 ReplicaInfo replicaInfo = replicaInfoManager
187 .getReplicaInfoFor(deviceId);
188
189 if (replicaInfo.master().get()
190 .equals(clusterService.getLocalNode().id())) {
191 return applyBatchInternal(batchOperation);
192 }
193
194 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
195 replicaInfo.master().orNull(), deviceId);
196
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700197 return clusterCommunicator.sendAndReceive(
198 batchOperation,
199 APPLY_EXTEND_FLOWS,
200 SERIALIZER::encode,
201 SERIALIZER::decode,
202 replicaInfo.master().get());
Hongtao Yin142b7582015-01-21 14:41:30 -0800203 }
204
205 /**
206 * apply the batch in local node.
207 * It means this instance is master of the device the flow entry belongs to.
208 *
209 * @param batchOperation a collection of flow entry, all they should send down to one device
210 * @return Future response indicating success/failure of the batch operation
211 * all the way down to the device.
212 */
213 private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
214 SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
215 pendingExtendFutures.put(batchOperation.batchId(), r);
216 // here should notify manager to complete
217 notify(batchOperation);
218 return r;
219 }
220
221 /**
222 * Get the deviceId of this batch.
223 * The whole Batch should belong to one deviceId.
224 *
225 * @param batchOperation a collection of flow entry, all they should send down to one device
226 * @return the deviceId the whole batch belongs to
227 */
228 private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
229 Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
230 FlowRuleBatchEntry headOp = head.next();
231 boolean sameId = true;
232 for (FlowRuleBatchEntry operation : batchOperation) {
233 if (operation.target().deviceId() != headOp.target().deviceId()) {
234 log.warn("this batch does not apply on one device Id ");
235 sameId = false;
236 break;
237 }
238 }
239 return sameId ? headOp.target().deviceId() : null;
240 }
241
242 /**
243 * Notify the listener of Router to do some reaction.
244 *
245 * @param request the requested operation to do
246 */
247 public void notify(FlowRuleBatchRequest request) {
248 for (FlowRuleExtRouterListener listener : routerListener) {
249 listener.notify(FlowRuleBatchEvent
250 // TODO fill in the deviceId
251 .requested(request, null));
252 }
253 }
254
255 /**
256 * Invoked on the completion of a storeBatch operation.
257 *
258 * @param event flow rule batch event
259 */
260 @Override
261 public void batchOperationComplete(FlowRuleBatchEvent event) {
262 // TODO Auto-generated method stub
263 final Long batchId = event.subject().batchId();
264 SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
265 .getIfPresent(batchId);
266 if (future != null) {
267 FlowRuleBatchRequest request = event.subject();
268 CompletedBatchOperation result = event.result();
269 FlowExtCompletedOperation completed =
270 new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
271 future.set(completed);
272 pendingExtendFutures.invalidate(batchId);
273 }
274 }
275
276 /**
277 * Register the listener to monitor Router,
278 * The Router find master to send downStream.
279 *
280 * @param listener the listener to register
281 */
282 @Override
283 public void addListener(FlowRuleExtRouterListener listener) {
284 routerListener.add(listener);
285 }
286
287 /**
288 * Remove the listener of Router.
289 *
290 * @param listener the listener to remove
291 */
292 @Override
293 public void removeListener(FlowRuleExtRouterListener listener) {
294 routerListener.remove(listener);
295 }
296}