Merge remote-tracking branch 'origin/master' into dev/murrelet
Change-Id: I6aa7a9bfc0c05c2e2eef1eeedb98639305c6537b
diff --git a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/Dhcp4HandlerImpl.java b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/Dhcp4HandlerImpl.java
index deeaa36..0df73e4 100644
--- a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/Dhcp4HandlerImpl.java
+++ b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/Dhcp4HandlerImpl.java
@@ -117,6 +117,7 @@
private MacAddress dhcpConnectMac = null;
private VlanId dhcpConnectVlan = null;
private Ip4Address dhcpGatewayIp = null;
+ private Ip4Address relayAgentIp = null;
@Activate
protected void activate() {
@@ -128,6 +129,12 @@
hostService.removeListener(hostListener);
this.dhcpConnectMac = null;
this.dhcpConnectVlan = null;
+
+ if (dhcpGatewayIp != null) {
+ hostService.stopMonitoringIp(dhcpGatewayIp);
+ } else if (dhcpServerIp != null) {
+ hostService.stopMonitoringIp(dhcpServerIp);
+ }
}
@Override
@@ -231,6 +238,8 @@
this.dhcpConnectVlan = host.vlan();
this.dhcpConnectMac = host.mac();
}
+
+ this.relayAgentIp = serverConfig.getRelayAgentIp4().orElse(null);
}
@Override
@@ -238,6 +247,7 @@
log.warn("Indirect config feature for DHCPv4 handler not implement yet");
}
+ @Override
public void processDhcpPacket(PacketContext context, BasePacket payload) {
checkNotNull(payload, "DHCP payload can't be null");
checkState(payload instanceof DHCP, "Payload is not a DHCP");
@@ -340,7 +350,7 @@
* @return the first interface IP; null if not exists an IP address in
* these interfaces
*/
- private Ip4Address getRelayAgentIPv4Address(Interface iface) {
+ private Ip4Address getFirstIpFromInterface(Interface iface) {
checkNotNull(iface, "Interface can't be null");
return iface.ipAddressesList().stream()
.map(InterfaceIpAddress::ipAddress)
@@ -398,9 +408,9 @@
log.warn("Can't get server interface, ignore");
return null;
}
- Ip4Address relayAgentIp = getRelayAgentIPv4Address(serverInterface);
- MacAddress relayAgentMac = serverInterface.mac();
- if (relayAgentIp == null || relayAgentMac == null) {
+ Ip4Address ipFacingServer = getFirstIpFromInterface(serverInterface);
+ MacAddress macFacingServer = serverInterface.mac();
+ if (ipFacingServer == null || macFacingServer == null) {
log.warn("No IP address for server Interface {}", serverInterface);
return null;
}
@@ -414,11 +424,11 @@
}
// get dhcp header.
Ethernet etherReply = (Ethernet) ethernetPacket.clone();
- etherReply.setSourceMACAddress(relayAgentMac);
+ etherReply.setSourceMACAddress(macFacingServer);
etherReply.setDestinationMACAddress(dhcpConnectMac);
etherReply.setVlanID(dhcpConnectVlan.toShort());
IPv4 ipv4Packet = (IPv4) etherReply.getPayload();
- ipv4Packet.setSourceAddress(relayAgentIp.toInt());
+ ipv4Packet.setSourceAddress(ipFacingServer.toInt());
ipv4Packet.setDestinationAddress(dhcpServerIp.toInt());
UDP udpPacket = (UDP) ipv4Packet.getPayload();
DHCP dhcpPacket = (DHCP) udpPacket.getPayload();
@@ -459,6 +469,12 @@
dhcpPacket.setGatewayIPAddress(clientInterfaceIp.toInt());
}
+ // replace giaddr if relay agent IP is set
+ // FIXME for both direct and indirect case now, should be separated
+ if (relayAgentIp != null) {
+ dhcpPacket.setGatewayIPAddress(relayAgentIp.toInt());
+ }
+
udpPacket.setPayload(dhcpPacket);
// As a DHCP relay, the source port should be server port(67) instead
// of client port(68)
@@ -590,8 +606,8 @@
// we leave the srcMac from the original packet
// figure out the relay agent IP corresponding to the original request
- Ip4Address relayAgentIP = getRelayAgentIPv4Address(clientInterface);
- if (relayAgentIP == null) {
+ Ip4Address ipFacingClient = getFirstIpFromInterface(clientInterface);
+ if (ipFacingClient == null) {
log.warn("Cannot determine relay agent interface Ipv4 addr for host {}/{}. "
+ "Aborting relay for dhcp packet from server {}",
etherReply.getDestinationMAC(), clientInterface.vlan(),
@@ -600,7 +616,7 @@
}
// SRC_IP: relay agent IP
// DST_IP: offered IP
- ipv4Packet.setSourceAddress(relayAgentIP.toInt());
+ ipv4Packet.setSourceAddress(ipFacingClient.toInt());
ipv4Packet.setDestinationAddress(dhcpPayload.getYourIPAddress());
udpPacket.setSourcePort(UDP.DHCP_SERVER_PORT);
if (directlyConnected(dhcpPayload)) {
@@ -675,23 +691,24 @@
* @return true if the host is directly connected to the network; false otherwise
*/
private boolean directlyConnected(DHCP dhcpPayload) {
- DhcpOption relayAgentOption = dhcpPayload.getOption(OptionCode_CircuitID);
+ DhcpRelayAgentOption relayAgentOption =
+ (DhcpRelayAgentOption) dhcpPayload.getOption(OptionCode_CircuitID);
// Doesn't contains relay option
if (relayAgentOption == null) {
return true;
}
- IpAddress gatewayIp = IpAddress.valueOf(dhcpPayload.getGatewayIPAddress());
- Set<Interface> gatewayInterfaces = interfaceService.getInterfacesByIp(gatewayIp);
+ // check circuit id, if circuit id is invalid, we say it is an indirect host
+ DhcpOption circuitIdOpt = relayAgentOption.getSubOption(CIRCUIT_ID.getValue());
- // Contains relay option, and added by ONOS
- if (!gatewayInterfaces.isEmpty()) {
+ try {
+ CircuitId.deserialize(circuitIdOpt.getData());
return true;
+ } catch (Exception e) {
+ // invalid circuit id
+ return false;
}
-
- // Relay option added by other relay agent
- return false;
}
diff --git a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
index ba19fa7..e5c9973 100644
--- a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
+++ b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
@@ -181,9 +181,6 @@
packetService.removeProcessor(dhcpRelayPacketProcessor);
cancelDhcpPackets();
cancelArpPackets();
- v4Handler.getDhcpGatewayIp().ifPresent(hostService::stopMonitoringIp);
- v4Handler.getDhcpServerIp().ifPresent(hostService::stopMonitoringIp);
- // TODO: DHCPv6 Handler
compCfgService.unregisterProperties(getClass(), false);
log.info("DHCP-RELAY Stopped");
@@ -236,15 +233,15 @@
// Ignore if config is not present
return;
}
- if (config instanceof DefaultDhcpRelayConfig) {
- DefaultDhcpRelayConfig defaultConfig = (DefaultDhcpRelayConfig) config;
- v4Handler.setDefaultDhcpServerConfigs(defaultConfig.dhcpServerConfigs());
- v6Handler.setDefaultDhcpServerConfigs(defaultConfig.dhcpServerConfigs());
- }
+
if (config instanceof IndirectDhcpRelayConfig) {
IndirectDhcpRelayConfig indirectConfig = (IndirectDhcpRelayConfig) config;
v4Handler.setIndirectDhcpServerConfigs(indirectConfig.dhcpServerConfigs());
v6Handler.setIndirectDhcpServerConfigs(indirectConfig.dhcpServerConfigs());
+ } else if (config instanceof DefaultDhcpRelayConfig) {
+ DefaultDhcpRelayConfig defaultConfig = (DefaultDhcpRelayConfig) config;
+ v4Handler.setDefaultDhcpServerConfigs(defaultConfig.dhcpServerConfigs());
+ v6Handler.setDefaultDhcpServerConfigs(defaultConfig.dhcpServerConfigs());
}
}
diff --git a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DefaultDhcpRelayConfig.java b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DefaultDhcpRelayConfig.java
index 959c01b..daf97bf 100644
--- a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DefaultDhcpRelayConfig.java
+++ b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DefaultDhcpRelayConfig.java
@@ -29,8 +29,6 @@
public class DefaultDhcpRelayConfig extends Config<ApplicationId> {
public static final String KEY = "default";
-
-
@Override
public boolean isValid() {
// check if all configs are valid
diff --git a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DhcpServerConfig.java b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DhcpServerConfig.java
index 2451a7a..a5c304b 100644
--- a/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DhcpServerConfig.java
+++ b/apps/dhcprelay/src/main/java/org/onosproject/dhcprelay/config/DhcpServerConfig.java
@@ -33,12 +33,15 @@
private static final String DHCP_CONNECT_POINT = "dhcpServerConnectPoint";
private static final String DHCP_SERVER_IP = "serverIps";
private static final String DHCP_GATEWAY_IP = "gatewayIps";
+ private static final String RELAY_AGENT_IP = "relayAgentIps";
private ConnectPoint connectPoint;
private Ip4Address serverIp4Addr;
private Ip4Address gatewayIp4Addr;
+ private Ip4Address relayAgentIp4Addr;
private Ip6Address serverIp6Addr;
private Ip6Address gatewayIp6Addr;
+ private Ip6Address relayAgentIp6Addr;
protected DhcpServerConfig() {
// empty config not allowed here
@@ -68,22 +71,34 @@
}
});
- if (!config.has(DHCP_GATEWAY_IP)) {
- // gateway ip doesn't exist, ignore the gateway
- return;
+ if (config.has(DHCP_GATEWAY_IP)) {
+ ArrayNode gatewayIps = (ArrayNode) config.path(DHCP_GATEWAY_IP);
+ gatewayIps.forEach(node -> {
+ if (node.isTextual()) {
+ IpAddress ip = IpAddress.valueOf(node.asText());
+ if (ip.isIp4() && gatewayIp4Addr == null) {
+ gatewayIp4Addr = ip.getIp4Address();
+ }
+ if (ip.isIp6() && gatewayIp6Addr == null) {
+ gatewayIp6Addr = ip.getIp6Address();
+ }
+ }
+ });
}
- ArrayNode gatewayIps = (ArrayNode) config.path(DHCP_GATEWAY_IP);
- gatewayIps.forEach(node -> {
- if (node.isTextual()) {
- IpAddress ip = IpAddress.valueOf(node.asText());
- if (ip.isIp4() && gatewayIp4Addr == null) {
- gatewayIp4Addr = ip.getIp4Address();
+ if (config.has(RELAY_AGENT_IP)) {
+ ArrayNode relayAgentIps = (ArrayNode) config.path(RELAY_AGENT_IP);
+ relayAgentIps.forEach(node -> {
+ if (node.isTextual()) {
+ IpAddress ip = IpAddress.valueOf(node.asText());
+ if (ip.isIp4() && relayAgentIp4Addr == null) {
+ relayAgentIp4Addr = ip.getIp4Address();
+ }
+ if (ip.isIp6() && relayAgentIp6Addr == null) {
+ relayAgentIp6Addr = ip.getIp6Address();
+ }
}
- if (ip.isIp6() && gatewayIp6Addr == null) {
- gatewayIp6Addr = ip.getIp6Address();
- }
- }
- });
+ });
+ }
}
/**
@@ -146,4 +161,26 @@
public Optional<Ip6Address> getDhcpGatewayIp6() {
return Optional.ofNullable(gatewayIp6Addr);
}
+
+ /**
+ * Returns the optional IPv4 address for relay agent, if configured.
+ * This option is used if we want to replace the giaddr field in DHCPv4
+ * payload.
+ *
+ * @return the giaddr; empty value if not set
+ */
+ public Optional<Ip4Address> getRelayAgentIp4() {
+ return Optional.ofNullable(relayAgentIp4Addr);
+ }
+
+ /**
+ * Returns the optional IPv6 address for relay agent, if configured.
+ * This option is used if we want to replace the link-address field in DHCPv6
+ * payload.
+ *
+ * @return the giaddr; empty value if not set
+ */
+ public Optional<Ip6Address> getRelayAgentIp6() {
+ return Optional.ofNullable(relayAgentIp6Addr);
+ }
}
diff --git a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
index 8a47cf8..45cfa23 100644
--- a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
+++ b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
@@ -20,12 +20,14 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.io.Charsets;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.ARP;
import org.onlab.packet.DHCP;
+import org.onlab.packet.DeserializationException;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
import org.onlab.packet.Ip4Address;
@@ -151,6 +153,9 @@
SERVER_IFACE_MAC,
SERVER_VLAN);
+ // Relay agent config
+ private static final Ip4Address RELAY_AGENT_IP = Ip4Address.valueOf("10.0.4.254");
+
// Components
private static final ApplicationId APP_ID = TestApplicationId.create(DhcpRelayManager.DHCP_RELAY_APP);
private static final DefaultDhcpRelayConfig CONFIG = new MockDefaultDhcpRelayConfig();
@@ -159,6 +164,7 @@
CLIENT2_INTERFACE,
SERVER_INTERFACE
);
+ private static final String NON_ONOS_CID = "Non-ONOS circuit ID";
private DhcpRelayManager manager;
private MockPacketService packetService;
@@ -185,8 +191,8 @@
.andReturn(APP_ID).anyTimes();
manager.hostService = createNiceMock(HostService.class);
- expect(manager.hostService.getHostsByIp(anyObject())).andReturn(ImmutableSet.of(SERVER_HOST));
- expect(manager.hostService.getHost(OUTER_RELAY_HOST_ID)).andReturn(OUTER_RELAY_HOST);
+ expect(manager.hostService.getHostsByIp(anyObject())).andReturn(ImmutableSet.of(SERVER_HOST)).anyTimes();
+ expect(manager.hostService.getHost(OUTER_RELAY_HOST_ID)).andReturn(OUTER_RELAY_HOST).anyTimes();
packetService = new MockPacketService();
manager.packetService = packetService;
@@ -298,6 +304,24 @@
}
@Test
+ public void testWithRelayAgentConfig() throws DeserializationException {
+ manager.v4Handler
+ .setDefaultDhcpServerConfigs(ImmutableList.of(new MockDhcpServerConfig(RELAY_AGENT_IP)));
+ packetService.processPacket(new TestDhcpRequestPacketContext(CLIENT2_MAC,
+ CLIENT2_VLAN,
+ CLIENT2_CP,
+ INTERFACE_IP.ipAddress().getIp4Address(),
+ true));
+ OutboundPacket outPacket = packetService.emittedPacket;
+ byte[] outData = outPacket.data().array();
+ Ethernet eth = Ethernet.deserializer().deserialize(outData, 0, outData.length);
+ IPv4 ip = (IPv4) eth.getPayload();
+ UDP udp = (UDP) ip.getPayload();
+ DHCP dhcp = (DHCP) udp.getPayload();
+ assertEquals(RELAY_AGENT_IP.toInt(), dhcp.getGatewayIPAddress());
+ }
+
+ @Test
public void testArpRequest() throws Exception {
packetService.processPacket(new TestArpRequestPacketContext(CLIENT_INTERFACE));
OutboundPacket outboundPacket = packetService.emittedPacket;
@@ -317,11 +341,27 @@
@Override
public List<DhcpServerConfig> dhcpServerConfigs() {
- return ImmutableList.of(new MockDhcpServerConfig());
+ return ImmutableList.of(new MockDhcpServerConfig(null));
}
}
private static class MockDhcpServerConfig extends DhcpServerConfig {
+ Ip4Address relayAgentIp;
+
+ /**
+ * Create mocked version DHCP server config.
+ *
+ * @param relayAgentIp the relay agent Ip config; null if we don't need it
+ */
+ public MockDhcpServerConfig(Ip4Address relayAgentIp) {
+ this.relayAgentIp = relayAgentIp;
+ }
+
+ @Override
+ public Optional<Ip4Address> getRelayAgentIp4() {
+ return Optional.ofNullable(relayAgentIp);
+ }
+
@Override
public Optional<ConnectPoint> getDhcpServerConnectPoint() {
return Optional.of(SERVER_CONNECT_POINT);
@@ -576,8 +616,8 @@
if (withNonOnosRelayInfo) {
DhcpRelayAgentOption relayOption = new DhcpRelayAgentOption();
DhcpOption circuitIdOption = new DhcpOption();
- CircuitId circuitId = new CircuitId("Custom cid", VlanId.NONE);
- byte[] cid = circuitId.serialize();
+ String circuitId = NON_ONOS_CID;
+ byte[] cid = circuitId.getBytes(Charsets.US_ASCII);
circuitIdOption.setCode(DhcpRelayAgentOption.RelayAgentInfoOptions.CIRCUIT_ID.getValue());
circuitIdOption.setLength((byte) cid.length);
circuitIdOption.setData(cid);
diff --git a/apps/dhcprelay/src/test/resources/dhcp-relay.json b/apps/dhcprelay/src/test/resources/dhcp-relay.json
index bd90b7a..dc724ee 100644
--- a/apps/dhcprelay/src/test/resources/dhcp-relay.json
+++ b/apps/dhcprelay/src/test/resources/dhcp-relay.json
@@ -5,13 +5,15 @@
{
"dhcpServerConnectPoint": "of:0000000000000002/2",
"serverIps": ["172.168.10.2", "2000::200:1"],
- "gatewayIps": ["192.168.10.254", "1000::100:1"]
+ "gatewayIps": ["192.168.10.254", "1000::100:1"],
+ "relayAgentIps": ["10.0.0.1", "1000:100::100:1"]
}
],
"indirect": [
{
"dhcpServerConnectPoint": "of:0000000000000002/3",
- "serverIps": ["172.168.10.3"]
+ "serverIps": ["172.168.10.3"],
+ "relayAgentIps": ["10.0.1.1"]
}
]
}
diff --git a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
index 73dd43f..18b6bc6 100644
--- a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
+++ b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
@@ -23,13 +23,10 @@
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
-import org.onosproject.drivers.bmv2.Bmv2DefaultInterpreter;
+import org.onosproject.drivers.bmv2.Bmv2DefaultPipeconfFactory;
import org.onosproject.grpc.ctl.GrpcControllerImpl;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.pi.model.DefaultPiPipeconf;
import org.onosproject.net.pi.model.PiPipeconf;
-import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.runtime.PiAction;
import org.onosproject.net.pi.runtime.PiActionId;
@@ -56,7 +53,6 @@
import static org.onlab.util.ImmutableByteSequence.fit;
import static org.onlab.util.ImmutableByteSequence.ofZeros;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
-import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
import static p4.P4RuntimeOuterClass.ActionProfileGroup.Type.SELECT;
import static p4.P4RuntimeOuterClass.Update.Type.INSERT;
@@ -83,13 +79,7 @@
private final URL p4InfoUrl = this.getClass().getResource("/bmv2/default.p4info");
private final URL jsonUrl = this.getClass().getResource("/bmv2/default.json");
- private final PiPipeconf bmv2DefaultPipeconf = DefaultPiPipeconf.builder()
- .withId(new PiPipeconfId("mock-p4runtime"))
- .withPipelineModel(Bmv2PipelineModelParser.parse(jsonUrl))
- .addBehaviour(PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class)
- .addExtension(P4_INFO_TEXT, p4InfoUrl)
- .addExtension(BMV2_JSON, jsonUrl)
- .build();
+ private final PiPipeconf bmv2DefaultPipeconf = Bmv2DefaultPipeconfFactory.get();
private final P4RuntimeControllerImpl controller = new P4RuntimeControllerImpl();
private final GrpcControllerImpl grpcController = new GrpcControllerImpl();
private final DeviceId deviceId = DeviceId.deviceId("dummy:1");
diff --git a/apps/pi-demo/ecmp/BUCK b/apps/pi-demo/ecmp/BUCK
index ebe97de..7c6a9af 100644
--- a/apps/pi-demo/ecmp/BUCK
+++ b/apps/pi-demo/ecmp/BUCK
@@ -4,6 +4,7 @@
'//incubator/bmv2/model:onos-incubator-bmv2-model',
'//apps/pi-demo/common:onos-apps-pi-demo-common',
'//drivers/default:onos-drivers-default',
+ '//drivers/p4runtime:onos-drivers-p4runtime',
]
osgi_jar (
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
index 94896aa..0781f52 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
@@ -17,83 +17,26 @@
package org.onosproject.pi.demo.app.ecmp;
import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.ImmutableList;
-import org.onlab.packet.Ethernet;
-import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.packet.DefaultInboundPacket;
-import org.onosproject.net.packet.InboundPacket;
-import org.onosproject.net.packet.OutboundPacket;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
-import org.onosproject.net.pi.runtime.PiAction;
-import org.onosproject.net.pi.runtime.PiActionId;
-import org.onosproject.net.pi.runtime.PiActionParam;
-import org.onosproject.net.pi.runtime.PiActionParamId;
-import org.onosproject.net.pi.runtime.PiHeaderFieldId;
-import org.onosproject.net.pi.runtime.PiPacketMetadata;
-import org.onosproject.net.pi.runtime.PiPacketMetadataId;
-import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.drivers.p4runtime.DefaultP4Interpreter;
import org.onosproject.net.pi.runtime.PiTableId;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
import java.util.Optional;
-import static java.util.stream.Collectors.toList;
-import static org.onosproject.net.PortNumber.CONTROLLER;
-import static org.onosproject.net.PortNumber.FLOOD;
-import static org.onosproject.net.flow.instructions.Instruction.Type.OUTPUT;
-import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
-
/**
* Implementation of a PiPipeline interpreter for the ecmp.json configuration.
*/
-public class EcmpInterpreter extends AbstractHandlerBehaviour implements PiPipelineInterpreter {
-
- // standard_metadata for Bmv2, ig_intr_md for Tofino
- // TODO: extract header name from model (first column of table0)
- private static final String STD_METADATA_HEADER_NAME = "ig_intr_md";
+public class EcmpInterpreter extends DefaultP4Interpreter {
protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata_t";
protected static final String ECMP_GROUP_ACTION_NAME = "ecmp_group";
protected static final String GROUP_ID = "group_id";
protected static final String SELECTOR = "selector";
protected static final String ECMP_GROUP_TABLE = "ecmp_group_table";
- protected static final String TABLE0 = "table0";
- private static final String SEND_TO_CPU = "send_to_cpu";
- private static final String PORT = "port";
- private static final String DROP = "_drop";
- private static final String SET_EGRESS_PORT = "set_egress_port";
- private static final String EGRESS_PORT = "egress_port";
- private static final String INGRESS_PORT = "ingress_port";
- private static final int PORT_NUMBER_BIT_WIDTH = 9;
- private static final PiHeaderFieldId IN_PORT_ID = PiHeaderFieldId.of(STD_METADATA_HEADER_NAME, "ingress_port");
- private static final PiHeaderFieldId ETH_DST_ID = PiHeaderFieldId.of("ethernet", "dstAddr");
- private static final PiHeaderFieldId ETH_SRC_ID = PiHeaderFieldId.of("ethernet", "srcAddr");
- private static final PiHeaderFieldId ETH_TYPE_ID = PiHeaderFieldId.of("ethernet", "etherType");
-
- private static final ImmutableBiMap<Criterion.Type, PiHeaderFieldId> CRITERION_MAP =
- new ImmutableBiMap.Builder<Criterion.Type, PiHeaderFieldId>()
- .put(Criterion.Type.IN_PORT, IN_PORT_ID)
- .put(Criterion.Type.ETH_DST, ETH_DST_ID)
- .put(Criterion.Type.ETH_SRC, ETH_SRC_ID)
- .put(Criterion.Type.ETH_TYPE, ETH_TYPE_ID)
- .build();
-
- private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = ImmutableBiMap.of(
- 0, PiTableId.of(TABLE0),
- 1, PiTableId.of(ECMP_GROUP_TABLE));
+ private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = new ImmutableBiMap.Builder<Integer, PiTableId>()
+ .put(0, PiTableId.of(TABLE0))
+ .put(1, PiTableId.of(ECMP_GROUP_TABLE))
+ .build();
@Override
public Optional<Integer> mapPiTableId(PiTableId piTableId) {
@@ -101,151 +44,7 @@
}
@Override
- public PiAction mapTreatment(TrafficTreatment treatment, PiTableId piTableId)
- throws PiInterpreterException {
-
- if (treatment.allInstructions().size() == 0) {
- // No instructions means drop for us.
- return actionWithName(DROP);
- } else if (treatment.allInstructions().size() > 1) {
- // Otherwise, we understand treatments with only 1 instruction.
- throw new PiPipelineInterpreter.PiInterpreterException("Treatment has multiple instructions");
- }
-
- Instruction instruction = treatment.allInstructions().get(0);
-
- switch (instruction.type()) {
- case OUTPUT:
- Instructions.OutputInstruction outInstruction = (Instructions.OutputInstruction) instruction;
- PortNumber port = outInstruction.port();
- if (!port.isLogical()) {
- return PiAction.builder()
- .withId(PiActionId.of(SET_EGRESS_PORT))
- .withParameter(new PiActionParam(PiActionParamId.of(PORT),
- ImmutableByteSequence.copyFrom(port.toLong())))
- .build();
- } else if (port.equals(CONTROLLER)) {
- return actionWithName(SEND_TO_CPU);
- } else {
- throw new PiInterpreterException("Egress on logical port not supported: " + port);
- }
- case NOACTION:
- return actionWithName(DROP);
- default:
- throw new PiInterpreterException("Instruction type not supported: " + instruction.type().name());
- }
- }
-
- private static PiAction actionWithName(String name) {
- return PiAction.builder().withId(PiActionId.of(name)).build();
- }
-
- @Override
- public Optional<PiHeaderFieldId> mapCriterionType(Criterion.Type type) {
- return Optional.ofNullable(CRITERION_MAP.get(type));
- }
-
- @Override
- public Optional<Criterion.Type> mapPiHeaderFieldId(PiHeaderFieldId headerFieldId) {
- return Optional.ofNullable(CRITERION_MAP.inverse().get(headerFieldId));
- }
-
- @Override
public Optional<PiTableId> mapFlowRuleTableId(int flowRuleTableId) {
return Optional.ofNullable(TABLE_MAP.get(flowRuleTableId));
}
-
- @Override
- public Collection<PiPacketOperation> mapOutboundPacket(OutboundPacket packet)
- throws PiInterpreterException {
- TrafficTreatment treatment = packet.treatment();
-
- // ecmp.p4 supports only OUTPUT instructions.
- List<Instructions.OutputInstruction> outInstructions = treatment.allInstructions()
- .stream()
- .filter(i -> i.type().equals(OUTPUT))
- .map(i -> (Instructions.OutputInstruction) i)
- .collect(toList());
-
- if (treatment.allInstructions().size() != outInstructions.size()) {
- // There are other instructions that are not of type OUTPUT
- throw new PiInterpreterException("Treatment not supported: " + treatment);
- }
-
- ImmutableList.Builder<PiPacketOperation> builder = ImmutableList.builder();
- for (Instructions.OutputInstruction outInst : outInstructions) {
- if (outInst.port().isLogical() && !outInst.port().equals(FLOOD)) {
- throw new PiInterpreterException("Logical port not supported: " +
- outInst.port());
- } else if (outInst.port().equals(FLOOD)) {
- // Since ecmp.p4 does not support flood for each port of the device
- // create a packet operation to send the packet out of that specific port
- for (Port port : handler().get(DeviceService.class).getPorts(packet.sendThrough())) {
- builder.add(createPiPacketOperation(packet.data(), port.number().toLong()));
- }
- } else {
- builder.add(createPiPacketOperation(packet.data(), outInst.port().toLong()));
- }
- }
- return builder.build();
- }
-
- private PiPacketOperation createPiPacketOperation(ByteBuffer data, long portNumber) throws PiInterpreterException {
- //create the metadata
- PiPacketMetadata metadata = createPacketMetadata(portNumber);
-
- //Create the Packet operation
- return PiPacketOperation.builder()
- .withType(PACKET_OUT)
- .withData(ImmutableByteSequence.copyFrom(data))
- .withMetadatas(ImmutableList.of(metadata))
- .build();
- }
-
- private PiPacketMetadata createPacketMetadata(long portNumber) throws PiInterpreterException {
- ImmutableByteSequence portValue = ImmutableByteSequence.copyFrom(portNumber);
- //FIXME remove hardcoded bitWidth and retrieve it from pipelineModel
- try {
- portValue = ImmutableByteSequence.fit(portValue, PORT_NUMBER_BIT_WIDTH);
- } catch (ImmutableByteSequence.ByteSequenceTrimException e) {
- throw new PiInterpreterException("Port number too big:" +
- portNumber + " causes " + e.getMessage());
- }
- return PiPacketMetadata.builder()
- .withId(PiPacketMetadataId.of(EGRESS_PORT))
- .withValue(portValue)
- .build();
- }
-
- @Override
- public InboundPacket mapInboundPacket(DeviceId deviceId, PiPacketOperation packetInOperation)
- throws PiInterpreterException {
- //We are assuming that the packet is ethernet type
- Ethernet ethPkt = new Ethernet();
-
- ethPkt.deserialize(packetInOperation.data().asArray(), 0, packetInOperation.data().size());
-
- //Returns the ingress port packet metadata
- Optional<PiPacketMetadata> packetMetadata = packetInOperation.metadatas()
- .stream().filter(metadata -> metadata.id().name().equals(INGRESS_PORT))
- .findFirst();
-
- if (packetMetadata.isPresent()) {
-
- // Obtaining the ingress port as an immutable byte sequence.
- ImmutableByteSequence portByteSequence = packetMetadata.get().value();
-
- // Converting immutableByteSequence to short.
- short s = portByteSequence.asReadOnlyBuffer().getShort();
-
- ConnectPoint receivedFrom = new ConnectPoint(deviceId, PortNumber.portNumber(s));
-
- //FIXME should be optimizable with .asReadOnlyBytebuffer
- ByteBuffer rawData = ByteBuffer.wrap(packetInOperation.data().asArray());
- return new DefaultInboundPacket(receivedFrom, ethPkt, rawData);
-
- } else {
- throw new PiInterpreterException("Can't get packet metadata for" + INGRESS_PORT);
- }
- }
}
\ No newline at end of file
diff --git a/apps/route-service/api/src/main/java/org/onosproject/routeservice/ResolvedRoute.java b/apps/route-service/api/src/main/java/org/onosproject/routeservice/ResolvedRoute.java
index 828827d..1795e1e 100644
--- a/apps/route-service/api/src/main/java/org/onosproject/routeservice/ResolvedRoute.java
+++ b/apps/route-service/api/src/main/java/org/onosproject/routeservice/ResolvedRoute.java
@@ -29,6 +29,7 @@
/**
* Represents a route with the next hop MAC address resolved.
*/
+// TODO Remove location from ResolvedRoute
public class ResolvedRoute {
private final Route route;
@@ -42,7 +43,9 @@
* @param route input route
* @param nextHopMac next hop MAC address
* @param location connect point where the next hop connects to
+ * @deprecated in 1.11 ("Loon")
*/
+ @Deprecated
public ResolvedRoute(Route route, MacAddress nextHopMac, ConnectPoint location) {
this(route, nextHopMac, VlanId.NONE, location);
}
@@ -54,7 +57,9 @@
* @param nextHopMac next hop MAC address
* @param nextHopVlan next hop VLAN ID
* @param location connect point where the next hop connects to
+ * @deprecated in 1.11 ("Loon")
*/
+ @Deprecated
public ResolvedRoute(Route route, MacAddress nextHopMac, VlanId nextHopVlan,
ConnectPoint location) {
this.route = route;
@@ -63,6 +68,31 @@
this.location = location;
}
+
+ /**
+ * Creates a new resolved route.
+ *
+ * @param route input route
+ * @param nextHopMac next hop MAC address
+ */
+ public ResolvedRoute(Route route, MacAddress nextHopMac) {
+ this(route, nextHopMac, VlanId.NONE);
+ }
+
+ /**
+ * Creates a new resolved route.
+ *
+ * @param route input route
+ * @param nextHopMac next hop MAC address
+ * @param nextHopVlan next hop VLAN ID
+ */
+ public ResolvedRoute(Route route, MacAddress nextHopMac, VlanId nextHopVlan) {
+ this.route = route;
+ this.nextHopMac = nextHopMac;
+ this.nextHopVlan = nextHopVlan;
+ this.location = null;
+ }
+
/**
* Returns the original route.
*
@@ -112,7 +142,9 @@
* Returns the next hop location.
*
* @return connect point where the next hop attaches to
+ * @deprecated in 1.11 ("Loon")
*/
+ @Deprecated
public ConnectPoint location() {
return location;
}
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
index e5db364..8914a1e 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteManager.java
@@ -197,7 +197,7 @@
private ResolvedRoute tryResolve(Route route) {
ResolvedRoute resolvedRoute = resolve(route);
if (resolvedRoute == null) {
- resolvedRoute = new ResolvedRoute(route, null, null);
+ resolvedRoute = new ResolvedRoute(route, null, null, null);
}
return resolvedRoute;
}
@@ -238,13 +238,9 @@
hostService.startMonitoringIp(route.nextHop());
Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
- Optional<Host> host = hosts.stream().findFirst();
- if (host.isPresent()) {
- return new ResolvedRoute(route, host.get().mac(), host.get().vlan(),
- host.get().location());
- } else {
- return null;
- }
+ return hosts.stream().findFirst()
+ .map(host -> new ResolvedRoute(route, host.mac(), host.vlan(), host.location()))
+ .orElse(null);
}
private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
diff --git a/apps/segmentrouting/BUCK b/apps/segmentrouting/BUCK
index 5e7a9d9..ae72b22 100644
--- a/apps/segmentrouting/BUCK
+++ b/apps/segmentrouting/BUCK
@@ -26,6 +26,12 @@
osgi_jar_with_tests (
deps = COMPILE_DEPS,
test_deps = TEST_DEPS,
+ # TODO Uncomment here when policy/tunnel are supported
+ #web_context = '/onos/segmentrouting',
+ #api_title = 'Segment Routing',
+ #api_version = '1.0',
+ #api_description = 'REST API for Segment Routing',
+ #api_package = 'org.onosproject.segmentrouting',
)
onos_app (
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
index e9517f1..50b0cd2 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
@@ -64,6 +64,7 @@
private static final int RETRY_INTERVAL_MS = 250;
private static final int RETRY_INTERVAL_SCALE = 1;
private static final long STABLITY_THRESHOLD = 10; //secs
+ private static final int UPDATE_INTERVAL = 5; //secs
private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class);
private SegmentRoutingManager srManager;
@@ -148,7 +149,7 @@
public boolean isRoutingStable() {
long last = (long) (lastRoutingChange.getMillis() / 1000.0);
long now = (long) (DateTime.now().getMillis() / 1000.0);
- log.debug("Routing stable since {}s", now - last);
+ log.trace("Routing stable since {}s", now - last);
return (now - last) > STABLITY_THRESHOLD;
}
@@ -364,6 +365,8 @@
return;
}
lastRoutingChange = DateTime.now();
+ executorService.schedule(new UpdateMaps(), UPDATE_INTERVAL,
+ TimeUnit.SECONDS);
statusLock.lock();
try {
@@ -402,34 +405,23 @@
// comparing all routes of existing ECMP SPG to new ECMP SPG
routeChanges = computeRouteChange();
- if (routeChanges != null) {
- // deal with linkUp of a seen-before link
- if (linkUp != null && srManager.isSeenLink(linkUp)) {
- if (!srManager.isBidirectional(linkUp)) {
- log.warn("Not a bidirectional link yet .. not "
- + "processing link {}", linkUp);
- srManager.updateSeenLink(linkUp, true);
- populationStatus = Status.ABORTED;
- return;
- }
- // link previously seen before
- // do hash-bucket changes instead of a re-route
- processHashGroupChange(routeChanges, false, null);
- // clear out routesChanges so a re-route is not attempted
- routeChanges = ImmutableSet.of();
+ // deal with linkUp of a seen-before link
+ if (linkUp != null && srManager.isSeenLink(linkUp)) {
+ if (!srManager.isBidirectional(linkUp)) {
+ log.warn("Not a bidirectional link yet .. not "
+ + "processing link {}", linkUp);
+ srManager.updateSeenLink(linkUp, true);
+ populationStatus = Status.ABORTED;
+ return;
}
-
- //deal with switchDown
- if (switchDown != null) {
- processHashGroupChange(routeChanges, true, switchDown);
- // clear out routesChanges so a re-route is not attempted
- routeChanges = ImmutableSet.of();
- }
-
- // for a linkUp of a never-seen-before link
- // let it fall through to a reroute of the routeChanges
-
+ // link previously seen before
+ // do hash-bucket changes instead of a re-route
+ processHashGroupChange(routeChanges, false, null);
+ // clear out routesChanges so a re-route is not attempted
+ routeChanges = ImmutableSet.of();
}
+ // for a linkUp of a never-seen-before link
+ // let it fall through to a reroute of the routeChanges
// now that we are past the check for a previously seen link
// it is safe to update the store for the linkUp
@@ -437,6 +429,12 @@
srManager.updateSeenLink(linkUp, true);
}
+ //deal with switchDown
+ if (switchDown != null) {
+ processHashGroupChange(routeChanges, true, switchDown);
+ // clear out routesChanges so a re-route is not attempted
+ routeChanges = ImmutableSet.of();
+ }
} else {
// link has gone down
// Compare existing ECMP SPG only with the link that went down
@@ -452,7 +450,6 @@
if (routeChanges == null) {
log.info("Optimized routing failed... opting for full reroute");
populationStatus = Status.ABORTED;
- statusLock.unlock();
populateAllRoutingRules();
return;
}
@@ -482,6 +479,7 @@
}
}
+
/**
* Processes a set a route-path changes by reprogramming routing rules and
* creating new hash-groups or editing them if necessary. This method also
@@ -533,8 +531,6 @@
return false; //abort routing and fail fast
}
- //XXX should we do hashgroupchanges here?
-
// update ecmpSPG for all edge-pairs
for (EdgePair ep : edgePairs) {
currentEcmpSpgMap.put(ep.dev1, updatedEcmpSpgMap.get(ep.dev1));
@@ -847,41 +843,51 @@
private void processHashGroupChange(Set<ArrayList<DeviceId>> routeChanges,
boolean linkOrSwitchFailed,
DeviceId failedSwitch) {
+ Set<ArrayList<DeviceId>> changedRoutes = new HashSet<>();
+ // first, ensure each routeChanges entry has two elements
for (ArrayList<DeviceId> route : routeChanges) {
- DeviceId targetSw = route.get(0);
- boolean success;
- DeviceId dstSw = null;
- if (route.size() > 1) {
- dstSw = route.get(1);
+ if (route.size() == 1) {
+ // route-path changes are from everyone else to this switch
+ DeviceId dstSw = route.get(0);
+ srManager.deviceService.getAvailableDevices().forEach(sw -> {
+ if (!sw.id().equals(dstSw)) {
+ changedRoutes.add(Lists.newArrayList(sw.id(), dstSw));
+ }
+ });
+ } else {
+ changedRoutes.add(route);
}
+ }
+ for (ArrayList<DeviceId> route : changedRoutes) {
+ DeviceId targetSw = route.get(0);
+ DeviceId dstSw = route.get(1);
if (linkOrSwitchFailed) {
- fixHashGroupsForRoute(route, true);
+ boolean success = fixHashGroupsForRoute(route, true);
// it's possible that we cannot fix hash groups for a route
// if the target switch has failed. Nevertheless the ecmp graph
// for the impacted switch must still be updated.
- if (failedSwitch != null && targetSw.equals(failedSwitch)
- && dstSw != null) {
+ if (!success && failedSwitch != null && targetSw.equals(failedSwitch)) {
currentEcmpSpgMap.put(dstSw, updatedEcmpSpgMap.get(dstSw));
currentEcmpSpgMap.remove(targetSw);
- log.debug("Updating ECMPspg for dst:{} removing failed "
+ log.debug("Updating ECMPspg for dst:{} removing failed switch "
+ "target:{}", dstSw, targetSw);
- return;
+ continue;
}
//linkfailed - update both sides
- currentEcmpSpgMap.put(targetSw, updatedEcmpSpgMap.get(targetSw));
- if (dstSw != null) {
- currentEcmpSpgMap.put(dstSw, updatedEcmpSpgMap.get(dstSw));
- }
- log.debug("Updating ECMPspg for dst:{} and target:{}", dstSw, targetSw);
- } else {
- success = fixHashGroupsForRoute(route, false);
if (success) {
currentEcmpSpgMap.put(targetSw, updatedEcmpSpgMap.get(targetSw));
- if (dstSw != null) {
- currentEcmpSpgMap.put(dstSw, updatedEcmpSpgMap.get(dstSw));
- }
- log.debug("Updating ECMPspg for target:{} and dst:{}",
+ currentEcmpSpgMap.put(dstSw, updatedEcmpSpgMap.get(dstSw));
+ log.debug("Updating ECMPspg for dst:{} and target:{} for linkdown",
+ dstSw, targetSw);
+ }
+ } else {
+ //linkup of seen before link
+ boolean success = fixHashGroupsForRoute(route, false);
+ if (success) {
+ currentEcmpSpgMap.put(targetSw, updatedEcmpSpgMap.get(targetSw));
+ currentEcmpSpgMap.put(dstSw, updatedEcmpSpgMap.get(dstSw));
+ log.debug("Updating ECMPspg for target:{} and dst:{} for linkup",
targetSw, dstSw);
}
}
@@ -908,48 +914,10 @@
return false;
}
DeviceId destSw = route.get(1);
- log.debug("Processing fixHashGroupsForRoute: Target {} -> Dest {}",
+ log.debug("* processing fixHashGroupsForRoute: Target {} -> Dest {}",
targetSw, destSw);
- boolean targetIsEdge = false;
- try {
- targetIsEdge = srManager.deviceConfiguration.isEdgeDevice(targetSw);
- } catch (DeviceConfigNotFoundException e) {
- log.warn(e.getMessage() + "Cannot determine if targetIsEdge {}.. "
- + "continuing fixHash", targetSw);
- }
-
// figure out the new next hops at the targetSw towards the destSw
- Set<DeviceId> nextHops = new HashSet<>();
- EcmpShortestPathGraph ecmpSpg = updatedEcmpSpgMap.get(destSw);
- HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
- ecmpSpg.getAllLearnedSwitchesAndVia();
- for (Integer itrIdx : switchVia.keySet()) {
- HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>> swViaMap =
- switchVia.get(itrIdx);
- for (DeviceId target : swViaMap.keySet()) {
- if (target.equals(targetSw)) {
- // found the iteration where targetSw is reached- get nextHops
- if (!targetIsEdge && itrIdx > 1) {
- // optimization for spines to not use other leaves to get
- // to a leaf to avoid loops
- log.debug("Avoiding {} hop path for non-edge targetSw:{}"
- + " --> dstSw:{}", itrIdx, targetSw, destSw);
- break;
- }
- for (ArrayList<DeviceId> via : swViaMap.get(target)) {
- if (via.isEmpty()) {
- // the dstSw is the next-hop from the targetSw
- nextHops.add(destSw);
- } else {
- // first elem is next-hop in each ECMP path
- nextHops.add(via.get(0));
- }
- }
- break;
- }
- }
- }
-
+ Set<DeviceId> nextHops = getNextHops(targetSw, destSw);
// call group handler to change hash group at targetSw
DefaultGroupHandler grpHandler = srManager.getGroupHandler(targetSw);
if (grpHandler == null) {
@@ -1010,7 +978,7 @@
* @param deviceId the device for which graphs need to be purged
*/
protected void purgeEcmpGraph(DeviceId deviceId) {
- currentEcmpSpgMap.remove(deviceId);
+ currentEcmpSpgMap.remove(deviceId); // XXX reconsider
if (updatedEcmpSpgMap != null) {
updatedEcmpSpgMap.remove(deviceId);
}
@@ -1030,64 +998,64 @@
* affected, or null if no previous ecmp spg was found for comparison
*/
private Set<ArrayList<DeviceId>> computeDamagedRoutes(Link linkFail) {
-
Set<ArrayList<DeviceId>> routes = new HashSet<>();
for (Device sw : srManager.deviceService.getDevices()) {
log.debug("Computing the impacted routes for device {} due to link fail",
sw.id());
- if (!srManager.mastershipService.isLocalMaster(sw.id())) {
- log.debug("No mastership for {} .. skipping route optimization",
- sw.id());
+ DeviceId retId = shouldHandleRouting(sw.id());
+ if (retId == null) {
continue;
}
- EcmpShortestPathGraph ecmpSpg = currentEcmpSpgMap.get(sw.id());
- if (ecmpSpg == null) {
- log.warn("No existing ECMP graph for switch {}. Aborting optimized"
- + " rerouting and opting for full-reroute", sw.id());
- return null;
- }
- if (log.isDebugEnabled()) {
- log.debug("Root switch: {}", sw.id());
- log.debug(" Current/Existing SPG: {}", ecmpSpg);
- log.debug(" New/Updated SPG: {}", updatedEcmpSpgMap.get(sw.id()));
- }
- HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
- ecmpSpg.getAllLearnedSwitchesAndVia();
- for (Integer itrIdx : switchVia.keySet()) {
- log.trace("Current/Exiting SPG Iterindex# {}", itrIdx);
- HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>> swViaMap =
- switchVia.get(itrIdx);
- for (DeviceId targetSw : swViaMap.keySet()) {
- DeviceId rootSw = sw.id();
- if (log.isTraceEnabled()) {
- log.trace("TargetSwitch {} --> RootSwitch {}", targetSw, rootSw);
+ Set<DeviceId> devicesToProcess = Sets.newHashSet(retId, sw.id());
+ for (DeviceId rootSw : devicesToProcess) {
+ EcmpShortestPathGraph ecmpSpg = currentEcmpSpgMap.get(rootSw);
+ if (ecmpSpg == null) {
+ log.warn("No existing ECMP graph for switch {}. Aborting optimized"
+ + " rerouting and opting for full-reroute", rootSw);
+ return null;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Root switch: {}", rootSw);
+ log.debug(" Current/Existing SPG: {}", ecmpSpg);
+ log.debug(" New/Updated SPG: {}", updatedEcmpSpgMap.get(rootSw));
+ }
+ HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>>
+ switchVia = ecmpSpg.getAllLearnedSwitchesAndVia();
+ // figure out if the broken link affected any route-paths in this graph
+ for (Integer itrIdx : switchVia.keySet()) {
+ log.trace("Current/Exiting SPG Iterindex# {}", itrIdx);
+ HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>> swViaMap =
+ switchVia.get(itrIdx);
+ for (DeviceId targetSw : swViaMap.keySet()) {
+ log.trace("TargetSwitch {} --> RootSwitch {}",
+ targetSw, rootSw);
for (ArrayList<DeviceId> via : swViaMap.get(targetSw)) {
log.trace(" Via:");
via.forEach(e -> log.trace(" {}", e));
}
- }
- Set<ArrayList<DeviceId>> subLinks =
- computeLinks(targetSw, rootSw, swViaMap);
- for (ArrayList<DeviceId> alink: subLinks) {
- if ((alink.get(0).equals(linkFail.src().deviceId()) &&
- alink.get(1).equals(linkFail.dst().deviceId()))
- ||
- (alink.get(0).equals(linkFail.dst().deviceId()) &&
- alink.get(1).equals(linkFail.src().deviceId()))) {
- log.debug("Impacted route:{}->{}", targetSw, rootSw);
- ArrayList<DeviceId> aRoute = new ArrayList<>();
- aRoute.add(targetSw); // switch with rules to populate
- aRoute.add(rootSw); // towards this destination
- routes.add(aRoute);
- break;
+ Set<ArrayList<DeviceId>> subLinks =
+ computeLinks(targetSw, rootSw, swViaMap);
+ for (ArrayList<DeviceId> alink: subLinks) {
+ if ((alink.get(0).equals(linkFail.src().deviceId()) &&
+ alink.get(1).equals(linkFail.dst().deviceId()))
+ ||
+ (alink.get(0).equals(linkFail.dst().deviceId()) &&
+ alink.get(1).equals(linkFail.src().deviceId()))) {
+ log.debug("Impacted route:{}->{}", targetSw, rootSw);
+ ArrayList<DeviceId> aRoute = new ArrayList<>();
+ aRoute.add(targetSw); // switch with rules to populate
+ aRoute.add(rootSw); // towards this destination
+ routes.add(aRoute);
+ break;
+ }
}
}
}
+
}
}
-
return routes;
}
@@ -1407,6 +1375,35 @@
}
}
+ /**
+ * Updates the currentEcmpSpgGraph for all devices.
+ */
+ private void updateEcmpSpgMaps() {
+ for (Device sw : srManager.deviceService.getDevices()) {
+ EcmpShortestPathGraph ecmpSpgUpdated =
+ new EcmpShortestPathGraph(sw.id(), srManager);
+ currentEcmpSpgMap.put(sw.id(), ecmpSpgUpdated);
+ }
+ }
+
+ /**
+ * Ensures routing is stable before updating all ECMP SPG graphs.
+ *
+ * TODO: CORD-1843 will ensure maps are updated faster, potentially while
+ * topology/routing is still unstable
+ */
+ private final class UpdateMaps implements Runnable {
+ @Override
+ public void run() {
+ if (isRoutingStable()) {
+ updateEcmpSpgMaps();
+ } else {
+ executorService.schedule(new UpdateMaps(), UPDATE_INTERVAL,
+ TimeUnit.SECONDS);
+ }
+ }
+ }
+
//////////////////////////////////////
// Filtering rule creation
//////////////////////////////////////
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
index 2113e71..23bbf07 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
@@ -29,7 +29,6 @@
import org.onlab.packet.ndp.NeighborSolicitation;
import org.onosproject.net.neighbour.NeighbourMessageContext;
import org.onosproject.net.neighbour.NeighbourMessageType;
-import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -165,12 +164,15 @@
// The source might be an indirectly attached host (e.g. behind a router)
// Lookup the route store for the nexthop instead.
if (destRouterAddress == null) {
- Optional<ResolvedRoute> nexthop = srManager.routeService.longestPrefixLookup(destIpAddress);
- if (nexthop.isPresent()) {
+ Optional<DeviceId> deviceId = srManager.routeService
+ .longestPrefixLookup(destIpAddress).map(srManager::nextHopLocations)
+ .flatMap(locations -> locations.stream().findFirst())
+ .map(ConnectPoint::deviceId);
+ if (deviceId.isPresent()) {
try {
- destRouterAddress = config.getRouterIpv4(nexthop.get().location().deviceId());
+ destRouterAddress = config.getRouterIpv4(deviceId.get());
} catch (DeviceConfigNotFoundException e) {
- log.warn("Device config not found. Abort ICMP processing");
+ log.warn("Device config for {} not found. Abort ICMP processing", deviceId);
return;
}
}
@@ -240,12 +242,15 @@
// The source might be an indirect host behind a router.
// Lookup the route store for the nexthop instead.
if (destRouterAddress == null) {
- Optional<ResolvedRoute> nexthop = srManager.routeService.longestPrefixLookup(destIpAddress);
- if (nexthop.isPresent()) {
+ Optional<DeviceId> deviceId = srManager.routeService
+ .longestPrefixLookup(destIpAddress).map(srManager::nextHopLocations)
+ .flatMap(locations -> locations.stream().findFirst())
+ .map(ConnectPoint::deviceId);
+ if (deviceId.isPresent()) {
try {
- destRouterAddress = config.getRouterIpv6(nexthop.get().location().deviceId());
+ destRouterAddress = config.getRouterIpv6(deviceId.get());
} catch (DeviceConfigNotFoundException e) {
- log.warn("Device config not found. Abort ICMPv6 processing");
+ log.warn("Device config for {} not found. Abort ICMPv6 processing", deviceId);
return;
}
}
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/RouteHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
index c82b39b..214ba6b 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
@@ -22,9 +22,9 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onosproject.net.ConnectPoint;
import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.RouteEvent;
-import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,22 +38,24 @@
private static final Logger log = LoggerFactory.getLogger(RouteHandler.class);
private final SegmentRoutingManager srManager;
- public RouteHandler(SegmentRoutingManager srManager) {
+ RouteHandler(SegmentRoutingManager srManager) {
this.srManager = srManager;
}
protected void init(DeviceId deviceId) {
- srManager.routeService.getRouteTables().forEach(routeTableId -> {
- srManager.routeService.getRoutes(routeTableId).forEach(routeInfo -> {
- routeInfo.allRoutes().stream()
- .filter(resolvedRoute -> resolvedRoute.location() != null &&
- resolvedRoute.location().deviceId().equals(deviceId))
- .forEach(this::processRouteAddedInternal);
- });
- });
+ srManager.routeService.getRouteTables().forEach(routeTableId ->
+ srManager.routeService.getRoutes(routeTableId).forEach(routeInfo ->
+ routeInfo.allRoutes().forEach(resolvedRoute ->
+ srManager.nextHopLocations(resolvedRoute).stream()
+ .filter(location -> deviceId.equals(location.deviceId()))
+ .forEach(location -> processRouteAddedInternal(resolvedRoute)
+ )
+ )
+ )
+ );
}
- protected void processRouteAdded(RouteEvent event) {
+ void processRouteAdded(RouteEvent event) {
log.info("processRouteAdded {}", event);
processRouteAddedInternal(event.subject());
}
@@ -67,7 +69,12 @@
IpPrefix prefix = route.prefix();
MacAddress nextHopMac = route.nextHopMac();
VlanId nextHopVlan = route.nextHopVlan();
- ConnectPoint location = route.location();
+ ConnectPoint location = srManager.nextHopLocations(route).stream().findFirst().orElse(null);
+
+ if (location == null) {
+ log.info("{} ignored. Cannot find nexthop location", prefix);
+ return;
+ }
srManager.deviceConfiguration.addSubnet(location, prefix);
// XXX need to handle the case where there are two connectpoints
@@ -77,13 +84,13 @@
nextHopMac, nextHopVlan, location.port());
}
- protected void processRouteUpdated(RouteEvent event) {
+ void processRouteUpdated(RouteEvent event) {
log.info("processRouteUpdated {}", event);
processRouteRemovedInternal(event.prevSubject());
processRouteAddedInternal(event.subject());
}
- protected void processRouteRemoved(RouteEvent event) {
+ void processRouteRemoved(RouteEvent event) {
log.info("processRouteRemoved {}", event);
processRouteRemovedInternal(event.subject());
}
@@ -97,7 +104,12 @@
IpPrefix prefix = route.prefix();
MacAddress nextHopMac = route.nextHopMac();
VlanId nextHopVlan = route.nextHopVlan();
- ConnectPoint location = route.location();
+ ConnectPoint location = srManager.nextHopLocations(route).stream().findFirst().orElse(null);
+
+ if (location == null) {
+ log.info("{} ignored. Cannot find nexthop location", prefix);
+ return;
+ }
srManager.deviceConfiguration.removeSubnet(location, prefix);
srManager.defaultRoutingHandler.revokeSubnet(ImmutableSet.of(prefix));
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index e00d865..6be675a 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +51,8 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
import org.onosproject.net.Link;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
@@ -84,6 +87,7 @@
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.PathService;
import org.onosproject.net.topology.TopologyService;
+import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.RouteEvent;
import org.onosproject.routeservice.RouteListener;
import org.onosproject.routeservice.RouteService;
@@ -696,6 +700,19 @@
}
/**
+ * Returns locations of given resolved route.
+ *
+ * @param resolvedRoute resolved route
+ * @return locations of nexthop. Might be empty if next hop is not found
+ */
+ Set<ConnectPoint> nextHopLocations(ResolvedRoute resolvedRoute) {
+ HostId hostId = HostId.hostId(resolvedRoute.nextHopMac(), resolvedRoute.nextHopVlan());
+ return Optional.ofNullable(hostService.getHost(hostId))
+ .map(Host::locations).orElse(Sets.newHashSet())
+ .stream().map(l -> (ConnectPoint) l).collect(Collectors.toSet());
+ }
+
+ /**
* Returns vlan port map of given device.
*
* @param deviceId device id
@@ -1106,10 +1123,6 @@
private void processLinkAdded(Link link) {
log.info("** LINK ADDED {}", link.toString());
- if (!deviceConfiguration.isConfigured(link.src().deviceId())) {
- log.warn("Source device of this link is not configured..not processing");
- return;
- }
if (link.type() != Link.Type.DIRECT) {
// NOTE: A DIRECT link might be transiently marked as INDIRECT
// if BDDP is received before LLDP. We can safely ignore that
@@ -1118,6 +1131,13 @@
link.src(), link.dst(), link.type());
return;
}
+ if (!deviceConfiguration.isConfigured(link.src().deviceId())) {
+ updateSeenLink(link, true);
+ // XXX revisit - what about devicePortMap
+ log.warn("Source device of this link is not configured.. "
+ + "not processing further");
+ return;
+ }
//Irrespective of whether the local is a MASTER or not for this device,
//create group handler instance and push default TTP flow rules if needed,
@@ -1127,6 +1147,7 @@
if (groupHandler != null) {
groupHandler.portUpForLink(link);
} else {
+ // XXX revisit/cleanup
Device device = deviceService.getDevice(link.src().deviceId());
if (device != null) {
log.warn("processLinkAdded: Link Added "
@@ -1189,7 +1210,12 @@
if (groupHandler != null) {
if (mastershipService.isLocalMaster(link.src().deviceId()) &&
isParallelLink(link)) {
+ log.debug("* retrying hash for parallel link removed:{}", link);
groupHandler.retryHash(link, true, false);
+ } else {
+ log.debug("Not attempting retry-hash for link removed: {} .. {}", link,
+ (mastershipService.isLocalMaster(link.src().deviceId()))
+ ? "not parallel" : "not master");
}
// ensure local stores are updated
groupHandler.portDown(link.src().port());
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/config/DeviceConfiguration.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/config/DeviceConfiguration.java
index d0dfa5e..11a8425 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/config/DeviceConfiguration.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/config/DeviceConfiguration.java
@@ -267,7 +267,6 @@
public MacAddress getDeviceMac(DeviceId deviceId) throws DeviceConfigNotFoundException {
SegmentRouterInfo srinfo = deviceConfigMap.get(deviceId);
if (srinfo != null) {
- log.trace("getDeviceMac for device{} is {}", deviceId, srinfo.mac);
return srinfo.mac;
} else {
String message = "getDeviceMac fails for device: " + deviceId + ".";
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index 6f06d60..1a9a80e 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -15,9 +15,9 @@
*/
package org.onosproject.segmentrouting.grouphandler;
-
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
@@ -236,9 +236,9 @@
* not seen-before
*/
public void retryHash(Link link, boolean linkDown, boolean firstTime) {
- MacAddress dstMac;
+ MacAddress neighborMac;
try {
- dstMac = deviceConfig.getDeviceMac(link.dst().deviceId());
+ neighborMac = deviceConfig.getDeviceMac(link.dst().deviceId());
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting retryHash.");
return;
@@ -264,124 +264,153 @@
int nextId = nextHops.nextId();
Set<DeviceId> dstSet = nextHops.getDstForNextHop(link.dst().deviceId());
if (!linkDown) {
- dstSet.forEach(dst -> {
- int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
- addToHashedNextObjective(link.src().port(), dstMac,
- edgeLabel, nextId);
- });
-
+ List<PortLabel> pl = Lists.newArrayList();
if (firstTime) {
// some links may have come up before the next-objective was created
// we take this opportunity to ensure other ports to same next-hop-dst
// are part of the hash group (see CORD-1180). Duplicate additions
// to the same hash group are avoided by the driver.
for (PortNumber p : devicePortMap.get(link.dst().deviceId())) {
- if (p.equals(link.src().port())) {
- continue;
- }
dstSet.forEach(dst -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
- addToHashedNextObjective(p, dstMac, edgeLabel, nextId);
+ pl.add(new PortLabel(p, edgeLabel));
});
}
+ addToHashedNextObjective(pl, neighborMac, nextId);
+ } else {
+ // handle only the port that came up
+ dstSet.forEach(dst -> {
+ int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
+ pl.add(new PortLabel(link.src().port(), edgeLabel));
+ });
+ addToHashedNextObjective(pl, neighborMac, nextId);
}
} else {
+ // linkdown
+ List<PortLabel> pl = Lists.newArrayList();
dstSet.forEach(dst -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
- removeFromHashedNextObjective(link.src().port(), dstMac,
- edgeLabel, nextId);
+ pl.add(new PortLabel(link.src().port(), edgeLabel));
});
+ removeFromHashedNextObjective(pl, neighborMac, nextId);
}
}
}
/**
- * Makes a call to the FlowObjective service to add a single bucket to
- * a hashed group.
+ * Utility class for associating output ports and the corresponding MPLS
+ * labels to push. In dual-homing, there are different labels to push
+ * corresponding to the destination switches in an edge-pair. If both
+ * destinations are reachable via the same spine, then the output-port to
+ * the spine will be associated with two labels i.e. there will be two
+ * PortLabel objects for the same port but with different labels.
+ */
+ private class PortLabel {
+ PortNumber port;
+ int edgeLabel;
+
+ PortLabel(PortNumber port, int edgeLabel) {
+ this.port = port;
+ this.edgeLabel = edgeLabel;
+ }
+
+ @Override
+ public String toString() {
+ return port.toString() + "/" + String.valueOf(edgeLabel);
+ }
+ }
+
+ /**
+ * Makes a call to the FlowObjective service to add buckets to
+ * a hashed group. User must ensure that all the ports & labels are meant
+ * same neighbor (ie. dstMac).
*
- * @param outport port to add to hash group
+ * @param portLables a collection of port & label combinations to add
+ * to the hash group identified by the nextId
* @param dstMac destination mac address of next-hop
- * @param edgeLabel the label to use in the bucket
- * @param nextId id for next-objective to which the bucket will be added
+ * @param nextId id for next-objective to which buckets will be added
*
*/
- private void addToHashedNextObjective(PortNumber outport, MacAddress dstMac,
- int edgeLabel, Integer nextId) {
- // Create the new bucket to be updated
- TrafficTreatment.Builder tBuilder =
- DefaultTrafficTreatment.builder();
- tBuilder.setOutput(outport)
- .setEthDst(dstMac)
- .setEthSrc(nodeMacAddr);
- if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
- tBuilder.pushMpls()
- .copyTtlOut()
- .setMpls(MplsLabel.mplsLabel(edgeLabel));
- }
+ private void addToHashedNextObjective(Collection<PortLabel> portLabels,
+ MacAddress dstMac, Integer nextId) {
// setup metadata to pass to nextObjective - indicate the vlan on egress
// if needed by the switch pipeline. Since hashed next-hops are always to
// other neighboring routers, there is no subnet assigned on those ports.
TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
metabuilder.matchVlanId(INTERNAL_VLAN);
-
NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
.withId(nextId)
.withType(NextObjective.Type.HASHED)
- .addTreatment(tBuilder.build())
.withMeta(metabuilder.build())
.fromApp(appId);
- log.debug("addToHash in device {}: Adding Bucket with port/label {}/{} "
- + "to nextId {}", deviceId, outport, edgeLabel, nextId);
+ // Create the new buckets to be updated
+ portLabels.forEach(pl -> {
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ tBuilder.setOutput(pl.port)
+ .setEthDst(dstMac)
+ .setEthSrc(nodeMacAddr);
+ if (pl.edgeLabel != DestinationSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .copyTtlOut()
+ .setMpls(MplsLabel.mplsLabel(pl.edgeLabel));
+ }
+ nextObjBuilder.addTreatment(tBuilder.build());
+ });
+
+ log.debug("addToHash in device {}: Adding Bucket with port/label {} "
+ + "to nextId {}", deviceId, portLabels, nextId);
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("addToHash addedTo NextObj {} on {}",
- nextId, deviceId),
+ (objective) -> log.debug("addToHash port/label {} addedTo "
+ + "NextObj {} on {}", portLabels, nextId, deviceId),
(objective, error) ->
- log.warn("addToHash failed to addTo NextObj {} on {}: {}",
+ log.warn("addToHash failed to add port/label {} to"
+ + " NextObj {} on {}: {}", portLabels,
nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.addToExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
}
/**
- * Makes a call to the FlowObjective service to remove a single bucket from
- * a hashed group.
+ * Makes a call to the FlowObjective service to remove buckets from
+ * a hash group. User must ensure that all the ports & labels are meant
+ * same neighbor (ie. dstMac).
*
- * @param port port to remove from hash group
+ * @param portLables a collection of port & label combinations to remove
+ * from the hash group identified by the nextId
* @param dstMac destination mac address of next-hop
- * @param edgeLabel the label to use in the bucket
- * @param nextId id for next-objective from which the bucket will be removed
+ * @param nextId id for next-objective from which buckets will be removed
*/
- private void removeFromHashedNextObjective(PortNumber port, MacAddress dstMac,
- int edgeLabel, Integer nextId) {
- // Create the bucket to be removed
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
- .builder();
- tBuilder.setOutput(port)
- .setEthDst(dstMac)
- .setEthSrc(nodeMacAddr);
- if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
- tBuilder.pushMpls()
- .copyTtlOut()
- .setMpls(MplsLabel.mplsLabel(edgeLabel));
- }
- log.info("{} in device {}: Removing Bucket with Port {} to next object id {}",
- "removeFromHash", deviceId, port, nextId);
+ private void removeFromHashedNextObjective(Collection<PortLabel> portLabels,
+ MacAddress dstMac, Integer nextId) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder()
.withType(NextObjective.Type.HASHED) //same as original
.withId(nextId)
- .fromApp(appId)
- .addTreatment(tBuilder.build());
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("port {} removedFrom NextObj {} on {}",
- port, nextId, deviceId),
- (objective, error) ->
- log.warn("port {} failed to removeFrom NextObj {} on {}: {}",
- port, nextId, deviceId, error));
- NextObjective nextObjective = nextObjBuilder.
- removeFromExisting(context);
+ .fromApp(appId);
+ // Create the buckets to be removed
+ portLabels.forEach(pl -> {
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ tBuilder.setOutput(pl.port)
+ .setEthDst(dstMac)
+ .setEthSrc(nodeMacAddr);
+ if (pl.edgeLabel != DestinationSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .copyTtlOut()
+ .setMpls(MplsLabel.mplsLabel(pl.edgeLabel));
+ }
+ nextObjBuilder.addTreatment(tBuilder.build());
+ });
+ log.debug("removeFromHash in device {}: Removing Bucket with port/label"
+ + " {} from nextId {}", deviceId, portLabels, nextId);
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("port/label {} removedFrom NextObj"
+ + " {} on {}", portLabels, nextId, deviceId),
+ (objective, error) ->
+ log.warn("port/label {} failed to removeFrom NextObj {} on "
+ + "{}: {}", portLabels, nextId, deviceId, error));
+ NextObjective nextObjective = nextObjBuilder.removeFromExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
}
@@ -405,7 +434,7 @@
// temporary storage of keys to be updated
Map<DestinationSetNextObjectiveStoreKey, Set<DeviceId>> tempStore =
new HashMap<>();
- boolean foundNextObjective = false;
+ boolean foundNextObjective = false, success = true;
// retrieve hash-groups meant for destSw, which have destinationSets
// with different neighbors than the given next-hops
@@ -432,44 +461,17 @@
+ "hops:{} ..adding {}", targetSw, destSw, nextId,
currNeighbors, diff);
}
- for (DeviceId neighbor : diff) {
- MacAddress dstMac;
- try {
- dstMac = deviceConfig.getDeviceMac(neighbor);
- } catch (DeviceConfigNotFoundException e) {
- log.warn(e.getMessage() + " Aborting fixHashGroup for nextId:"
- + nextId);
- return false;
- }
- if (devicePortMap.get(neighbor) == null ||
- devicePortMap.get(neighbor).isEmpty()) {
- log.warn("No ports found in dev:{} for neighbor:{} .. cannot "
- + "fix hash group for nextId: {}",
- deviceId, neighbor, nextId);
- return false;
- }
+ boolean suc = updateAllPortsToNextHop(diff, edgeLabel, nextId,
+ revoke);
+ if (suc) {
+ // to update neighbor set with changes made
if (revoke) {
- for (PortNumber port : devicePortMap.get(neighbor)) {
- log.info("fixHashGroup in device {}: Removing Bucket "
- + "with Port {} to next object id {}",
- deviceId, port, nextId);
- removeFromHashedNextObjective(port, dstMac,
- edgeLabel,
- nextId);
- }
- // to update neighbor set with changes made
tempStore.put(dskey, Sets.difference(currNeighbors, diff));
} else {
- for (PortNumber port : devicePortMap.get(neighbor)) {
- log.info("fixHashGroup in device {}: Adding Bucket "
- + "with Port {} to next object id {}",
- deviceId, port, nextId);
- addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
- }
- // to update neighbor set with changes made
tempStore.put(dskey, Sets.union(currNeighbors, diff));
}
}
+ success &= suc;
}
if (!foundNextObjective) {
@@ -480,18 +482,29 @@
// update the dsNextObjectiveStore with new destinationSet to nextId mappings
for (DestinationSetNextObjectiveStoreKey key : tempStore.keySet()) {
- NextNeighbors oldHops = dsNextObjStore.get(key);
- if (oldHops == null) {
+ NextNeighbors currentNextHops = dsNextObjStore.get(key);
+ if (currentNextHops == null) {
+ log.warn("fixHashGroups could not update global store in "
+ + "device {} .. missing nextNeighbors for key {}",
+ deviceId, key);
continue;
}
- Set<DeviceId> newNeighbors = tempStore.get(key);
- Set<DeviceId> oldNeighbors = ImmutableSet.copyOf(oldHops.nextHops(destSw));
- oldHops.dstNextHops().put(destSw, newNeighbors);
- log.debug("Updating nsNextObjStore: oldHops:{} -> newHops:{} :: nextId:{}",
- oldNeighbors, newNeighbors, oldHops.nextId());
+ Set<DeviceId> newNeighbors = new HashSet<>();
+ newNeighbors.addAll(tempStore.get(key));
+ Map<DeviceId, Set<DeviceId>> oldDstNextHops =
+ ImmutableMap.copyOf(currentNextHops.dstNextHops());
+ currentNextHops.dstNextHops().put(destSw, newNeighbors); //local change
+ log.debug("Updating nsNextObjStore target:{} -> dst:{} in key:{} nextId:{}",
+ targetSw, destSw, key, currentNextHops.nextId());
+ log.debug("Old dstNextHops: {}", oldDstNextHops);
+ log.debug("New dstNextHops: {}", currentNextHops.dstNextHops());
+ // update global store
+ dsNextObjStore.put(key,
+ new NextNeighbors(currentNextHops.dstNextHops(),
+ currentNextHops.nextId()));
}
-
- return true;
+ // even if one fails and others succeed, return false so ECMPspg not updated
+ return success;
}
/**
@@ -543,7 +556,9 @@
}
/**
- * Adds or removes buckets for all ports to a set of neighbor devices.
+ * Adds or removes buckets for all ports to a set of neighbor devices. Caller
+ * needs to ensure that the given neighbors are all next hops towards the
+ * same destination (represented by the given edgeLabel).
*
* @param neighbors set of neighbor device ids
* @param edgeLabel MPLS label to use in buckets
@@ -556,37 +571,33 @@
private boolean updateAllPortsToNextHop(Set<DeviceId> neighbors, int edgeLabel,
int nextId, boolean revoke) {
for (DeviceId neighbor : neighbors) {
- MacAddress dstMac;
+ MacAddress neighborMac;
try {
- dstMac = deviceConfig.getDeviceMac(neighbor);
+ neighborMac = deviceConfig.getDeviceMac(neighbor);
} catch (DeviceConfigNotFoundException e) {
- log.warn(e.getMessage() + " Aborting fixHashGroup for nextId:"
- + nextId);
+ log.warn(e.getMessage() + " Aborting updateAllPortsToNextHop"
+ + " for nextId:" + nextId);
return false;
}
- if (devicePortMap.get(neighbor) == null ||
- devicePortMap.get(neighbor).isEmpty()) {
+ Collection<PortNumber> portsToNeighbor = devicePortMap.get(neighbor);
+ if (portsToNeighbor == null || portsToNeighbor.isEmpty()) {
log.warn("No ports found in dev:{} for neighbor:{} .. cannot "
- + "fix hash group for nextId: {}",
+ + "updateAllPortsToNextHop for nextId: {}",
deviceId, neighbor, nextId);
return false;
}
+ List<PortLabel> pl = Lists.newArrayList();
+ portsToNeighbor.forEach(p -> pl.add(new PortLabel(p, edgeLabel)));
if (revoke) {
- for (PortNumber port : devicePortMap.get(neighbor)) {
- log.debug("fixHashGroup in device {}: Removing Bucket "
- + "with Port {} edgeLabel:{} to next object id {}",
- deviceId, port, edgeLabel, nextId);
- removeFromHashedNextObjective(port, dstMac,
- edgeLabel,
- nextId);
- }
+ log.debug("updateAllPortsToNextHops in device {}: Removing Bucket(s) "
+ + "with Port/Label:{} to next object id {}",
+ deviceId, pl, nextId);
+ removeFromHashedNextObjective(pl, neighborMac, nextId);
} else {
- for (PortNumber port : devicePortMap.get(neighbor)) {
- log.debug("fixHashGroup in device {}: Adding Bucket "
- + "with Port {} edgeLabel: {} to next object id {}",
- deviceId, port, edgeLabel, nextId);
- addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
- }
+ log.debug("fixHashGroup in device {}: Adding Bucket(s) "
+ + "with Port/Label: {} to next object id {}",
+ deviceId, pl, nextId);
+ addToHashedNextObjective(pl, neighborMac, nextId);
}
}
return true;
@@ -1124,8 +1135,13 @@
/**
- *
- *
+ * Performs bucket verification operation for all hash groups in this device.
+ * Checks RouteHandler to ensure that routing is stable before attempting
+ * verification. Verification involves creating a nextObjective with
+ * operation VERIFY for existing next objectives in the store, and passing
+ * it to the driver. It is the driver that actually performs the verification
+ * by adding or removing buckets to match the verification next objective
+ * created here.
*/
protected final class BucketCorrector implements Runnable {
Integer nextId;
@@ -1152,7 +1168,7 @@
}
rh.acquireRoutingLock();
try {
- log.debug("running bucket corrector for dev: {}", deviceId);
+ log.trace("running bucket corrector for dev: {}", deviceId);
Set<DestinationSetNextObjectiveStoreKey> dsKeySet = dsNextObjStore.entrySet()
.stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
@@ -1167,7 +1183,7 @@
if (nextId != null && nextId != nid) {
continue;
}
- log.debug("bkt-corr: dsNextObjStore for device {}: {}",
+ log.trace("bkt-corr: dsNextObjStore for device {}: {}",
deviceId, dsKey, next);
TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
metabuilder.matchVlanId(INTERNAL_VLAN);
@@ -1189,7 +1205,7 @@
return;
}
devicePortMap.get(neighbor).forEach(port -> {
- log.debug("verify in device {} nextId {}: bucket with"
+ log.trace("verify in device {} nextId {}: bucket with"
+ " port/label {}/{} to dst {} via {}",
deviceId, nid, port, edgeLabel,
dstDev, neighbor);
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/NextNeighbors.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/NextNeighbors.java
index 1a0507b..eaca1aa 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/NextNeighbors.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/NextNeighbors.java
@@ -25,27 +25,60 @@
import org.onosproject.net.DeviceId;
+/**
+ * Represents the nexthop information associated with a route-path towards a
+ * set of destinations.
+ */
public class NextNeighbors {
private final Map<DeviceId, Set<DeviceId>> dstNextHops;
private final int nextId;
+ /**
+ * Constructor.
+ *
+ * @param dstNextHops map of destinations and the next-hops towards each dest
+ * @param nextId id of nextObjective that manifests the next-hop info
+ */
public NextNeighbors(Map<DeviceId, Set<DeviceId>> dstNextHops, int nextId) {
this.dstNextHops = dstNextHops;
this.nextId = nextId;
}
+ /**
+ * Returns a map of destinations and the next-hops towards them.
+ *
+ * @return map of destinations and the next-hops towards them
+ */
public Map<DeviceId, Set<DeviceId>> dstNextHops() {
return dstNextHops;
}
+ /**
+ * Set of next-hops towards the given destination.
+ *
+ * @param deviceId the destination
+ * @return set of nexthops towards the destination
+ */
public Set<DeviceId> nextHops(DeviceId deviceId) {
return dstNextHops.get(deviceId);
}
+ /**
+ * Return the nextId representing the nextObjective towards the next-hops.
+ *
+ * @return nextId representing the nextObjective towards the next-hops
+ */
public int nextId() {
return nextId;
}
+ /**
+ * Checks if the given nextHopId is a valid next hop to any one of the
+ * destinations.
+ *
+ * @param nextHopId the deviceId for the next hop
+ * @return true if given next
+ */
public boolean containsNextHop(DeviceId nextHopId) {
for (Set<DeviceId> nextHops : dstNextHops.values()) {
if (nextHops != null && nextHops.contains(nextHopId)) {
@@ -55,6 +88,14 @@
return false;
}
+ /**
+ * Returns a set of destinations which have the given nextHopId as one
+ * of the next-hops to that destination.
+ *
+ * @param nextHopId the deviceId for the next hop
+ * @return set of deviceIds that have the given nextHopId as a next-hop
+ * which could be empty if no destinations were found
+ */
public Set<DeviceId> getDstForNextHop(DeviceId nextHopId) {
Set<DeviceId> dstSet = new HashSet<>();
for (DeviceId dstKey : dstNextHops.keySet()) {
diff --git a/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/MockHostService.java b/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/MockHostService.java
index 1bd593c..e8c4701 100644
--- a/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/MockHostService.java
+++ b/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/MockHostService.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableSet;
import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
import org.onosproject.net.host.HostServiceAdapter;
import java.util.Set;
@@ -36,4 +37,9 @@
public Set<Host> getHosts() {
return hosts;
}
+
+ @Override
+ public Host getHost(HostId hostId) {
+ return hosts.stream().filter(host -> hostId.equals(host.id())).findFirst().orElse(null);
+ }
}
diff --git a/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java b/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
index fc8c294..383f800 100644
--- a/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
+++ b/apps/segmentrouting/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
@@ -25,11 +25,15 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultHost;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
import org.onosproject.net.config.NetworkConfigRegistryAdapter;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.intf.Interface;
+import org.onosproject.net.provider.ProviderId;
import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.Route;
import org.onosproject.routeservice.RouteEvent;
@@ -62,19 +66,24 @@
private static final VlanId V1 = VlanId.vlanId((short) 1);
private static final ConnectPoint CP1 = ConnectPoint.deviceConnectPoint("of:0000000000000001/1");
private static final Route R1 = new Route(Route.Source.STATIC, P1, N1);
- private static final ResolvedRoute RR1 = new ResolvedRoute(R1, M1, V1, CP1);
+ private static final ResolvedRoute RR1 = new ResolvedRoute(R1, M1, V1);
private static final IpAddress N2 = IpAddress.valueOf("10.0.2.254");
private static final MacAddress M2 = MacAddress.valueOf("00:00:00:00:00:02");
private static final VlanId V2 = VlanId.vlanId((short) 2);
private static final ConnectPoint CP2 = ConnectPoint.deviceConnectPoint("of:0000000000000001/2");
private static final Route R2 = new Route(Route.Source.STATIC, P1, N2);
- private static final ResolvedRoute RR2 = new ResolvedRoute(R2, M2, V2, CP2);
+ private static final ResolvedRoute RR2 = new ResolvedRoute(R2, M2, V2);
private static final RouteInfo RI1 = new RouteInfo(P1, RR1, Sets.newHashSet(RR1));
+ private static final Host H1 = new DefaultHost(ProviderId.NONE, HostId.hostId(M1, V1), M1, V1,
+ Sets.newHashSet(new HostLocation(CP1, 0)), Sets.newHashSet(N1), false);
+ private static final Host H2 = new DefaultHost(ProviderId.NONE, HostId.hostId(M2, V2), M2, V2,
+ Sets.newHashSet(new HostLocation(CP2, 0)), Sets.newHashSet(N2), false);
+
// A set of hosts
- private static final Set<Host> HOSTS = Sets.newHashSet();
+ private static final Set<Host> HOSTS = Sets.newHashSet(H1, H2);
// A set of devices of which we have mastership
private static final Set<DeviceId> LOCAL_DEVICES = Sets.newHashSet();
// A set of interfaces
@@ -127,7 +136,11 @@
routeHandler.init(CP1.deviceId());
assertEquals(1, ROUTING_TABLE.size());
- assertNotNull(ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1)));
+ MockRoutingTableValue rtv1 = ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1));
+ assertEquals(M1, rtv1.macAddress);
+ assertEquals(V1, rtv1.vlanId);
+ assertEquals(CP1.port(), rtv1.portNumber);
+
assertEquals(1, SUBNET_TABLE.size());
assertTrue(SUBNET_TABLE.get(CP1).contains(P1));
}
@@ -138,7 +151,11 @@
routeHandler.processRouteAdded(re);
assertEquals(1, ROUTING_TABLE.size());
- assertNotNull(ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1)));
+ MockRoutingTableValue rtv1 = ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1));
+ assertEquals(M1, rtv1.macAddress);
+ assertEquals(V1, rtv1.vlanId);
+ assertEquals(CP1.port(), rtv1.portNumber);
+
assertEquals(1, SUBNET_TABLE.size());
assertTrue(SUBNET_TABLE.get(CP1).contains(P1));
}
@@ -151,7 +168,11 @@
routeHandler.processRouteUpdated(re);
assertEquals(1, ROUTING_TABLE.size());
- assertNotNull(ROUTING_TABLE.get(new MockRoutingTableKey(CP2.deviceId(), P1)));
+ MockRoutingTableValue rtv2 = ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1));
+ assertEquals(M2, rtv2.macAddress);
+ assertEquals(V2, rtv2.vlanId);
+ assertEquals(CP2.port(), rtv2.portNumber);
+
assertEquals(1, SUBNET_TABLE.size());
assertTrue(SUBNET_TABLE.get(CP2).contains(P1));
}
@@ -166,5 +187,4 @@
assertEquals(0, ROUTING_TABLE.size());
assertEquals(0, SUBNET_TABLE.size());
}
-
}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
new file mode 100644
index 0000000..f1d4fec
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Test asynchronous document tree.
+ */
+public class TestAsyncDocumentTree<V> implements AsyncDocumentTree<V> {
+ private final DocumentTree<V> tree;
+
+ public TestAsyncDocumentTree(String name) {
+ this.tree = new TestDocumentTree<>(name);
+ }
+
+ @Override
+ public String name() {
+ return tree.name();
+ }
+
+ @Override
+ public DocumentPath root() {
+ return tree.root();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.getChildren(path));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.get(path));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.set(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.create(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.createRecursive(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+ return CompletableFuture.completedFuture(tree.replace(path, newValue, version));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+ return CompletableFuture.completedFuture(tree.replace(path, newValue, currentValue));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.removeNode(path));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+ tree.addListener(path, listener);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+ tree.removeListener(listener);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public DocumentTree<V> asDocumentTree() {
+ return tree;
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index 7bfab0f..b097e2a 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -21,7 +21,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Set;
@@ -169,13 +168,9 @@
if ("file".equals(url.getProtocol())) {
File file = new File(metadataUrl.replaceFirst("file://", ""));
return file.exists();
- } else if ("http".equals(url.getProtocol())) {
- try (InputStream file = url.openStream()) {
- return true;
- }
} else {
- // Unsupported protocol
- return false;
+ // Return true for HTTP URLs since we allow blocking until HTTP servers come up
+ return "http".equals(url.getProtocol());
}
} catch (Exception e) {
log.warn("Exception accessing metadata file at {}:", metadataUrl, e);
@@ -184,6 +179,7 @@
}
private Versioned<ClusterMetadata> blockForMetadata(String metadataUrl) {
+ int iterations = 0;
for (;;) {
Versioned<ClusterMetadata> metadata = fetchMetadata(metadataUrl);
if (metadata != null) {
@@ -191,7 +187,7 @@
}
try {
- Thread.sleep(10);
+ Thread.sleep(Math.min((int) Math.pow(2, ++iterations) * 10, 1000));
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
@@ -209,6 +205,10 @@
metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
} else if ("http".equals(url.getProtocol())) {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ if (conn.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+ log.warn("Could not reach metadata URL {}. Retrying...", url);
+ return null;
+ }
if (conn.getResponseCode() == HttpURLConnection.HTTP_NO_CONTENT) {
return null;
}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslationServiceImpl.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslationServiceImpl.java
index b999197..c55a57c 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslationServiceImpl.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslationServiceImpl.java
@@ -61,7 +61,7 @@
public PiTableEntry translate(FlowRule rule, PiPipeconf pipeconf)
throws PiFlowRuleTranslationException {
- Device device = deviceService.getDevice(rule.deviceId());
+ final Device device = deviceService.getDevice(rule.deviceId());
if (device == null) {
throw new PiFlowRuleTranslationException("Unable to get device " + rule.deviceId());
}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslator.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslator.java
index 8d8d3c1..dc6d1c5 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslator.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslator.java
@@ -81,18 +81,24 @@
PiPipelineModel pipelineModel = pipeconf.pipelineModel();
// Retrieve interpreter, if any.
- // FIXME: get interpreter via driver once implemented.
- // final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
- // ? device.as(PiPipelineInterpreter.class) : null;
-
final PiPipelineInterpreter interpreter;
- try {
- interpreter = (PiPipelineInterpreter) pipeconf.implementation(PiPipelineInterpreter.class)
- .orElse(null)
- .newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new PiFlowRuleTranslationException(format(
- "Unable to instantiate interpreter of pipeconf %s", pipeconf.id()));
+
+ if (device != null) {
+ interpreter = device.is(PiPipelineInterpreter.class) ? device.as(PiPipelineInterpreter.class) : null;
+ } else {
+ // The case of device == null should be admitted only during unit testing.
+ // In any other case, the interpreter should be constructed using the device.as() method to make sure that
+ // behaviour's handler/data attributes are correctly populated.
+ // FIXME: modify test class PiFlowRuleTranslatorTest to avoid passing null device
+ // I.e. we need to create a device object that supports is/as method for obtaining the interpreter.
+ log.warn("translateFlowRule() called with device == null, is this a unit test?");
+ try {
+ interpreter = (PiPipelineInterpreter) pipeconf.implementation(PiPipelineInterpreter.class)
+ .orElse(null)
+ .newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(format("Unable to instantiate interpreter of pipeconf %s", pipeconf.id()));
+ }
}
PiTableId piTableId;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7b6ae55..3382abe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -19,9 +19,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -69,6 +72,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -108,12 +112,13 @@
RETRY.setStackTrace(new StackTraceElement[0]);
}
+ private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final int MAX_RETRY_DELAY_MILLIS = 50;
private static final String FLOW_TABLE = "onos-flow-table";
- private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+ private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -145,8 +150,11 @@
new InternalTableStatsListener();
private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+ private ScheduledExecutorService scheduledExecutor;
private ExecutorService messageHandlingExecutor;
+ private final Random random = new Random();
+ private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
private IdGenerator idGenerator;
private NodeId local;
@@ -157,6 +165,10 @@
local = clusterService.getLocalNode().id();
+ scheduledExecutor = Executors.newScheduledThreadPool(
+ SCHEDULED_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/flow", "schedulers", log));
+
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/flow", "message-handlers", log));
@@ -170,16 +182,16 @@
.build();
deviceTableStats.addListener(tableStatsListener);
- flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+ asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
.withName(FLOW_TABLE)
.withSerializer(serializer)
- .buildDocumentTree()
- .asDocumentTree();
+ .buildDocumentTree();
+ flows = asyncFlows.asDocumentTree();
clusterCommunicator.addSubscriber(
- APPLY_FLOWS,
+ APPLY_BATCH_FLOWS,
serializer::decode,
- this::applyFlows,
+ this::applyBatchFlows,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(
COMPLETE_BATCH,
@@ -194,9 +206,10 @@
public void deactivate() {
deviceTableStats.removeListener(tableStatsListener);
deviceTableStats.destroy();
- clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+ clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
messageHandlingExecutor.shutdownNow();
+ scheduledExecutor.shutdownNow();
log.info("Stopped");
}
@@ -220,6 +233,52 @@
}
/**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
+ return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
+ }
+
+ /**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param future future to be completed once the operation has been successful
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(
+ Supplier<CompletableFuture<T>> supplier,
+ CompletableFuture<T> future) {
+ supplier.get().whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(result);
+ } else {
+ Throwable cause = error.getCause() != null ? error.getCause() : error;
+ if (cause instanceof StorageException.ConcurrentModification) {
+ scheduledExecutor.schedule(
+ () -> retryAsyncUntilSuccess(supplier, future),
+ random.nextInt(50),
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(error);
+ }
+ }
+ });
+ return future;
+ }
+
+ /**
* Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
* retried after a randomized delay.
*
@@ -232,19 +291,6 @@
}
/**
- * Handles a flow rule batch event forwarded to the master node.
- * <p>
- * If this node is the master for the associated device, notifies event listeners to install flow rules.
- *
- * @param event the event to handle
- */
- private void applyFlows(FlowRuleBatchEvent event) {
- if (mastershipService.isLocalMaster(event.deviceId())) {
- notifyDelegate(event);
- }
- }
-
- /**
* Handles a completed batch event received from the master node.
* <p>
* If this node is the source of the batch, notifies event listeners to complete the operations.
@@ -338,106 +384,159 @@
if (master == null) {
log.warn("No master for {} ", deviceId);
- updateStoreInternal(operation);
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ updateStoreInternal(operation).whenComplete((result, error) -> {
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ });
return;
}
pendingBatches.add(operation.id());
- Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
- if (currentOps.isEmpty()) {
- batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
- } else if (Objects.equals(local, master)) {
- notifyDelegate(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()));
+ // If the local node is the master, apply the flows. Otherwise, send them to the master.
+ if (Objects.equals(local, master)) {
+ applyBatchFlows(operation);
} else {
- clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()),
- APPLY_FLOWS,
+ log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
+ clusterCommunicator.unicast(
+ operation,
+ APPLY_BATCH_FLOWS,
serializer::encode,
master);
}
}
- private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
- return operation.getOperations().stream().map(
- op -> {
- switch (op.operator()) {
- case ADD:
- addBatchEntry(op);
- return op;
- case REMOVE:
- if (removeBatchEntry(op)) {
- return op;
- }
- return null;
- case MODIFY:
- //TODO: figure this out at some point
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
- }
- ).filter(Objects::nonNull).collect(Collectors.toSet());
- }
-
- @SuppressWarnings("unchecked")
- private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
- StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
- DocumentPath path = getPathFor(entry.deviceId(), entry.id());
- retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- entries.put(entry, entry);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Stored new flow rule: {}", entry);
- return null;
+ /**
+ * Asynchronously applies a batch of flows to the store.
+ * <p>
+ * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
+ * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
+ * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
+ * underlying {@code DocumentTree} primitive.
+ */
+ private void applyBatchFlows(FlowRuleBatchOperation operation) {
+ updateStoreInternal(operation).whenComplete((operations, error) -> {
+ if (error == null) {
+ if (operations.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
} else {
- log.trace("Failed to store new flow rule: {}", entry);
- return retry();
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest(operation.id(), operations),
+ operation.deviceId()));
}
- } else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
}
});
}
+ private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
+ return Tools.allOf(operation.getOperations().stream().map(op -> {
+ switch (op.operator()) {
+ case ADD:
+ return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case REMOVE:
+ return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case MODIFY:
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ }
+ }).collect(Collectors.toList()))
+ .thenApply(results -> results.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()));
+ }
+
@SuppressWarnings("unchecked")
- private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
- FlowRule rule = batchEntry.target();
- DocumentPath path = getPathFor(rule.deviceId(), rule.id());
- return retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- StoredFlowEntry entry = entries.get(rule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
- return true;
+ private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
+ StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+ DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ entries.put(entry, entry);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
+ if (replaceError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(replaceError);
+ }
+ });
} else {
- log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
- return retry();
+ // If there are no entries stored for the device, initialize the device's flows.
+ Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
+ map.put(entry, entry);
+ asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
+ if (createError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(createError);
+ }
+ });
}
} else {
- return false;
+ future.completeExceptionally(getError);
}
- } else {
- return false;
- }
+ });
+ return future;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+ FlowRule rule = batchEntry.target();
+ DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ StoredFlowEntry entry = entries.get(rule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
+ if (error == null) {
+ if (succeeded) {
+ log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.completeExceptionally(getError);
+ }
+ });
+ return future;
});
}
@@ -528,9 +627,7 @@
return null;
}
} else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
+ return null;
}
});
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
index d49f1cc..8144d22 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
@@ -42,11 +42,9 @@
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.service.AsyncDocumentTree;
-import org.onosproject.store.service.AsyncDocumentTreeAdapter;
-import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.DocumentTreeBuilder;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestDocumentTree;
+import org.onosproject.store.service.TestAsyncDocumentTree;
import org.onosproject.store.service.TestStorageService;
import org.onosproject.store.service.TestTopic;
import org.onosproject.store.service.Topic;
@@ -145,13 +143,7 @@
@Override
@SuppressWarnings("unchecked")
public AsyncDocumentTree<V> build() {
- String name = name();
- return new AsyncDocumentTreeAdapter() {
- @Override
- public DocumentTree asDocumentTree() {
- return new TestDocumentTree(name);
- }
- };
+ return new TestAsyncDocumentTree<>(name());
}
};
}
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index a08afb2..544be38 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.0-raft-final</version>
+ <version>2.0.0</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index ed46a96..8f1ffa3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -247,8 +247,8 @@
AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
.withName(name)
.withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
- .withReadConsistency(ReadConsistency.LINEARIZABLE)
- .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
.withTimeout(Duration.ofSeconds(30))
.withMaxRetries(5)
.build()
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index fcf6dc7..477b466 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -20,6 +20,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
import org.onlab.packet.ChassisId;
import org.onlab.packet.EthType;
import org.onlab.packet.Ip4Address;
@@ -272,6 +274,7 @@
LinkedHashSet.class
)
.register(HashMultiset.class)
+ .register(Sets.class)
.register(Maps.immutableEntry("a", "b").getClass())
.register(new ArraysAsListSerializer(), Arrays.asList().getClass())
.register(Collections.singletonList(1).getClass())
diff --git a/drivers/bmv2/BUCK b/drivers/bmv2/BUCK
index 8938d7a..b13a673 100644
--- a/drivers/bmv2/BUCK
+++ b/drivers/bmv2/BUCK
@@ -6,6 +6,7 @@
'//protocols/p4runtime/api:onos-protocols-p4runtime-api',
'//incubator/bmv2/model:onos-incubator-bmv2-model',
'//drivers/default:onos-drivers-default',
+ '//drivers/p4runtime:onos-drivers-p4runtime',
'//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
'//lib:grpc-netty-' + GRPC_VER,
]
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java
deleted file mode 100644
index 3fc7287..0000000
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.drivers.bmv2;
-
-import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.ImmutableList;
-import org.onlab.packet.Ethernet;
-import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.packet.DefaultInboundPacket;
-import org.onosproject.net.packet.InboundPacket;
-import org.onosproject.net.packet.OutboundPacket;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
-import org.onosproject.net.pi.runtime.PiAction;
-import org.onosproject.net.pi.runtime.PiActionId;
-import org.onosproject.net.pi.runtime.PiActionParam;
-import org.onosproject.net.pi.runtime.PiActionParamId;
-import org.onosproject.net.pi.runtime.PiHeaderFieldId;
-import org.onosproject.net.pi.runtime.PiPacketMetadata;
-import org.onosproject.net.pi.runtime.PiPacketMetadataId;
-import org.onosproject.net.pi.runtime.PiPacketOperation;
-import org.onosproject.net.pi.runtime.PiTableId;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-import static java.util.stream.Collectors.toList;
-import static org.onosproject.net.PortNumber.CONTROLLER;
-import static org.onosproject.net.PortNumber.FLOOD;
-import static org.onosproject.net.flow.instructions.Instruction.Type.OUTPUT;
-import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
-
-/**
- * Interpreter implementation for the default pipeconf.
- */
-public class Bmv2DefaultInterpreter extends AbstractHandlerBehaviour implements PiPipelineInterpreter {
- private static final String TABLE0 = "table0";
- private static final String SEND_TO_CPU = "send_to_cpu";
- private static final String PORT = "port";
- private static final String DROP = "_drop";
- private static final String SET_EGRESS_PORT = "set_egress_port";
- private static final String EGRESS_PORT = "egress_port";
- private static final int PORT_NUMBER_BIT_WIDTH = 9;
-
- private static final PiHeaderFieldId IN_PORT_ID = PiHeaderFieldId.of("standard_metadata", "ingress_port");
- private static final PiHeaderFieldId ETH_DST_ID = PiHeaderFieldId.of("ethernet", "dstAddr");
- private static final PiHeaderFieldId ETH_SRC_ID = PiHeaderFieldId.of("ethernet", "srcAddr");
- private static final PiHeaderFieldId ETH_TYPE_ID = PiHeaderFieldId.of("ethernet", "etherType");
-
- private static final ImmutableBiMap<Criterion.Type, PiHeaderFieldId> CRITERION_MAP =
- new ImmutableBiMap.Builder<Criterion.Type, PiHeaderFieldId>()
- .put(Criterion.Type.IN_PORT, IN_PORT_ID)
- .put(Criterion.Type.ETH_DST, ETH_DST_ID)
- .put(Criterion.Type.ETH_SRC, ETH_SRC_ID)
- .put(Criterion.Type.ETH_TYPE, ETH_TYPE_ID)
- .build();
-
- private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = ImmutableBiMap.of(
- 0, PiTableId.of(TABLE0));
- public static final String INGRESS_PORT = "ingress_port";
-
-
- @Override
- public PiAction mapTreatment(TrafficTreatment treatment, PiTableId piTableId) throws PiInterpreterException {
-
- if (treatment.allInstructions().size() == 0) {
- // No instructions means drop for us.
- return actionWithName(DROP);
- } else if (treatment.allInstructions().size() > 1) {
- // Otherwise, we understand treatments with only 1 instruction.
- throw new PiPipelineInterpreter.PiInterpreterException("Treatment has multiple instructions");
- }
-
- Instruction instruction = treatment.allInstructions().get(0);
-
- switch (instruction.type()) {
- case OUTPUT:
- Instructions.OutputInstruction outInstruction = (Instructions.OutputInstruction) instruction;
- PortNumber port = outInstruction.port();
- if (!port.isLogical()) {
- return PiAction.builder()
- .withId(PiActionId.of(SET_EGRESS_PORT))
- .withParameter(new PiActionParam(PiActionParamId.of(PORT),
- ImmutableByteSequence.copyFrom(port.toLong())))
- .build();
- } else if (port.equals(CONTROLLER)) {
- return actionWithName(SEND_TO_CPU);
- } else {
- throw new PiInterpreterException("Egress on logical port not supported: " + port);
- }
- case NOACTION:
- return actionWithName(DROP);
- default:
- throw new PiInterpreterException("Instruction type not supported: " + instruction.type().name());
- }
- }
-
- @Override
- public Collection<PiPacketOperation> mapOutboundPacket(OutboundPacket packet)
- throws PiInterpreterException {
- TrafficTreatment treatment = packet.treatment();
-
- // default.p4 supports only OUTPUT instructions.
- List<Instructions.OutputInstruction> outInstructions = treatment.allInstructions()
- .stream()
- .filter(i -> i.type().equals(OUTPUT))
- .map(i -> (Instructions.OutputInstruction) i)
- .collect(toList());
-
- if (treatment.allInstructions().size() != outInstructions.size()) {
- // There are other instructions that are not of type OUTPUT
- throw new PiInterpreterException("Treatment not supported: " + treatment);
- }
-
- ImmutableList.Builder<PiPacketOperation> builder = ImmutableList.builder();
- for (Instructions.OutputInstruction outInst : outInstructions) {
- if (outInst.port().isLogical() && !outInst.port().equals(FLOOD)) {
- throw new PiInterpreterException("Logical port not supported: " +
- outInst.port());
- } else if (outInst.port().equals(FLOOD)) {
- //Since default.p4 does not support flood for each port of the device
- // create a packet operation to send the packet out of that specific port
- for (Port port : handler().get(DeviceService.class).getPorts(packet.sendThrough())) {
- builder.add(createPiPacketOperation(packet.data(), port.number().toLong()));
- }
- } else {
- builder.add(createPiPacketOperation(packet.data(), outInst.port().toLong()));
- }
- }
- return builder.build();
- }
-
- @Override
- public InboundPacket mapInboundPacket(DeviceId deviceId, PiPacketOperation packetIn)
- throws PiInterpreterException {
-
- //We are assuming that the packet is ethernet type
- Ethernet ethPkt = new Ethernet();
-
- ethPkt.deserialize(packetIn.data().asArray(), 0, packetIn.data().size());
-
- //Returns the ingress port packet metadata
- Optional<PiPacketMetadata> packetMetadata = packetIn.metadatas()
- .stream().filter(metadata -> metadata.id().name().equals(INGRESS_PORT))
- .findFirst();
-
- if (packetMetadata.isPresent()) {
-
- //Obtaining the ingress port as an immutable byte sequence
- ImmutableByteSequence portByteSequence = packetMetadata.get().value();
-
- //Converting immutableByteSequence to short
- short s = portByteSequence.asReadOnlyBuffer().getShort();
-
- ConnectPoint receivedFrom = new ConnectPoint(deviceId, PortNumber.portNumber(s));
-
- //FIXME should be optimizable with .asReadOnlyBytebuffer
- ByteBuffer rawData = ByteBuffer.wrap(packetIn.data().asArray());
- return new DefaultInboundPacket(receivedFrom, ethPkt, rawData);
-
- } else {
- throw new PiInterpreterException("Can't get packet metadata for" + INGRESS_PORT);
- }
- }
-
- private PiPacketOperation createPiPacketOperation(ByteBuffer data, long portNumber) throws PiInterpreterException {
- //create the metadata
- PiPacketMetadata metadata = createPacketMetadata(portNumber);
-
- //Create the Packet operation
- return PiPacketOperation.builder()
- .withType(PACKET_OUT)
- .withData(ImmutableByteSequence.copyFrom(data))
- .withMetadatas(ImmutableList.of(metadata))
- .build();
- }
-
- private PiPacketMetadata createPacketMetadata(long portNumber) throws PiInterpreterException {
- ImmutableByteSequence portValue = ImmutableByteSequence.copyFrom(portNumber);
- //FIXME remove hardcoded bitWidth and retrieve it from pipelineModel
- try {
- portValue = ImmutableByteSequence.fit(portValue, PORT_NUMBER_BIT_WIDTH);
- } catch (ImmutableByteSequence.ByteSequenceTrimException e) {
- throw new PiInterpreterException("Port number too big: {}" +
- portNumber + " causes " + e.getMessage());
- }
- return PiPacketMetadata.builder()
- .withId(PiPacketMetadataId.of(EGRESS_PORT))
- .withValue(portValue)
- .build();
- }
-
- /**
- * Returns an action instance with no runtime parameters.
- */
- private PiAction actionWithName(String name) {
- return PiAction.builder().withId(PiActionId.of(name)).build();
- }
-
- @Override
- public Optional<PiHeaderFieldId> mapCriterionType(Criterion.Type type) {
- return Optional.ofNullable(CRITERION_MAP.get(type));
- }
-
- @Override
- public Optional<Criterion.Type> mapPiHeaderFieldId(PiHeaderFieldId headerFieldId) {
- return Optional.ofNullable(CRITERION_MAP.inverse().get(headerFieldId));
- }
-
- @Override
- public Optional<PiTableId> mapFlowRuleTableId(int flowRuleTableId) {
- return Optional.ofNullable(TABLE_MAP.get(flowRuleTableId));
- }
-
- @Override
- public Optional<Integer> mapPiTableId(PiTableId piTableId) {
- return Optional.ofNullable(TABLE_MAP.inverse().get(piTableId));
- }
-}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
index e04e8b3..3dba8b6 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
@@ -18,6 +18,7 @@
import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
import org.onosproject.driver.pipeline.DefaultSingleTablePipeline;
+import org.onosproject.drivers.p4runtime.DefaultP4Interpreter;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.pi.model.DefaultPiPipeconf;
@@ -33,7 +34,7 @@
/**
* Factory of pipeconf implementation for the default.p4 program on BMv2.
*/
-final class Bmv2DefaultPipeconfFactory {
+public final class Bmv2DefaultPipeconfFactory {
private static final String PIPECONF_ID = "bmv2-default-pipeconf";
private static final String JSON_PATH = "/default.json";
@@ -45,7 +46,7 @@
// Hides constructor.
}
- static PiPipeconf get() {
+ public static PiPipeconf get() {
return PIPECONF;
}
@@ -57,7 +58,7 @@
return DefaultPiPipeconf.builder()
.withId(new PiPipeconfId(PIPECONF_ID))
.withPipelineModel(Bmv2PipelineModelParser.parse(jsonUrl))
- .addBehaviour(PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class)
+ .addBehaviour(PiPipelineInterpreter.class, DefaultP4Interpreter.class)
.addBehaviour(Pipeliner.class, DefaultSingleTablePipeline.class)
.addBehaviour(PortStatisticsDiscovery.class, Bmv2DefaultPortStatisticsDiscovery.class)
.addExtension(P4_INFO_TEXT, p4InfoUrl)
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index 4f16cff..24ababc 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -42,6 +42,7 @@
import org.onosproject.net.flow.criteria.VlanIdCriterion;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.Instructions.GroupInstruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
@@ -1497,7 +1498,6 @@
List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
List<TrafficTreatment> bucketsToCreate = Lists.newArrayList();
List<Integer> indicesToRemove = Lists.newArrayList();
- // XXX verify empty group
for (TrafficTreatment bkt : nextObjective.next()) {
PortNumber portNumber = readOutPortFromTreatment(bkt);
int label = readLabelFromTreatment(bkt);
@@ -1544,6 +1544,100 @@
removeBucket(chainsToRemove, nextObjective);
}
+ if (bucketsToCreate.isEmpty() && indicesToRemove.isEmpty()) {
+ // flowObjective store record is in-sync with nextObjective passed-in
+ // Nevertheless groupStore may not be in sync due to bug in the store
+ // - see CORD-1844. XXX When this bug is fixed, the rest of this verify
+ // method will not be required.
+ GroupKey hashGroupKey = allActiveKeys.get(0).peekFirst();
+ Group hashGroup = groupService.getGroup(deviceId, hashGroupKey);
+ int actualGroupSize = hashGroup.buckets().buckets().size();
+ int objGroupSize = nextObjective.next().size();
+ if (actualGroupSize != objGroupSize) {
+ log.warn("Mismatch detected in device:{}, nextId:{}, nextObjective-size"
+ + ":{} group-size:{} .. correcting", deviceId, nextObjective.id(),
+ objGroupSize, actualGroupSize);
+ }
+ if (actualGroupSize > objGroupSize) {
+ List<GroupBucket> bucketsToRemove = Lists.newArrayList();
+ //check every bucket in the actual group
+ for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+ GroupInstruction g = (GroupInstruction) bucket.treatment()
+ .allInstructions().iterator().next();
+ GroupId gidToCheck = g.groupId(); // the group pointed to
+ boolean matches = false;
+ for (Deque<GroupKey> validChain : allActiveKeys) {
+ if (validChain.size() < 2) {
+ continue;
+ }
+ GroupKey pointedGroupKey = validChain.stream()
+ .collect(Collectors.toList()).get(1);
+ Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
+ if (pointedGroup != null && gidToCheck.equals(pointedGroup.id())) {
+ matches = true;
+ break;
+ }
+ }
+ if (!matches) {
+ log.warn("Removing bucket pointing to groupId:{}", gidToCheck);
+ bucketsToRemove.add(bucket);
+ }
+ }
+ // remove buckets for which there was no record in the obj store
+ if (bucketsToRemove.isEmpty()) {
+ log.warn("Mismatch detected but could not determine which"
+ + "buckets to remove");
+ } else {
+ GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
+ groupService.removeBucketsFromGroup(deviceId, hashGroupKey,
+ removeBuckets, hashGroupKey,
+ nextObjective.appId());
+ }
+ } else if (actualGroupSize < objGroupSize) {
+ // should also add buckets not in group-store but in obj-store
+ List<GroupBucket> bucketsToAdd = Lists.newArrayList();
+ //check every bucket in the obj
+ for (Deque<GroupKey> validChain : allActiveKeys) {
+ if (validChain.size() < 2) {
+ continue;
+ }
+ GroupKey pointedGroupKey = validChain.stream()
+ .collect(Collectors.toList()).get(1);
+ Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
+ if (pointedGroup == null) {
+ // group should exist, otherwise cannot be added as bucket
+ continue;
+ }
+ boolean matches = false;
+ for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+ GroupInstruction g = (GroupInstruction) bucket.treatment()
+ .allInstructions().iterator().next();
+ GroupId gidToCheck = g.groupId(); // the group pointed to
+ if (pointedGroup.id().equals(gidToCheck)) {
+ matches = true;
+ break;
+ }
+ }
+ if (!matches) {
+ log.warn("Adding bucket pointing to groupId:{}", pointedGroup);
+ TrafficTreatment t = DefaultTrafficTreatment.builder()
+ .group(pointedGroup.id())
+ .build();
+ bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t));
+ }
+ }
+ if (bucketsToAdd.isEmpty()) {
+ log.warn("Mismatch detected but could not determine which "
+ + "buckets to add");
+ } else {
+ GroupBuckets addBuckets = new GroupBuckets(bucketsToAdd);
+ groupService.addBucketsToGroup(deviceId, hashGroupKey,
+ addBuckets, hashGroupKey,
+ nextObjective.appId());
+ }
+ }
+ }
+
pass(nextObjective);
}
@@ -1733,7 +1827,6 @@
private class InnerGroupListener implements GroupListener {
@Override
public void event(GroupEvent event) {
- log.trace("received group event of type {}", event.type());
switch (event.type()) {
case GROUP_ADDED:
processPendingAddGroupsOrNextObjs(event.subject().appCookie(), true);
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4Interpreter.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4Interpreter.java
new file mode 100644
index 0000000..85167eb
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4Interpreter.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime;
+
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableList;
+import org.onlab.packet.Ethernet;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.pi.model.PiHeaderFieldModel;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
+import org.onosproject.net.pi.model.PiPipelineModel;
+import org.onosproject.net.pi.model.PiTableModel;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionId;
+import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.net.pi.runtime.PiActionParamId;
+import org.onosproject.net.pi.runtime.PiHeaderFieldId;
+import org.onosproject.net.pi.runtime.PiPacketMetadata;
+import org.onosproject.net.pi.runtime.PiPacketMetadataId;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
+import org.onosproject.net.pi.runtime.PiTableId;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.onlab.util.ImmutableByteSequence.fit;
+import static org.onosproject.net.PortNumber.CONTROLLER;
+import static org.onosproject.net.PortNumber.FLOOD;
+import static org.onosproject.net.flow.instructions.Instruction.Type.OUTPUT;
+import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
+
+/**
+ * Implementation of an interpreter that can be used for any P4 program based on default.p4 (i.e. those under
+ * onos/tools/test/p4src).
+ */
+public class DefaultP4Interpreter extends AbstractHandlerBehaviour implements PiPipelineInterpreter {
+
+ // FIXME: Should move this class out of the p4runtime drivers.
+ // e.g. in a dedicated onos/pipeconf directory, along with any related P4 source code.
+
+ public static final String TABLE0 = "table0";
+ public static final String SEND_TO_CPU = "send_to_cpu";
+ public static final String PORT = "port";
+ public static final String DROP = "drop";
+ public static final String SET_EGRESS_PORT = "set_egress_port";
+ public static final String EGRESS_PORT = "egress_port";
+ public static final String INGRESS_PORT = "ingress_port";
+
+ protected static final PiHeaderFieldId ETH_DST_ID = PiHeaderFieldId.of("ethernet", "dstAddr");
+ protected static final PiHeaderFieldId ETH_SRC_ID = PiHeaderFieldId.of("ethernet", "srcAddr");
+ protected static final PiHeaderFieldId ETH_TYPE_ID = PiHeaderFieldId.of("ethernet", "etherType");
+
+ private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = ImmutableBiMap.of(
+ 0, PiTableId.of(TABLE0));
+
+ private boolean targetAttributesInitialized = false;
+
+ /*
+ The following attributes are target-specific, i.e. they might change from one target to another.
+ */
+ private ImmutableBiMap<Criterion.Type, PiHeaderFieldId> criterionMap;
+ private int portFieldBitWidth;
+
+ /**
+ * Populates target-specific attributes based on this device's pipeline model.
+ */
+ private synchronized void initTargetSpecificAttributes() {
+ if (targetAttributesInitialized) {
+ return;
+ }
+
+ DeviceId deviceId = this.handler().data().deviceId();
+ PiPipeconfService pipeconfService = this.handler().get(PiPipeconfService.class);
+ PiPipeconfId pipeconfId = pipeconfService.ofDevice(deviceId)
+ .orElseThrow(() -> new RuntimeException(format(
+ "Unable to get current pipeconf for device %s", this.data().deviceId())));
+ PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId)
+ .orElseThrow(() -> new RuntimeException(format(
+ "Pipeconf %s is not registered", pipeconfId)));
+ PiPipelineModel model = pipeconf.pipelineModel();
+
+ this.portFieldBitWidth = extractPortFieldBitWidth(model);
+ this.criterionMap = new ImmutableBiMap.Builder<Criterion.Type, PiHeaderFieldId>()
+ .put(Criterion.Type.IN_PORT, extractInPortFieldId(model))
+ .put(Criterion.Type.ETH_DST, ETH_DST_ID)
+ .put(Criterion.Type.ETH_SRC, ETH_SRC_ID)
+ .put(Criterion.Type.ETH_TYPE, ETH_TYPE_ID)
+ .build();
+
+ this.targetAttributesInitialized = true;
+ }
+
+ private static PiHeaderFieldId extractInPortFieldId(PiPipelineModel model) {
+ /*
+ For the targets we currently support, the field name is "ingress_port", but we miss the header name, which is
+ target-specific. We know table0 defines that field as a match key, we look for it and we get the header name.
+ */
+ PiTableModel tableModel = model.table(TABLE0).orElseThrow(() -> new RuntimeException(format(
+ "No such table '%s' in pipeline model", TABLE0)));
+ PiHeaderFieldModel fieldModel = tableModel.matchFields().stream()
+ .filter(m -> m.field().type().name().equals(INGRESS_PORT))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException(format(
+ "No such match field in table '%s' with name '%s'", TABLE0, INGRESS_PORT)))
+ .field();
+ return PiHeaderFieldId.of(fieldModel.header().name(), INGRESS_PORT);
+ }
+
+ private static int extractPortFieldBitWidth(PiPipelineModel model) {
+ /*
+ Get it form the set_egress_port action parameters.
+ */
+ return model
+ .action(SET_EGRESS_PORT).orElseThrow(() -> new RuntimeException(format(
+ "No such action '%s' in pipeline model", SET_EGRESS_PORT)))
+ .param(PORT).orElseThrow(() -> new RuntimeException(format(
+ "No such parameter '%s' of action '%s' in pipeline model", PORT, SET_EGRESS_PORT)))
+ .bitWidth();
+ }
+
+
+ @Override
+ public PiAction mapTreatment(TrafficTreatment treatment, PiTableId piTableId) throws PiInterpreterException {
+
+ if (treatment.allInstructions().size() == 0) {
+ // No instructions means drop for us.
+ return actionWithName(DROP);
+ } else if (treatment.allInstructions().size() > 1) {
+ // Otherwise, we understand treatments with only 1 instruction.
+ throw new PiPipelineInterpreter.PiInterpreterException("Treatment has multiple instructions");
+ }
+
+ Instruction instruction = treatment.allInstructions().get(0);
+
+ switch (instruction.type()) {
+ case OUTPUT:
+ Instructions.OutputInstruction outInstruction = (Instructions.OutputInstruction) instruction;
+ PortNumber port = outInstruction.port();
+ if (!port.isLogical()) {
+ return PiAction.builder()
+ .withId(PiActionId.of(SET_EGRESS_PORT))
+ .withParameter(new PiActionParam(PiActionParamId.of(PORT), copyFrom(port.toLong())))
+ .build();
+ } else if (port.equals(CONTROLLER)) {
+ return actionWithName(SEND_TO_CPU);
+ } else {
+ throw new PiInterpreterException(format("Egress on logical port '%s' not supported", port));
+ }
+ case NOACTION:
+ return actionWithName(DROP);
+ default:
+ throw new PiInterpreterException(format("Instruction type '%s' not supported", instruction.type()));
+ }
+ }
+
+ @Override
+ public Collection<PiPacketOperation> mapOutboundPacket(OutboundPacket packet)
+ throws PiInterpreterException {
+ TrafficTreatment treatment = packet.treatment();
+
+ // default.p4 supports only OUTPUT instructions.
+ List<Instructions.OutputInstruction> outInstructions = treatment.allInstructions()
+ .stream()
+ .filter(i -> i.type().equals(OUTPUT))
+ .map(i -> (Instructions.OutputInstruction) i)
+ .collect(toList());
+
+ if (treatment.allInstructions().size() != outInstructions.size()) {
+ // There are other instructions that are not of type OUTPUT.
+ throw new PiInterpreterException("Treatment not supported: " + treatment);
+ }
+
+ ImmutableList.Builder<PiPacketOperation> builder = ImmutableList.builder();
+ for (Instructions.OutputInstruction outInst : outInstructions) {
+ if (outInst.port().isLogical() && !outInst.port().equals(FLOOD)) {
+ throw new PiInterpreterException(format("Output on logical port '%s' not supported", outInst.port()));
+ } else if (outInst.port().equals(FLOOD)) {
+ // Since default.p4 does not support flooding, we create a packet operation for each switch port.
+ for (Port port : handler().get(DeviceService.class).getPorts(packet.sendThrough())) {
+ builder.add(createPiPacketOperation(packet.data(), port.number().toLong()));
+ }
+ } else {
+ builder.add(createPiPacketOperation(packet.data(), outInst.port().toLong()));
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public InboundPacket mapInboundPacket(DeviceId deviceId, PiPacketOperation packetIn)
+ throws PiInterpreterException {
+ // Assuming that the packet is ethernet, which is fine since default.p4 can deparse only ethernet packets.
+ Ethernet ethPkt = new Ethernet();
+
+ ethPkt.deserialize(packetIn.data().asArray(), 0, packetIn.data().size());
+
+ // Returns the ingress port packet metadata.
+ Optional<PiPacketMetadata> packetMetadata = packetIn.metadatas()
+ .stream().filter(metadata -> metadata.id().name().equals(INGRESS_PORT))
+ .findFirst();
+
+ if (packetMetadata.isPresent()) {
+ ImmutableByteSequence portByteSequence = packetMetadata.get().value();
+ short s = portByteSequence.asReadOnlyBuffer().getShort();
+ ConnectPoint receivedFrom = new ConnectPoint(deviceId, PortNumber.portNumber(s));
+ ByteBuffer rawData = ByteBuffer.wrap(packetIn.data().asArray());
+ return new DefaultInboundPacket(receivedFrom, ethPkt, rawData);
+ } else {
+ throw new PiInterpreterException(format(
+ "Missing metadata '%s' in packet-in received from '%s': %s", INGRESS_PORT, deviceId, packetIn));
+ }
+ }
+
+ private PiPacketOperation createPiPacketOperation(ByteBuffer data, long portNumber) throws PiInterpreterException {
+ PiPacketMetadata metadata = createPacketMetadata(portNumber);
+ return PiPacketOperation.builder()
+ .withType(PACKET_OUT)
+ .withData(copyFrom(data))
+ .withMetadatas(ImmutableList.of(metadata))
+ .build();
+ }
+
+ private PiPacketMetadata createPacketMetadata(long portNumber) throws PiInterpreterException {
+ initTargetSpecificAttributes();
+ try {
+ return PiPacketMetadata.builder()
+ .withId(PiPacketMetadataId.of(EGRESS_PORT))
+ .withValue(fit(copyFrom(portNumber), portFieldBitWidth))
+ .build();
+ } catch (ImmutableByteSequence.ByteSequenceTrimException e) {
+ throw new PiInterpreterException(format("Port number %d too big, %s", portNumber, e.getMessage()));
+ }
+ }
+
+ /**
+ * Returns an action instance with no runtime parameters.
+ */
+ private PiAction actionWithName(String name) {
+ return PiAction.builder().withId(PiActionId.of(name)).build();
+ }
+
+ @Override
+ public Optional<PiHeaderFieldId> mapCriterionType(Criterion.Type type) {
+ initTargetSpecificAttributes();
+ return Optional.ofNullable(criterionMap.get(type));
+ }
+
+ @Override
+ public Optional<Criterion.Type> mapPiHeaderFieldId(PiHeaderFieldId headerFieldId) {
+ initTargetSpecificAttributes();
+ return Optional.ofNullable(criterionMap.inverse().get(headerFieldId));
+ }
+
+ @Override
+ public Optional<PiTableId> mapFlowRuleTableId(int flowRuleTableId) {
+ return Optional.ofNullable(TABLE_MAP.get(flowRuleTableId));
+ }
+
+ @Override
+ public Optional<Integer> mapPiTableId(PiTableId piTableId) {
+ return Optional.ofNullable(TABLE_MAP.inverse().get(piTableId));
+ }
+}
diff --git a/features/features.xml b/features/features.xml
index bd36ee4..c95b3a9 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -59,7 +59,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
<bundle>mvn:commons-io/commons-io/2.4</bundle>
- <bundle>mvn:io.atomix/atomix/2.0.0-raft-final</bundle>
+ <bundle>mvn:io.atomix/atomix/2.0.0</bundle>
<bundle>mvn:org.glassfish.jersey.core/jersey-client/2.25.1</bundle>
diff --git a/lib/BUCK b/lib/BUCK
index ad18962..87add7d 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Wed, 30 Aug 2017 00:20:31 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Thu, 31 Aug 2017 21:26:06 GMT. Do not edit this file manually. *****
# ***** Use onos-lib-gen *****
pass_thru_pom(
@@ -208,10 +208,10 @@
remote_jar (
name = 'atomix',
- out = 'atomix-2.0.0-raft-final.jar',
- url = 'mvn:io.atomix:atomix:jar:2.0.0-raft-final',
- sha1 = '75ded9852e3d45ca4cbb3976a9ce39062e13fc0a',
- maven_coords = 'io.atomix:atomix:2.0.0-raft-final',
+ out = 'atomix-2.0.0.jar',
+ url = 'mvn:io.atomix:atomix:jar:2.0.0',
+ sha1 = '44b1271a4a77d9831b000f2eedf52587969ae9fb',
+ maven_coords = 'io.atomix:atomix:2.0.0',
visibility = [ 'PUBLIC' ],
)
diff --git a/lib/deps.json b/lib/deps.json
index 82992bd..35282c4 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -118,7 +118,7 @@
"aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b32",
"amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
"asm": "mvn:org.ow2.asm:asm:5.0.4",
- "atomix": "mvn:io.atomix:atomix:2.0.0-raft-final",
+ "atomix": "mvn:io.atomix:atomix:2.0.0",
"commons-codec": "mvn:commons-codec:commons-codec:1.10",
"commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
"commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",
@@ -237,9 +237,11 @@
"openstack4j-core": "mvn:org.pacesys:openstack4j-core:2.11",
"openstack4j-http-connector": "mvn:org.pacesys.openstack4j.connectors:openstack4j-http-connector:2.11",
"openstack4j-httpclient": "mvn:org.pacesys.openstack4j.connectors:openstack4j-httpclient:2.11",
+ // old version of YANG tools for YMS
"onos-yang-datamodel": "mvn:org.onosproject:onos-yang-datamodel:1.11",
"onos-yang-maven-plugin": "mvn:org.onosproject:onos-yang-maven-plugin:1.11",
"onos-yang-utils-generator": "mvn:org.onosproject:onos-yang-utils-generator:1.11",
+ // Note: update BVER in tools/dev/bin/patch-yang-libs
"onos-yang-model":"mvn:org.onosproject:onos-yang-model:2.2.0-b6",
"onos-yang-compiler-api":"mvn:org.onosproject:onos-yang-compiler-api:2.2.0-b6",
"onos-yang-runtime":"mvn:org.onosproject:onos-yang-runtime:2.2.0-b6",
diff --git a/tools/gui/package.json b/tools/gui/package.json
index 1b3f4d7..1e9e030 100644
--- a/tools/gui/package.json
+++ b/tools/gui/package.json
@@ -6,7 +6,8 @@
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"build": "./node_modules/gulp/bin/gulp.js build",
- "dev": "./node_modules/gulp/bin/gulp.js --development"
+ "dev": "./node_modules/gulp/bin/gulp.js --development",
+ "postinstall": "ppid=$(ps -p ${1:-$$} -o ppid=;); ppid=$(echo ${ppid}|tr -d '[[:space:]]'); if [ -z ${npm_config_tmp} ]; then npm_config_tmp=/tmp; fi; rm -rf \"${npm_config_tmp}\"/npm-${ppid}*"
},
"author": "",
"license": "ISC",
diff --git a/utils/misc/src/main/java/org/onlab/packet/dhcp/CircuitId.java b/utils/misc/src/main/java/org/onlab/packet/dhcp/CircuitId.java
index 3b819e1..1b53a5b 100644
--- a/utils/misc/src/main/java/org/onlab/packet/dhcp/CircuitId.java
+++ b/utils/misc/src/main/java/org/onlab/packet/dhcp/CircuitId.java
@@ -17,6 +17,7 @@
package org.onlab.packet.dhcp;
import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedLongs;
import org.onlab.packet.VlanId;
import java.nio.charset.StandardCharsets;
@@ -31,6 +32,7 @@
public class CircuitId {
private static final String SEPARATOR = ":";
private static final String CIRCUIT_ID_FORMAT = "%s" + SEPARATOR + "%s";
+ private static final String DEVICE_PORT_SEPARATOR = "/";
private String connectPoint;
private VlanId vlanId;
@@ -65,11 +67,22 @@
*/
public static CircuitId deserialize(byte[] circuitId) {
String cIdString = new String(circuitId, StandardCharsets.US_ASCII);
- List<String> split = Lists.newArrayList(cIdString.split(SEPARATOR));
- checkArgument(split.size() > 1, "Illegal circuit id.");
+ List<String> splittedCircuitId = Lists.newArrayList(cIdString.split(SEPARATOR));
+ checkArgument(splittedCircuitId.size() > 1, "Illegal circuit id.");
// remove last element (vlan id)
- String vlanId = split.remove(split.size() - 1);
- String connectPoint = String.join(SEPARATOR, split);
+ String vlanId = splittedCircuitId.remove(splittedCircuitId.size() - 1);
+
+ // Reconstruct device Id
+ String connectPoint = String.join(SEPARATOR, splittedCircuitId);
+
+ String[] splittedConnectPoint = connectPoint.split(DEVICE_PORT_SEPARATOR);
+ // Check connect point is valid or not
+ checkArgument(splittedConnectPoint.length == 2,
+ "Connect point must be in \"deviceUri/portNumber\" format");
+
+ // Check the port number is a number or not
+ UnsignedLongs.decode(splittedConnectPoint[1]);
+
return new CircuitId(connectPoint, VlanId.vlanId(vlanId));
}