Hongtao Yin | 142b758 | 2015-01-21 14:41:30 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2015 Open Networking Laboratory |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.flowext.impl; |
| 17 | |
| 18 | import com.google.common.cache.Cache; |
| 19 | import com.google.common.cache.CacheBuilder; |
| 20 | import com.google.common.util.concurrent.Futures; |
| 21 | import com.google.common.util.concurrent.ListenableFuture; |
| 22 | import com.google.common.util.concurrent.SettableFuture; |
| 23 | import org.apache.felix.scr.annotations.Activate; |
| 24 | import org.apache.felix.scr.annotations.Component; |
| 25 | import org.apache.felix.scr.annotations.Deactivate; |
| 26 | import org.apache.felix.scr.annotations.Reference; |
| 27 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 28 | import org.apache.felix.scr.annotations.Service; |
| 29 | import org.onlab.util.KryoNamespace; |
| 30 | import org.onosproject.cluster.ClusterService; |
| 31 | import org.onosproject.net.DeviceId; |
| 32 | import org.onosproject.net.device.DeviceService; |
| 33 | import org.onosproject.net.flow.CompletedBatchOperation; |
| 34 | import org.onosproject.net.flow.FlowRuleBatchEntry; |
| 35 | import org.onosproject.net.flow.FlowRuleBatchEvent; |
| 36 | import org.onosproject.net.flow.FlowRuleBatchRequest; |
| 37 | import org.onosproject.net.flowext.DefaultFlowRuleExt; |
| 38 | import org.onosproject.net.flowext.DownStreamFlowEntry; |
| 39 | import org.onosproject.net.flowext.FlowExtCompletedOperation; |
| 40 | import org.onosproject.net.flowext.FlowRuleExtRouter; |
| 41 | import org.onosproject.net.flowext.FlowRuleExtRouterListener; |
| 42 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
| 43 | import org.onosproject.store.cluster.messaging.ClusterMessage; |
| 44 | import org.onosproject.store.cluster.messaging.ClusterMessageHandler; |
| 45 | import org.onosproject.store.flow.ReplicaInfo; |
| 46 | import org.onosproject.store.flow.ReplicaInfoEventListener; |
| 47 | import org.onosproject.store.flow.ReplicaInfoService; |
| 48 | import org.onosproject.store.serializers.DecodeTo; |
| 49 | import org.onosproject.store.serializers.KryoSerializer; |
| 50 | import org.onosproject.store.serializers.StoreSerializer; |
| 51 | import org.onosproject.store.serializers.impl.DistributedStoreSerializers; |
| 52 | import org.slf4j.Logger; |
| 53 | |
| 54 | import java.io.IOException; |
| 55 | import java.util.Collection; |
| 56 | import java.util.Collections; |
| 57 | import java.util.HashSet; |
| 58 | import java.util.Iterator; |
| 59 | import java.util.Set; |
| 60 | import java.util.concurrent.ExecutorService; |
| 61 | import java.util.concurrent.Executors; |
| 62 | import java.util.concurrent.Future; |
| 63 | import java.util.concurrent.TimeUnit; |
| 64 | |
| 65 | import static org.onlab.util.Tools.namedThreads; |
| 66 | import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS; |
| 67 | import static org.slf4j.LoggerFactory.getLogger; |
| 68 | |
| 69 | /** |
| 70 | * Experimental extension to the flow rule subsystem; still under development. |
| 71 | * Implement a simple routing-like mechanism to directly send service data to its master and push to device. |
| 72 | * This Router does not save any flow rule extension data in cache, it focus on routing mechanism. |
| 73 | */ |
alshabib | a66a056 | 2015-02-17 15:50:54 -0800 | [diff] [blame^] | 74 | @Component(immediate = false) |
Hongtao Yin | 142b758 | 2015-01-21 14:41:30 -0800 | [diff] [blame] | 75 | @Service |
| 76 | public class DefaultFlowRuleExtRouter |
| 77 | implements FlowRuleExtRouter { |
| 78 | |
| 79 | private final Logger log = getLogger(getClass()); |
| 80 | |
| 81 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 82 | protected ReplicaInfoService replicaInfoManager; |
| 83 | |
| 84 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 85 | protected ClusterCommunicationService clusterCommunicator; |
| 86 | |
| 87 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 88 | protected ClusterService clusterService; |
| 89 | |
| 90 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 91 | protected DeviceService deviceService; |
| 92 | |
| 93 | private int pendingFutureTimeoutMinutes = 5; |
| 94 | |
| 95 | protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>(); |
| 96 | private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder |
| 97 | .newBuilder() |
| 98 | .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES) |
| 99 | // .removalListener(new TimeoutFuture()) |
| 100 | .build(); |
| 101 | |
| 102 | private final ExecutorService futureListeners = Executors |
| 103 | .newCachedThreadPool(namedThreads("flowstore-peer-responders")); |
| 104 | |
| 105 | protected static final StoreSerializer SERIALIZER = new KryoSerializer() { |
| 106 | @Override |
| 107 | protected void setupKryoPool() { |
| 108 | serializerPool = KryoNamespace.newBuilder() |
| 109 | .register(DistributedStoreSerializers.STORE_COMMON) |
| 110 | .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) |
| 111 | .register(FlowExtCompletedOperation.class) |
| 112 | .register(FlowRuleBatchRequest.class) |
| 113 | .register(DownStreamFlowEntry.class) |
| 114 | .register(DefaultFlowRuleExt.class) |
| 115 | .build(); |
| 116 | } |
| 117 | }; |
| 118 | |
| 119 | private ReplicaInfoEventListener replicaInfoEventListener; |
| 120 | |
| 121 | @Activate |
| 122 | public void activate() { |
| 123 | clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS, |
| 124 | new ClusterMessageHandler() { |
| 125 | |
| 126 | @Override |
| 127 | public void handle(ClusterMessage message) { |
| 128 | // decode the extended flow entry and store them in memory. |
| 129 | FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload()); |
| 130 | log.info("received batch request {}", operation); |
| 131 | final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation); |
| 132 | f.addListener(new Runnable() { |
| 133 | @Override |
| 134 | public void run() { |
| 135 | FlowExtCompletedOperation result = Futures.getUnchecked(f); |
| 136 | try { |
| 137 | message.respond(SERIALIZER.encode(result)); |
| 138 | } catch (IOException e) { |
| 139 | log.error("Failed to respond back", e); |
| 140 | } |
| 141 | } |
| 142 | }, futureListeners); |
| 143 | } |
| 144 | }); |
| 145 | |
| 146 | replicaInfoManager.addListener(replicaInfoEventListener); |
| 147 | |
| 148 | log.info("Started"); |
| 149 | } |
| 150 | |
| 151 | @Deactivate |
| 152 | public void deactivate() { |
| 153 | clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS); |
| 154 | replicaInfoManager.removeListener(replicaInfoEventListener); |
| 155 | log.info("Stopped"); |
| 156 | } |
| 157 | |
| 158 | /** |
| 159 | * apply the sub batch of flow extension rules. |
| 160 | * |
| 161 | * @param batchOperation batch of flow rules. |
| 162 | * A batch can contain flow rules for a single device only. |
| 163 | * @return Future response indicating success/failure of the batch operation |
| 164 | * all the way down to the device. |
| 165 | */ |
| 166 | @Override |
| 167 | public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) { |
| 168 | // TODO Auto-generated method stub |
| 169 | if (batchOperation.ops().isEmpty()) { |
| 170 | return Futures.immediateFuture(new FlowExtCompletedOperation( |
| 171 | batchOperation.batchId(), true, Collections.emptySet())); |
| 172 | } |
| 173 | // get the deviceId all the collection belongs to |
| 174 | DeviceId deviceId = getBatchDeviceId(batchOperation.ops()); |
| 175 | |
| 176 | if (deviceId == null) { |
| 177 | log.error("This Batch exists more than two deviceId"); |
| 178 | return null; |
| 179 | } |
| 180 | ReplicaInfo replicaInfo = replicaInfoManager |
| 181 | .getReplicaInfoFor(deviceId); |
| 182 | |
| 183 | if (replicaInfo.master().get() |
| 184 | .equals(clusterService.getLocalNode().id())) { |
| 185 | return applyBatchInternal(batchOperation); |
| 186 | } |
| 187 | |
| 188 | log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", |
| 189 | replicaInfo.master().orNull(), deviceId); |
| 190 | |
| 191 | ClusterMessage message = new ClusterMessage(clusterService |
| 192 | .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation)); |
| 193 | |
| 194 | try { |
| 195 | ListenableFuture<byte[]> responseFuture = clusterCommunicator |
| 196 | .sendAndReceive(message, replicaInfo.master().get()); |
| 197 | // here should add another decode process |
| 198 | return Futures.transform(responseFuture, |
| 199 | new DecodeTo<FlowExtCompletedOperation>(SERIALIZER)); |
| 200 | } catch (IOException e) { |
| 201 | return Futures.immediateFailedFuture(e); |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | /** |
| 206 | * apply the batch in local node. |
| 207 | * It means this instance is master of the device the flow entry belongs to. |
| 208 | * |
| 209 | * @param batchOperation a collection of flow entry, all they should send down to one device |
| 210 | * @return Future response indicating success/failure of the batch operation |
| 211 | * all the way down to the device. |
| 212 | */ |
| 213 | private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) { |
| 214 | SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create(); |
| 215 | pendingExtendFutures.put(batchOperation.batchId(), r); |
| 216 | // here should notify manager to complete |
| 217 | notify(batchOperation); |
| 218 | return r; |
| 219 | } |
| 220 | |
| 221 | /** |
| 222 | * Get the deviceId of this batch. |
| 223 | * The whole Batch should belong to one deviceId. |
| 224 | * |
| 225 | * @param batchOperation a collection of flow entry, all they should send down to one device |
| 226 | * @return the deviceId the whole batch belongs to |
| 227 | */ |
| 228 | private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) { |
| 229 | Iterator<FlowRuleBatchEntry> head = batchOperation.iterator(); |
| 230 | FlowRuleBatchEntry headOp = head.next(); |
| 231 | boolean sameId = true; |
| 232 | for (FlowRuleBatchEntry operation : batchOperation) { |
| 233 | if (operation.target().deviceId() != headOp.target().deviceId()) { |
| 234 | log.warn("this batch does not apply on one device Id "); |
| 235 | sameId = false; |
| 236 | break; |
| 237 | } |
| 238 | } |
| 239 | return sameId ? headOp.target().deviceId() : null; |
| 240 | } |
| 241 | |
| 242 | /** |
| 243 | * Notify the listener of Router to do some reaction. |
| 244 | * |
| 245 | * @param request the requested operation to do |
| 246 | */ |
| 247 | public void notify(FlowRuleBatchRequest request) { |
| 248 | for (FlowRuleExtRouterListener listener : routerListener) { |
| 249 | listener.notify(FlowRuleBatchEvent |
| 250 | // TODO fill in the deviceId |
| 251 | .requested(request, null)); |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | /** |
| 256 | * Invoked on the completion of a storeBatch operation. |
| 257 | * |
| 258 | * @param event flow rule batch event |
| 259 | */ |
| 260 | @Override |
| 261 | public void batchOperationComplete(FlowRuleBatchEvent event) { |
| 262 | // TODO Auto-generated method stub |
| 263 | final Long batchId = event.subject().batchId(); |
| 264 | SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures |
| 265 | .getIfPresent(batchId); |
| 266 | if (future != null) { |
| 267 | FlowRuleBatchRequest request = event.subject(); |
| 268 | CompletedBatchOperation result = event.result(); |
| 269 | FlowExtCompletedOperation completed = |
| 270 | new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems()); |
| 271 | future.set(completed); |
| 272 | pendingExtendFutures.invalidate(batchId); |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | /** |
| 277 | * Register the listener to monitor Router, |
| 278 | * The Router find master to send downStream. |
| 279 | * |
| 280 | * @param listener the listener to register |
| 281 | */ |
| 282 | @Override |
| 283 | public void addListener(FlowRuleExtRouterListener listener) { |
| 284 | routerListener.add(listener); |
| 285 | } |
| 286 | |
| 287 | /** |
| 288 | * Remove the listener of Router. |
| 289 | * |
| 290 | * @param listener the listener to remove |
| 291 | */ |
| 292 | @Override |
| 293 | public void removeListener(FlowRuleExtRouterListener listener) { |
| 294 | routerListener.remove(listener); |
| 295 | } |
| 296 | } |