Initial implementation: The init extended flow rule and store interface APIs
The APIs are for supporting service data to install on network devices.
This is related to JIRA ticket ID ONOS-869.
Updated API code and added implementation code files.
Modified API for supporting payload abstruction, and added routing mechanism for pushing flow rules to devices.
Added more javadoc, and fixed some minor issues.
Updated javadoc, removed unnecessary method, and test code.
Change-Id: I105defc92a9e01b30601fcb56a9dafa086d4adc0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
new file mode 100644
index 0000000..4ae14dd
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flowext.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
+import org.onosproject.net.flowext.DefaultFlowRuleExt;
+import org.onosproject.net.flowext.DownStreamFlowEntry;
+import org.onosproject.net.flowext.FlowExtCompletedOperation;
+import org.onosproject.net.flowext.FlowRuleExtRouter;
+import org.onosproject.net.flowext.FlowRuleExtRouterListener;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
+import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.serializers.DecodeTo;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Experimental extension to the flow rule subsystem; still under development.
+ * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
+ * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
+ */
+@Component(immediate = true)
+@Service
+public class DefaultFlowRuleExtRouter
+ implements FlowRuleExtRouter {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ReplicaInfoService replicaInfoManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private int pendingFutureTimeoutMinutes = 5;
+
+ protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
+ private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
+ .newBuilder()
+ .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
+ // .removalListener(new TimeoutFuture())
+ .build();
+
+ private final ExecutorService futureListeners = Executors
+ .newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+
+ protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .register(FlowExtCompletedOperation.class)
+ .register(FlowRuleBatchRequest.class)
+ .register(DownStreamFlowEntry.class)
+ .register(DefaultFlowRuleExt.class)
+ .build();
+ }
+ };
+
+ private ReplicaInfoEventListener replicaInfoEventListener;
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ // decode the extended flow entry and store them in memory.
+ FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
+ log.info("received batch request {}", operation);
+ final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
+ f.addListener(new Runnable() {
+ @Override
+ public void run() {
+ FlowExtCompletedOperation result = Futures.getUnchecked(f);
+ try {
+ message.respond(SERIALIZER.encode(result));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, futureListeners);
+ }
+ });
+
+ replicaInfoManager.addListener(replicaInfoEventListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
+ replicaInfoManager.removeListener(replicaInfoEventListener);
+ log.info("Stopped");
+ }
+
+ /**
+ * apply the sub batch of flow extension rules.
+ *
+ * @param batchOperation batch of flow rules.
+ * A batch can contain flow rules for a single device only.
+ * @return Future response indicating success/failure of the batch operation
+ * all the way down to the device.
+ */
+ @Override
+ public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
+ // TODO Auto-generated method stub
+ if (batchOperation.ops().isEmpty()) {
+ return Futures.immediateFuture(new FlowExtCompletedOperation(
+ batchOperation.batchId(), true, Collections.emptySet()));
+ }
+ // get the deviceId all the collection belongs to
+ DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
+
+ if (deviceId == null) {
+ log.error("This Batch exists more than two deviceId");
+ return null;
+ }
+ ReplicaInfo replicaInfo = replicaInfoManager
+ .getReplicaInfoFor(deviceId);
+
+ if (replicaInfo.master().get()
+ .equals(clusterService.getLocalNode().id())) {
+ return applyBatchInternal(batchOperation);
+ }
+
+ log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), deviceId);
+
+ ClusterMessage message = new ClusterMessage(clusterService
+ .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
+
+ try {
+ ListenableFuture<byte[]> responseFuture = clusterCommunicator
+ .sendAndReceive(message, replicaInfo.master().get());
+ // here should add another decode process
+ return Futures.transform(responseFuture,
+ new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ /**
+ * apply the batch in local node.
+ * It means this instance is master of the device the flow entry belongs to.
+ *
+ * @param batchOperation a collection of flow entry, all they should send down to one device
+ * @return Future response indicating success/failure of the batch operation
+ * all the way down to the device.
+ */
+ private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
+ SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
+ pendingExtendFutures.put(batchOperation.batchId(), r);
+ // here should notify manager to complete
+ notify(batchOperation);
+ return r;
+ }
+
+ /**
+ * Get the deviceId of this batch.
+ * The whole Batch should belong to one deviceId.
+ *
+ * @param batchOperation a collection of flow entry, all they should send down to one device
+ * @return the deviceId the whole batch belongs to
+ */
+ private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
+ Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
+ FlowRuleBatchEntry headOp = head.next();
+ boolean sameId = true;
+ for (FlowRuleBatchEntry operation : batchOperation) {
+ if (operation.target().deviceId() != headOp.target().deviceId()) {
+ log.warn("this batch does not apply on one device Id ");
+ sameId = false;
+ break;
+ }
+ }
+ return sameId ? headOp.target().deviceId() : null;
+ }
+
+ /**
+ * Notify the listener of Router to do some reaction.
+ *
+ * @param request the requested operation to do
+ */
+ public void notify(FlowRuleBatchRequest request) {
+ for (FlowRuleExtRouterListener listener : routerListener) {
+ listener.notify(FlowRuleBatchEvent
+ // TODO fill in the deviceId
+ .requested(request, null));
+ }
+ }
+
+ /**
+ * Invoked on the completion of a storeBatch operation.
+ *
+ * @param event flow rule batch event
+ */
+ @Override
+ public void batchOperationComplete(FlowRuleBatchEvent event) {
+ // TODO Auto-generated method stub
+ final Long batchId = event.subject().batchId();
+ SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
+ .getIfPresent(batchId);
+ if (future != null) {
+ FlowRuleBatchRequest request = event.subject();
+ CompletedBatchOperation result = event.result();
+ FlowExtCompletedOperation completed =
+ new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
+ future.set(completed);
+ pendingExtendFutures.invalidate(batchId);
+ }
+ }
+
+ /**
+ * Register the listener to monitor Router,
+ * The Router find master to send downStream.
+ *
+ * @param listener the listener to register
+ */
+ @Override
+ public void addListener(FlowRuleExtRouterListener listener) {
+ routerListener.add(listener);
+ }
+
+ /**
+ * Remove the listener of Router.
+ *
+ * @param listener the listener to remove
+ */
+ @Override
+ public void removeListener(FlowRuleExtRouterListener listener) {
+ routerListener.remove(listener);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/FlowExtRouterMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/FlowExtRouterMessageSubjects.java
new file mode 100644
index 0000000..05cb9d7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/FlowExtRouterMessageSubjects.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flowext.impl;
+
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+/**
+ * Experimental extension to the flow rule subsystem; still under development.
+ * MessageSubjects used by DefaultFlowRuleExtRouter peer-peer communication.
+ */
+public final class FlowExtRouterMessageSubjects {
+ private FlowExtRouterMessageSubjects() {
+ }
+
+ /**
+ * The subject of routing extended flow to specified device.
+ */
+ public static final MessageSubject APPLY_EXTEND_FLOWS
+ = new MessageSubject("peer-forward-apply-batch-extension");
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/package-info.java
new file mode 100644
index 0000000..23a28f7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Experimental extension to the flow rule subsystem; still under development.
+ * <p>
+ * Implementation of the distributed flow extension rule router using p2p synchronization
+ * protocol. The Router is the core component of routing flow rules to specified device.
+ * This package is still experimental at this point in time.
+ * </p>
+ */
+package org.onosproject.store.flowext.impl;