blob: 033e0b8b9b244bc01333d029259c27436533151f [file] [log] [blame]
Hongtao Yin142b7582015-01-21 14:41:30 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flowext.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.util.concurrent.Futures;
21import com.google.common.util.concurrent.ListenableFuture;
22import com.google.common.util.concurrent.SettableFuture;
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.device.DeviceService;
33import org.onosproject.net.flow.CompletedBatchOperation;
34import org.onosproject.net.flow.FlowRuleBatchEntry;
35import org.onosproject.net.flow.FlowRuleBatchEvent;
36import org.onosproject.net.flow.FlowRuleBatchRequest;
37import org.onosproject.net.flowext.DefaultFlowRuleExt;
38import org.onosproject.net.flowext.DownStreamFlowEntry;
39import org.onosproject.net.flowext.FlowExtCompletedOperation;
40import org.onosproject.net.flowext.FlowRuleExtRouter;
41import org.onosproject.net.flowext.FlowRuleExtRouterListener;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.ClusterMessage;
44import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45import org.onosproject.store.flow.ReplicaInfo;
46import org.onosproject.store.flow.ReplicaInfoEventListener;
47import org.onosproject.store.flow.ReplicaInfoService;
48import org.onosproject.store.serializers.DecodeTo;
49import org.onosproject.store.serializers.KryoSerializer;
50import org.onosproject.store.serializers.StoreSerializer;
51import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
52import org.slf4j.Logger;
53
54import java.io.IOException;
55import java.util.Collection;
56import java.util.Collections;
57import java.util.HashSet;
58import java.util.Iterator;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61import java.util.concurrent.Executors;
62import java.util.concurrent.Future;
63import java.util.concurrent.TimeUnit;
64
65import static org.onlab.util.Tools.namedThreads;
66import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
67import static org.slf4j.LoggerFactory.getLogger;
68
69/**
70 * Experimental extension to the flow rule subsystem; still under development.
71 * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
72 * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
73 */
alshabiba66a0562015-02-17 15:50:54 -080074@Component(immediate = false)
Hongtao Yin142b7582015-01-21 14:41:30 -080075@Service
76public class DefaultFlowRuleExtRouter
77 implements FlowRuleExtRouter {
78
79 private final Logger log = getLogger(getClass());
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ReplicaInfoService replicaInfoManager;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterCommunicationService clusterCommunicator;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterService clusterService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected DeviceService deviceService;
92
93 private int pendingFutureTimeoutMinutes = 5;
94
95 protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
96 private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
97 .newBuilder()
98 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
99 // .removalListener(new TimeoutFuture())
100 .build();
101
102 private final ExecutorService futureListeners = Executors
103 .newCachedThreadPool(namedThreads("flowstore-peer-responders"));
104
105 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
106 @Override
107 protected void setupKryoPool() {
108 serializerPool = KryoNamespace.newBuilder()
109 .register(DistributedStoreSerializers.STORE_COMMON)
110 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
111 .register(FlowExtCompletedOperation.class)
112 .register(FlowRuleBatchRequest.class)
113 .register(DownStreamFlowEntry.class)
114 .register(DefaultFlowRuleExt.class)
115 .build();
116 }
117 };
118
119 private ReplicaInfoEventListener replicaInfoEventListener;
120
121 @Activate
122 public void activate() {
123 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
124 new ClusterMessageHandler() {
125
126 @Override
127 public void handle(ClusterMessage message) {
128 // decode the extended flow entry and store them in memory.
129 FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
130 log.info("received batch request {}", operation);
131 final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
132 f.addListener(new Runnable() {
133 @Override
134 public void run() {
135 FlowExtCompletedOperation result = Futures.getUnchecked(f);
136 try {
137 message.respond(SERIALIZER.encode(result));
138 } catch (IOException e) {
139 log.error("Failed to respond back", e);
140 }
141 }
142 }, futureListeners);
143 }
144 });
145
146 replicaInfoManager.addListener(replicaInfoEventListener);
147
148 log.info("Started");
149 }
150
151 @Deactivate
152 public void deactivate() {
153 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
154 replicaInfoManager.removeListener(replicaInfoEventListener);
155 log.info("Stopped");
156 }
157
158 /**
159 * apply the sub batch of flow extension rules.
160 *
161 * @param batchOperation batch of flow rules.
162 * A batch can contain flow rules for a single device only.
163 * @return Future response indicating success/failure of the batch operation
164 * all the way down to the device.
165 */
166 @Override
167 public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
168 // TODO Auto-generated method stub
169 if (batchOperation.ops().isEmpty()) {
170 return Futures.immediateFuture(new FlowExtCompletedOperation(
171 batchOperation.batchId(), true, Collections.emptySet()));
172 }
173 // get the deviceId all the collection belongs to
174 DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
175
176 if (deviceId == null) {
177 log.error("This Batch exists more than two deviceId");
178 return null;
179 }
180 ReplicaInfo replicaInfo = replicaInfoManager
181 .getReplicaInfoFor(deviceId);
182
183 if (replicaInfo.master().get()
184 .equals(clusterService.getLocalNode().id())) {
185 return applyBatchInternal(batchOperation);
186 }
187
188 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
189 replicaInfo.master().orNull(), deviceId);
190
191 ClusterMessage message = new ClusterMessage(clusterService
192 .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
193
194 try {
195 ListenableFuture<byte[]> responseFuture = clusterCommunicator
196 .sendAndReceive(message, replicaInfo.master().get());
197 // here should add another decode process
198 return Futures.transform(responseFuture,
199 new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
200 } catch (IOException e) {
201 return Futures.immediateFailedFuture(e);
202 }
203 }
204
205 /**
206 * apply the batch in local node.
207 * It means this instance is master of the device the flow entry belongs to.
208 *
209 * @param batchOperation a collection of flow entry, all they should send down to one device
210 * @return Future response indicating success/failure of the batch operation
211 * all the way down to the device.
212 */
213 private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
214 SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
215 pendingExtendFutures.put(batchOperation.batchId(), r);
216 // here should notify manager to complete
217 notify(batchOperation);
218 return r;
219 }
220
221 /**
222 * Get the deviceId of this batch.
223 * The whole Batch should belong to one deviceId.
224 *
225 * @param batchOperation a collection of flow entry, all they should send down to one device
226 * @return the deviceId the whole batch belongs to
227 */
228 private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
229 Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
230 FlowRuleBatchEntry headOp = head.next();
231 boolean sameId = true;
232 for (FlowRuleBatchEntry operation : batchOperation) {
233 if (operation.target().deviceId() != headOp.target().deviceId()) {
234 log.warn("this batch does not apply on one device Id ");
235 sameId = false;
236 break;
237 }
238 }
239 return sameId ? headOp.target().deviceId() : null;
240 }
241
242 /**
243 * Notify the listener of Router to do some reaction.
244 *
245 * @param request the requested operation to do
246 */
247 public void notify(FlowRuleBatchRequest request) {
248 for (FlowRuleExtRouterListener listener : routerListener) {
249 listener.notify(FlowRuleBatchEvent
250 // TODO fill in the deviceId
251 .requested(request, null));
252 }
253 }
254
255 /**
256 * Invoked on the completion of a storeBatch operation.
257 *
258 * @param event flow rule batch event
259 */
260 @Override
261 public void batchOperationComplete(FlowRuleBatchEvent event) {
262 // TODO Auto-generated method stub
263 final Long batchId = event.subject().batchId();
264 SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
265 .getIfPresent(batchId);
266 if (future != null) {
267 FlowRuleBatchRequest request = event.subject();
268 CompletedBatchOperation result = event.result();
269 FlowExtCompletedOperation completed =
270 new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
271 future.set(completed);
272 pendingExtendFutures.invalidate(batchId);
273 }
274 }
275
276 /**
277 * Register the listener to monitor Router,
278 * The Router find master to send downStream.
279 *
280 * @param listener the listener to register
281 */
282 @Override
283 public void addListener(FlowRuleExtRouterListener listener) {
284 routerListener.add(listener);
285 }
286
287 /**
288 * Remove the listener of Router.
289 *
290 * @param listener the listener to remove
291 */
292 @Override
293 public void removeListener(FlowRuleExtRouterListener listener) {
294 routerListener.remove(listener);
295 }
296}