[CORD-1616] Supports PD by DHCP relay App
Change-Id: I9a23534023ca2847bd3f77a3f9ee2b468c5bb422
diff --git a/apps/routing/fpm/BUCK b/apps/routing/fpm/BUCK
index 002481f..c11818b 100644
--- a/apps/routing/fpm/BUCK
+++ b/apps/routing/fpm/BUCK
@@ -1,29 +1,8 @@
-COMPILE_DEPS = [
- '//lib:CORE_DEPS',
- '//lib:NETTY',
- '//lib:KRYO',
- '//lib:org.apache.karaf.shell.console',
- '//cli:onos-cli',
- '//incubator/api:onos-incubator-api',
- '//apps/routing-api:onos-apps-routing-api',
- '//apps/route-service/api:onos-apps-route-service-api',
- '//core/store/serializers:onos-core-serializers',
- '//lib:netty',
-]
-
-TEST_DEPS = [
- '//lib:TEST_ADAPTERS',
-]
-
-osgi_jar_with_tests (
- deps = COMPILE_DEPS,
- test_deps = TEST_DEPS,
-)
-
BUNDLES = [
'//apps/routing/common:onos-apps-routing-common',
- '//apps/routing/fpm:onos-apps-routing-fpm',
'//apps/routing-api:onos-apps-routing-api',
+ '//apps/routing/fpm/api:onos-apps-routing-fpm-api',
+ '//apps/routing/fpm/app:onos-apps-routing-fpm-app',
]
onos_app (
@@ -31,7 +10,7 @@
title = 'FIB Push Manager (FPM) Route Receiver',
category = 'Utility',
url = 'http://onosproject.org',
- description = 'Receives routes from external routing daemon over FPM protocol',
+ description = 'Receives/Transmits routes from external routing daemon over FPM protocol',
included_bundles = BUNDLES,
required_apps = [ 'org.onosproject.route-service' ],
-)
+)
\ No newline at end of file
diff --git a/apps/routing/fpm/api/BUCK b/apps/routing/fpm/api/BUCK
new file mode 100644
index 0000000..f5a98fc
--- /dev/null
+++ b/apps/routing/fpm/api/BUCK
@@ -0,0 +1,7 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+]
+
+osgi_jar (
+ deps = COMPILE_DEPS,
+)
diff --git a/apps/routing/fpm/api/pom.xml b/apps/routing/fpm/api/pom.xml
new file mode 100644
index 0000000..1b8d310
--- /dev/null
+++ b/apps/routing/fpm/api/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2017-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-apps-fpm</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.11.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-apps-routing-fpm-api</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.console</artifactId>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
diff --git a/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStore.java b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStore.java
new file mode 100644
index 0000000..c2ed203
--- /dev/null
+++ b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStore.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm.api;
+
+import org.onosproject.store.Store;
+import org.onosproject.store.StoreDelegate;
+import org.onlab.packet.IpPrefix;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Interface to store Fpm records.
+ */
+public interface FpmPrefixStore extends Store<FpmPrefixStoreEvent, StoreDelegate<FpmPrefixStoreEvent>> {
+
+ /**
+ * Gets Fpm record for a prefix.
+ *
+ * @param prefix is the key
+ * @return the Fpm record; empty if record does not exist
+ */
+ Optional<FpmRecord> getFpmRecord(IpPrefix prefix);
+
+ /**
+ * Gets all Fpm records from the data store.
+ *
+ * @return all FPM records
+ */
+ Collection<FpmRecord> getFpmRecords();
+
+ /**
+ * Set a delegate on the data store to be notified of events.
+ *
+ * @param delegate is the delegate to be added
+ */
+ public void setDelegate(StoreDelegate<FpmPrefixStoreEvent> delegate);
+
+ /**
+ * Unset delegate on the data store.
+ *
+ * @param delegate us the delegate to be removed
+ */
+ public void unsetDelegate(StoreDelegate<FpmPrefixStoreEvent> delegate);
+}
diff --git a/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStoreEvent.java b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStoreEvent.java
new file mode 100644
index 0000000..1d3cf28
--- /dev/null
+++ b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmPrefixStoreEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm.api;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Event class for FPM prefix store.
+ */
+public class FpmPrefixStoreEvent extends AbstractEvent<FpmPrefixStoreEvent.Type, FpmRecord> {
+
+ /**
+ * Types of the event.
+ */
+ public enum Type {
+ /**
+ * A Fpm record has been added.
+ */
+ ADD,
+
+ /**
+ * A Fpm record has been removed.
+ */
+ REMOVE
+ }
+
+ /**
+ * Creates a Fpm prefix store event with given data.
+ *
+ * @param type is the type of event
+ * @param subject is the Fpm record of this event
+ */
+ public FpmPrefixStoreEvent(Type type, FpmRecord subject) {
+ super(type, subject);
+ }
+}
diff --git a/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmRecord.java b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmRecord.java
new file mode 100644
index 0000000..de63e44
--- /dev/null
+++ b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/FpmRecord.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm.api;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import java.util.Objects;
+import com.google.common.base.MoreObjects;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A class to define a Fpm record.
+ */
+public class FpmRecord {
+
+ public enum Type {
+ /**
+ * Signifies that record came from Dhcp Relay.
+ */
+ DHCP_RELAY,
+
+ /**
+ * Signifies that record came from RIP.
+ */
+ RIP
+ }
+
+ private IpPrefix prefix;
+ private IpAddress nextHop;
+ private Type type;
+
+ public FpmRecord(IpPrefix prefix, IpAddress nextHop, Type type) {
+ checkNotNull(prefix, "prefix cannot be null");
+ checkNotNull(nextHop, "ipAddress cannot be null");
+
+ this.prefix = prefix;
+ this.nextHop = nextHop;
+ this.type = type;
+ }
+
+ /**
+ * Gets IP prefix of record.
+ *
+ * @return the IP prefix
+ */
+ public IpPrefix ipPrefix() {
+ return prefix;
+ }
+
+ /**
+ * Gets IP address of record.
+ *
+ * @return the IP address
+ */
+ public IpAddress nextHop() {
+ return nextHop;
+ }
+
+ /**
+ * Gets type of record.
+ *
+ * @return the type
+ */
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prefix, nextHop, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof FpmRecord)) {
+ return false;
+ }
+ FpmRecord that = (FpmRecord) obj;
+ return Objects.equals(prefix, that.prefix) &&
+ Objects.equals(nextHop, that.nextHop) &&
+ Objects.equals(type, that.type);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("prefix", prefix)
+ .add("ipAddress", nextHop)
+ .add("type", type)
+ .toString();
+ }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/package-info.java
similarity index 64%
copy from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
copy to apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/package-info.java
index e92b489..a45fb74 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/api/src/main/java/org/onosproject/routing/fpm/api/package-info.java
@@ -14,19 +14,7 @@
* limitations under the License.
*/
-package org.onosproject.routing.fpm;
-
-import java.util.Map;
-
/**
- * Provides information about the FPM route receiver module.
+ * Forwarding Plane Manager (FPM) implementation.
*/
-public interface FpmInfoService {
-
- /**
- * Returns the FPM peers that are currently connected.
- *
- * @return a map of FPM peer with related information
- */
- Map<FpmPeer, FpmPeerInfo> peers();
-}
+package org.onosproject.routing.fpm.api;
diff --git a/apps/routing/fpm/app/BUCK b/apps/routing/fpm/app/BUCK
new file mode 100644
index 0000000..da7b8e5
--- /dev/null
+++ b/apps/routing/fpm/app/BUCK
@@ -0,0 +1,22 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//lib:NETTY',
+ '//lib:KRYO',
+ '//lib:org.apache.karaf.shell.console',
+ '//cli:onos-cli',
+ '//incubator/api:onos-incubator-api',
+ '//apps/routing-api:onos-apps-routing-api',
+ '//apps/route-service/api:onos-apps-route-service-api',
+ '//core/store/serializers:onos-core-serializers',
+ '//apps/routing/fpm/api:onos-apps-routing-fpm-api',
+ '//lib:netty',
+]
+
+TEST_DEPS = [
+ '//lib:TEST_ADAPTERS',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+ test_deps = TEST_DEPS,
+)
diff --git a/apps/routing/fpm/app/pom.xml b/apps/routing/fpm/app/pom.xml
new file mode 100644
index 0000000..39df6d7
--- /dev/null
+++ b/apps/routing/fpm/app/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2017-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-apps-fpm</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.11.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-apps-routing-fpm-app</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-routing-fpm-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.console</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-route-service-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmFrameDecoder.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmFrameDecoder.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmFrameDecoder.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmFrameDecoder.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
similarity index 86%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index e92b489..a160fc0 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -29,4 +29,11 @@
* @return a map of FPM peer with related information
*/
Map<FpmPeer, FpmPeerInfo> peers();
+
+ /**
+ * Returns true if pushing routes to Quagga is emabled.
+ *
+ * @return true or false
+ */
+ boolean isPdPushEnabled();
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmListener.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmListener.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmListener.java
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
new file mode 100644
index 0000000..dbe58fe
--- /dev/null
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -0,0 +1,707 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip6Address;
+import org.onlab.packet.IpPrefix;
+import org.onosproject.net.intf.Interface;
+import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.intf.InterfaceService;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.routeservice.Route;
+import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.routing.fpm.protocol.FpmHeader;
+import org.onosproject.routing.fpm.protocol.Netlink;
+import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
+import org.onosproject.routing.fpm.protocol.RouteAttribute;
+import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
+import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
+import org.onosproject.routing.fpm.protocol.RtNetlink;
+import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.StoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
+import org.onosproject.routing.fpm.api.FpmPrefixStore;
+import org.onosproject.routing.fpm.api.FpmRecord;
+
+/**
+ * Forwarding Plane Manager (FPM) route source.
+ */
+@Service
+@Component(immediate = true)
+public class FpmManager implements FpmInfoService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final int FPM_PORT = 2620;
+ private static final String APP_NAME = "org.onosproject.fpm";
+ private static final int IDLE_TIMEOUT_SECS = 5;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RouteAdminService routeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected InterfaceService interfaceService;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ bind = "bindRipStore",
+ unbind = "unbindRipStore",
+ policy = ReferencePolicy.DYNAMIC,
+ target = "(fpm_type=RIP)")
+ protected volatile FpmPrefixStore ripStore;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ bind = "bindDhcpStore",
+ unbind = "unbindDhcpStore",
+ policy = ReferencePolicy.DYNAMIC,
+ target = "(fpm_type=DHCP)")
+ protected volatile FpmPrefixStore dhcpStore;
+
+ private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
+ = new FpmPrefixStoreDelegate();
+
+ private ApplicationId appId;
+ private ServerBootstrap serverBootstrap;
+ private Channel serverChannel;
+ private ChannelGroup allChannels = new DefaultChannelGroup();
+
+ private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
+
+ private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
+
+ @Property(name = "clearRoutes", boolValue = true,
+ label = "Whether to clear routes when the FPM connection goes down")
+ private boolean clearRoutes = true;
+
+ @Property(name = "pdPushEnabled", boolValue = false,
+ label = "Whether to push prefixes to Quagga over fpm connection")
+ private boolean pdPushEnabled = false;
+
+ @Property(name = "pdPushNextHopIPv4", value = "",
+ label = "IPv4 next-hop address for PD Pushing.")
+ private Ip4Address pdPushNextHopIPv4 = null;
+
+ @Property(name = "pdPushNextHopIPv6", value = "",
+ label = "IPv6 next-hop address for PD Pushing.")
+ private Ip6Address pdPushNextHopIPv6 = null;
+
+ protected void bindRipStore(FpmPrefixStore store) {
+ if ((ripStore == null) && (store != null)) {
+ ripStore = store;
+ ripStore.setDelegate(fpmPrefixStoreDelegate);
+ for (Channel ch : allChannels) {
+ processRipStaticRoutes(ch);
+ }
+ }
+ }
+
+ protected void unbindRipStore(FpmPrefixStore store) {
+ if (ripStore == store) {
+ ripStore.unsetDelegate(fpmPrefixStoreDelegate);
+ ripStore = null;
+ }
+ }
+
+ protected void bindDhcpStore(FpmPrefixStore store) {
+ if ((dhcpStore == null) && (store != null)) {
+ dhcpStore = store;
+ dhcpStore.setDelegate(fpmPrefixStoreDelegate);
+ for (Channel ch : allChannels) {
+ processDhcpStaticRoutes(ch);
+ }
+ }
+ }
+
+ protected void unbindDhcpStore(FpmPrefixStore store) {
+ if (dhcpStore == store) {
+ dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
+ dhcpStore = null;
+ }
+ }
+
+ @Activate
+ protected void activate(ComponentContext context) {
+ componentConfigService.preSetProperty(
+ "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
+ "distributed", "true");
+
+ componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(FpmPeer.class)
+ .register(FpmConnectionInfo.class)
+ .build();
+ peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
+ .withName("fpm-connections")
+ .withSerializer(Serializer.using(serializer))
+ .build();
+
+ modified(context);
+ startServer();
+
+ appId = coreService.registerApplication(APP_NAME, peers::destroy);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ componentConfigService.preSetProperty(
+ "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
+ "distributed", "false");
+
+ stopServer();
+ fpmRoutes.clear();
+ componentConfigService.unregisterProperties(getClass(), false);
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+ if (properties == null) {
+ return;
+ }
+ String strClearRoutes = Tools.get(properties, "clearRoutes");
+ if (strClearRoutes != null) {
+ clearRoutes = Boolean.parseBoolean(strClearRoutes);
+ log.info("clearRoutes is {}", clearRoutes);
+ }
+
+ String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
+ if (strPdPushEnabled != null) {
+ boolean oldValue = pdPushEnabled;
+ pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
+ if (pdPushEnabled) {
+
+ pdPushNextHopIPv4 = null;
+ pdPushNextHopIPv6 = null;
+
+ String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
+ if (strPdPushNextHopIPv4 != null) {
+ pdPushNextHopIPv4 = Ip4Address.valueOf(strPdPushNextHopIPv4);
+ }
+ String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
+ if (strPdPushNextHopIPv6 != null) {
+ pdPushNextHopIPv6 = Ip6Address.valueOf(strPdPushNextHopIPv6);
+ }
+
+ if (pdPushNextHopIPv4 == null) {
+ pdPushNextHopIPv4 = interfaceService.getInterfaces()
+ .stream()
+ .filter(iface -> iface.name().contains("RUR"))
+ .map(Interface::ipAddressesList)
+ .flatMap(Collection::stream)
+ .map(InterfaceIpAddress::ipAddress)
+ .filter(IpAddress::isIp4)
+ .map(IpAddress::getIp4Address)
+ .findFirst()
+ .orElse(null);
+ }
+
+ if (pdPushNextHopIPv6 == null) {
+ pdPushNextHopIPv6 = interfaceService.getInterfaces()
+ .stream()
+ .filter(iface -> iface.name().contains("RUR"))
+ .map(Interface::ipAddressesList)
+ .flatMap(Collection::stream)
+ .map(InterfaceIpAddress::ipAddress)
+ .filter(IpAddress::isIp6)
+ .map(IpAddress::getIp6Address)
+ .findFirst()
+ .orElse(null);
+ }
+
+ log.info("PD pushing is enabled.");
+ if (pdPushNextHopIPv4 != null) {
+ log.info("ipv4 next-hop {}", pdPushNextHopIPv4.toString());
+ } else {
+ log.info("ipv4 next-hop is null");
+ }
+ if (pdPushNextHopIPv6 != null) {
+ log.info("ipv6 next-hop={}", pdPushNextHopIPv6.toString());
+ } else {
+ log.info("ipv6 next-hop is null");
+ }
+ if (!oldValue) {
+ processStaticRoutes();
+ }
+ } else {
+ log.info("PD pushing is disabled.");
+ }
+ }
+ }
+
+ private void startServer() {
+ HashedWheelTimer timer = new HashedWheelTimer(
+ groupedThreads("onos/fpm", "fpm-timer-%d", log));
+
+ ChannelFactory channelFactory = new NioServerSocketChannelFactory(
+ newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
+ newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
+ ChannelPipelineFactory pipelineFactory = () -> {
+ // Allocate a new session per connection
+ IdleStateHandler idleHandler =
+ new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
+ FpmSessionHandler fpmSessionHandler =
+ new FpmSessionHandler(this, new InternalFpmListener());
+ FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
+
+ // Setup the processing pipeline
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
+ pipeline.addLast("idle", idleHandler);
+ pipeline.addLast("FpmSession", fpmSessionHandler);
+ return pipeline;
+ };
+
+ InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
+
+ serverBootstrap = new ServerBootstrap(channelFactory);
+ serverBootstrap.setOption("child.reuseAddr", true);
+ serverBootstrap.setOption("child.keepAlive", true);
+ serverBootstrap.setOption("child.tcpNoDelay", true);
+ serverBootstrap.setPipelineFactory(pipelineFactory);
+ try {
+ serverChannel = serverBootstrap.bind(listenAddress);
+ allChannels.add(serverChannel);
+ } catch (ChannelException e) {
+ log.debug("Exception binding to FPM port {}: ",
+ listenAddress.getPort(), e);
+ stopServer();
+ }
+ }
+
+ private void stopServer() {
+ allChannels.close().awaitUninterruptibly();
+ allChannels.clear();
+ if (serverBootstrap != null) {
+ serverBootstrap.releaseExternalResources();
+ }
+
+ if (clearRoutes) {
+ peers.keySet().forEach(this::clearRoutes);
+ }
+ }
+
+ private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+ if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
+ return;
+ }
+
+ Netlink netlink = fpmMessage.netlink();
+ RtNetlink rtNetlink = netlink.rtNetlink();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Received FPM message: {}", fpmMessage);
+ }
+
+ if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
+ rtNetlink.protocol() == RtProtocol.UNSPEC)) {
+ log.trace("Ignoring non-zebra route");
+ return;
+ }
+
+ IpAddress dstAddress = null;
+ IpAddress gateway = null;
+
+ for (RouteAttribute attribute : rtNetlink.attributes()) {
+ if (attribute.type() == RouteAttribute.RTA_DST) {
+ RouteAttributeDst raDst = (RouteAttributeDst) attribute;
+ dstAddress = raDst.dstAddress();
+ } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
+ RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
+ gateway = raGateway.gateway();
+ }
+ }
+
+ if (dstAddress == null) {
+ log.error("Dst address missing!");
+ return;
+ }
+
+ IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
+
+ List<Route> updates = new LinkedList<>();
+ List<Route> withdraws = new LinkedList<>();
+
+ Route route;
+ switch (netlink.type()) {
+ case RTM_NEWROUTE:
+ if (gateway == null) {
+ // We ignore interface routes with no gateway for now.
+ return;
+ }
+ route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
+
+
+ Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
+
+ if (oldRoute != null) {
+ log.trace("Swapping {} with {}", oldRoute, route);
+ withdraws.add(oldRoute);
+ }
+ updates.add(route);
+ break;
+ case RTM_DELROUTE:
+ Route existing = fpmRoutes.get(peer).remove(prefix);
+ if (existing == null) {
+ log.warn("Got delete for non-existent prefix");
+ return;
+ }
+
+ route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
+
+ withdraws.add(route);
+ break;
+ case RTM_GETROUTE:
+ default:
+ break;
+ }
+
+ routeService.withdraw(withdraws);
+ routeService.update(updates);
+ }
+
+ private void clearRoutes(FpmPeer peer) {
+ log.info("Clearing all routes for peer {}", peer);
+ Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
+ if (routes != null) {
+ routeService.withdraw(routes.values());
+ }
+ }
+
+ public void processStaticRoutes() {
+ for (Channel ch : allChannels) {
+ processStaticRoutes(ch);
+ }
+ }
+
+ public void processStaticRoutes(Channel ch) {
+ processRipStaticRoutes(ch);
+ processDhcpStaticRoutes(ch);
+ }
+
+ private void processRipStaticRoutes(Channel ch) {
+
+ /* Get RIP static routes. */
+ if (ripStore != null) {
+ Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
+ log.info("RIP store size is {}", ripRecords.size());
+
+ ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
+ record.ipPrefix(), ch));
+ }
+ }
+
+ private void processDhcpStaticRoutes(Channel ch) {
+
+ /* Get Dhcp static routes. */
+ if (dhcpStore != null) {
+ Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
+ log.info("Dhcp store size is {}", dhcpRecords.size());
+
+ dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
+ record.ipPrefix(), ch));
+ }
+ }
+
+ private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
+
+ int netLinkLength;
+ short addrFamily;
+ IpAddress pdPushNextHop;
+
+ if (!pdPushEnabled) {
+ return;
+ }
+
+ try {
+ // Construct list of route attributes.
+ List<RouteAttribute> attributes = new ArrayList<>();
+ if (prefix.isIp4()) {
+ if (pdPushNextHopIPv4 == null) {
+ log.info("Prefix not pushed because ipv4 next-hop is null.");
+ return;
+ }
+ pdPushNextHop = pdPushNextHopIPv4;
+ netLinkLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
+ addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
+ } else {
+ if (pdPushNextHopIPv6 == null) {
+ log.info("Prefix not pushed because ipv6 next-hop is null.");
+ return;
+ }
+ pdPushNextHop = pdPushNextHopIPv6;
+ netLinkLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
+ addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
+ }
+
+ RouteAttributeDst raDst = new RouteAttributeDst(
+ netLinkLength,
+ RouteAttribute.RTA_DST,
+ prefix.address());
+ attributes.add(raDst);
+
+ RouteAttributeGateway raGateway = new RouteAttributeGateway(
+ netLinkLength,
+ RouteAttribute.RTA_GATEWAY,
+ pdPushNextHop);
+ attributes.add(raGateway);
+
+ // Add RtNetlink header.
+ int srcLength = 0;
+ short tos = 0;
+ short table = 0;
+ short scope = 0;
+ long rtFlags = 0;
+ int messageLength = raDst.length() + raGateway.length() +
+ RtNetlink.RT_NETLINK_LENGTH;
+
+ RtNetlink rtNetlink = new RtNetlink(
+ addrFamily,
+ prefix.prefixLength(),
+ srcLength,
+ tos,
+ table,
+ RtProtocol.ZEBRA,
+ scope,
+ FpmHeader.FPM_TYPE_NETLINK,
+ rtFlags,
+ attributes);
+
+ // Add Netlink header.
+ NetlinkMessageType nlMsgType;
+ if (isAdd) {
+ nlMsgType = NetlinkMessageType.RTM_NEWROUTE;
+ } else {
+ nlMsgType = NetlinkMessageType.RTM_DELROUTE;
+ }
+ int flags = Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE;
+ long sequence = 0;
+ long processPortId = 0;
+ messageLength += Netlink.NETLINK_HEADER_LENGTH;
+
+ Netlink netLink = new Netlink(messageLength,
+ nlMsgType,
+ flags,
+ sequence,
+ processPortId,
+ rtNetlink);
+
+ messageLength += FpmHeader.FPM_HEADER_LENGTH;
+
+ // Add FpmHeader.
+ FpmHeader fpmMessage = new FpmHeader(
+ FpmHeader.FPM_VERSION_1,
+ FpmHeader.FPM_TYPE_NETLINK,
+ messageLength,
+ netLink);
+
+ // Encode message in a channel buffer and transmit.
+ ch.write(fpmMessage.encode());
+
+ } catch (RuntimeException e) {
+ log.info("Route not sent over fpm connection.");
+ }
+ }
+
+ private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
+
+ for (Channel ch : allChannels) {
+ sendRouteUpdateToChannel(isAdd, prefix, ch);
+ }
+ }
+
+ public boolean isPdPushEnabled() {
+ return pdPushEnabled;
+ }
+
+ private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
+ return new FpmPeerInfo(connections,
+ fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
+ }
+
+ @Override
+ public Map<FpmPeer, FpmPeerInfo> peers() {
+ return peers.asJavaMap().entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> toFpmInfo(e.getKey(), e.getValue())));
+ }
+
+ private class InternalFpmListener implements FpmListener {
+ @Override
+ public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+ FpmManager.this.fpmMessage(peer, fpmMessage);
+ }
+
+ @Override
+ public boolean peerConnected(FpmPeer peer) {
+ if (peers.keySet().contains(peer)) {
+ return false;
+ }
+
+ NodeId localNode = clusterService.getLocalNode().id();
+ peers.compute(peer, (p, infos) -> {
+ if (infos == null) {
+ infos = new HashSet<>();
+ }
+
+ infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+ return infos;
+ });
+
+ fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
+ return true;
+ }
+
+ @Override
+ public void peerDisconnected(FpmPeer peer) {
+ log.info("FPM connection to {} went down", peer);
+
+ if (clearRoutes) {
+ clearRoutes(peer);
+ }
+
+ peers.compute(peer, (p, infos) -> {
+ if (infos == null) {
+ return null;
+ }
+
+ infos.stream()
+ .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
+ .findAny()
+ .ifPresent(i -> infos.remove(i));
+
+ if (infos.isEmpty()) {
+ return null;
+ }
+
+ return infos;
+ });
+ }
+ }
+
+ /**
+ * Adds a channel to the channel group.
+ *
+ * @param channel the channel to add
+ */
+ public void addSessionChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ /**
+ * Removes a channel from the channel group.
+ *
+ * @param channel the channel to remove
+ */
+ public void removeSessionChannel(Channel channel) {
+ allChannels.remove(channel);
+ }
+
+ /**
+ * Store delegate for Fpm Prefix store.
+ * Handles Fpm prefix store event.
+ */
+ class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
+
+ @Override
+ public void notify(FpmPrefixStoreEvent e) {
+
+ log.trace("FpmPrefixStoreEvent notify");
+
+ FpmRecord record = e.subject();
+ switch (e.type()) {
+ case ADD:
+ sendRouteUpdate(true, record.ipPrefix());
+ break;
+ case REMOVE:
+ sendRouteUpdate(false, record.ipPrefix());
+ break;
+ default:
+ log.warn("unsupported store event type", e.type());
+ return;
+ }
+ }
+ }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmPeer.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeerInfo.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmPeerInfo.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmPeerInfo.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmPeerInfo.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
similarity index 92%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
index 3482bb7..658c058 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
@@ -42,6 +42,7 @@
private final FpmListener fpmListener;
+ private final FpmManager fpmManager;
private Channel channel;
private FpmPeer us;
@@ -51,9 +52,11 @@
/**
* Class constructor.
*
+ * @param fpmMgr manager
* @param fpmListener listener for FPM messages
*/
- public FpmSessionHandler(FpmListener fpmListener) {
+ public FpmSessionHandler(FpmManager fpmMgr, FpmListener fpmListener) {
+ this.fpmManager = fpmMgr;
this.fpmListener = checkNotNull(fpmListener);
}
@@ -114,6 +117,8 @@
}
channel = ctx.getChannel();
+ fpmManager.addSessionChannel(e.getChannel());
+ fpmManager.processStaticRoutes(e.getChannel());
}
@Override
@@ -130,6 +135,7 @@
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
+ fpmManager.removeSessionChannel(e.getChannel());
}
private void handleDisconnect() {
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
similarity index 95%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
index 2706e56..33ff5d6 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
@@ -41,6 +41,9 @@
protected void execute() {
FpmInfoService fpmInfo = get(FpmInfoService.class);
+ if (fpmInfo.isPdPushEnabled()) {
+ print("PD Pushing is enabled/disbled.");
+ }
fpmInfo.peers().entrySet().stream()
.sorted(Comparator.<Map.Entry<FpmPeer, FpmPeerInfo>, IpAddress>comparing(e -> e.getKey().address())
.thenComparing(e -> e.getKey().port()))
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/package-info.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/cli/package-info.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/package-info.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/cli/package-info.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/package-info.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/package-info.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/package-info.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/package-info.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
similarity index 87%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
index e5b841e..607c9e1 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
@@ -20,6 +20,8 @@
import com.google.common.collect.ImmutableSet;
import org.onlab.packet.DeserializationException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import java.nio.ByteBuffer;
import static org.onlab.packet.PacketUtils.checkInput;
@@ -29,6 +31,7 @@
*/
public final class FpmHeader {
public static final int FPM_HEADER_LENGTH = 4;
+ public static final int FPM_MESSAGE_MAX_LENGTH = 4096;
public static final short FPM_VERSION_1 = 1;
public static final short FPM_VERSION_ONOS_EXT = 32;
@@ -60,7 +63,7 @@
* @param length length
* @param netlink netlink header
*/
- private FpmHeader(short version, short type, int length, Netlink netlink) {
+ public FpmHeader(short version, short type, int length, Netlink netlink) {
this.version = version;
this.type = type;
this.length = length;
@@ -149,4 +152,21 @@
return new FpmHeader(version, type, messageLength, netlink);
}
+
+ /**
+ * Encode the FpmHeader contents into a ChannelBuffer.
+ *
+ * @return filled in ChannelBuffer
+ */
+ public ChannelBuffer encode() {
+
+ ChannelBuffer cb = ChannelBuffers.buffer(FPM_MESSAGE_MAX_LENGTH);
+
+ cb.writeByte(version);
+ cb.writeByte(type);
+ cb.writeShort(length);
+
+ netlink.encode(cb);
+ return cb;
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java
similarity index 85%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java
index 6c3f578..df84ed6 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/Netlink.java
@@ -19,6 +19,7 @@
import com.google.common.base.MoreObjects;
import org.onlab.packet.DeserializationException;
+import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
import static org.onlab.packet.PacketUtils.checkInput;
@@ -33,6 +34,9 @@
public static final int NETLINK_HEADER_LENGTH = 16;
+ public static final int NETLINK_REQUEST = 0x01;
+ public static final int NETLINK_CREATE = 0x400;
+
private final long length;
private final NetlinkMessageType type;
private final int flags;
@@ -51,7 +55,7 @@
* @param processPortId port ID
* @param rtNetlink netlink routing message
*/
- private Netlink(long length, NetlinkMessageType type, int flags, long sequence,
+ public Netlink(long length, NetlinkMessageType type, int flags, long sequence,
long processPortId, RtNetlink rtNetlink) {
this.length = length;
this.type = type;
@@ -168,4 +172,20 @@
rtNetlink);
}
+ /**
+ * Encode the Netlink contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeInt(Integer.reverseBytes((int) length));
+ cb.writeShort(Short.reverseBytes((short) type.type()));
+ cb.writeShort(Short.reverseBytes((short) flags));
+ cb.writeInt(Integer.reverseBytes((int) sequence));
+ cb.writeInt(Integer.reverseBytes((int) processPortId));
+
+ rtNetlink.encode(cb);
+ }
+
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/NetlinkMessageType.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/NetlinkMessageType.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/NetlinkMessageType.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/NetlinkMessageType.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java
similarity index 93%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java
index 095e516..43a21c0 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttribute.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import org.onlab.packet.DeserializationException;
+import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -114,4 +115,12 @@
return decoder.decodeAttribute(tlvLength, type, value);
}
+
+ /**
+ * Encode the RouteAttribute into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ public abstract void encode(ChannelBuffer cb);
+
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDecoder.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDecoder.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDecoder.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDecoder.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java
similarity index 70%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java
index 1590e90..1535100 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeDst.java
@@ -21,6 +21,8 @@
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
/**
* Destination address route attribute.
@@ -36,7 +38,7 @@
* @param type type
* @param dstAddress destination address
*/
- private RouteAttributeDst(int length, int type, IpAddress dstAddress) {
+ public RouteAttributeDst(int length, int type, IpAddress dstAddress) {
super(length, type);
this.dstAddress = dstAddress;
@@ -80,4 +82,27 @@
return new RouteAttributeDst(length, type, dstAddress);
};
}
+
+ /**
+ * Encode the RouteAttributeDst contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ @Override
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeShort(Short.reverseBytes((short) length()));
+ cb.writeShort(Short.reverseBytes((short) type()));
+
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer(dstAddress.toOctets());
+ if (length() == Ip6Address.BYTE_LENGTH +
+ RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH) {
+ cb.writeBytes(buffer, Ip6Address.BYTE_LENGTH);
+ } else if (length() == Ip4Address.BYTE_LENGTH +
+ RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH) {
+ cb.writeBytes(buffer, Ip4Address.BYTE_LENGTH);
+ } else {
+ throw new RuntimeException("Dst address length incorrect!");
+ }
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java
similarity index 70%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java
index c22d556..886f52e 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeGateway.java
@@ -21,6 +21,8 @@
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
/**
* Gateway route attribute.
@@ -38,7 +40,7 @@
* @param type type
* @param gateway gateway address
*/
- private RouteAttributeGateway(int length, int type, IpAddress gateway) {
+ public RouteAttributeGateway(int length, int type, IpAddress gateway) {
super(length, type);
this.gateway = gateway;
@@ -83,4 +85,26 @@
};
}
+ /**
+ * Encode the RouteAttributeGateway contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ @Override
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeShort(Short.reverseBytes((short) length()));
+ cb.writeShort(Short.reverseBytes((short) type()));
+
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer(gateway.toOctets());
+ if (length() == Ip6Address.BYTE_LENGTH +
+ RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH) {
+ cb.writeBytes(buffer, Ip6Address.BYTE_LENGTH);
+ } else if (length() == Ip4Address.BYTE_LENGTH +
+ RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH) {
+ cb.writeBytes(buffer, Ip4Address.BYTE_LENGTH);
+ } else {
+ throw new RuntimeException("Gateway address length incorrect!");
+ }
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java
similarity index 81%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java
index f6a84d8..49a12e8 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributeOif.java
@@ -19,6 +19,7 @@
import com.google.common.base.MoreObjects;
import org.onlab.packet.DeserializationException;
+import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
/**
@@ -37,7 +38,7 @@
* @param type type
* @param outputInterface output interface
*/
- private RouteAttributeOif(int length, int type, long outputInterface) {
+ public RouteAttributeOif(int length, int type, long outputInterface) {
super(length, type);
this.outputInterface = outputInterface;
@@ -77,4 +78,17 @@
return new RouteAttributeOif(length, type, outputInterface);
};
}
+
+ /**
+ * Encode the RouteAttributeOif contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ @Override
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeShort(Short.reverseBytes((short) length()));
+ cb.writeShort(Short.reverseBytes((short) type()));
+ cb.writeInt(Integer.reverseBytes((int) outputInterface));
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java
similarity index 80%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java
index a1abe97..2c45db0 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RouteAttributePriority.java
@@ -19,6 +19,7 @@
import com.google.common.base.MoreObjects;
import org.onlab.packet.DeserializationException;
+import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
/**
@@ -37,7 +38,7 @@
* @param type type
* @param priority priority
*/
- private RouteAttributePriority(int length, int type, long priority) {
+ public RouteAttributePriority(int length, int type, long priority) {
super(length, type);
this.priority = priority;
@@ -78,4 +79,16 @@
};
}
+ /**
+ * Encode the RouteAttributePriority contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ @Override
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeShort(Short.reverseBytes((short) length()));
+ cb.writeShort(Short.reverseBytes((short) type()));
+ cb.writeInt(Integer.reverseBytes((int) priority));
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java
similarity index 88%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java
index 71d9572..adaaf62 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RtNetlink.java
@@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.jboss.netty.buffer.ChannelBuffer;
import static org.onlab.packet.PacketUtils.checkInput;
@@ -34,7 +35,9 @@
*/
public final class RtNetlink {
- private static final int RT_NETLINK_LENGTH = 12;
+ public static final int RT_ADDRESS_FAMILY_INET = 2;
+ public static final int RT_ADDRESS_FAMILY_INET6 = 10;
+ public static final int RT_NETLINK_LENGTH = 12;
private static final int MASK = 0xff;
@@ -64,7 +67,7 @@
* @param flags flags
* @param attributes list of attributes
*/
- private RtNetlink(short addressFamily,
+ public RtNetlink(short addressFamily,
int dstLength,
int srcLength,
short tos,
@@ -244,4 +247,27 @@
attributes);
}
+
+ /**
+ * Encode the RtNetlink contents into the ChannelBuffer.
+ *
+ * @param cb channelbuffer to be filled in
+ */
+ public void encode(ChannelBuffer cb) {
+
+ cb.writeByte(addressFamily);
+ cb.writeByte(dstLength);
+ cb.writeByte(srcLength);
+ cb.writeByte(tos);
+ cb.writeByte(table);
+ cb.writeByte(protocol.value());
+ cb.writeByte(scope);
+ cb.writeByte(type);
+ cb.writeInt(Integer.reverseBytes((int) flags));
+
+ for (RouteAttribute attribute : attributes()) {
+ attribute.encode(cb);
+ }
+ }
+
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RtProtocol.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RtProtocol.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/RtProtocol.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/RtProtocol.java
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/package-info.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/package-info.java
similarity index 100%
rename from apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/package-info.java
rename to apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/protocol/package-info.java
diff --git a/apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
similarity index 100%
rename from apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml
rename to apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
diff --git a/apps/routing/fpm/pom.xml b/apps/routing/fpm/pom.xml
index 95313a7..97dbb3f 100644
--- a/apps/routing/fpm/pom.xml
+++ b/apps/routing/fpm/pom.xml
@@ -20,40 +20,16 @@
<parent>
<artifactId>onos-app-routing-parent</artifactId>
<groupId>org.onosproject</groupId>
- <version>1.12.0-SNAPSHOT</version>
+ <version>1.11.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-apps-fpm</artifactId>
- <packaging>bundle</packaging>
+ <packaging>pom</packaging>
- <dependencies>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-cli</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.karaf.shell</groupId>
- <artifactId>org.apache.karaf.shell.console</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-core-serializers</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-apps-route-service-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
+ <modules>
+ <module>api</module>
+ <module>app</module>
+ </modules>
</project>
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
deleted file mode 100644
index 178b489..0000000
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.routing.fpm;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
-import org.onosproject.routeservice.Route;
-import org.onosproject.routeservice.RouteAdminService;
-import org.onosproject.routing.fpm.protocol.FpmHeader;
-import org.onosproject.routing.fpm.protocol.Netlink;
-import org.onosproject.routing.fpm.protocol.RouteAttribute;
-import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
-import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
-import org.onosproject.routing.fpm.protocol.RtNetlink;
-import org.onosproject.routing.fpm.protocol.RtProtocol;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static org.onlab.util.Tools.groupedThreads;
-
-/**
- * Forwarding Plane Manager (FPM) route source.
- */
-@Service
-@Component(immediate = true)
-public class FpmManager implements FpmInfoService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final int FPM_PORT = 2620;
- private static final String APP_NAME = "org.onosproject.fpm";
- private static final int IDLE_TIMEOUT_SECS = 5;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected RouteAdminService routeService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private ServerBootstrap serverBootstrap;
- private Channel serverChannel;
- private ChannelGroup allChannels = new DefaultChannelGroup();
-
- private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
-
- private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
-
- @Property(name = "clearRoutes", boolValue = true,
- label = "Whether to clear routes when the FPM connection goes down")
- private boolean clearRoutes = true;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.preSetProperty(
- "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
- "distributed", "true");
-
- componentConfigService.registerProperties(getClass());
-
- KryoNamespace serializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(FpmPeer.class)
- .register(FpmConnectionInfo.class)
- .build();
- peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
- .withName("fpm-connections")
- .withSerializer(Serializer.using(serializer))
- .build();
-
- modified(context);
- startServer();
-
- coreService.registerApplication(APP_NAME, peers::destroy);
-
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.preSetProperty(
- "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
- "distributed", "false");
-
- stopServer();
- fpmRoutes.clear();
- componentConfigService.unregisterProperties(getClass(), false);
- log.info("Stopped");
- }
-
- @Modified
- protected void modified(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
- if (properties == null) {
- return;
- }
- String strClearRoutes = Tools.get(properties, "clearRoutes");
- clearRoutes = Boolean.parseBoolean(strClearRoutes);
-
- log.info("clearRoutes set to {}", clearRoutes);
- }
-
- private void startServer() {
- HashedWheelTimer timer = new HashedWheelTimer(
- groupedThreads("onos/fpm", "fpm-timer-%d", log));
-
- ChannelFactory channelFactory = new NioServerSocketChannelFactory(
- newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
- newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
- ChannelPipelineFactory pipelineFactory = () -> {
- // Allocate a new session per connection
- IdleStateHandler idleHandler =
- new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
- FpmSessionHandler fpmSessionHandler =
- new FpmSessionHandler(new InternalFpmListener());
- FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
-
- // Setup the processing pipeline
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
- pipeline.addLast("idle", idleHandler);
- pipeline.addLast("FpmSession", fpmSessionHandler);
- return pipeline;
- };
-
- InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
-
- serverBootstrap = new ServerBootstrap(channelFactory);
- serverBootstrap.setOption("child.reuseAddr", true);
- serverBootstrap.setOption("child.keepAlive", true);
- serverBootstrap.setOption("child.tcpNoDelay", true);
- serverBootstrap.setPipelineFactory(pipelineFactory);
- try {
- serverChannel = serverBootstrap.bind(listenAddress);
- allChannels.add(serverChannel);
- } catch (ChannelException e) {
- log.debug("Exception binding to FPM port {}: ",
- listenAddress.getPort(), e);
- stopServer();
- }
- }
-
- private void stopServer() {
- allChannels.close().awaitUninterruptibly();
- allChannels.clear();
- if (serverBootstrap != null) {
- serverBootstrap.releaseExternalResources();
- }
-
- if (clearRoutes) {
- peers.keySet().forEach(this::clearRoutes);
- }
- }
-
- private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
- if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
- return;
- }
-
- Netlink netlink = fpmMessage.netlink();
- RtNetlink rtNetlink = netlink.rtNetlink();
-
- if (log.isTraceEnabled()) {
- log.trace("Received FPM message: {}", fpmMessage);
- }
-
- if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
- rtNetlink.protocol() == RtProtocol.UNSPEC)) {
- log.trace("Ignoring non-zebra route");
- return;
- }
-
- IpAddress dstAddress = null;
- IpAddress gateway = null;
-
- for (RouteAttribute attribute : rtNetlink.attributes()) {
- if (attribute.type() == RouteAttribute.RTA_DST) {
- RouteAttributeDst raDst = (RouteAttributeDst) attribute;
- dstAddress = raDst.dstAddress();
- } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
- RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
- gateway = raGateway.gateway();
- }
- }
-
- if (dstAddress == null) {
- log.error("Dst address missing!");
- return;
- }
-
- IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
-
- List<Route> updates = new LinkedList<>();
- List<Route> withdraws = new LinkedList<>();
-
- Route route;
- switch (netlink.type()) {
- case RTM_NEWROUTE:
- if (gateway == null) {
- // We ignore interface routes with no gateway for now.
- return;
- }
- route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
-
-
- Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
-
- if (oldRoute != null) {
- log.trace("Swapping {} with {}", oldRoute, route);
- withdraws.add(oldRoute);
- }
- updates.add(route);
- break;
- case RTM_DELROUTE:
- Route existing = fpmRoutes.get(peer).remove(prefix);
- if (existing == null) {
- log.warn("Got delete for non-existent prefix");
- return;
- }
-
- route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
-
- withdraws.add(route);
- break;
- case RTM_GETROUTE:
- default:
- break;
- }
-
- routeService.withdraw(withdraws);
- routeService.update(updates);
- }
-
-
- private void clearRoutes(FpmPeer peer) {
- log.info("Clearing all routes for peer {}", peer);
- Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
- if (routes != null) {
- routeService.withdraw(routes.values());
- }
- }
-
- private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
- return new FpmPeerInfo(connections,
- fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
- }
-
- @Override
- public Map<FpmPeer, FpmPeerInfo> peers() {
- return peers.asJavaMap().entrySet().stream()
- .collect(Collectors.toMap(
- e -> e.getKey(),
- e -> toFpmInfo(e.getKey(), e.getValue())));
- }
-
- private class InternalFpmListener implements FpmListener {
- @Override
- public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
- FpmManager.this.fpmMessage(peer, fpmMessage);
- }
-
- @Override
- public boolean peerConnected(FpmPeer peer) {
- if (peers.keySet().contains(peer)) {
- return false;
- }
-
- NodeId localNode = clusterService.getLocalNode().id();
- peers.compute(peer, (p, infos) -> {
- if (infos == null) {
- infos = new HashSet<>();
- }
-
- infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
- return infos;
- });
-
- fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
- return true;
- }
-
- @Override
- public void peerDisconnected(FpmPeer peer) {
- log.info("FPM connection to {} went down", peer);
-
- if (clearRoutes) {
- clearRoutes(peer);
- }
-
- peers.compute(peer, (p, infos) -> {
- if (infos == null) {
- return null;
- }
-
- infos.stream()
- .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
- .findAny()
- .ifPresent(i -> infos.remove(i));
-
- if (infos.isEmpty()) {
- return null;
- }
-
- return infos;
- });
- }
- }
-
-}