Add sample forwarding application by using groups
Change-Id: I955342847b6c6ba5693e00952ba0886e2c41d1a9
diff --git a/group-fwd/pom.xml b/group-fwd/pom.xml
new file mode 100644
index 0000000..d7410b1
--- /dev/null
+++ b/group-fwd/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2017 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-dependencies</artifactId>
+ <version>1.8.0</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-app-group-fwd</artifactId>
+ <version>1.9.0</version>
+ <packaging>bundle</packaging>
+
+ <description>Forwarding application using groups.</description>
+
+ <properties>
+ <onos.version>1.8.0</onos.version>
+ <onos.app.name>org.onosproject.groupfwd</onos.app.name>
+ <onos.app.origin>ON.Lab</onos.app.origin>
+ <onos.app.title>Forwarding App (Group)</onos.app.title>
+ <onos.app.category>Traffic Steering</onos.app.category>
+ <onos.app.url>http://onosproject.org</onos.app.url>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/group-fwd/src/main/java/org/onosproject/groupfwd/GroupForwarding.java b/group-fwd/src/main/java/org/onosproject/groupfwd/GroupForwarding.java
new file mode 100644
index 0000000..61375d7
--- /dev/null
+++ b/group-fwd/src/main/java/org/onosproject/groupfwd/GroupForwarding.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2017 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.groupfwd;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.GroupId;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Link;
+import org.onosproject.net.Path;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostListener;
+import org.onosproject.net.host.HostService;
+import org.onosproject.net.topology.PathService;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+public class GroupForwarding {
+
+ private static final int DEAFULT_PRIORITY = 500;
+ private final Logger log = getLogger(getClass());
+
+ protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(Integer.class)
+ .register(DeviceId.class)
+ .build("group-fwd-app");
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ GroupService groupService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ PathService pathService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ FlowRuleService flowRuleService;
+
+ private ApplicationId appId;
+
+ private final SetMultimap<GroupKey, FlowRule> pendingFlows =
+ HashMultimap.create();
+
+ private ScheduledExecutorService groupChecker =
+ Executors.newScheduledThreadPool(1, groupedThreads("group-fwd", "group-fwd-%d", log));
+
+ private Runnable groupCheckerTask = new InternalGroupCheckerTask();
+
+ private final HostListener hostListener = new InternalHostListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+
+ @Activate
+ public void activate() {
+ appId = coreService.registerApplication("org.onosproject.groupfwd");
+ setupGroups();
+ setupFlows();
+ hostService.addListener(hostListener);
+ deviceService.addListener(deviceListener);
+ groupChecker.scheduleAtFixedRate(groupCheckerTask, 0, 5, TimeUnit.SECONDS);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ groupChecker.shutdown();
+ hostService.removeListener(hostListener);
+ deviceService.removeListener(deviceListener);
+ cleanUpFlows();
+ log.info("Stopped");
+ }
+
+ private void cleanUpFlows() {
+ Set<FlowRule> flowRules = Sets.newHashSet(flowRuleService.getFlowRulesById(appId));
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+ Map<DeviceId, Set<Group>> deviceGroups = Maps.newHashMap();
+
+ devices.forEach(device -> {
+ Set<Group> groups = Sets.newHashSet(groupService.getGroups(device.id(), appId));
+ deviceGroups.put(device.id(), groups);
+ });
+
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+
+ FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ cleanUpGroups(devices, deviceGroups);
+ }
+ };
+
+ flowRules.forEach(opsBuilder::remove);
+
+ flowRuleService.apply(opsBuilder.build(ctx));
+ }
+
+ private void cleanUpGroups(Set<Device> devices, Map<DeviceId, Set<Group>> deviceGroups) {
+ devices.forEach(device -> {
+ Set<Group> groups = deviceGroups.get(device.id());
+ groups.forEach(group -> groupService.removeGroup(device.id(), group.appCookie(), appId));
+ });
+ }
+
+ private synchronized void setupGroups() {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+
+ // create groups for each devices in each devices
+ devices.forEach(this::processDeviceGroups);
+
+ // creates groups for each host in each devices
+ devices.forEach(this::processHostGroups);
+
+ }
+
+ private synchronized void setupFlows() {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+
+ devices.forEach(this::processHostFlows);
+ }
+
+ private void processHostFlows(Device device) {
+ Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
+
+ hosts.forEach(host -> {
+ TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ selectorBuilder.matchEthDst(host.mac());
+
+ Integer groupId;
+ GroupKey groupKey;
+ if (host.location().deviceId().equals(device.id())) {
+ // host connect to device, use host group
+ groupId = generateHostGroupId(host);
+
+ } else {
+ // host not in this device, use target device group
+ groupId = generateDeviceGroupId(host.location().deviceId());
+ }
+ groupKey = generateGroupKey(device.id(), groupId);
+
+ treatmentBuilder.deferred();
+ treatmentBuilder.group(new GroupId(groupId));
+
+ FlowRule flowRule = DefaultFlowRule.builder()
+ .fromApp(appId)
+ .makePermanent()
+ .withSelector(selectorBuilder.build())
+ .withTreatment(treatmentBuilder.build())
+ .forDevice(device.id())
+ .withPriority(DEAFULT_PRIORITY)
+ .build();
+
+ addPendingFlow(groupKey, flowRule);
+
+ });
+ }
+
+ private void addPendingFlow(GroupKey groupkey, FlowRule flowRule) {
+ synchronized (pendingFlows) {
+ pendingFlows.put(groupkey, flowRule);
+ }
+ }
+
+ private Set<FlowRule> fetchPendingFlows(GroupKey groupKey) {
+ Set<FlowRule> flowRules;
+
+ synchronized (pendingFlows) {
+ flowRules = pendingFlows.removeAll(groupKey);
+ }
+
+ return flowRules;
+ }
+
+ private void processDeviceGroups(Device device) {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+
+ devices.forEach(targetDevice -> {
+ if (device.equals(targetDevice)) {
+ // no need to process if device is equals to target device
+ return;
+ }
+ Integer targetDeviceGroupId = generateDeviceGroupId(targetDevice.id());
+ GroupKey targetDeviceGroupKey = generateGroupKey(device.id(), targetDeviceGroupId);
+
+ if (!groupExist(device, targetDeviceGroupKey)) {
+ GroupBucket deviceBucket = createBucketForDevice(device, targetDevice);
+
+ if (deviceBucket == null) {
+ // no connection between devices
+ return;
+ }
+ GroupDescription groupDescription = new DefaultGroupDescription(
+ device.id(),
+ GroupDescription.Type.INDIRECT,
+ new GroupBuckets(ImmutableList.of(deviceBucket)),
+ targetDeviceGroupKey,
+ targetDeviceGroupId,
+ appId
+ );
+
+ groupService.addGroup(groupDescription);
+
+ }
+
+ });
+
+ }
+
+ private GroupBucket createBucketForDevice(Device sourceDevice, Device targetDevice) {
+
+ PortNumber outputPort = getOutPortForDeviceLink(sourceDevice, targetDevice);
+
+ if (outputPort == null) {
+ // no path between two devices
+ return null;
+ }
+
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ treatmentBuilder.setOutput(outputPort);
+ return DefaultGroupBucket.createIndirectGroupBucket(treatmentBuilder.build());
+
+ }
+
+ private PortNumber getOutPortForDeviceLink(Device sourceDevice, Device targetDevice) {
+ Set<Path> paths = pathService.getPaths(sourceDevice.id(), targetDevice.id());
+
+ if (paths == null || paths.isEmpty()) {
+ return null;
+ }
+
+ Path path = paths.iterator().next(); // use first path
+
+ if (path.links().isEmpty()) {
+ // XXX: will this happened ?
+ return null;
+ }
+ // first link should contains devive+port -> device+port
+
+ Link link = path.links().get(0);
+ return link.src().port();
+ }
+
+ private Integer generateDeviceGroupId(DeviceId deviceId) {
+ // make sure first bit is 1 for device group id
+ return deviceId.hashCode() | 0x80000000;
+ }
+
+ private void processHostGroups(Device device) {
+ Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
+
+ // generates group id for each host
+ hosts.forEach(host -> {
+
+ if (!hostService.getConnectedHosts(device.id()).contains(host)) {
+ // host not connected to this device, skip
+ return;
+ }
+ Integer hostGroupId = generateHostGroupId(host);
+ GroupKey hostGroupKey = generateGroupKey(device.id(), hostGroupId);
+
+ if (!groupExist(device, hostGroupKey)) {
+ GroupBucket hostBucket = createBucketsForHost(host);
+
+ GroupDescription groupDescription = new DefaultGroupDescription(
+ device.id(),
+ GroupDescription.Type.INDIRECT,
+ new GroupBuckets(ImmutableList.of(hostBucket)),
+ hostGroupKey,
+ hostGroupId,
+ appId
+ );
+ groupService.addGroup(groupDescription);
+ }
+ });
+
+ }
+
+ private GroupBucket createBucketsForHost(Host host) {
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ treatmentBuilder.setOutput(host.location().port());
+ return DefaultGroupBucket.createIndirectGroupBucket(treatmentBuilder.build());
+ }
+
+ private GroupKey generateGroupKey(DeviceId deviceId, Integer groupId) {
+ int hashed = Objects.hash(deviceId, groupId);
+ return new DefaultGroupKey(appKryo.serialize(hashed));
+ }
+
+ private boolean groupExist(Device device, GroupKey groupKey) {
+ return groupService.getGroup(device.id(), groupKey) != null;
+ }
+
+ private Integer generateHostGroupId(Host host) {
+ // first 1 bit should be 0 for host group id
+ return host.mac().hashCode() & 0x7FFFFFFF;
+ }
+
+ class InternalGroupCheckerTask implements Runnable {
+
+ @Override
+ public void run() {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+
+ devices.forEach(device -> {
+ Set<Group> groups = Sets.newHashSet(groupService.getGroups(device.id()));
+ groups.forEach(group -> {
+ Set<FlowRule> flowRulesToInstall = fetchPendingFlows(group.appCookie());
+
+ if (flowRulesToInstall != null && !flowRulesToInstall.isEmpty()) {
+ flowRulesToInstall.forEach(flowRuleService::applyFlowRules);
+ }
+ });
+ });
+ }
+ }
+
+ private class InternalHostListener implements HostListener {
+
+ @Override
+ public void event(HostEvent hostEvent) {
+ switch (hostEvent.type()) {
+ case HOST_ADDED:
+ case HOST_UPDATED:
+ setupGroups();
+ setupFlows();
+ break;
+ case HOST_REMOVED:
+ removeHostFlows(hostEvent.subject());
+ // group removal operation will operate after flow removed.
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void removeHostFlows(Host host) {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+ devices.forEach(device -> {
+ Integer groupId;
+
+ if (host.location().deviceId().equals(device.id())) {
+ // host connect to device, use host group
+ groupId = generateHostGroupId(host);
+
+ } else {
+ // host not in this device, use target device group
+ groupId = generateDeviceGroupId(host.location().deviceId());
+ }
+
+ Set<FlowRule> flowRules = getRelatedFlows(groupId);
+
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ flowRules.forEach(opsBuilder::remove);
+ FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
+
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ // group removal operation will operate after flow removed.
+ removeHostGroup(device, host);
+ }
+ };
+ FlowRuleOperations ops = opsBuilder.build(ctx);
+ flowRuleService.apply(ops);
+ });
+ }
+
+ private void removeHostGroup(Device device, Host host) {
+ Integer hostGroupId = generateHostGroupId(host);
+ GroupKey hostGroupKey = generateGroupKey(device.id(), hostGroupId);
+
+ if (groupExist(device, hostGroupKey)) {
+ groupService.removeGroup(device.id(), hostGroupKey, appId);
+ }
+ }
+
+ private Set<FlowRule> getRelatedFlows(Integer groupId) {
+ Set<FlowRule> flowRules = Sets.newHashSet(flowRuleService.getFlowRulesById(appId));
+
+ return flowRules.stream()
+ .filter(flowRule ->
+ flowRule.treatment().allInstructions().stream()
+ .filter(inst -> inst.type() == Instruction.Type.GROUP)
+ .map(inst -> (Instructions.GroupInstruction) inst)
+ .anyMatch(inst -> inst.groupId().equals(new GroupId(groupId))))
+ .collect(Collectors.toSet());
+
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent deviceEvent) {
+ switch (deviceEvent.type()) {
+ case DEVICE_ADDED:
+ case DEVICE_UPDATED:
+ setupGroups();
+ setupFlows();
+ break;
+ case DEVICE_REMOVED:
+ case DEVICE_SUSPENDED:
+ removeFlowsRelatedToDevice(deviceEvent.subject());
+ break;
+ default:
+ break;
+ }
+
+ }
+ }
+
+ private void removeFlowsRelatedToDevice(Device targetDevice) {
+ Set<Device> devices = Sets.newHashSet(deviceService.getAvailableDevices());
+
+ devices.forEach(device -> {
+
+ if (device.equals(targetDevice)) {
+ return;
+ }
+
+ Integer groupId = generateDeviceGroupId(targetDevice.id());
+
+ Set<FlowRule> flowRules = getRelatedFlows(groupId);
+
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ flowRules.forEach(opsBuilder::remove);
+ FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
+
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ // group removal operation will operate after flow removed.
+ removeDeviceGroups(device, targetDevice);
+ }
+ };
+ FlowRuleOperations ops = opsBuilder.build(ctx);
+ flowRuleService.apply(ops);
+ });
+ }
+
+ private void removeDeviceGroups(Device device, Device targetDevice) {
+ Integer groupId = generateDeviceGroupId(targetDevice.id());
+ GroupKey deviceGroupeKey = generateGroupKey(device.id(), groupId);
+
+ if (groupExist(device, deviceGroupeKey)) {
+ groupService.removeGroup(device.id(), deviceGroupeKey, appId);
+ }
+ }
+}
diff --git a/group-fwd/src/main/java/org/onosproject/groupfwd/package-info.java b/group-fwd/src/main/java/org/onosproject/groupfwd/package-info.java
new file mode 100644
index 0000000..6874894
--- /dev/null
+++ b/group-fwd/src/main/java/org/onosproject/groupfwd/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2017 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Sample application that provides simple form of reactive forwarding
+ * using the intent service.
+ */
+package org.onosproject.groupfwd;
diff --git a/pom.xml b/pom.xml
index accaeaf..8ff24a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,7 @@
<module>icona</module>
<module>patchpanel</module>
<module>ovsdb-rest</module>
+ <module>group-fwd</module>
</modules>
</project>