ONOS-5450 Initial implementation of OFAgent
- Refactored OFAgent immutable
- Added OFAgentStore and OFAgentEvent
- Implemented OFAgentManager and OFSwitchManager
- Added unit tests
Change-Id: Ie39ad2db9e6bd6259a062371b3ffe116b8c8cc52
diff --git a/apps/ofagent/BUCK b/apps/ofagent/BUCK
index eeb3aba..1c8ee1e 100644
--- a/apps/ofagent/BUCK
+++ b/apps/ofagent/BUCK
@@ -1,19 +1,30 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
+ '//core/store/serializers:onos-core-serializers',
+ '//core/common:onos-core-common',
+ '//incubator/api:onos-incubator-api',
+ '//cli:onos-cli',
+ '//lib:org.apache.karaf.shell.console',
'//lib:netty-transport',
'//lib:netty-buffer',
'//lib:netty-codec',
'//lib:netty-handler',
- '//incubator/api:onos-incubator-api',
'//lib:openflowj-3.0',
]
+TEST_DEPS = [
+ '//lib:TEST_ADAPTERS',
+ '//core/api:onos-api-tests',
+ '//core/common:onos-core-common-tests',
+]
+
EXCLUDED_BUNDLES = [
'//lib:openflowj-3.0',
]
osgi_jar_with_tests (
deps = COMPILE_DEPS,
+ test_deps = TEST_DEPS,
)
onos_app (
diff --git a/apps/ofagent/pom.xml b/apps/ofagent/pom.xml
index 1416332..22e6996 100644
--- a/apps/ofagent/pom.xml
+++ b/apps/ofagent/pom.xml
@@ -52,33 +52,79 @@
<artifactId>org.osgi.compendium</artifactId>
</dependency>
<dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.console</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-osgi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-misc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-common</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.projectfloodlight</groupId>
<artifactId>openflowj</artifactId>
<version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-of-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-api</artifactId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty4.version}</version>
</dependency>
</dependencies>
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java
index b44f48e..0db4489 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java
@@ -15,19 +15,29 @@
*/
package org.onosproject.ofagent.api;
-import io.netty.channel.nio.NioEventLoopGroup;
import org.onosproject.incubator.net.virtual.NetworkId;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
/**
- * Representation of an OF agent, which brokers virtual devices and external
- * controllers by handling OpenFlow connections and messages between them.
+ * Representation of an OpenFlow agent, which holds the mapping between the virtual
+ * network and the external OpenFlow controllers.
*/
public interface OFAgent {
+ enum State {
+
+ /**
+ * Specifies that the ofagent state is started.
+ */
+ STARTED,
+
+ /**
+ * Specifies that the ofagent state is stopped.
+ */
+ STOPPED
+ }
+
/**
* Returns the identifier of the virtual network that this agent cares for.
*
@@ -43,14 +53,11 @@
Set<OFController> controllers();
/**
- * Starts the OpenFlow agent.
+ * Returns the admin state of the agent.
+ *
+ * @return state
*/
- void start();
-
- /**
- * Stops the OpenFlow agent.
- */
- void stop();
+ State state();
/**
* Builder of OF agent entities.
@@ -73,15 +80,6 @@
Builder networkId(NetworkId networkId);
/**
- * Returns OF agent builder with the supplied network services for the
- * virtual network.
- *
- * @param services network services for the virtual network
- * @return of agent builder
- */
- Builder services(Map<Class<?>, Object> services);
-
- /**
* Returns OF agent builder with the supplied controllers.
*
* @param controllers set of openflow controllers
@@ -90,19 +88,11 @@
Builder controllers(Set<OFController> controllers);
/**
- * Returns OF agent builder with the supplied event executor.
+ * Returns OF agent builder with the supplied state.
*
- * @param eventExecutor event executor
+ * @param state state of the agent
* @return of agent builder
*/
- Builder eventExecutor(ExecutorService eventExecutor);
-
- /**
- * Returns OF agent builder with the supplied IO work group.
- *
- * @param ioWorker io worker group
- * @return of agent builder
- */
- Builder ioWorker(NioEventLoopGroup ioWorker);
+ Builder state(State state);
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java
new file mode 100644
index 0000000..0c6667a
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import org.onosproject.incubator.net.virtual.NetworkId;
+
+/**
+ * Service for administering the inventory of OpenFlow agents.
+ */
+public interface OFAgentAdminService {
+
+ /**
+ * Creates an OpenFlow agent for a given virtual network with given controllers.
+ *
+ * @param ofAgent the new ofagent
+ */
+ void createAgent(OFAgent ofAgent);
+
+ /**
+ * Updates the agent.
+ *
+ * @param ofAgent updated ofagent
+ */
+ void updateAgent(OFAgent ofAgent);
+
+ /**
+ * Removes the OpenFlow agent for the given virtual network.
+ *
+ * @param networkId virtual network identifier
+ */
+ void removeAgent(NetworkId networkId);
+
+ /**
+ * Starts the agent for the given network.
+ *
+ * @param networkId virtual network identifier
+ */
+ void startAgent(NetworkId networkId);
+
+ /**
+ * Stops the agent for the given network.
+ *
+ * @param networkId virtual network identifier
+ */
+ void stopAgent(NetworkId networkId);
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java
new file mode 100644
index 0000000..7515a90
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Describes OFAgent event.
+ */
+public class OFAgentEvent extends AbstractEvent<OFAgentEvent.Type, OFAgent> {
+
+ private final OFController controller;
+
+ public enum Type {
+
+ /**
+ * Signifies that a new OFAgent is created.
+ */
+ OFAGENT_CREATED,
+
+ /**
+ * Signifies that the OFAgent is removed.
+ */
+ OFAGENT_REMOVED,
+
+ /**
+ * Signifies that the new external controller is added.
+ */
+ OFAGENT_CONTROLLER_ADDED,
+
+ /**
+ * Signifies that the external controller is removed.
+ */
+ OFAGENT_CONTROLLER_REMOVED,
+
+ /**
+ * Signifies that the OFAgent is started.
+ */
+ OFAGENT_STARTED,
+
+ /**
+ * Signifies that the OFAgent is stopped.
+ */
+ OFAGENT_STOPPED,
+ }
+
+ /**
+ * Creates an event of a given type for the specified ofagent and the current time.
+ *
+ * @param type ofagent event type
+ * @param ofAgent ofagent instance
+ */
+ public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent) {
+ super(type, ofAgent);
+ this.controller = null;
+ }
+
+ /**
+ * Creates an event of a given type for the specified ofagent and the updated controller.
+ *
+ * @param type ofagent event type
+ * @param ofAgent ofagent instance
+ * @param controller updated external controller
+ */
+ public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent, OFController controller) {
+ super(type, ofAgent);
+ this.controller = controller;
+ }
+
+ /**
+ * Returns the updated controller.
+ *
+ * @return updated controller; null if the event is not controller related
+ */
+ public OFController controller() {
+ return this.controller;
+ }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java
new file mode 100644
index 0000000..9cc7982
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener for OFAgent events.
+ */
+public interface OFAgentListener extends EventListener<OFAgentEvent> {
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java
index c2b5e47..17422e1 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.ofagent.api;
+import org.onosproject.event.ListenerService;
import org.onosproject.incubator.net.virtual.NetworkId;
import java.util.Set;
@@ -22,7 +23,9 @@
/**
* Service for administering OF agents for a virtual network.
*/
-public interface OFAgentService {
+public interface OFAgentService extends ListenerService<OFAgentEvent, OFAgentListener> {
+
+ String APPLICATION_NAME = "org.onosproject.ofagent";
/**
* Returns the OpenFlow agent list.
@@ -32,39 +35,10 @@
Set<OFAgent> agents();
/**
- * Creates an OpenFlow agent for a given virtual network with given controllers.
- *
- * @param networkId id of the virtual network
- * @param controllers list of controllers
- */
- void createAgent(NetworkId networkId, OFController... controllers);
-
- /**
- * Removes the OpenFlow agent for the given virtual network.
- *
- * @param networkId virtual network identifier
- */
- void removeAgent(NetworkId networkId);
-
- /**
- * Starts the agent for the given network.
- *
- * @param networkId virtual network identifier
- */
- void startAgent(NetworkId networkId);
-
- /**
- * Stops the agent for the given network.
- *
- * @param networkId virtual network identifier
- */
- void stopAgent(NetworkId networkId);
-
- /**
- * Returns if the agent of the given network is active or not.
+ * Returns the agent for the given network.
*
* @param networkId network id
- * @return true if the agent is active
+ * @return ofagent; null if no ofagent exists for the network
*/
- boolean isActive(NetworkId networkId);
+ OFAgent agent(NetworkId networkId);
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java
new file mode 100644
index 0000000..aa42716
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.store.Store;
+
+import java.util.Set;
+
+/**
+ * Manages inventory of OpenFlow agent states; not intended for direct use.
+ */
+public interface OFAgentStore extends Store<OFAgentEvent, OFAgentStoreDelegate> {
+
+ /**
+ * Creates the new openflow agent.
+ *
+ * @param ofAgent the new ofagent
+ */
+ void createOfAgent(OFAgent ofAgent);
+
+ /**
+ * Updates the openflow agent.
+ *
+ * @param ofAgent the updated ofagent
+ */
+ void updateOfAgent(OFAgent ofAgent);
+
+ /**
+ * Removes the openflow agent for the supplied network ID.
+ *
+ * @param networkId virtual network identifier
+ * @return removed agent; null if remove failed
+ */
+ OFAgent removeOfAgent(NetworkId networkId);
+
+ /**
+ * Returns the openflow agent with the supplied network ID.
+ *
+ * @param networkId virtual network identifier
+ * @return ofagent; null if no ofagent exists for the network
+ */
+ OFAgent ofAgent(NetworkId networkId);
+
+ /**
+ * Returns all openflow agents.
+ *
+ * @return set of ofagents; empty set if no ofagents exist
+ */
+ Set<OFAgent> ofAgents();
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java
new file mode 100644
index 0000000..16596fc
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * OFAgent network store delegate abstraction.
+ */
+public interface OFAgentStoreDelegate extends StoreDelegate<OFAgentEvent> {
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
index 8ee6f64..b8cabf9 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
@@ -15,19 +15,19 @@
*/
package org.onosproject.ofagent.api;
-import org.onosproject.net.Device;
+import org.projectfloodlight.openflow.types.DatapathId;
/**
* Representation of virtual OpenFlow switch.
*/
-public interface OFSwitch extends OFSwitchService, OFControllerRoleService {
+public interface OFSwitch extends OFSwitchOperationService, OFControllerRoleService {
/**
* Returns the device information.
*
* @return virtual device
*/
- Device device();
+ DatapathId dpid();
/**
* Returns the capabilities of the switch.
@@ -35,11 +35,4 @@
* @return capabilities
*/
OFSwitchCapabilities capabilities();
-
- /**
- * Returns if the switch is connected to controllers or not.
- *
- * @return true if the switch is connected, false otherwise
- */
- boolean isConnected();
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java
new file mode 100644
index 0000000..6413cba
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.api;
+
+import io.netty.channel.Channel;
+import org.onosproject.net.Port;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.packet.InboundPacket;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+
+/**
+ * Service for providing OpenFlow operations.
+ */
+public interface OFSwitchOperationService {
+
+ /**
+ * Processes a new port of the switch.
+ * It sends out FEATURE_REPLY message to the controllers.
+ *
+ * @param port virtual port
+ */
+ void processPortAdded(Port port);
+
+ /**
+ * Processes port link down.
+ * It sends out PORT_STATUS asynchronous message to the controllers.
+ *
+ * @param port virtual port
+ */
+ void processPortDown(Port port);
+
+ /**
+ * Processes port link down.
+ * It sends out PORT_STATUS asynchronous message to the controllers.
+ *
+ * @param port virtual port
+ */
+ void processPortUp(Port port);
+
+ /**
+ * Processes flow removed.
+ * It sends out FLOW_REMOVED asynchronous message to the controllers.
+ *
+ * @param flowRule removed flow rule
+ */
+ void processFlowRemoved(FlowRule flowRule);
+
+ /**
+ * Processes packet in.
+ * It sends out PACKET_IN asynchronous message to the controllers.
+ *
+ * @param packet inbound packet
+ */
+ void processPacketIn(InboundPacket packet);
+
+ /**
+ * Processes commands from the controllers that modify the state of the switch.
+ * Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD,
+ * PORT_MOD, TABLE_MOD. These types of messages can be denied based on a
+ * role of the request controller.
+ *
+ * @param channel received channel
+ * @param msg command message received
+ */
+ void processControllerCommand(Channel channel, OFMessage msg);
+
+ /**
+ * Processes a stats request from the controllers.
+ * Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP,
+ * GROUP_DESC subtypes.
+ *
+ * @param channel received channel
+ * @param msg stats request message received
+ */
+ void processStatsRequest(Channel channel, OFMessage msg);
+
+ /**
+ * Processes a role request from the controllers.
+ * Targeted message type is ROLE_REQUEST.
+ *
+ * @param channel received channel
+ * @param msg role request message received
+ */
+ void processRoleRequest(Channel channel, OFMessage msg);
+
+ /**
+ * Processes a features request from the controllers.
+ *
+ * @param channel received channel
+ * @param msg received features request
+ */
+ void processFeaturesRequest(Channel channel, OFMessage msg);
+
+ /**
+ * Processes LLDP packets from the controller.
+ *
+ * @param channel received channel
+ * @param msg packet out message with lldp
+ */
+ void processLldp(Channel channel, OFMessage msg);
+
+ /**
+ * Sends hello to the controller.
+ *
+ * @param channel received channel
+ */
+ void sendOfHello(Channel channel);
+
+ /**
+ * Processes echo request from the controllers.
+ *
+ * @param channel received channel
+ * @param msg echo request message
+ */
+ void processEchoRequest(Channel channel, OFMessage msg);
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
index 4f33b51..898aede 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
@@ -15,125 +15,27 @@
*/
package org.onosproject.ofagent.api;
-import io.netty.channel.Channel;
-import org.onosproject.net.Port;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.packet.InboundPacket;
-import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.onosproject.incubator.net.virtual.NetworkId;
+
+import java.util.Set;
/**
- * Service providing OpenFlow switch operations.
+ * Service for providing virtual OpenFlow switch information.
*/
public interface OFSwitchService {
/**
- * Handles the switch starts.
- */
- void started();
-
- /**
- * Handles the switch stops.
- */
- void stopped();
-
- /**
- * Processes a new port of the switch.
- * It sends out FEATURE_REPLY message to the controllers.
+ * Returns all openflow switches that OF agent service manages.
*
- * @param port virtual port
+ * @return set of openflow switches; empty set if no openflow switches exist
*/
- void processPortAdded(Port port);
+ Set<OFSwitch> ofSwitches();
/**
- * Processes port link down.
- * It sends out PORT_STATUS asynchronous message to the controllers.
+ * Returns all openflow switches for the specified network.
*
- * @param port virtual port
+ * @param networkId network id
+ * @return set of openflow switches; empty set if no devices exist on the network
*/
- void processPortDown(Port port);
-
- /**
- * Processes port link down.
- * It sends out PORT_STATUS asynchronous message to the controllers.
- *
- * @param port virtual port
- */
- void processPortUp(Port port);
-
- /**
- * Processes flow removed.
- * It sends out FLOW_REMOVED asynchronous message to the controllers.
- *
- * @param flowRule removed flow rule
- */
- void processFlowRemoved(FlowRule flowRule);
-
- /**
- * Processes packet in.
- * It sends out PACKET_IN asynchronous message to the controllers.
- *
- * @param packet inbound packet
- */
- void processPacketIn(InboundPacket packet);
-
- /**
- * Processes commands from the controllers that modify the state of the switch.
- * Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD,
- * PORT_MOD, TABLE_MOD. These types of messages can be denied based on a
- * role of the request controller.
- *
- * @param channel received channel
- * @param msg command message received
- */
- void processControllerCommand(Channel channel, OFMessage msg);
-
- /**
- * Processes a stats request from the controllers.
- * Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP,
- * GROUP_DESC subtypes.
- *
- * @param channel received channel
- * @param msg stats request message received
- */
- void processStatsRequest(Channel channel, OFMessage msg);
-
- /**
- * Processes a role request from the controllers.
- * Targeted message type is ROLE_REQUEST.
- *
- * @param channel received channel
- * @param msg role request message received
- */
- void processRoleRequest(Channel channel, OFMessage msg);
-
- /**
- * Processes a features request from the controllers.
- *
- * @param channel received channel
- * @param msg received features request
- */
- void processFeaturesRequest(Channel channel, OFMessage msg);
-
- /**
- * Processes LLDP packets from the controller.
- *
- * @param channel received channel
- * @param msg packet out message with lldp
- */
- void processLldp(Channel channel, OFMessage msg);
-
- /**
- * Sends hello to the controller.
- *
- * @param channel received channel
- */
- void sendOfHello(Channel channel);
-
- /**
- * Processes echo request from the controllers.
- *
- * @param channel received channel
- * @param msg echo request message
- */
- void processEchoRequest(Channel channel, OFMessage msg);
+ Set<OFSwitch> ofSwitches(NetworkId networkId);
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
index 6a6ba06..cac1e03 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
@@ -15,111 +15,134 @@
*/
package org.onosproject.ofagent.impl;
-import io.netty.channel.nio.NioEventLoopGroup;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleListener;
-import org.onosproject.net.packet.PacketContext;
-import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFController;
-import org.onosproject.ofagent.api.OFSwitch;
-import java.util.Map;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Implementation of OF agent.
+ * Implementation of OpenFlow agent.
*/
public final class DefaultOFAgent implements OFAgent {
private final NetworkId networkId;
- private final Map<Class<?>, Object> services;
private final Set<OFController> controllers;
- private final ExecutorService eventExecutor;
- private final NioEventLoopGroup ioWorker;
-
- private final ConcurrentHashMap<DeviceId, OFSwitch> switchMap = new ConcurrentHashMap<>();
- private final DeviceListener deviceListener = new InternalDeviceListener();
- private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
- private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+ private final State state;
private DefaultOFAgent(NetworkId networkId,
- Map<Class<?>, Object> services,
Set<OFController> controllers,
- ExecutorService eventExecutor,
- NioEventLoopGroup ioWorker) {
+ State state) {
this.networkId = networkId;
- this.services = services;
this.controllers = controllers;
- this.eventExecutor = eventExecutor;
- this.ioWorker = ioWorker;
+ this.state = state;
}
@Override
public NetworkId networkId() {
- return null;
+ return networkId;
}
@Override
public Set<OFController> controllers() {
- return null;
+ return controllers;
}
@Override
- public void start() {
- // TODO add listeners to the services
- // TODO connect all virtual devices in this network to the controllers
+ public State state() {
+ return state;
}
@Override
- public void stop() {
- // TODO remove listeners from the services
- // TODO disconnect all active connections
+ public int hashCode() {
+ return Objects.hash(networkId);
}
- private void connect(OFSwitch ofSwitch, OFController controller) {
- // TODO connect the switch to the controller
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultOFAgent) {
+ DefaultOFAgent that = (DefaultOFAgent) obj;
+ if (Objects.equals(networkId, that.networkId)) {
+ return true;
+ }
+ }
+ return false;
}
- private void disconnect(OFSwitch ofSwitch, OFController controller) {
- // TODO disconnect the controller from the ofSwitch
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("networkId", this.networkId)
+ .add("controllers", this.controllers)
+ .add("state", this.state)
+ .toString();
}
- private class InternalFlowRuleListener implements FlowRuleListener {
+ /**
+ * Returns new builder instance.
+ *
+ * @return default ofagent builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns new builder instance from the existing agent.
+ *
+ * @param ofAgent the existing agent
+ * @return default ofagent builder
+ */
+ public static Builder builder(OFAgent ofAgent) {
+ return new Builder()
+ .networkId(ofAgent.networkId())
+ .controllers(ofAgent.controllers())
+ .state(ofAgent.state());
+ }
+
+ public static final class Builder implements OFAgent.Builder {
+
+ private NetworkId networkId;
+ private Set<OFController> controllers;
+ private State state;
+
+ private Builder() {
+ }
@Override
- public void event(FlowRuleEvent event) {
- // TODO handle flow rule event
- }
- }
+ public OFAgent build() {
+ checkNotNull(networkId, "Network ID cannot be null");
+ checkNotNull(state, "State cannot be null");
+ controllers = controllers == null ? ImmutableSet.of() : controllers;
- private class InternalDeviceListener implements DeviceListener {
+ return new DefaultOFAgent(networkId, controllers, state);
+ }
@Override
- public void event(DeviceEvent event) {
- // TODO handle device event
- // device detected: connect the device to controllers
- // device removed: disconnect and remove the switch from the map
- // device state available: connect the switch to the controllers
- // device state unavailable: disconnect the switch from the controllers
- // port added: send out features reply
- // port status change
+ public Builder networkId(NetworkId networkId) {
+ this.networkId = networkId;
+ return this;
}
- }
-
- private class InternalPacketProcessor implements PacketProcessor {
@Override
- public void process(PacketContext context) {
- // TODO handle packet-in
+ public Builder controllers(Set<OFController> controllers) {
+ this.controllers = controllers;
+ return this;
+ }
+
+ @Override
+ public Builder state(State state) {
+ this.state = state;
+ return this;
}
}
-
- // TODO implement builder
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
index 32956c4..310c5a4 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
@@ -15,22 +15,41 @@
*/
package org.onosproject.ofagent.impl;
+import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.ofagent.api.OFController;
-/**
- * Implementation of tenant openflow controller.
- */
-public class DefaultOFController implements OFController {
- private IpAddress ip;
- private TpPort port;
+import java.util.Objects;
- public DefaultOFController(IpAddress ip, TpPort port) {
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the default OpenFlow controller.
+ */
+public final class DefaultOFController implements OFController {
+
+ private final IpAddress ip;
+ private final TpPort port;
+
+ private DefaultOFController(IpAddress ip, TpPort port) {
this.ip = ip;
this.port = port;
}
+ /**
+ * Returns new OpenFlow controller with the supplied IP and port.
+ *
+ * @param ip ip address
+ * @param port port number
+ * @return openflow controller
+ */
+ public static DefaultOFController of(IpAddress ip, TpPort port) {
+ checkNotNull(ip, "Controller IP address cannot be null");
+ checkNotNull(port, "Controller port address cannot be null");
+ return new DefaultOFController(ip, port);
+ }
+
@Override
public IpAddress ip() {
return ip;
@@ -40,4 +59,33 @@
public TpPort port() {
return port;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultOFController) {
+ DefaultOFController that = (DefaultOFController) obj;
+ if (Objects.equals(ip, that.ip) &&
+ Objects.equals(port, that.port)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("ip", this.ip)
+ .add("port", this.port)
+ .toString();
+ }
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
index 4258321..878fae0 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
@@ -15,10 +15,8 @@
*/
package org.onosproject.ofagent.impl;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.packet.InboundPacket;
@@ -35,7 +33,7 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.types.DatapathId;
-import java.util.List;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,7 +42,7 @@
import static org.projectfloodlight.openflow.protocol.OFControllerRole.*;
/**
- * Implementation of OF switch.
+ * Implementation of the default OpenFlow switch.
*/
public final class DefaultOFSwitch implements OFSwitch {
@@ -53,49 +51,34 @@
private static final long NUM_BUFFERS = 1024;
private static final short NUM_TABLES = 3;
- private final Device device;
+ private final DatapathId dpId;
private final OFSwitchCapabilities capabilities;
- private final DatapathId datapathId;
private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap
= new ConcurrentHashMap<>();
+ private static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- private int handshakeTransactionIds;
+ private int handshakeTransactionIds = -1;
- public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
- this.device = device;
+ private DefaultOFSwitch(DatapathId dpid, OFSwitchCapabilities capabilities) {
+ this.dpId = dpid;
this.capabilities = capabilities;
- datapathId = getDpidFromDeviceId(device.id());
- handshakeTransactionIds = -1;
-
}
- // TODO add builder
+ public static DefaultOFSwitch of(DatapathId dpid, OFSwitchCapabilities capabilities) {
+ checkNotNull(dpid, "DPID cannot be null");
+ checkNotNull(capabilities, "OF capabilities cannot be null");
+ return new DefaultOFSwitch(dpid, capabilities);
+ }
@Override
- public Device device() {
- return device;
+ public DatapathId dpid() {
+ return this.dpId;
}
@Override
public OFSwitchCapabilities capabilities() {
- return capabilities;
- }
-
- @Override
- public boolean isConnected() {
- return !controllerChannels().isEmpty();
- }
-
- @Override
- public void started() {
- // TODO do some initial setups
- }
-
- @Override
- public void stopped() {
- // TODO implement
+ return this.capabilities;
}
@Override
@@ -136,7 +119,7 @@
@Override
public Set<Channel> controllerChannels() {
- return null;
+ return ImmutableSet.copyOf(controllerRoleMap.keySet());
}
@Override
@@ -181,19 +164,14 @@
@Override
public void processFeaturesRequest(Channel channel, OFMessage msg) {
- // TODO process features request and send reply
- List<OFMessage> ofMessageList = Lists.newArrayList();
-
- OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply()
- .setDatapathId(datapathId)
+ OFFeaturesReply ofFeaturesReply = FACTORY.buildFeaturesReply()
+ .setDatapathId(dpId)
.setNBuffers(NUM_BUFFERS)
.setNTables(NUM_TABLES)
.setCapabilities(capabilities.ofSwitchCapabilities())
- .setXid(msg.getXid());
-
- ofMessageList.add(frBuilder.build());
- channel.write(ofMessageList);
-
+ .setXid(msg.getXid())
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofFeaturesReply));
}
@Override
@@ -203,38 +181,18 @@
@Override
public void sendOfHello(Channel channel) {
- List<OFMessage> ofMessageList = Lists.newArrayList();
- OFHello.Builder ofHello = FACTORY.buildHello()
- .setXid(this.handshakeTransactionIds--);
-
- ofMessageList.add(ofHello.build());
- channel.write(ofMessageList);
+ OFHello ofHello = FACTORY.buildHello()
+ .setXid(this.handshakeTransactionIds--)
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofHello));
}
@Override
public void processEchoRequest(Channel channel, OFMessage msg) {
- List<OFMessage> ofMessageList = Lists.newArrayList();
- OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply()
+ OFEchoReply ofEchoReply = FACTORY.buildEchoReply()
.setXid(msg.getXid())
- .setData(((OFEchoRequest) msg).getData());
-
- ofMessageList.add(echoBuilder.build());
- channel.write(ofMessageList);
- }
-
- private DatapathId getDpidFromDeviceId(DeviceId deviceId) {
- String deviceIdToString = deviceId.toString().split(":")[1];
-
- assert (deviceIdToString.length() == 16);
-
- String resultedHexString = new String();
- for (int i = 0; i < 8; i++) {
- resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i)
- + deviceIdToString.charAt(2 * i + 1);
- if (i != 7) {
- resultedHexString += ":";
- }
- }
- return DatapathId.of(resultedHexString);
+ .setData(((OFEchoRequest) msg).getData())
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofEchoReply));
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java
new file mode 100644
index 0000000..9f1f88f
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentEvent.Type;
+import org.onosproject.ofagent.api.OFAgentStore;
+import org.onosproject.ofagent.api.OFAgentStoreDelegate;
+import org.onosproject.ofagent.api.OFController;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
+import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the {@link OFAgentStore} with consistent map.
+ */
+@Service
+@Component(immediate = true)
+public class DistributedOFAgentStore extends AbstractStore<OFAgentEvent, OFAgentStoreDelegate>
+ implements OFAgentStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+
+ private static final KryoNamespace SERIALIZER_OFAGENT = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(OFAgent.class)
+ .register(OFAgent.State.class)
+ .register(NetworkId.class)
+ .register(DefaultOFAgent.class)
+ .register(OFController.class)
+ .register(DefaultOFController.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final MapEventListener<NetworkId, OFAgent> ofAgentMapListener = new OFAgentMapListener();
+
+ private ConsistentMap<NetworkId, OFAgent> ofAgentStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(APPLICATION_NAME);
+ ofAgentStore = storageService.<NetworkId, OFAgent>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_OFAGENT))
+ .withName("ofagentstore")
+ .withApplicationId(appId)
+ .build();
+ ofAgentStore.addListener(ofAgentMapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ ofAgentStore.removeListener(ofAgentMapListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void createOfAgent(OFAgent ofAgent) {
+ ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
+ final String error = ofAgent.networkId() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return ofAgent;
+ });
+ }
+
+ @Override
+ public void updateOfAgent(OFAgent ofAgent) {
+ ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
+ final String error = ofAgent.networkId() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return ofAgent;
+ });
+ }
+
+ @Override
+ public OFAgent removeOfAgent(NetworkId networkId) {
+ Versioned<OFAgent> ofAgent = ofAgentStore.remove(networkId);
+ return ofAgent == null ? null : ofAgent.value();
+ }
+
+ @Override
+ public OFAgent ofAgent(NetworkId networkId) {
+ Versioned<OFAgent> ofAgent = ofAgentStore.get(networkId);
+ return ofAgent == null ? null : ofAgent.value();
+ }
+
+ @Override
+ public Set<OFAgent> ofAgents() {
+ Set<OFAgent> ofAgents = ofAgentStore.values().stream()
+ .map(Versioned::value)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(ofAgents);
+ }
+
+ private class OFAgentMapListener implements MapEventListener<NetworkId, OFAgent> {
+
+ @Override
+ public void event(MapEvent<NetworkId, OFAgent> event) {
+ switch (event.type()) {
+ case INSERT:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} created", event.key());
+ notifyDelegate(new OFAgentEvent(
+ Type.OFAGENT_CREATED,
+ event.newValue().value()));
+ });
+ break;
+ case UPDATE:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} updated", event.key());
+ processUpdated(event.oldValue().value(), event.newValue().value());
+ });
+ break;
+ case REMOVE:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} removed", event.key());
+ notifyDelegate(new OFAgentEvent(
+ Type.OFAGENT_REMOVED,
+ event.oldValue().value()));
+ });
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processUpdated(OFAgent oldValue, OFAgent newValue) {
+ if (!oldValue.controllers().equals(newValue.controllers())) {
+ oldValue.controllers().stream()
+ .filter(controller -> !newValue.controllers().contains(controller))
+ .forEach(controller -> notifyDelegate(new OFAgentEvent(
+ OFAGENT_CONTROLLER_REMOVED,
+ newValue,
+ controller)
+ ));
+
+ newValue.controllers().stream()
+ .filter(controller -> !oldValue.controllers().contains(controller))
+ .forEach(controller -> notifyDelegate(new OFAgentEvent(
+ OFAGENT_CONTROLLER_ADDED,
+ newValue,
+ controller
+ )));
+ }
+
+ if (oldValue.state() != newValue.state()) {
+ Type eventType = newValue.state() == STARTED ? OFAGENT_STARTED : OFAGENT_STOPPED;
+ notifyDelegate(new OFAgentEvent(eventType, newValue));
+ }
+ }
+ }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
index 3005f73..ce7c30f 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
@@ -15,99 +15,258 @@
*/
package org.onosproject.ofagent.impl;
-import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentAdminService;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService;
-import org.onosproject.ofagent.api.OFController;
+import org.onosproject.ofagent.api.OFAgentStore;
+import org.onosproject.ofagent.api.OFAgentStoreDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
/**
* Implementation of OpenFlow agent service.
*/
@Component(immediate = true)
@Service
-public class OFAgentManager implements OFAgentService {
+public class OFAgentManager extends ListenerRegistry<OFAgentEvent, OFAgentListener>
+ implements OFAgentService, OFAgentAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
- // TODO make it to be configurable with component config
- private static final int NUM_OF_THREADS = 1;
- private final ExecutorService eventExecutor = newFixedThreadPool(
- NUM_OF_THREADS,
- groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private static final String MSG_OFAGENT = "OFAgent for network %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
+ private static final String MSG_IN_STARTED = "is already in active state, do nothing";
+ private static final String MSG_IN_STOPPED = "is already in inactive state, do nothing";
- // TODO change it to ConsistentMap and support multi-instance mode
- private ConcurrentHashMap<NetworkId, OFAgent> agentMap = new ConcurrentHashMap<>();
- private NioEventLoopGroup ioWorker;
+ private static final String ERR_NULL_OFAGENT = "OFAgent cannot be null";
+ private static final String ERR_NULL_NETID = "Network ID cannot be null";
+ private static final String ERR_NOT_EXIST = "does not exist";
+ private static final String ERR_IN_USE = "is in start state, stop the agent first";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VirtualNetworkService virtualNetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OFAgentStore ofAgentStore;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
+ private final VirtualNetworkListener virtualNetListener = new InternalVirtualNetworkListener();
+ private final OFAgentStoreDelegate delegate = new InternalOFAgentStoreDelegate();
+
+ private ApplicationId appId;
+ private NodeId localId;
@Activate
protected void activate() {
- // TODO listen to the virtual network event
- ioWorker = new NioEventLoopGroup();
+ appId = coreService.registerApplication(APPLICATION_NAME);
+ localId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ ofAgentStore.setDelegate(delegate);
+ virtualNetService.addListener(virtualNetListener);
+ leadershipService.addListener(leadershipListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
- ioWorker.shutdownGracefully();
+ leadershipService.removeListener(leadershipListener);
+ virtualNetService.removeListener(virtualNetListener);
+ ofAgentStore.unsetDelegate(delegate);
+ ofAgentStore.ofAgents().forEach(ofAgent -> stopAgent(ofAgent.networkId()));
+
eventExecutor.shutdown();
+ leadershipService.withdraw(appId.name());
+
log.info("Stopped");
}
@Override
- public Set<OFAgent> agents() {
- // TODO return existing agents
- return null;
+ public void createAgent(OFAgent ofAgent) {
+ checkNotNull(ofAgent, ERR_NULL_OFAGENT);
+ if (ofAgent.state() == STARTED) {
+ log.warn(String.format(MSG_OFAGENT, ofAgent.networkId(), ERR_IN_USE));
+ return;
+ }
+ ofAgentStore.createOfAgent(ofAgent);
+ log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_CREATED));
}
@Override
- public void createAgent(NetworkId networkId, OFController... controllers) {
- // TODO create OFAgent instance with the given network ID, controllers
- // TODO and device, flowRule, link, and packet service for the virtual network
- // TODO start the OFAgent only if the virtual network state is active
+ public void updateAgent(OFAgent ofAgent) {
+ checkNotNull(ofAgent, ERR_NULL_OFAGENT);
+ ofAgentStore.updateOfAgent(ofAgent);
+ log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_UPDATED));
}
@Override
public void removeAgent(NetworkId networkId) {
- // TODO stop and remove the OFAgent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STARTED) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_IN_USE);
+ throw new IllegalStateException(error);
+ }
+ ofAgentStore.removeOfAgent(networkId);
+ log.info(String.format(MSG_OFAGENT, networkId, MSG_REMOVED));
+ }
}
@Override
public void startAgent(NetworkId networkId) {
- // TODO starts the agent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STARTED) {
+ log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STARTED));
+ return;
+ }
+ OFAgent updated = DefaultOFAgent.builder(existing).state(STARTED).build();
+ ofAgentStore.updateOfAgent(updated);
+ }
}
@Override
public void stopAgent(NetworkId networkId) {
- // TODO stops the agent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STOPPED) {
+ log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STOPPED));
+ return;
+ }
+ OFAgent updated = DefaultOFAgent.builder(existing).state(STOPPED).build();
+ ofAgentStore.updateOfAgent(updated);
+ }
}
@Override
- public boolean isActive(NetworkId networkId) {
- // TODO manage the OF agent status
- return false;
+ public Set<OFAgent> agents() {
+ return ofAgentStore.ofAgents();
+ }
+
+ @Override
+ public OFAgent agent(NetworkId networkId) {
+ checkNotNull(networkId, ERR_NULL_NETID);
+ return ofAgentStore.ofAgent(networkId);
+ }
+
+ private class InternalLeadershipListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ // TODO check if local node is relevant to the leadership change event
+ return false;
+ }
+
+ @Override
+ public void event(LeadershipEvent event) {
+ switch (event.type()) {
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ // TODO handle leadership changed events -> restart agents?
+ default:
+ break;
+ }
+ }
}
private class InternalVirtualNetworkListener implements VirtualNetworkListener {
@Override
+ public boolean isRelevant(VirtualNetworkEvent event) {
+ // do not allow without leadership
+ return Objects.equals(localId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
public void event(VirtualNetworkEvent event) {
- // TODO handle virtual network start and stop
+ switch (event.type()) {
+ case NETWORK_UPDATED:
+ // TODO handle virtual network stopped -> stop agent
+ break;
+ case NETWORK_REMOVED:
+ // TODO remove related OFAgent -> stop agent
+ break;
+ case NETWORK_ADDED:
+ case VIRTUAL_DEVICE_ADDED:
+ case VIRTUAL_DEVICE_UPDATED:
+ case VIRTUAL_DEVICE_REMOVED:
+ case VIRTUAL_PORT_ADDED:
+ case VIRTUAL_PORT_UPDATED:
+ case VIRTUAL_PORT_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ private class InternalOFAgentStoreDelegate implements OFAgentStoreDelegate {
+
+ @Override
+ public void notify(OFAgentEvent event) {
+ if (event != null) {
+ log.trace("send ofagent event {}", event);
+ process(event);
+ }
}
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
index 38530737..6514b3c 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
@@ -15,39 +15,35 @@
*/
package org.onosproject.ofagent.impl;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.ReadTimeoutException;
-import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.osgi.ServiceDirectory;
-import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFSwitch;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
-import org.projectfloodlight.openflow.protocol.OFFactories;
-import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
-import java.util.List;
import java.util.concurrent.RejectedExecutionException;
+import static org.onosproject.ofagent.impl.OFChannelHandler.ChannelState.INIT;
+
/**
* Implementation of OpenFlow channel handler.
* It processes OpenFlow message according to the channel state.
*/
public final class OFChannelHandler extends ChannelDuplexHandler {
+ private static final String MSG_CHANNEL_STATE = "Set channel(%s) state: %s";
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final OFSwitch ofSwitch;
- private ChannelHandlerContext ctx;
+ private Channel channel;
private ChannelState state;
- protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- protected VirtualNetworkService vNetService;
enum ChannelState {
@@ -62,7 +58,6 @@
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
-
switch (msg.getType()) {
case HELLO:
handler.setState(ChannelState.WAIT_FEATURE_REQUEST);
@@ -77,17 +72,16 @@
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
-
switch (msg.getType()) {
case FEATURES_REQUEST:
- handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processFeaturesRequest(handler.channel, msg);
handler.setState(ChannelState.ESTABLISHED);
break;
case ECHO_REQUEST:
- handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processEchoRequest(handler.channel, msg);
break;
case ERROR:
- handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ handler.logErrorClose((OFErrorMsg) msg);
break;
default:
handler.illegalMessageReceived(msg);
@@ -117,10 +111,10 @@
//TODO implement
break;
case ECHO_REQUEST:
- handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processEchoRequest(handler.channel, msg);
break;
case ERROR:
- handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ handler.logErrorClose((OFErrorMsg) msg);
break;
default:
handler.unhandledMessageReceived(msg);
@@ -128,8 +122,8 @@
}
}
};
- abstract void processOFMessage(final OFChannelHandler handler,
- final OFMessage msg);
+
+ abstract void processOFMessage(final OFChannelHandler handler, final OFMessage msg);
}
/**
@@ -140,47 +134,36 @@
public OFChannelHandler(OFSwitch ofSwitch) {
super();
this.ofSwitch = ofSwitch;
-
- setState(ChannelState.INIT);
-
- ServiceDirectory services = new DefaultServiceDirectory();
- vNetService = services.get(VirtualNetworkService.class);
+ setState(INIT);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- this.ctx = ctx;
- log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress());
-
+ this.channel = ctx.channel();
+ // FIXME move this to channel handler and add channel when OF handshake is done
+ ofSwitch.addControllerChannel(channel);
try {
- ofSwitch.sendOfHello(ctx.channel());
+ ofSwitch.sendOfHello(channel);
+ log.trace("Send OF_13 Hello to {}", channel.remoteAddress());
setState(ChannelState.WAIT_HELLO);
- } catch (Throwable cause) {
- log.error("Exception occured because of{}", cause.getMessage());
+ } catch (Exception ex) {
+ log.error("Failed sending OF_13 Hello to {} for {}", channel.remoteAddress(), ex.getMessage());
}
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
+ ofSwitch.deleteControllerChannel(channel);
+ log.info("Device {} disconnected from controller {}", ofSwitch.dpid(), channel.remoteAddress());
+ }
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
- if (msg instanceof List) {
- ((List) msg).forEach(ofm -> {
- state.processOFMessage(this, (OFMessage) ofm);
- });
- } else {
- state.processOFMessage(this, (OFMessage) msg);
- }
- } catch (Throwable cause) {
- log.error("Exception occured {}", cause.getMessage());
+ state.processOFMessage(this, (OFMessage) msg);
+ } catch (Exception ex) {
+ ctx.fireExceptionCaught(ex);
}
-
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx)
- throws Exception {
}
@Override
@@ -188,41 +171,40 @@
if (cause instanceof ReadTimeoutException) {
log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage());
} else if (cause instanceof ClosedChannelException) {
- log.error("ClosedChannelException occured");
+ log.error("ClosedChannelException occurred");
return;
} else if (cause instanceof RejectedExecutionException) {
log.error("Could not process message: queue full");
} else if (cause instanceof IOException) {
- log.error("IOException occured");
+ log.error("IOException occurred");
} else {
log.error("Error while processing message from switch {}", cause.getMessage());
}
- ctx.close();
+ channel.close();
}
private void setState(ChannelState state) {
this.state = state;
+ if (state != INIT) {
+ log.debug(String.format(MSG_CHANNEL_STATE, channel.remoteAddress(), state.name()));
+ }
}
- private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) {
+ private void logErrorClose(OFErrorMsg errorMsg) {
log.error("{} from switch {} in state {}",
errorMsg,
- ofSwitch.device().id().toString(),
+ ofSwitch.dpid(),
state);
-
- log.error("Disconnecting...");
- ctx.close();
+ channel.close();
}
private void illegalMessageReceived(OFMessage ofMessage) {
log.warn("Controller should never send this message {} in current state {}",
- ofMessage.getType().toString(),
- state);
+ ofMessage.getType(), state);
}
private void unhandledMessageReceived(OFMessage ofMessage) {
- log.warn("Unhandled message {} received in state {}. Ignored",
- ofMessage.getType().toString(),
- state);
+ log.warn("Unexpected message {} received in state {}",
+ ofMessage.getType(), state);
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
index e93b0c9..58df71c 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
@@ -39,7 +39,6 @@
@Override
protected void initChannel(SocketChannel ch) throws Exception {
-
ch.pipeline().addLast(new OFMessageDecoder())
.addLast(new OFMessageEncoder())
.addLast(new ReadTimeoutHandler(READ_TIMEOUT))
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
index 7f59cf5..ed897ff 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
@@ -38,10 +38,17 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String MSG_STATE = "Device %s %s to controller %s:%s";
+ private static final String MSG_CONNECTING = "connecting";
+ private static final String MSG_CONNECTED = "connected";
+ private static final String MSG_FAILED = "failed to connect";
+
private final AtomicInteger retryCount;
private final OFSwitch ofSwitch;
private final OFController controller;
private final EventLoopGroup workGroup;
+
+ // TODO make this value configurable
private static final int MAX_RETRY = 3;
/**
@@ -61,32 +68,40 @@
/**
* Creates a connection to the supplied controller.
- *
*/
public void connect() {
-
- SocketAddress remoteAddr = new InetSocketAddress(controller.ip().toInetAddress(), controller.port().toInt());
-
- log.debug("Connecting to controller {}:{}", controller.ip(), controller.port());
+ SocketAddress remoteAddr = new InetSocketAddress(
+ controller.ip().toInetAddress(), controller.port().toInt());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new OFChannelInitializer(ofSwitch));
+ log.debug(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_CONNECTING,
+ controller.ip(),
+ controller.port()));
bootstrap.connect(remoteAddr).addListener(this);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-
if (future.isSuccess()) {
- ofSwitch.addControllerChannel(future.channel());
- log.debug("Connected to controller {}:{} for device {}",
- controller.ip(), controller.port(), ofSwitch.device().id());
+ log.info(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_CONNECTED,
+ controller.ip(),
+ controller.port()));
} else {
- log.info("Failed to connect controller {}:{}. Retry...", controller.ip(), controller.port());
- if (retryCount.getAndIncrement() < MAX_RETRY) {
+ if (retryCount.getAndIncrement() > MAX_RETRY) {
+ log.warn(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_FAILED,
+ controller.ip(),
+ controller.port()));
+ } else {
this.connect();
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
index 7e3d1d4..0e07d41 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
@@ -32,22 +32,20 @@
public final class OFMessageDecoder extends ByteToMessageDecoder {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
-
if (!ctx.channel().isActive()) {
return;
}
try {
- OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
OFMessage message = reader.readFrom(in);
out.add(message);
} catch (Throwable cause) {
- log.error("Exception occured while processing decoding because of {}", cause.getMessage());
+ log.error("Failed decode OF message for {}", cause.getMessage());
}
-
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
index 3d9f8ee..67d0ccc 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
@@ -16,50 +16,24 @@
package org.onosproject.ofagent.impl;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Encodes OFMessage to a byte buffer.
*/
public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> {
- private final Logger log = LoggerFactory.getLogger(getClass());
@Override
protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out)
throws Exception {
-
if (!ctx.channel().isActive()) {
return;
}
- if (msgList instanceof Iterable) {
- msgList.forEach(msg -> {
- try {
- ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer();
- msg.writeTo(byteBuf);
-
- ctx.writeAndFlush(byteBuf);
- } catch (Exception e) {
- log.error("error occured because of {}", e.getMessage());
- }
- });
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- if (cause instanceof EncoderException) {
- log.error("Connection closed because of EncoderException {}", cause.getMessage());
- ctx.close();
- } else {
- log.error("Exception occured while processing encoding because of {}", cause.getMessage());
- ctx.close();
+ for (OFMessage ofm : msgList) {
+ ofm.writeTo(out);
}
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java
new file mode 100644
index 0000000..f522d68
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentListener;
+import org.onosproject.ofagent.api.OFAgentService;
+import org.onosproject.ofagent.api.OFController;
+import org.onosproject.ofagent.api.OFSwitch;
+import org.onosproject.ofagent.api.OFSwitchCapabilities;
+import org.onosproject.ofagent.api.OFSwitchService;
+import org.projectfloodlight.openflow.types.DatapathId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
+
+/**
+ * Manages OF switches.
+ */
+@Component(immediate = true)
+@Service
+public class OFSwitchManager implements OFSwitchService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final OFSwitchCapabilities DEFAULT_CAPABILITIES = DefaultOFSwitchCapabilities.builder()
+ .flowStats()
+ .tableStats()
+ .portStats()
+ .groupStats()
+ .queueStats()
+ .ipReasm()
+ .portBlocked()
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VirtualNetworkService virtualNetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OFAgentService ofAgentService;
+
+ private final ConcurrentHashMap<DeviceId, OFSwitch> ofSwitchMap = new ConcurrentHashMap<>();
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final OFAgentListener ofAgentListener = new InternalOFAgentListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
+ private final PacketProcessor packetProcessor = new InternalPacketProcessor();
+
+ private NioEventLoopGroup ioWorker;
+ private ApplicationId appId;
+ private NodeId localId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(APPLICATION_NAME);
+ localId = clusterService.getLocalNode().id();
+ ioWorker = new NioEventLoopGroup();
+ ofAgentService.addListener(ofAgentListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ ofAgentService.removeListener(ofAgentListener);
+ ofAgentService.agents().forEach(this::processOFAgentStopped);
+
+ ioWorker.shutdownGracefully();
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<OFSwitch> ofSwitches() {
+ return ImmutableSet.copyOf(ofSwitchMap.values());
+ }
+
+ @Override
+ public Set<OFSwitch> ofSwitches(NetworkId networkId) {
+ Set<OFSwitch> ofSwitches = devices(networkId).stream()
+ .map(ofSwitchMap::get)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(ofSwitches);
+ }
+
+ private void addOFSwitch(DeviceId deviceId) {
+ OFSwitch ofSwitch = DefaultOFSwitch.of(
+ dpidWithDeviceId(deviceId),
+ DEFAULT_CAPABILITIES);
+ ofSwitchMap.put(deviceId, ofSwitch);
+ log.debug("Added virtual OF switch for {}", deviceId);
+ // TODO connect controllers if the agent is in started state
+ }
+
+ private void deleteOFSwitch(DeviceId deviceId) {
+ // TODO disconnect switch if it has active connection
+ OFSwitch ofSwitch = ofSwitchMap.remove(deviceId);
+ if (ofSwitch != null) {
+ log.debug("Removed virtual OFSwitch for {}", deviceId);
+ }
+ }
+
+ private void connectController(OFSwitch ofSwitch, Set<OFController> controllers) {
+ controllers.forEach(controller -> {
+ OFConnectionHandler connectionHandler = new OFConnectionHandler(
+ ofSwitch,
+ controller,
+ ioWorker);
+ connectionHandler.connect();
+ });
+ log.debug("Connection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
+ }
+
+ private void disconnectController(OFSwitch ofSwitch, Set<OFController> controllers) {
+ Set<SocketAddress> controllerAddrs = controllers.stream()
+ .map(ctrl -> new InetSocketAddress(ctrl.ip().toInetAddress(), ctrl.port().toInt()))
+ .collect(Collectors.toSet());
+
+ ofSwitch.controllerChannels().stream()
+ .filter(channel -> controllerAddrs.contains(channel.remoteAddress()))
+ .forEach(ChannelOutboundInvoker::disconnect);
+ log.debug("Disconnection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
+ }
+
+ private Set<DeviceId> devices(NetworkId networkId) {
+ DeviceService deviceService = virtualNetService.get(
+ networkId,
+ DeviceService.class);
+ Set<DeviceId> deviceIds = Tools.stream(deviceService.getAvailableDevices())
+ .map(Device::id)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(deviceIds);
+ }
+
+ private DatapathId dpidWithDeviceId(DeviceId deviceId) {
+ String strDeviceId = deviceId.toString().split(":")[1];
+ checkArgument(strDeviceId.length() == 16, "Invalid device ID " + strDeviceId);
+
+ String resultedHexString = "";
+ for (int i = 0; i < 8; i++) {
+ resultedHexString = resultedHexString + strDeviceId.charAt(2 * i)
+ + strDeviceId.charAt(2 * i + 1);
+ if (i != 7) {
+ resultedHexString += ":";
+ }
+ }
+ return DatapathId.of(resultedHexString);
+ }
+
+ private void processOFAgentCreated(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(this::addOFSwitch);
+ DeviceService deviceService = virtualNetService.get(
+ ofAgent.networkId(),
+ DeviceService.class);
+ deviceService.addListener(deviceListener);
+ }
+
+ private void processOFAgentRemoved(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(this::deleteOFSwitch);
+ DeviceService deviceService = virtualNetService.get(
+ ofAgent.networkId(),
+ DeviceService.class);
+ deviceService.removeListener(deviceListener);
+ }
+
+ private void processOFAgentStarted(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(deviceId -> {
+ OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
+ if (ofSwitch != null) {
+ connectController(ofSwitch, ofAgent.controllers());
+ }
+ });
+
+ PacketService packetService = virtualNetService.get(
+ ofAgent.networkId(),
+ PacketService.class);
+ packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
+
+ FlowRuleService flowRuleService = virtualNetService.get(
+ ofAgent.networkId(),
+ FlowRuleService.class);
+ flowRuleService.addListener(flowRuleListener);
+ }
+
+ private void processOFAgentStopped(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(deviceId -> {
+ OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
+ if (ofSwitch != null) {
+ disconnectController(ofSwitch, ofAgent.controllers());
+ }
+ });
+
+ PacketService packetService = virtualNetService.get(
+ ofAgent.networkId(),
+ PacketService.class);
+ packetService.removeProcessor(packetProcessor);
+
+ FlowRuleService flowRuleService = virtualNetService.get(
+ ofAgent.networkId(),
+ FlowRuleService.class);
+ flowRuleService.removeListener(flowRuleListener);
+ }
+
+ private class InternalOFAgentListener implements OFAgentListener {
+
+ @Override
+ public boolean isRelevant(OFAgentEvent event) {
+ return Objects.equals(localId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(OFAgentEvent event) {
+ switch (event.type()) {
+ case OFAGENT_CREATED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent created: {}", ofAgent);
+ processOFAgentCreated(ofAgent);
+ });
+ break;
+ case OFAGENT_REMOVED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent removed: {}", ofAgent);
+ processOFAgentRemoved(ofAgent);
+ });
+ break;
+ case OFAGENT_CONTROLLER_ADDED:
+ // TODO handle additional controller
+ break;
+ case OFAGENT_CONTROLLER_REMOVED:
+ // TODO handle removed controller
+ break;
+ case OFAGENT_STARTED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent started: {}", ofAgent);
+ processOFAgentStarted(ofAgent);
+ });
+ break;
+ case OFAGENT_STOPPED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent stopped: {}", ofAgent);
+ processOFAgentStopped(ofAgent);
+ });
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> {
+ Device device = event.subject();
+ log.debug("Processing device added: {}", device);
+ addOFSwitch(device.id());
+ });
+ break;
+ case DEVICE_REMOVED:
+ eventExecutor.execute(() -> {
+ Device device = event.subject();
+ log.debug("Processing device added: {}", device);
+ deleteOFSwitch(device.id());
+ });
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ // TODO handle event
+ break;
+ case DEVICE_UPDATED:
+ case DEVICE_SUSPENDED:
+ case PORT_ADDED:
+ // TODO handle event
+ case PORT_REMOVED:
+ // TODO handle event
+ case PORT_STATS_UPDATED:
+ case PORT_UPDATED:
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InternalPacketProcessor implements PacketProcessor {
+
+ @Override
+ public void process(PacketContext context) {
+ // TODO handle packet-in
+ }
+ }
+
+ private class InternalFlowRuleListener implements FlowRuleListener {
+
+ @Override
+ public void event(FlowRuleEvent event) {
+ // TODO handle flow rule event
+ }
+ }
+}
diff --git a/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java
new file mode 100644
index 0000000..52f7f53
--- /dev/null
+++ b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.Sets;
+import com.google.common.testing.EqualsTester;
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFController;
+
+import java.util.Set;
+
+import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
+
+/**
+ * Unit test of DefaultOFAgent model entity.
+ */
+public class DefaultOFAgentTest {
+
+ private static final Set<OFController> CONTROLLER_1 = Sets.newHashSet(
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.3"),
+ TpPort.tpPort(6653)));
+
+ private static final Set<OFController> CONTROLLER_2 = Sets.newHashSet(
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.3"),
+ TpPort.tpPort(6653)),
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.4"),
+ TpPort.tpPort(6653)));
+
+ private static final NetworkId NETWORK_1 = NetworkId.networkId(1);
+ private static final NetworkId NETWORK_2 = NetworkId.networkId(2);
+
+ private static final OFAgent OFAGENT = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .controllers(CONTROLLER_1)
+ .state(STOPPED)
+ .build();
+
+ private static final OFAgent SAME_AS_OFAGENT_1 = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .controllers(CONTROLLER_2)
+ .state(STOPPED)
+ .build();
+
+ private static final OFAgent SAME_AS_OFAGENT_2 = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .controllers(CONTROLLER_1)
+ .state(STARTED)
+ .build();
+
+ private static final OFAgent ANOTHER_OFAGENT = DefaultOFAgent.builder()
+ .networkId(NETWORK_2)
+ .controllers(CONTROLLER_1)
+ .state(STOPPED)
+ .build();
+
+ @Test
+ public void testImmutability() {
+ assertThatClassIsImmutable(DefaultOFAgent.class);
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester()
+ .addEqualityGroup(OFAGENT, SAME_AS_OFAGENT_1, SAME_AS_OFAGENT_2)
+ .addEqualityGroup(ANOTHER_OFAGENT)
+ .testEquals();
+ }
+}
diff --git a/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java
new file mode 100644
index 0000000..52b06f4
--- /dev/null
+++ b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.junit.TestUtils;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.Event;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentListener;
+import org.onosproject.ofagent.api.OFController;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
+import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
+
+/**
+ * Junit tests for OFAgent target.
+ */
+public class OFAgentManagerTest {
+
+ private static final ApplicationId APP_ID = new DefaultApplicationId(1, "test");
+ private static final ControllerNode LOCAL_NODE =
+ new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf("127.0.0.1"));
+
+ private static final Set<OFController> CONTROLLER_1 = Sets.newHashSet(
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.3"),
+ TpPort.tpPort(6653)));
+
+ private static final Set<OFController> CONTROLLER_2 = Sets.newHashSet(
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.3"),
+ TpPort.tpPort(6653)),
+ DefaultOFController.of(
+ IpAddress.valueOf("192.168.0.4"),
+ TpPort.tpPort(6653)));
+
+ private static final NetworkId NETWORK_1 = NetworkId.networkId(1);
+ private static final NetworkId NETWORK_2 = NetworkId.networkId(2);
+
+ private static final OFAgent OFAGENT_1 = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .state(STOPPED)
+ .build();
+
+ private static final OFAgent OFAGENT_1_CTRL_1 = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .controllers(CONTROLLER_1)
+ .state(STOPPED)
+ .build();
+
+ private static final OFAgent OFAGENT_1_CTRL_2 = DefaultOFAgent.builder()
+ .networkId(NETWORK_1)
+ .controllers(CONTROLLER_2)
+ .state(STOPPED)
+ .build();
+
+ private static final OFAgent OFAGENT_2 = DefaultOFAgent.builder()
+ .networkId(NETWORK_2)
+ .state(STOPPED)
+ .build();
+
+ private final TestOFAgentListener testListener = new TestOFAgentListener();
+ private final CoreService mockCoreService = createMock(CoreService.class);
+ private final LeadershipService mockLeadershipService = createMock(LeadershipService.class);
+ private final VirtualNetworkService mockVirtualNetService = createMock(VirtualNetworkService.class);
+ private final ClusterService mockClusterService = createMock(ClusterService.class);
+
+ private OFAgentManager target;
+ private DistributedOFAgentStore ofAgentStore;
+
+ @Before
+ public void setUp() throws Exception {
+ ofAgentStore = new DistributedOFAgentStore();
+ TestUtils.setField(ofAgentStore, "coreService", createMock(CoreService.class));
+ TestUtils.setField(ofAgentStore, "storageService", new TestStorageService());
+ ofAgentStore.activate();
+
+ expect(mockCoreService.registerApplication(anyObject()))
+ .andReturn(APP_ID)
+ .anyTimes();
+ replay(mockCoreService);
+
+ expect(mockClusterService.getLocalNode())
+ .andReturn(LOCAL_NODE)
+ .anyTimes();
+ replay(mockClusterService);
+
+ expect(mockLeadershipService.runForLeadership(anyObject()))
+ .andReturn(null)
+ .anyTimes();
+ mockLeadershipService.addListener(anyObject());
+ mockLeadershipService.removeListener(anyObject());
+ mockLeadershipService.withdraw(anyObject());
+ replay(mockLeadershipService);
+
+ target = new OFAgentManager();
+ target.coreService = mockCoreService;
+ target.leadershipService = mockLeadershipService;
+ target.virtualNetService = mockVirtualNetService;
+ target.clusterService = mockClusterService;
+ target.ofAgentStore = ofAgentStore;
+ target.addListener(testListener);
+ target.activate();
+ }
+
+ @After
+ public void tearDown() {
+ target.removeListener(testListener);
+ ofAgentStore.deactivate();
+ target.deactivate();
+ ofAgentStore = null;
+ target = null;
+ }
+
+ @Test
+ public void testCreateAndRemoveAgent() {
+ target.createAgent(OFAGENT_1);
+ Set<OFAgent> agents = target.agents();
+ assertEquals("OFAgent set size did not match", 1, agents.size());
+
+ target.createAgent(OFAGENT_2);
+ agents = target.agents();
+ assertEquals("OFAgent set size did not match", 2, agents.size());
+
+ target.removeAgent(NETWORK_1);
+ agents = target.agents();
+ assertEquals("OFAgent set size did not match", 1, agents.size());
+
+ target.removeAgent(NETWORK_2);
+ agents = target.agents();
+ assertEquals("OFAgent set size did not match", 0, agents.size());
+
+ validateEvents(OFAGENT_CREATED, OFAGENT_CREATED, OFAGENT_REMOVED, OFAGENT_REMOVED);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateNullAgent() {
+ target.createAgent(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateDuplicateAgent() {
+ target.createAgent(OFAGENT_1);
+ target.createAgent(OFAGENT_1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testRemoveNullAgent() {
+ target.removeAgent(null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRemoveNotFoundAgent() {
+ target.removeAgent(NETWORK_1);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRemoveStartedAgent() {
+ target.createAgent(OFAGENT_1);
+ target.startAgent(NETWORK_1);
+ target.removeAgent(NETWORK_1);
+ }
+
+ @Test
+ public void testStartAndStopAgent() {
+ target.createAgent(OFAGENT_1);
+ target.startAgent(NETWORK_1);
+ OFAgent ofAgent = target.agent(NETWORK_1);
+ assertEquals("OFAgent state did not match", STARTED, ofAgent.state());
+
+ target.stopAgent(NETWORK_1);
+ ofAgent = target.agent(NETWORK_1);
+ assertEquals("OFAgent state did not match", STOPPED, ofAgent.state());
+
+ validateEvents(OFAGENT_CREATED, OFAGENT_STARTED, OFAGENT_STOPPED);
+ }
+
+ @Test
+ public void testAddController() {
+ target.createAgent(OFAGENT_1);
+ target.updateAgent(OFAGENT_1_CTRL_1);
+ OFAgent ofAgent = target.agent(NETWORK_1);
+ assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers());
+
+ target.updateAgent(OFAGENT_1_CTRL_2);
+ ofAgent = target.agent(NETWORK_1);
+ assertEquals("OFAgent controller did not match", CONTROLLER_2, ofAgent.controllers());
+
+ validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_ADDED, OFAGENT_CONTROLLER_ADDED);
+ }
+
+ @Test
+ public void testRemoveController() {
+ target.createAgent(OFAGENT_1_CTRL_2);
+ target.updateAgent(OFAGENT_1_CTRL_1);
+ OFAgent ofAgent = target.agent(NETWORK_1);
+ assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers());
+
+ target.updateAgent(OFAGENT_1);
+ ofAgent = target.agent(NETWORK_1);
+ assertTrue("OFAgent controller did not match", ofAgent.controllers().isEmpty());
+
+ validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_REMOVED, OFAGENT_CONTROLLER_REMOVED);
+ }
+
+ private void validateEvents(Enum... types) {
+ TestTools.assertAfter(100, () -> {
+ int i = 0;
+ assertEquals("Number of events does not match", types.length, testListener.events.size());
+ for (Event event : testListener.events) {
+ assertEquals("Incorrect event received", types[i], event.type());
+ i++;
+ }
+ testListener.events.clear();
+ });
+ }
+
+ private static class TestOFAgentListener implements OFAgentListener {
+
+ private List<OFAgentEvent> events = Lists.newArrayList();
+
+ @Override
+ public void event(OFAgentEvent event) {
+ events.add(event);
+ }
+ }
+}