Merge "Merge branch 'master' into merge" into dev-karaf-4.2.1
diff --git a/README.md b/README.md
index 65f63d2..287a375 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@
3. Build ONOS with Buck
```bash
$ cd $ONOS_ROOT
-$ onos-buck build onos [--show-output]
+$ bazel build onos
```
ONOS currently uses a modified version of Buck (`onos-buck`), which has been packaged with ONOS. Please use this version until our changes have been upstreamed and released as part of an official Buck release.
@@ -79,10 +79,10 @@
To run ONOS locally on the development machine, simply run the following command:
```bash
-$ onos-buck run onos-local [-- [clean] [debug]]
+$ bazel run onos-local [-- [clean] [debug]]
```
-or simplier one:
+or simpler one:
```bash
$ ok [clean] [debug]
@@ -107,16 +107,16 @@
### Unit Tests
-To run ONOS unit tests, including code Checkstyle validation, run the following command:
+To run ONOS unit tests, run the following command:
```bash
-$ onos-buck test
+$ bazel query '\''tests(//...)'\'' | xargs bazel test
```
-Or more specific tests:
+Or better yet, to run code Checkstyle and all unit tests use the following convenience alias:
```bash
-$ onos-buck test [buck-test-rule]
+$ ot
```
## Contributing
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedSecurityGroupStore.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedSecurityGroupStore.java
index ec12967..f4fbf52 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedSecurityGroupStore.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedSecurityGroupStore.java
@@ -171,7 +171,7 @@
event.newValue().value()));
break;
case REMOVE:
- log.debug("OpenStack security group removed {}", event.newValue());
+ log.debug("OpenStack security group removed {}", event.oldValue());
eventExecutor.execute(() ->
notifyDelegate(new OpenstackSecurityGroupEvent(
OPENSTACK_SECURITY_GROUP_REMOVED,
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index a6f54bd..8c45d4c 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -45,15 +45,18 @@
import org.onosproject.net.packet.PacketService;
import org.onosproject.openstacknetworking.api.Constants;
import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortAdminService;
import org.onosproject.openstacknetworking.api.InstancePortEvent;
import org.onosproject.openstacknetworking.api.InstancePortListener;
-import org.onosproject.openstacknetworking.api.InstancePortService;
import org.onosproject.openstacknetworking.api.OpenstackFlowRuleService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkAdminService;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkListener;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
import org.onosproject.openstacknetworking.api.OpenstackRouterEvent;
import org.onosproject.openstacknetworking.api.OpenstackRouterListener;
import org.onosproject.openstacknetworking.api.OpenstackRouterService;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeEvent;
import org.onosproject.openstacknode.api.OpenstackNodeListener;
@@ -61,6 +64,7 @@
import org.openstack4j.model.network.ExternalGateway;
import org.openstack4j.model.network.IP;
import org.openstack4j.model.network.NetFloatingIP;
+import org.openstack4j.model.network.Port;
import org.openstack4j.model.network.Router;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -87,6 +91,7 @@
import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByInstancePort;
@@ -124,7 +129,7 @@
protected OpenstackNodeService osNodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected InstancePortService instancePortService;
+ protected InstancePortAdminService instancePortService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@@ -141,6 +146,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService configService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PreCommitPortService preCommitPortService;
+
//@Property(name = ARP_MODE, value = DEFAULT_ARP_MODE_STR,
// label = "ARP processing mode, broadcast | proxy (default)")
protected String arpMode = DEFAULT_ARP_MODE_STR;
@@ -150,6 +158,7 @@
private final OpenstackRouterListener osRouterListener = new InternalRouterEventListener();
private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
private final InstancePortListener instPortListener = new InternalInstancePortListener();
+ private final OpenstackNetworkListener osNetworkListener = new InternalNetworkEventListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -166,6 +175,7 @@
localNodeId = clusterService.getLocalNode().id();
osRouterService.addListener(osRouterListener);
osNodeService.addListener(osNodeListener);
+ osNetworkService.addListener(osNetworkListener);
instancePortService.addListener(instPortListener);
leadershipService.runForLeadership(appId.name());
packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
@@ -178,6 +188,7 @@
instancePortService.removeListener(instPortListener);
osRouterService.removeListener(osRouterListener);
osNodeService.removeListener(osNodeListener);
+ osNetworkService.removeListener(osNetworkListener);
instancePortService.removeListener(instPortListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
@@ -433,9 +444,9 @@
return;
}
- if (portId == null || fip.getPortId() == null) {
+ if (portId == null || (install && fip.getPortId() == null)) {
log.trace("Unknown target ARP request for {}, ignore it",
- fip.getFloatingIpAddress());
+ fip.getFloatingIpAddress());
return;
}
@@ -448,6 +459,16 @@
return;
}
+ if (install) {
+ preCommitPortService.subscribePreCommit(instPort.portId(),
+ OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+ log.info("Subscribed the port {} on listening pre-remove event", instPort.portId());
+ } else {
+ preCommitPortService.unsubscribePreCommit(instPort.portId(),
+ OPENSTACK_PORT_PRE_REMOVE, instancePortService, this.getClass().getName());
+ log.info("Unsubscribed the port {} on listening pre-remove event", instPort.portId());
+ }
+
setArpRule(fip, targetMac, gw, install);
}
}
@@ -486,8 +507,8 @@
}
}
- private void setFakeGatewayArpRule(Router router, boolean install) {
- setFakeGatewayArpRule(router.getExternalGatewayInfo(), install);
+ private void setFakeGatewayArpRuleByRouter(Router router, boolean install) {
+ setFakeGatewayArpRuleByGateway(router.getExternalGatewayInfo(), install);
}
private Set<IP> getExternalGatewaySnatIps(ExternalGateway extGw) {
@@ -500,47 +521,89 @@
.collect(Collectors.toSet());
}
- private void setFakeGatewayArpRule(ExternalGateway extGw, boolean install) {
+ private void setFakeGatewayArpRuleByGateway(ExternalGateway extGw, boolean install) {
if (ARP_BROADCAST_MODE.equals(getArpMode())) {
if (extGw == null) {
return;
}
- Set<IP> ips = getExternalGatewaySnatIps(extGw);
+ setFakeGatewayArpRuleByIps(getExternalGatewaySnatIps(extGw), install);
+ }
+ }
- ips.forEach(ip -> {
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchEthType(EthType.EtherType.ARP.ethType().toShort())
- .matchArpOp(ARP.OP_REQUEST)
- .matchArpTpa(Ip4Address.valueOf(ip.getIpAddress()))
- .build();
+ private void setFakeGatewayArpRuleByIps(Set<IP> ips, boolean install) {
+ ips.forEach(ip -> {
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REQUEST)
+ .matchArpTpa(Ip4Address.valueOf(ip.getIpAddress()))
+ .build();
- TrafficTreatment treatment = DefaultTrafficTreatment.builder()
- .setArpOp(ARP.OP_REPLY)
- .setArpSha(MacAddress.valueOf(gatewayMac))
- .setArpSpa(Ip4Address.valueOf(ip.getIpAddress()))
- .setOutput(PortNumber.IN_PORT)
- .build();
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setArpOp(ARP.OP_REPLY)
+ .setArpSha(MacAddress.valueOf(gatewayMac))
+ .setArpSpa(Ip4Address.valueOf(ip.getIpAddress()))
+ .setOutput(PortNumber.IN_PORT)
+ .build();
- osNodeService.completeNodes(GATEWAY).forEach(n ->
- osFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- selector,
- treatment,
- PRIORITY_ARP_GATEWAY_RULE,
- GW_COMMON_TABLE,
- install
- )
- );
+ osNodeService.completeNodes(GATEWAY).forEach(n ->
+ osFlowRuleService.setRule(
+ appId,
+ n.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_GATEWAY_RULE,
+ GW_COMMON_TABLE,
+ install
+ )
+ );
- if (install) {
- log.info("Install ARP Rule for Gateway Snat {}", ip.getIpAddress());
- } else {
- log.info("Uninstall ARP Rule for Gateway Snat {}", ip.getIpAddress());
- }
- });
+ if (install) {
+ log.info("Install ARP Rule for Gateway Snat {}", ip.getIpAddress());
+ } else {
+ log.info("Uninstall ARP Rule for Gateway Snat {}", ip.getIpAddress());
+ }
+ });
+ }
+
+ /**
+ * An internal network event listener, intended to uninstall ARP rules for
+ * routing the packets destined to external gateway.
+ */
+ private class InternalNetworkEventListener implements OpenstackNetworkListener {
+
+ @Override
+ public boolean isRelevant(OpenstackNetworkEvent event) {
+ Port osPort = event.port();
+ if (osPort == null || osPort.getFixedIps() == null) {
+ return false;
+ }
+
+ // do not allow to proceed without leadership
+ NodeId leader = leadershipService.getLeader(appId.name());
+ return Objects.equals(localNodeId, leader) &&
+ DEVICE_OWNER_ROUTER_GW.equals(osPort.getDeviceOwner());
+ }
+
+ @Override
+ public void event(OpenstackNetworkEvent event) {
+ switch (event.type()) {
+ case OPENSTACK_PORT_CREATED:
+ case OPENSTACK_PORT_UPDATED:
+ eventExecutor.execute(() ->
+ setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), true)
+ );
+ break;
+ case OPENSTACK_PORT_REMOVED:
+ eventExecutor.execute(() ->
+ setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), false)
+ );
+ break;
+ default:
+ // do nothing
+ break;
+ }
}
}
@@ -566,25 +629,25 @@
case OPENSTACK_ROUTER_CREATED:
eventExecutor.execute(() ->
// add a router with external gateway
- setFakeGatewayArpRule(event.subject(), true)
+ setFakeGatewayArpRuleByRouter(event.subject(), true)
);
break;
case OPENSTACK_ROUTER_REMOVED:
eventExecutor.execute(() ->
// remove a router with external gateway
- setFakeGatewayArpRule(event.subject(), false)
+ setFakeGatewayArpRuleByRouter(event.subject(), false)
);
break;
case OPENSTACK_ROUTER_GATEWAY_ADDED:
eventExecutor.execute(() ->
// add a gateway manually after adding a router
- setFakeGatewayArpRule(event.externalGateway(), true)
+ setFakeGatewayArpRuleByGateway(event.externalGateway(), true)
);
break;
case OPENSTACK_ROUTER_GATEWAY_REMOVED:
eventExecutor.execute(() ->
// remove a gateway from an existing router
- setFakeGatewayArpRule(event.externalGateway(), false)
+ setFakeGatewayArpRuleByGateway(event.externalGateway(), false)
);
break;
case OPENSTACK_FLOATING_IP_ASSOCIATED:
@@ -807,7 +870,7 @@
osRouterService.routers().stream()
.filter(router -> router.getExternalGatewayInfo() != null)
- .forEach(router -> setFakeGatewayArpRule(router, install));
+ .forEach(router -> setFakeGatewayArpRuleByRouter(router, install));
}
}
}
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index 173b6bf..4423bd3 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -27,6 +27,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
@@ -85,6 +86,7 @@
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getPropertyValueAsBoolean;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
@@ -101,6 +103,8 @@
private static final String ERR_FLOW = "Failed set flows for floating IP %s: ";
private static final String ERR_UNSUPPORTED_NET_TYPE = "Unsupported network type %s";
+ private static final String USE_SECURITY_GROUP = "useSecurityGroup";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -114,6 +118,9 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackNodeService osNodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -879,14 +886,22 @@
NetFloatingIP fip =
associatedFloatingIp(instPort, osRouterAdminService.floatingIps());
+ boolean sgFlag = getPropertyValueAsBoolean(
+ componentConfigService.getProperties(
+ OpenstackSecurityGroupHandler.class.getName()),
+ USE_SECURITY_GROUP);
+
if (fip != null) {
-
- instancePortService.updateInstancePort(instPort.updateState(REMOVE_PENDING));
-
+ instancePortService.updateInstancePort(
+ instPort.updateState(REMOVE_PENDING));
eventExecutor.execute(() ->
updateFipStore(instancePortService.instancePort(event.port().getId())));
} else {
- instancePortService.removeInstancePort(instPort.portId());
+ // FIXME: we have dependency with security group, need to
+ // find a better way to remove this dependency
+ if (!sgFlag) {
+ instancePortService.removeInstancePort(instPort.portId());
+ }
}
break;
default:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
index 114cc44..550fc37 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
@@ -41,9 +41,9 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.ExtensionSelector;
import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortAdminService;
import org.onosproject.openstacknetworking.api.InstancePortEvent;
import org.onosproject.openstacknetworking.api.InstancePortListener;
-import org.onosproject.openstacknetworking.api.InstancePortService;
import org.onosproject.openstacknetworking.api.OpenstackFlowRuleService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
import org.onosproject.openstacknetworking.api.OpenstackNetworkListener;
@@ -51,6 +51,7 @@
import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupEvent;
import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupListener;
import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupService;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
import org.onosproject.openstacknetworking.util.RulePopulatorUtil;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeEvent;
@@ -88,6 +89,8 @@
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_DROP_RULE;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_HOOK_RULE;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.openstacknetworking.api.InstancePort.State.REMOVE_PENDING;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.computeCtMaskFlag;
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.computeCtStateFlag;
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
@@ -112,7 +115,7 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected InstancePortService instancePortService;
+ protected InstancePortAdminService instancePortService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MastershipService mastershipService;
@@ -141,6 +144,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PreCommitPortService preCommitPortService;
+
private final InstancePortListener instancePortListener =
new InternalInstancePortListener();
private final OpenstackNetworkListener osNetworkListener =
@@ -255,6 +261,16 @@
final String action = install ? "Installed " : "Removed ";
log.debug(action + "security group rule ID : " + sgId);
});
+
+ if (install) {
+ preCommitPortService.subscribePreCommit(instPort.portId(),
+ OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+ log.info("Subscribed the port {} on listening pre-remove event", instPort.portId());
+ } else {
+ preCommitPortService.unsubscribePreCommit(instPort.portId(),
+ OPENSTACK_PORT_PRE_REMOVE, instancePortService, this.getClass().getName());
+ log.info("Unsubscribed the port {} on listening pre-remove event", instPort.portId());
+ }
}
private void updateSecurityGroupRule(InstancePort instPort, Port port,
@@ -715,9 +731,11 @@
switch (event.type()) {
case OPENSTACK_PORT_PRE_REMOVE:
- eventExecutor.execute(() -> {
- setSecurityGroupRules(instPort, osPort, false);
- });
+ instancePortService.updateInstancePort(
+ instPort.updateState(REMOVE_PENDING));
+ eventExecutor.execute(() ->
+ setSecurityGroupRules(instPort, osPort, false)
+ );
break;
default:
// do nothing for the other events
@@ -803,8 +821,8 @@
securityGroupRuleToRemove.getId());
});
break;
- case OPENSTACK_SECURITY_GROUP_CREATED:
case OPENSTACK_SECURITY_GROUP_REMOVED:
+ case OPENSTACK_SECURITY_GROUP_CREATED:
default:
// do nothing
break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index daad9d1..008f484 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -276,6 +276,21 @@
private void setFakeGatewayArpRule(Subnet osSubnet, boolean install, OpenstackNode osNode) {
if (ARP_BROADCAST_MODE.equals(getArpMode())) {
+
+ // do not remove fake gateway ARP rules, if there is another gateway
+ // which has the same subnet that to be removed
+ // this only occurs if we have duplicated subnets associated with
+ // different networks
+ if (!install) {
+ long numOfDupGws = osNetworkService.subnets().stream()
+ .filter(s -> !s.getId().equals(osSubnet.getId()))
+ .filter(s -> s.getGateway().equals(osSubnet.getGateway()))
+ .count();
+ if (numOfDupGws > 0) {
+ return;
+ }
+ }
+
String gateway = osSubnet.getGateway();
TrafficSelector selector = DefaultTrafficSelector.builder()
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
index 4f4e570..b755017 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
@@ -452,6 +452,20 @@
}
/**
+ * Obtains the boolean property value with specified property key name.
+ *
+ * @param properties a collection of properties
+ * @param name key name
+ * @return mapping value
+ */
+ public static boolean getPropertyValueAsBoolean(Set<ConfigProperty> properties, String name) {
+ Optional<ConfigProperty> property =
+ properties.stream().filter(p -> p.name().equals(name)).findFirst();
+
+ return property.map(ConfigProperty::asBoolean).orElse(false);
+ }
+
+ /**
* Prints out the JSON string in pretty format.
*
* @param mapper Object mapper
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkConfig.java b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkConfig.java
similarity index 96%
rename from apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkConfig.java
rename to apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkConfig.java
index afd2d22..5cc333a 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkConfig.java
+++ b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkConfig.java
@@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
-import org.onosproject.openstacknode.api.DpdkConfig;
-import org.onosproject.openstacknode.api.DpdkInterface;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkInterface.java b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkInterface.java
similarity index 97%
rename from apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkInterface.java
rename to apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkInterface.java
index c442876..aa06843 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultDpdkInterface.java
+++ b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultDpdkInterface.java
@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.base.MoreObjects;
-import org.onosproject.openstacknode.api.DpdkInterface;
import java.util.Objects;
diff --git a/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackNode.java b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackNode.java
index ec811b3..ca4734e 100644
--- a/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackNode.java
+++ b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackNode.java
@@ -334,6 +334,7 @@
.dpdkConfig(dpdkConfig)
.keystoneConfig(keystoneConfig)
.neutronConfig(neutronConfig)
+ .controllers(controllers)
.build();
}
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterface.java b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterface.java
similarity index 96%
rename from apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterface.java
rename to apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterface.java
index 498ce5c..32df86e 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterface.java
+++ b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterface.java
@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.base.MoreObjects;
-import org.onosproject.openstacknode.api.OpenstackPhyInterface;
import java.util.Objects;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuth.java b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuth.java
similarity index 96%
rename from apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuth.java
rename to apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuth.java
index 882e028..7e6cc1e 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuth.java
+++ b/apps/openstacknode/api/src/main/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuth.java
@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.base.MoreObjects;
-import org.onosproject.openstacknode.api.OpenstackSshAuth;
import java.util.Objects;
diff --git a/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkConfigTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkConfigTest.java
new file mode 100644
index 0000000..d748e01
--- /dev/null
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkConfigTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknode.api;
+
+import com.google.common.collect.Lists;
+import com.google.common.testing.EqualsTester;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacknode.api.DpdkConfig.DatapathType.NETDEV;
+import static org.onosproject.openstacknode.api.DpdkConfig.DatapathType.NORMAL;
+import static org.onosproject.openstacknode.api.DpdkInterface.Type.DPDK_VHOST_USER;
+import static org.onosproject.openstacknode.api.DpdkInterface.Type.DPDK_VHOST_USER_CLIENT;
+
+/**
+ * Unit tests for DefaultDpdkConfig.
+ */
+public class DefaultDpdkConfigTest {
+
+ private static final DpdkConfig.DatapathType DATAPATH_TYPE_1 = NETDEV;
+ private static final DpdkConfig.DatapathType DATAPATH_TYPE_2 = NORMAL;
+
+ private static final String SOCKET_DIR_1 = "/var/lib/libvirt/qemu";
+ private static final String SOCKET_DIR_2 = "/var/lib/libvirt/kvm";
+
+ private static final List<DpdkInterface> DPDK_INTFS_1 = Lists.newArrayList();
+ private static final List<DpdkInterface> DPDK_INTFS_2 = Lists.newArrayList();
+
+ private DpdkConfig config1;
+ private DpdkConfig sameAsConfig1;
+ private DpdkConfig config2;
+
+ /**
+ * Tests class immutability.
+ */
+ @Test
+ public void testImmutability() {
+ assertThatClassIsImmutable(DefaultDpdkConfig.class);
+ }
+
+ /**
+ * Initial setup for this unit test.
+ */
+ @Before
+ public void setUp() {
+ DpdkInterface dpdkIntf1 = DefaultDpdkInterface.builder()
+ .intf("dpdk1")
+ .deviceName("br-int")
+ .mtu(1500L)
+ .pciAddress("0000:85:00.0")
+ .type(DPDK_VHOST_USER)
+ .build();
+
+ DpdkInterface dpdkIntf2 = DefaultDpdkInterface.builder()
+ .intf("dpdk2")
+ .deviceName("br-int")
+ .mtu(1500L)
+ .pciAddress("0000:85:00.0")
+ .type(DPDK_VHOST_USER_CLIENT)
+ .build();
+
+ DPDK_INTFS_1.add(dpdkIntf1);
+ DPDK_INTFS_2.add(dpdkIntf2);
+
+ config1 = DefaultDpdkConfig.builder()
+ .datapathType(DATAPATH_TYPE_1)
+ .socketDir(SOCKET_DIR_1)
+ .dpdkIntfs(DPDK_INTFS_1)
+ .build();
+
+ sameAsConfig1 = DefaultDpdkConfig.builder()
+ .datapathType(DATAPATH_TYPE_1)
+ .socketDir(SOCKET_DIR_1)
+ .dpdkIntfs(DPDK_INTFS_1)
+ .build();
+
+ config2 = DefaultDpdkConfig.builder()
+ .datapathType(DATAPATH_TYPE_2)
+ .socketDir(SOCKET_DIR_2)
+ .dpdkIntfs(DPDK_INTFS_2)
+ .build();
+ }
+
+ /**
+ * Checks equals method works as expected.
+ */
+ @Test
+ public void testEquality() {
+ new EqualsTester().addEqualityGroup(config1, sameAsConfig1)
+ .addEqualityGroup(config2)
+ .testEquals();
+ }
+
+ /**
+ * Test object construction.
+ */
+ @Test
+ public void testConstruction() {
+ DpdkConfig config = config1;
+
+ assertEquals(config.datapathType(), DATAPATH_TYPE_1);
+ assertEquals(config.socketDir(), SOCKET_DIR_1);
+ assertEquals(config.dpdkIntfs(), DPDK_INTFS_1);
+ }
+}
diff --git a/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkInterfaceTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkInterfaceTest.java
new file mode 100644
index 0000000..f39139a
--- /dev/null
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultDpdkInterfaceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknode.api;
+
+import com.google.common.testing.EqualsTester;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacknode.api.DpdkInterface.Type.DPDK_VHOST_USER;
+import static org.onosproject.openstacknode.api.DpdkInterface.Type.DPDK_VHOST_USER_CLIENT;
+
+/**
+ * Unit tests for DefaultDpdkInterface.
+ */
+public class DefaultDpdkInterfaceTest {
+
+ private static final String DEVICE_NAME_1 = "br-int";
+ private static final String DEVICE_NAME_2 = "br-tun";
+
+ private static final String INTF_NAME_1 = "dpdk0";
+ private static final String INTF_NAME_2 = "dpdk1";
+
+ private static final String PCI_ADDRESS_1 = "0000:85:00.0";
+ private static final String PCI_ADDRESS_2 = "0000:85:00.1";
+
+ private static final DpdkInterface.Type TYPE_1 = DPDK_VHOST_USER;
+ private static final DpdkInterface.Type TYPE_2 = DPDK_VHOST_USER_CLIENT;
+
+ private static final Long MTU_1 = 1500L;
+ private static final Long MTU_2 = 1600L;
+
+ private DpdkInterface dpdkIntf1;
+ private DpdkInterface sameAsDpdkIntf1;
+ private DpdkInterface dpdkIntf2;
+
+ /**
+ * Tests class immutability.
+ */
+ @Test
+ public void testImmutability() {
+ assertThatClassIsImmutable(DefaultDpdkInterface.class);
+ }
+
+ /**
+ * Initial setup for this unit test.
+ */
+ @Before
+ public void setUp() {
+ dpdkIntf1 = DefaultDpdkInterface.builder()
+ .type(TYPE_1)
+ .pciAddress(PCI_ADDRESS_1)
+ .mtu(MTU_1)
+ .deviceName(DEVICE_NAME_1)
+ .intf(INTF_NAME_1)
+ .build();
+
+ sameAsDpdkIntf1 = DefaultDpdkInterface.builder()
+ .type(TYPE_1)
+ .pciAddress(PCI_ADDRESS_1)
+ .mtu(MTU_1)
+ .deviceName(DEVICE_NAME_1)
+ .intf(INTF_NAME_1)
+ .build();
+
+ dpdkIntf2 = DefaultDpdkInterface.builder()
+ .type(TYPE_2)
+ .pciAddress(PCI_ADDRESS_2)
+ .mtu(MTU_2)
+ .deviceName(DEVICE_NAME_2)
+ .intf(INTF_NAME_2)
+ .build();
+ }
+
+ /**
+ * Tests object equality.
+ */
+ @Test
+ public void testEquality() {
+ new EqualsTester().addEqualityGroup(dpdkIntf1, sameAsDpdkIntf1)
+ .addEqualityGroup(dpdkIntf2)
+ .testEquals();
+ }
+
+ /**
+ * Test object construction.
+ */
+ @Test
+ public void testConstruction() {
+ DpdkInterface dpdkIntf = dpdkIntf1;
+
+ assertEquals(dpdkIntf.deviceName(), DEVICE_NAME_1);
+ assertEquals(dpdkIntf.intf(), INTF_NAME_1);
+ assertEquals(dpdkIntf.mtu(), MTU_1);
+ assertEquals(dpdkIntf.pciAddress(), PCI_ADDRESS_1);
+ assertEquals(dpdkIntf.type(), TYPE_1);
+ }
+}
diff --git a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackAuthTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackAuthTest.java
similarity index 94%
rename from apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackAuthTest.java
rename to apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackAuthTest.java
index e602cf2..0a09248 100644
--- a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackAuthTest.java
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackAuthTest.java
@@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
-import org.onosproject.openstacknode.api.DefaultOpenstackAuth;
-import org.onosproject.openstacknode.api.OpenstackAuth;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
diff --git a/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackNodeTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackNodeTest.java
index 2fe0d2a..c8feb87 100644
--- a/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackNodeTest.java
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackNodeTest.java
@@ -15,15 +15,29 @@
*/
package org.onosproject.openstacknode.api;
+import com.google.common.collect.ImmutableList;
import com.google.common.testing.EqualsTester;
+import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.ControllerInfo;
+
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.onosproject.openstacknode.api.DpdkConfig.DatapathType.NETDEV;
+import static org.onosproject.openstacknode.api.DpdkInterface.Type.DPDK_VHOST_USER;
+import static org.onosproject.openstacknode.api.NodeState.COMPLETE;
+import static org.onosproject.openstacknode.api.NodeState.DEVICE_CREATED;
+import static org.onosproject.openstacknode.api.OpenstackAuth.Perspective.PUBLIC;
+import static org.onosproject.openstacknode.api.OpenstackAuth.Protocol.HTTP;
/**
* Unit tests for DefaultOpenstackNode.
*/
-public class DefaultOpenstackNodeTest extends OpenstackNodeTest {
+public final class DefaultOpenstackNodeTest extends OpenstackNodeTest {
private static final IpAddress TEST_IP = IpAddress.valueOf("10.100.0.3");
@@ -31,6 +45,55 @@
private static final String HOSTNAME_2 = "hostname_2";
private static final Device DEVICE_1 = createDevice(1);
private static final Device DEVICE_2 = createDevice(2);
+
+ private static final IpAddress MANAGEMENT_IP = IpAddress.valueOf("10.10.10.10");
+ private static final IpAddress DATA_IP = IpAddress.valueOf("20.20.20.20");
+
+ private static final String VLAN_INTF = "eth0";
+ private static final String UPLINK_PORT = "eth1";
+
+ private static final String SSH_AUTH_ID = "admin";
+ private static final String SSH_AUTH_PASSWORD = "nova";
+
+ private static final String OS_AUTH_USERNAME = "admin";
+ private static final String OS_AUTH_PASSWORD = "nova";
+ private static final String OS_AUTH_PROJECT = "admin";
+ private static final String OS_AUTH_VERSION = "v2.0";
+ private static final OpenstackAuth.Protocol OS_AUTH_PROTOCOL = HTTP;
+ private static final OpenstackAuth.Perspective OS_AUTH_PERSPECTIVE = PUBLIC;
+ private static final String OS_ENDPOINT = "keystone:35357/v2.0";
+
+ private static final String META_PROXY_SECRET = "nova";
+ private static final boolean META_USE_SERVICE = true;
+ private static final String META_IP = "30.30.30.30";
+ private static final int META_PORT = 8775;
+
+ private static final String DPDK_INTF_NAME = "dpdk1";
+ private static final Long DPDK_INTF_MTU = 1500L;
+ private static final String DPDK_INTF_DEV_NAME = "br-int";
+ private static final String DPDK_INTF_PCI_ADDRESS = "0000:85:00.0";
+ private static final DpdkInterface.Type DPDK_INTF_TYPE = DPDK_VHOST_USER;
+
+ private static final DpdkConfig.DatapathType DPDK_DATAPATH_TYPE = NETDEV;
+ private static final String DPDK_SOCKET_DIR = "/var/lib/libvirt/qemu";
+
+ private static final String PHY_INTF_NETWORK = "mgmtnetwork";
+ private static final String PHY_INTF_NAME = "eth3";
+
+ private static final IpAddress CONTROLLER_IP = IpAddress.valueOf("40.40.40.40");
+ private static final int CONTROLLER_PORT = 6653;
+ private static final String TCP = "tcp";
+
+ private static final OpenstackSshAuth SSH_AUTH = initSshAuth();
+ private static final OpenstackAuth AUTH = initAuth();
+ private static final KeystoneConfig KEYSTONE_CONFIG = initKeystoneConfig();
+ private static final NeutronConfig NEUTRON_CONFIG = initNeutronConfig();
+ private static final DpdkConfig DPDK_CONFIG = initDpdkConfig();
+ private static final List<OpenstackPhyInterface> PHY_INTFS = initPhyIntfs();
+ private static final List<ControllerInfo> CONTROLLERS = initControllers();
+
+ private OpenstackNode refNode;
+
private static final OpenstackNode OS_NODE_1 = createNode(
HOSTNAME_1,
OpenstackNode.NodeType.COMPUTE,
@@ -51,6 +114,29 @@
NodeState.INIT);
/**
+ * Initial setup for this unit test.
+ */
+ @Before
+ public void setUp() {
+ refNode = DefaultOpenstackNode.builder()
+ .hostname(HOSTNAME_1)
+ .type(OpenstackNode.NodeType.COMPUTE)
+ .managementIp(MANAGEMENT_IP)
+ .dataIp(DATA_IP)
+ .intgBridge(DEVICE_1.id())
+ .vlanIntf(VLAN_INTF)
+ .uplinkPort(UPLINK_PORT)
+ .state(NodeState.COMPLETE)
+ .sshAuthInfo(SSH_AUTH)
+ .keystoneConfig(KEYSTONE_CONFIG)
+ .neutronConfig(NEUTRON_CONFIG)
+ .dpdkConfig(DPDK_CONFIG)
+ .phyIntfs(PHY_INTFS)
+ .controllers(CONTROLLERS)
+ .build();
+ }
+
+ /**
* Checks equals method works as expected.
*/
@Test
@@ -61,6 +147,48 @@
}
/**
+ * Test object construction.
+ */
+ @Test
+ public void testConstruction() {
+ checkCommonProperties(refNode);
+ assertEquals(refNode.state(), COMPLETE);
+ assertEquals(refNode.intgBridge(), DEVICE_1.id());
+ }
+
+ /**
+ * Checks the functionality of update state method.
+ */
+ @Test
+ public void testUpdateState() {
+ OpenstackNode updatedNode = refNode.updateState(DEVICE_CREATED);
+
+ checkCommonProperties(updatedNode);
+ assertEquals(updatedNode.state(), DEVICE_CREATED);
+ }
+
+ /**
+ * Checks the functionality of update int bridge method.
+ */
+ @Test
+ public void testUpdateIntBridge() {
+ OpenstackNode updatedNode = refNode.updateIntbridge(DeviceId.deviceId("br-tun"));
+
+ checkCommonProperties(updatedNode);
+ assertEquals(updatedNode.intgBridge(), DeviceId.deviceId("br-tun"));
+ }
+
+ /**
+ * Checks the functionality of from method.
+ */
+ @Test
+ public void testFrom() {
+ OpenstackNode updatedNode = DefaultOpenstackNode.from(refNode).build();
+
+ assertEquals(updatedNode, refNode);
+ }
+
+ /**
* Checks building a node without hostname fails with proper exception.
*/
@Test(expected = IllegalArgumentException.class)
@@ -116,4 +244,87 @@
.state(NodeState.INIT)
.build();
}
+
+
+ private static OpenstackSshAuth initSshAuth() {
+ return DefaultOpenstackSshAuth.builder()
+ .id(SSH_AUTH_ID)
+ .password(SSH_AUTH_PASSWORD)
+ .build();
+ }
+
+ private static OpenstackAuth initAuth() {
+ return DefaultOpenstackAuth.builder()
+ .username(OS_AUTH_USERNAME)
+ .password(OS_AUTH_PASSWORD)
+ .project(OS_AUTH_PROJECT)
+ .protocol(OS_AUTH_PROTOCOL)
+ .version(OS_AUTH_VERSION)
+ .perspective(OS_AUTH_PERSPECTIVE)
+ .build();
+ }
+
+ private static KeystoneConfig initKeystoneConfig() {
+ return DefaultKeystoneConfig.builder()
+ .authentication(AUTH)
+ .endpoint(OS_ENDPOINT)
+ .build();
+ }
+
+ private static NeutronConfig initNeutronConfig() {
+ return DefaultNeutronConfig.builder()
+ .metadataProxySecret(META_PROXY_SECRET)
+ .novaMetadataIp(META_IP)
+ .novaMetadataPort(META_PORT)
+ .useMetadataProxy(META_USE_SERVICE)
+ .build();
+ }
+
+ private static DpdkConfig initDpdkConfig() {
+ DpdkInterface dpdkIntf = DefaultDpdkInterface.builder()
+ .intf(DPDK_INTF_NAME)
+ .deviceName(DPDK_INTF_DEV_NAME)
+ .mtu(DPDK_INTF_MTU)
+ .pciAddress(DPDK_INTF_PCI_ADDRESS)
+ .type(DPDK_INTF_TYPE)
+ .build();
+ List<DpdkInterface> dpdkIntfs = ImmutableList.of(dpdkIntf);
+
+ return DefaultDpdkConfig.builder()
+ .dpdkIntfs(dpdkIntfs)
+ .datapathType(DPDK_DATAPATH_TYPE)
+ .socketDir(DPDK_SOCKET_DIR)
+ .build();
+ }
+
+ private static List<OpenstackPhyInterface> initPhyIntfs() {
+ OpenstackPhyInterface phyIntf = DefaultOpenstackPhyInterface.builder()
+ .intf(PHY_INTF_NAME)
+ .network(PHY_INTF_NETWORK)
+ .build();
+
+ return ImmutableList.of(phyIntf);
+ }
+
+ private static List<ControllerInfo> initControllers() {
+ ControllerInfo controller = new ControllerInfo(CONTROLLER_IP, CONTROLLER_PORT, TCP);
+
+ return ImmutableList.of(controller);
+ }
+
+
+ private void checkCommonProperties(OpenstackNode node) {
+ assertEquals(node.hostname(), HOSTNAME_1);
+ assertEquals(node.type(), OpenstackNode.NodeType.COMPUTE);
+ assertEquals(node.managementIp(), MANAGEMENT_IP);
+ assertEquals(node.dataIp(), DATA_IP);
+ assertEquals(node.vlanIntf(), VLAN_INTF);
+ assertEquals(node.uplinkPort(), UPLINK_PORT);
+ assertEquals(node.sshAuthInfo(), SSH_AUTH);
+ assertEquals(node.keystoneConfig(), KEYSTONE_CONFIG);
+ assertEquals(node.neutronConfig(), NEUTRON_CONFIG);
+ assertEquals(node.dpdkConfig(), DPDK_CONFIG);
+ assertEquals(node.phyIntfs(), PHY_INTFS);
+ assertEquals(node.controllers(), CONTROLLERS);
+ }
}
diff --git a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterfaceTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterfaceTest.java
similarity index 94%
rename from apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterfaceTest.java
rename to apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterfaceTest.java
index 55a7077..ea9e569 100644
--- a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackPhyInterfaceTest.java
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackPhyInterfaceTest.java
@@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
-import org.onosproject.openstacknode.api.OpenstackPhyInterface;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
diff --git a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuthTest.java b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuthTest.java
similarity index 94%
rename from apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuthTest.java
rename to apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuthTest.java
index bdba45c..be923c0 100644
--- a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/impl/DefaultOpenstackSshAuthTest.java
+++ b/apps/openstacknode/api/src/test/java/org/onosproject/openstacknode/api/DefaultOpenstackSshAuthTest.java
@@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacknode.impl;
+package org.onosproject.openstacknode.api;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
-import org.onosproject.openstacknode.api.OpenstackSshAuth;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/cli/OpenstackNodeCheckCommand.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/cli/OpenstackNodeCheckCommand.java
index c67ff5b..7f1d630 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/cli/OpenstackNodeCheckCommand.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/cli/OpenstackNodeCheckCommand.java
@@ -24,12 +24,17 @@
import org.onosproject.net.Port;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceService;
+import org.onosproject.openstacknode.api.NodeState;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeService;
+import org.openstack4j.api.OSClient;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.onosproject.openstacknode.api.Constants.DEFAULT_TUNNEL;
import static org.onosproject.openstacknode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.CONTROLLER;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
+import static org.onosproject.openstacknode.util.OpenstackNodeUtil.getConnectedClient;
/**
* Checks detailed node init state.
@@ -44,7 +49,7 @@
private String hostname = null;
private static final String MSG_OK = "OK";
- private static final String MSG_NO = "NO";
+ private static final String MSG_ERROR = "ERROR";
@Override
protected void doExecute() {
@@ -57,29 +62,49 @@
return;
}
- print("[Integration Bridge Status]");
- Device device = deviceService.getDevice(osNode.intgBridge());
- if (device != null) {
- print("%s %s=%s available=%s %s",
- deviceService.isAvailable(device.id()) ? MSG_OK : MSG_NO,
- INTEGRATION_BRIDGE,
- device.id(),
- deviceService.isAvailable(device.id()),
- device.annotations());
- if (osNode.dataIp() != null) {
- printPortState(deviceService, osNode.intgBridge(), DEFAULT_TUNNEL);
+ if (osNode.type() == CONTROLLER) {
+ print("[Openstack Controller Status]");
+
+ OSClient client = getConnectedClient(osNode);
+ if (client == null) {
+ error("The given keystone info is incorrect to get authorized to openstack");
+ print("keystoneConfig=%s", osNode.keystoneConfig());
}
- if (osNode.vlanIntf() != null) {
- printPortState(deviceService, osNode.intgBridge(), osNode.vlanIntf());
- }
- if (osNode.type() == OpenstackNode.NodeType.GATEWAY) {
- printPortState(deviceService, osNode.intgBridge(), osNode.uplinkPort());
+
+ if (osNode.keystoneConfig() != null) {
+ print("%s keystoneConfig=%s, neutronConfig=%s",
+ osNode.state() == NodeState.COMPLETE && client != null ?
+ MSG_OK : MSG_ERROR,
+ osNode.keystoneConfig(),
+ osNode.neutronConfig());
+ } else {
+ print("%s keystoneConfig is missing", MSG_ERROR);
}
} else {
- print("%s %s=%s is not available",
- MSG_NO,
- INTEGRATION_BRIDGE,
- osNode.intgBridge());
+ print("[Integration Bridge Status]");
+ Device device = deviceService.getDevice(osNode.intgBridge());
+ if (device != null) {
+ print("%s %s=%s available=%s %s",
+ deviceService.isAvailable(device.id()) ? MSG_OK : MSG_ERROR,
+ INTEGRATION_BRIDGE,
+ device.id(),
+ deviceService.isAvailable(device.id()),
+ device.annotations());
+ if (osNode.dataIp() != null) {
+ printPortState(deviceService, osNode.intgBridge(), DEFAULT_TUNNEL);
+ }
+ if (osNode.vlanIntf() != null) {
+ printPortState(deviceService, osNode.intgBridge(), osNode.vlanIntf());
+ }
+ if (osNode.type() == GATEWAY) {
+ printPortState(deviceService, osNode.intgBridge(), osNode.uplinkPort());
+ }
+ } else {
+ print("%s %s=%s is not available",
+ MSG_ERROR,
+ INTEGRATION_BRIDGE,
+ osNode.intgBridge());
+ }
}
}
@@ -91,13 +116,13 @@
if (port != null) {
print("%s %s portNum=%s enabled=%s %s",
- port.isEnabled() ? MSG_OK : MSG_NO,
+ port.isEnabled() ? MSG_OK : MSG_ERROR,
portName,
port.number(),
port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
port.annotations());
} else {
- print("%s %s does not exist", MSG_NO, portName);
+ print("%s %s does not exist", MSG_ERROR, portName);
}
}
}
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkConfigCodec.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkConfigCodec.java
index 3e8156d..dcaed0f 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkConfigCodec.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkConfigCodec.java
@@ -22,7 +22,7 @@
import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacknode.api.DpdkConfig;
import org.onosproject.openstacknode.api.DpdkInterface;
-import org.onosproject.openstacknode.impl.DefaultDpdkConfig;
+import org.onosproject.openstacknode.api.DefaultDpdkConfig;
import java.util.ArrayList;
import java.util.List;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkInterfaceCodec.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkInterfaceCodec.java
index 033bfc0..2d38221 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkInterfaceCodec.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/DpdkInterfaceCodec.java
@@ -21,7 +21,7 @@
import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacknode.api.DpdkInterface;
import org.onosproject.openstacknode.api.DpdkInterface.Type;
-import org.onosproject.openstacknode.impl.DefaultDpdkInterface;
+import org.onosproject.openstacknode.api.DefaultDpdkInterface;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.nullIsIllegal;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackPhyInterfaceCodec.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackPhyInterfaceCodec.java
index 340a76d..d84abf8 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackPhyInterfaceCodec.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackPhyInterfaceCodec.java
@@ -19,7 +19,7 @@
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacknode.api.OpenstackPhyInterface;
-import org.onosproject.openstacknode.impl.DefaultOpenstackPhyInterface;
+import org.onosproject.openstacknode.api.DefaultOpenstackPhyInterface;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackSshAuthCodec.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackSshAuthCodec.java
index f64d664..56b860a 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackSshAuthCodec.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/codec/OpenstackSshAuthCodec.java
@@ -20,7 +20,7 @@
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacknode.api.OpenstackSshAuth;
-import org.onosproject.openstacknode.impl.DefaultOpenstackSshAuth;
+import org.onosproject.openstacknode.api.DefaultOpenstackSshAuth;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DistributedOpenstackNodeStore.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DistributedOpenstackNodeStore.java
index 71f5ba5..188520f 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DistributedOpenstackNodeStore.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DistributedOpenstackNodeStore.java
@@ -20,10 +20,14 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.openstacknode.api.DefaultDpdkConfig;
+import org.onosproject.openstacknode.api.DefaultDpdkInterface;
import org.onosproject.openstacknode.api.DefaultKeystoneConfig;
import org.onosproject.openstacknode.api.DefaultNeutronConfig;
import org.onosproject.openstacknode.api.DefaultOpenstackAuth;
import org.onosproject.openstacknode.api.DefaultOpenstackNode;
+import org.onosproject.openstacknode.api.DefaultOpenstackPhyInterface;
+import org.onosproject.openstacknode.api.DefaultOpenstackSshAuth;
import org.onosproject.openstacknode.api.DpdkConfig;
import org.onosproject.openstacknode.api.DpdkInterface;
import org.onosproject.openstacknode.api.NodeState;
diff --git a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/codec/OpenstackNodeCodecTest.java b/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/codec/OpenstackNodeCodecTest.java
index 3788d01..4798da1 100644
--- a/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/codec/OpenstackNodeCodecTest.java
+++ b/apps/openstacknode/app/src/test/java/org/onosproject/openstacknode/codec/OpenstackNodeCodecTest.java
@@ -40,12 +40,12 @@
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackPhyInterface;
import org.onosproject.openstacknode.api.OpenstackSshAuth;
-import org.onosproject.openstacknode.impl.DefaultDpdkConfig;
-import org.onosproject.openstacknode.impl.DefaultDpdkInterface;
+import org.onosproject.openstacknode.api.DefaultDpdkConfig;
+import org.onosproject.openstacknode.api.DefaultDpdkInterface;
import org.onosproject.openstacknode.api.DefaultKeystoneConfig;
import org.onosproject.openstacknode.api.DefaultNeutronConfig;
-import org.onosproject.openstacknode.impl.DefaultOpenstackPhyInterface;
-import org.onosproject.openstacknode.impl.DefaultOpenstackSshAuth;
+import org.onosproject.openstacknode.api.DefaultOpenstackPhyInterface;
+import org.onosproject.openstacknode.api.DefaultOpenstackSshAuth;
import java.io.IOException;
import java.io.InputStream;
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManagerTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManagerTest.java
new file mode 100644
index 0000000..d4296be
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManagerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for prometheus telemetry manager.
+ */
+public final class PrometheusTelemetryManagerTest {
+
+ private PrometheusTelemetryManager manager;
+ private OpenstackTelemetryServiceAdapter telemetryService =
+ new OpenstackTelemetryServiceAdapter();
+
+ /**
+ * Tests app activation and deactivation.
+ */
+ @Test
+ public void testActivateDeactivate() {
+ manager = new PrometheusTelemetryManager();
+
+ TestUtils.setField(manager, "openstackTelemetryService", telemetryService);
+
+ manager.activate();
+
+ assertTrue(telemetryService.services.contains(manager));
+
+ manager.deactivate();
+
+ assertFalse(telemetryService.services.contains(manager));
+ }
+}
diff --git a/cli/src/main/java/org/onosproject/cli/StorageNodesListCommand.java b/cli/src/main/java/org/onosproject/cli/StorageNodesListCommand.java
new file mode 100644
index 0000000..7cbf619
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/StorageNodesListCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2014-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cluster.ClusterAdminService;
+import org.onosproject.cluster.Node;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+/**
+ * Lists all storage nodes.
+ */
+@Service
+@Command(scope = "onos", name = "storage-nodes", description = "Lists all storage nodes")
+public class StorageNodesListCommand extends AbstractShellCommand {
+
+ private static final String FMT = "id=%s, address=%s:%s";
+
+ @Override
+ protected void doExecute() {
+ ClusterAdminService service = get(ClusterAdminService.class);
+ List<Node> nodes = newArrayList(service.getConsensusNodes());
+ Collections.sort(nodes, Comparator.comparing(Node::id));
+ if (outputJson()) {
+ print("%s", json(nodes));
+ } else {
+ for (Node node : nodes) {
+ print(FMT, node.id(), node.host(), node.tcpPort());
+ }
+ }
+ }
+
+ // Produces JSON structure.
+ private JsonNode json(List<Node> nodes) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode result = mapper.createArrayNode();
+ for (Node node : nodes) {
+ ObjectNode newNode = mapper.createObjectNode()
+ .put("id", node.id().toString())
+ .put("ip", node.ip().toString())
+ .put("host", node.host())
+ .put("tcpPort", node.tcpPort());
+ result.add(newNode);
+ }
+ return result;
+ }
+
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 299fe09..99a458d 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -108,6 +108,9 @@
<command>
<action class="org.onosproject.cli.NodesListCommand"/>
</command>
+ <command>
+ <action class="org.onosproject.cli.StorageNodesListCommand"/>
+ </command>
<command>
<action class="org.onosproject.cli.RolesCommand"/>
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
index 0253423..3633231 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
@@ -30,7 +30,20 @@
* Low priority for reactive applications. Packets are only sent to the
* controller if they fail to match any of the rules installed in the switch.
*/
- REACTIVE(5);
+ REACTIVE(5),
+
+ /**
+ * Other choices for applications.
+ */
+ MAX(65535),
+ HIGH5(65000),
+ HIGH4(64000),
+ HIGH3(63000),
+ HIGH2(62000),
+ HIGH1(61000),
+ HIGH(60000),
+ MEDIUM(30000),
+ LOWEST(1);
private final int priorityValue;
@@ -47,6 +60,7 @@
return priorityValue;
}
+ @Override
public String toString() {
return String.valueOf(priorityValue);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
deleted file mode 100644
index 823bcba..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cfg.ConfigProperty;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.ControllerNode.State;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.Node;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.Serializer;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.osgi.service.component.annotations.ReferencePolicy;
-import org.slf4j.Logger;
-
-import java.time.Instant;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_READY;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.store.OsgiPropertyConstants.*;
-
-/**
- * Distributed cluster nodes store that employs an accrual failure
- * detector to identify cluster member up/down status.
- */
-
-@Component(
- enabled = false,
- service = ClusterStore.class,
- property = {
- HEARTBEAT_INTERVAL + "=" + HEARTBEAT_INTERVAL_DEFAULT,
- PHI_FAILURE_THRESHOLD + "=" + PHI_FAILURE_THRESHOLD_DEFAULT,
- MIN_STANDARD_DEVIATION_MILLIS + "=" + MIN_STANDARD_DEVIATION_MILLIS_DEFAULT
- }
-)
-
-public class DistributedClusterStore
- extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
-
- private static final Logger log = getLogger(DistributedClusterStore.class);
-
- public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
- //@Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
- // label = "Interval time to send heartbeat to other controller nodes (millisecond)")
- private int heartbeatInterval = HEARTBEAT_INTERVAL_DEFAULT;
-
- //@Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
- // label = "the value of Phi threshold to detect accrual failure")
- private int phiFailureThreshold = PHI_FAILURE_THRESHOLD_DEFAULT;
-
- //@Property(name = "minStandardDeviationMillis", longValue = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS,
- // label = "The minimum standard deviation to take into account when computing the Phi value")
- private long minStandardDeviationMillis = MIN_STANDARD_DEVIATION_MILLIS_DEFAULT;
-
- private static final Serializer SERIALIZER = Serializer.using(
- KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(HeartbeatMessage.class)
- .build("ClusterStore"));
-
- private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-
- private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
- private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
- private final Map<NodeId, Version> nodeVersions = Maps.newConcurrentMap();
- private final Map<NodeId, Instant> nodeLastUpdatedTimes = Maps.newConcurrentMap();
-
- private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
- private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
-
- private PhiAccrualFailureDetector failureDetector;
-
- private ControllerNode localNode;
- private Version localVersion;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected VersionService versionService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MessagingService messagingService;
-
- // This must be optional to avoid a cyclic dependency
- @Reference(cardinality = ReferenceCardinality.OPTIONAL,
- bind = "bindComponentConfigService",
- unbind = "unbindComponentConfigService",
- policy = ReferencePolicy.DYNAMIC)
- protected ComponentConfigService cfgService;
-
- /**
- * Hook for wiring up optional reference to a service.
- *
- * @param service service being announced
- */
- protected void bindComponentConfigService(ComponentConfigService service) {
- if (cfgService == null) {
- cfgService = service;
- cfgService.registerProperties(getClass());
- readComponentConfiguration();
- }
- }
-
- /**
- * Hook for unwiring optional reference to a service.
- *
- * @param service service being withdrawn
- */
- protected void unbindComponentConfigService(ComponentConfigService service) {
- if (cfgService == service) {
- cfgService.unregisterProperties(getClass(), false);
- cfgService = null;
- }
- }
-
- @Activate
- public void activate() {
- localNode = clusterMetadataService.getLocalNode();
- localVersion = versionService.version();
- nodeVersions.put(localNode.id(), localVersion);
-
- messagingService.registerHandler(HEARTBEAT_MESSAGE,
- new HeartbeatMessageHandler(), heartBeatMessageHandler);
-
- failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
-
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- heartbeatInterval, TimeUnit.MILLISECONDS);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
- heartBeatSender.shutdownNow();
- heartBeatMessageHandler.shutdownNow();
-
- log.info("Stopped");
- }
-
- @Modified
- public void modified(ComponentContext context) {
- readComponentConfiguration();
- }
-
- @Override
- public void setDelegate(ClusterStoreDelegate delegate) {
- checkNotNull(delegate, "Delegate cannot be null");
- this.delegate = delegate;
- }
-
- @Override
- public void unsetDelegate(ClusterStoreDelegate delegate) {
- this.delegate = null;
- }
-
- @Override
- public boolean hasDelegate() {
- return this.delegate != null;
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return localNode;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- return ImmutableSet.copyOf(allNodes.values());
- }
-
- @Override
- public Set<Node> getStorageNodes() {
- return ImmutableSet.of();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return allNodes.get(nodeId);
- }
-
- @Override
- public State getState(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return firstNonNull(nodeStates.get(nodeId), State.INACTIVE);
- }
-
- @Override
- public Version getVersion(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return nodeVersions.get(nodeId);
- }
-
- @Override
- public void markFullyStarted(boolean started) {
- updateNode(localNode.id(), started ? State.READY : State.ACTIVE, null);
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- addNode(node);
- return node;
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- ControllerNode node = allNodes.remove(nodeId);
- if (node != null) {
- nodeStates.remove(nodeId);
- nodeVersions.remove(nodeId);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
- }
- }
-
- private void addNode(ControllerNode node) {
- allNodes.put(node.id(), node);
- updateNode(node.id(), node.equals(localNode) ? State.ACTIVE : State.INACTIVE, null);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
- }
-
- private void updateNode(NodeId nodeId, State newState, Version newVersion) {
- State currentState = nodeStates.get(nodeId);
- Version currentVersion = nodeVersions.get(nodeId);
- if (!Objects.equals(currentState, newState)
- || (newVersion != null && !Objects.equals(currentVersion, newVersion))) {
- nodeStates.put(nodeId, newState);
- if (newVersion != null) {
- nodeVersions.put(nodeId, newVersion);
- }
- nodeLastUpdatedTimes.put(nodeId, Instant.now());
- notifyChange(nodeId, currentState, newState, currentVersion, newVersion);
- }
- }
-
- private void heartbeat() {
- try {
- Set<ControllerNode> peers = allNodes.values()
- .stream()
- .filter(node -> !(node.id().equals(localNode.id())))
- .collect(Collectors.toSet());
- State state = nodeStates.get(localNode.id());
- byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state, localVersion));
- peers.forEach((node) -> {
- heartbeatToPeer(hbMessagePayload, node);
- State currentState = nodeStates.get(node.id());
- double phi = failureDetector.phi(node.id());
- if (phi >= phiFailureThreshold) {
- if (currentState.isActive()) {
- updateNode(node.id(), State.INACTIVE, null);
- failureDetector.reset(node.id());
- }
- } else {
- if (currentState == State.INACTIVE) {
- updateNode(node.id(), State.ACTIVE, null);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failed to send heartbeat", e);
- }
- }
-
- private void notifyChange(NodeId nodeId, State oldState, State newState, Version oldVersion, Version newVersion) {
- if (oldState != newState || !Objects.equals(oldVersion, newVersion)) {
- ControllerNode node = allNodes.get(nodeId);
- // Either this node or that node is no longer part of the same cluster
- if (node == null) {
- log.debug("Could not find node {} in the cluster, ignoring state change", nodeId);
- return;
- }
- ClusterEvent.Type type = newState == State.READY ? INSTANCE_READY :
- newState == State.ACTIVE ? INSTANCE_ACTIVATED :
- INSTANCE_DEACTIVATED;
- notifyDelegate(new ClusterEvent(type, node));
- }
- }
-
- private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
- Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
- messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
- if (error != null) {
- log.trace("Sending heartbeat to {} failed", remoteEp, error);
- }
- });
- }
-
- private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
- @Override
- public void accept(Endpoint sender, byte[] message) {
- HeartbeatMessage hb = SERIALIZER.decode(message);
- if (clusterMetadataService.getClusterMetadata().getNodes().contains(hb.source())) {
- // Avoid reporting heartbeats that have been enqueued by setting a minimum interval.
- long heartbeatTime = System.currentTimeMillis();
- long lastHeartbeatTime = failureDetector.getLastHeartbeatTime(hb.source().id());
- if (heartbeatTime - lastHeartbeatTime > heartbeatInterval / 2) {
- failureDetector.report(hb.source().id(), heartbeatTime);
- }
- updateNode(hb.source().id(), hb.state, hb.version);
- }
- }
- }
-
- private static class HeartbeatMessage {
- private ControllerNode source;
- private State state;
- private Version version;
-
- public HeartbeatMessage(ControllerNode source, State state, Version version) {
- this.source = source;
- this.state = state != null ? state : State.ACTIVE;
- this.version = version;
- }
-
- public ControllerNode source() {
- return source;
- }
- }
-
- @Override
- public Instant getLastUpdatedInstant(NodeId nodeId) {
- return nodeLastUpdatedTimes.get(nodeId);
- }
-
- /**
- * Extracts properties from the component configuration.
- *
- */
- private void readComponentConfiguration() {
- Set<ConfigProperty> configProperties = cfgService.getProperties(getClass().getName());
- for (ConfigProperty property : configProperties) {
- if ("heartbeatInterval".equals(property.name())) {
- String s = property.value();
- if (s == null) {
- setHeartbeatInterval(HEARTBEAT_INTERVAL_DEFAULT);
- log.info("Heartbeat interval time is not configured, default value is {}",
- HEARTBEAT_INTERVAL_DEFAULT);
- } else {
- int newHeartbeatInterval = isNullOrEmpty(s) ? HEARTBEAT_INTERVAL_DEFAULT
- : Integer.parseInt(s.trim());
- if (newHeartbeatInterval > 0 && heartbeatInterval != newHeartbeatInterval) {
- heartbeatInterval = newHeartbeatInterval;
- restartHeartbeatSender();
- }
- log.info("Configured. Heartbeat interval time is configured to {}",
- heartbeatInterval);
- }
- }
- if ("phiFailureThreshold".equals(property.name())) {
- String s = property.value();
- if (s == null) {
- setPhiFailureThreshold(PHI_FAILURE_THRESHOLD_DEFAULT);
- log.info("Phi failure threshold is not configured, default value is {}",
- PHI_FAILURE_THRESHOLD_DEFAULT);
- } else {
- int newPhiFailureThreshold = isNullOrEmpty(s) ? HEARTBEAT_INTERVAL_DEFAULT
- : Integer.parseInt(s.trim());
- setPhiFailureThreshold(newPhiFailureThreshold);
- log.info("Configured. Phi failure threshold is configured to {}",
- phiFailureThreshold);
- }
- }
- if ("minStandardDeviationMillis".equals(property.name())) {
- String s = property.value();
- if (s == null) {
- setMinStandardDeviationMillis(MIN_STANDARD_DEVIATION_MILLIS_DEFAULT);
- log.info("Minimum standard deviation is not configured, default value is {}",
- MIN_STANDARD_DEVIATION_MILLIS_DEFAULT);
- } else {
- long newMinStandardDeviationMillis = isNullOrEmpty(s)
- ? MIN_STANDARD_DEVIATION_MILLIS_DEFAULT
- : Long.parseLong(s.trim());
- setMinStandardDeviationMillis(newMinStandardDeviationMillis);
- log.info("Configured. Minimum standard deviation is configured to {}",
- newMinStandardDeviationMillis);
- }
- }
- }
- }
-
- /**
- * Sets heartbeat interval between the termination of one execution of heartbeat
- * and the commencement of the next.
- *
- * @param interval term between each heartbeat
- */
- private void setHeartbeatInterval(int interval) {
- try {
- checkArgument(interval > 0, "Interval must be greater than zero");
- heartbeatInterval = interval;
- } catch (IllegalArgumentException e) {
- log.warn(e.getMessage());
- heartbeatInterval = HEARTBEAT_INTERVAL_DEFAULT;
- }
- }
-
- /**
- * Sets Phi failure threshold.
- * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
- *
- * @param threshold
- */
- private void setPhiFailureThreshold(int threshold) {
- phiFailureThreshold = threshold;
- }
-
- /**
- * Sets the minimum standard deviation milliseconds.
- *
- * @param minStandardDeviationMillis the updated minimum standard deviation
- */
- private void setMinStandardDeviationMillis(long minStandardDeviationMillis) {
- this.minStandardDeviationMillis = minStandardDeviationMillis;
- try {
- failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
- } catch (IllegalArgumentException e) {
- log.warn(e.getMessage());
- this.minStandardDeviationMillis = MIN_STANDARD_DEVIATION_MILLIS_DEFAULT;
- failureDetector = new PhiAccrualFailureDetector(this.minStandardDeviationMillis);
- }
- }
-
- /**
- * Restarts heartbeatSender executor.
- */
- private void restartHeartbeatSender() {
- try {
- ScheduledExecutorService prevSender = heartBeatSender;
- heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- heartbeatInterval, TimeUnit.MILLISECONDS);
- prevSender.shutdown();
- } catch (Exception e) {
- log.warn(e.getMessage());
- }
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
deleted file mode 100644
index f59a3ed..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-/**
- * State transitions a decoder goes through as it is decoding an incoming message.
- */
-public enum DecoderState {
- READ_TYPE,
- READ_PREAMBLE,
- READ_LOGICAL_TIME,
- READ_LOGICAL_COUNTER,
- READ_MESSAGE_ID,
- READ_SENDER_IP_VERSION,
- READ_SENDER_IP,
- READ_SENDER_PORT,
- READ_SUBJECT_LENGTH,
- READ_SUBJECT,
- READ_STATUS,
- READ_CONTENT_LENGTH,
- READ_CONTENT
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
deleted file mode 100644
index 14489d5..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.onosproject.core.HybridLogicalTime;
-
-/**
- * Base class for internal messages.
- */
-public abstract class InternalMessage {
-
- /**
- * Internal message type.
- */
- public enum Type {
- REQUEST(1),
- REPLY(2);
-
- private final int id;
-
- Type(int id) {
- this.id = id;
- }
-
- /**
- * Returns the unique message type ID.
- *
- * @return the unique message type ID.
- */
- public int id() {
- return id;
- }
-
- /**
- * Returns the message type enum associated with the given ID.
- *
- * @param id the type ID.
- * @return the type enum for the given ID.
- */
- public static Type forId(int id) {
- switch (id) {
- case 1:
- return REQUEST;
- case 2:
- return REPLY;
- default:
- throw new IllegalArgumentException("Unknown status ID " + id);
- }
- }
- }
-
- private final int preamble;
- private final HybridLogicalTime time;
- private final long id;
- private final byte[] payload;
-
- protected InternalMessage(int preamble,
- HybridLogicalTime time,
- long id,
- byte[] payload) {
- this.preamble = preamble;
- this.time = time;
- this.id = id;
- this.payload = payload;
- }
-
- public abstract Type type();
-
- public boolean isRequest() {
- return type() == Type.REQUEST;
- }
-
- public boolean isReply() {
- return type() == Type.REPLY;
- }
-
- public HybridLogicalTime time() {
- return time;
- }
-
- public int preamble() {
- return preamble;
- }
-
- public long id() {
- return id;
- }
-
- public byte[] payload() {
- return payload;
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
deleted file mode 100644
index fc5706b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.MoreObjects;
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.core.HybridLogicalTime;
-
-/**
- * Internal reply message.
- */
-public final class InternalReply extends InternalMessage {
-
- /**
- * Message status.
- */
- public enum Status {
-
- // NOTE: For backwards compatibility enum constant IDs should not be changed.
-
- /**
- * All ok.
- */
- OK(0),
-
- /**
- * Response status signifying no registered handler.
- */
- ERROR_NO_HANDLER(1),
-
- /**
- * Response status signifying an exception handling the message.
- */
- ERROR_HANDLER_EXCEPTION(2),
-
- /**
- * Response status signifying invalid message structure.
- */
- PROTOCOL_EXCEPTION(3);
-
- private final int id;
-
- Status(int id) {
- this.id = id;
- }
-
- /**
- * Returns the unique status ID.
- *
- * @return the unique status ID.
- */
- public int id() {
- return id;
- }
-
- /**
- * Returns the status enum associated with the given ID.
- *
- * @param id the status ID.
- * @return the status enum for the given ID.
- */
- public static Status forId(int id) {
- switch (id) {
- case 0:
- return OK;
- case 1:
- return ERROR_NO_HANDLER;
- case 2:
- return ERROR_HANDLER_EXCEPTION;
- case 3:
- return PROTOCOL_EXCEPTION;
- default:
- throw new IllegalArgumentException("Unknown status ID " + id);
- }
- }
- }
-
- private final Status status;
-
- public InternalReply(int preamble,
- HybridLogicalTime time,
- long id,
- Status status) {
- this(preamble, time, id, new byte[0], status);
- }
-
- public InternalReply(int preamble,
- HybridLogicalTime time,
- long id,
- byte[] payload,
- Status status) {
- super(preamble, time, id, payload);
- this.status = status;
- }
-
- @Override
- public Type type() {
- return Type.REPLY;
- }
-
- public Status status() {
- return status;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("time", time())
- .add("id", id())
- .add("status", status())
- .add("payload", ByteArraySizeHashPrinter.of(payload()))
- .toString();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
deleted file mode 100644
index a432fcb..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-/**
- * Internal request message.
- */
-public final class InternalRequest extends InternalMessage {
- private final Endpoint sender;
- private final String subject;
-
- public InternalRequest(int preamble,
- HybridLogicalTime time,
- long id,
- Endpoint sender,
- String subject,
- byte[] payload) {
- super(preamble, time, id, payload);
- this.sender = sender;
- this.subject = subject;
- }
-
- @Override
- public Type type() {
- return Type.REQUEST;
- }
-
- public String subject() {
- return subject;
- }
-
- public Endpoint sender() {
- return sender;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("time", time())
- .add("id", id())
- .add("subject", subject)
- .add("sender", sender)
- .add("payload", ByteArraySizeHashPrinter.of(payload()))
- .toString();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
deleted file mode 100644
index 4b9ef6c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.Charsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ReplayingDecoder;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Decoder for inbound messages.
- */
-public class MessageDecoder extends ReplayingDecoder<DecoderState> {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private Version ipVersion;
- private IpAddress senderIp;
- private int senderPort;
-
- private InternalMessage.Type type;
- private int preamble;
- private long logicalTime;
- private long logicalCounter;
- private long messageId;
- private int contentLength;
- private byte[] content;
- private int subjectLength;
- private String subject;
- private InternalReply.Status status;
-
- public MessageDecoder() {
- super(DecoderState.READ_SENDER_IP_VERSION);
- }
-
- @Override
- @SuppressWarnings("squid:S128") // suppress switch fall through warning
- protected void decode(
- ChannelHandlerContext context,
- ByteBuf buffer,
- List<Object> out) throws Exception {
-
- switch (state()) {
- case READ_SENDER_IP_VERSION:
- ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
- checkpoint(DecoderState.READ_SENDER_IP);
- // FALLTHROUGH
- case READ_SENDER_IP:
- byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
- buffer.readBytes(octets);
- senderIp = IpAddress.valueOf(ipVersion, octets);
- checkpoint(DecoderState.READ_SENDER_PORT);
- // FALLTHROUGH
- case READ_SENDER_PORT:
- senderPort = buffer.readInt();
- checkpoint(DecoderState.READ_TYPE);
- // FALLTHROUGH
- case READ_TYPE:
- type = InternalMessage.Type.forId(buffer.readByte());
- checkpoint(DecoderState.READ_PREAMBLE);
- // FALLTHROUGH
- case READ_PREAMBLE:
- preamble = buffer.readInt();
- checkpoint(DecoderState.READ_LOGICAL_TIME);
- // FALLTHROUGH
- case READ_LOGICAL_TIME:
- logicalTime = buffer.readLong();
- checkpoint(DecoderState.READ_LOGICAL_COUNTER);
- // FALLTHROUGH
- case READ_LOGICAL_COUNTER:
- logicalCounter = buffer.readLong();
- checkpoint(DecoderState.READ_MESSAGE_ID);
- // FALLTHROUGH
- case READ_MESSAGE_ID:
- messageId = buffer.readLong();
- checkpoint(DecoderState.READ_CONTENT_LENGTH);
- // FALLTHROUGH
- case READ_CONTENT_LENGTH:
- contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_CONTENT);
- // FALLTHROUGH
- case READ_CONTENT:
- if (contentLength > 0) {
- //TODO Perform a sanity check on the size before allocating
- content = new byte[contentLength];
- buffer.readBytes(content);
- } else {
- content = new byte[0];
- }
-
- switch (type) {
- case REQUEST:
- checkpoint(DecoderState.READ_SUBJECT_LENGTH);
- break;
- case REPLY:
- checkpoint(DecoderState.READ_STATUS);
- break;
- default:
- checkState(false, "Must not be here");
- }
- break;
- default:
- break;
- }
-
- switch (type) {
- case REQUEST:
- switch (state()) {
- case READ_SUBJECT_LENGTH:
- subjectLength = buffer.readShort();
- checkpoint(DecoderState.READ_SUBJECT);
- // FALLTHROUGH
- case READ_SUBJECT:
- byte[] messageTypeBytes = new byte[subjectLength];
- buffer.readBytes(messageTypeBytes);
- subject = new String(messageTypeBytes, Charsets.UTF_8);
- InternalRequest message = new InternalRequest(preamble,
- new HybridLogicalTime(logicalTime, logicalCounter),
- messageId,
- new Endpoint(senderIp, senderPort),
- subject,
- content);
- out.add(message);
- checkpoint(DecoderState.READ_TYPE);
- break;
- default:
- break;
- }
- break;
- case REPLY:
- switch (state()) {
- case READ_STATUS:
- status = InternalReply.Status.forId(buffer.readByte());
- InternalReply message = new InternalReply(preamble,
- new HybridLogicalTime(logicalTime, logicalCounter),
- messageId,
- content,
- status);
- out.add(message);
- checkpoint(DecoderState.READ_TYPE);
- break;
- default:
- break;
- }
- break;
- default:
- checkState(false, "Must not be here");
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
- context.close();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
deleted file mode 100644
index 2542088..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import java.io.IOException;
-
-import com.google.common.base.Charsets;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Encode InternalMessage out into a byte buffer.
- */
-public class MessageEncoder extends MessageToByteEncoder<Object> {
-// Effectively MessageToByteEncoder<InternalMessage>,
-// had to specify <Object> to avoid Class Loader not being able to find some classes.
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final Endpoint endpoint;
- private final int preamble;
- private boolean endpointWritten;
-
- public MessageEncoder(Endpoint endpoint, int preamble) {
- super();
- this.endpoint = endpoint;
- this.preamble = preamble;
- }
-
- @Override
- protected void encode(
- ChannelHandlerContext context,
- Object rawMessage,
- ByteBuf out) throws Exception {
- if (rawMessage instanceof InternalRequest) {
- encodeRequest((InternalRequest) rawMessage, out);
- } else if (rawMessage instanceof InternalReply) {
- encodeReply((InternalReply) rawMessage, out);
- }
- }
-
- private void encodeMessage(InternalMessage message, ByteBuf out) {
- // If the endpoint hasn't been written to the channel, write it.
- if (!endpointWritten) {
- IpAddress senderIp = endpoint.host();
- if (senderIp.version() == Version.INET) {
- out.writeByte(0);
- } else {
- out.writeByte(1);
- }
- out.writeBytes(senderIp.toOctets());
-
- // write sender port
- out.writeInt(endpoint.port());
-
- endpointWritten = true;
- }
-
- out.writeByte(message.type().id());
- out.writeInt(this.preamble);
-
- // write time
- out.writeLong(message.time().logicalTime());
- out.writeLong(message.time().logicalCounter());
-
- // write message id
- out.writeLong(message.id());
-
- byte[] payload = message.payload();
-
- // write payload length
- out.writeInt(payload.length);
-
- // write payload.
- out.writeBytes(payload);
- }
-
- private void encodeRequest(InternalRequest request, ByteBuf out) {
- encodeMessage(request, out);
-
- byte[] messageTypeBytes = request.subject().getBytes(Charsets.UTF_8);
-
- // write length of message type
- out.writeShort(messageTypeBytes.length);
-
- // write message type bytes
- out.writeBytes(messageTypeBytes);
-
- }
-
- private void encodeReply(InternalReply reply, ByteBuf out) {
- encodeMessage(reply, out);
-
- // write message status value
- out.writeByte(reply.status().id());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- if (cause instanceof IOException) {
- log.debug("IOException inside channel handling pipeline.", cause);
- } else {
- log.error("non-IOException inside channel handling pipeline.", cause);
- }
- context.close();
- }
-
- // Effectively same result as one generated by MessageToByteEncoder<InternalMessage>
- @Override
- public final boolean acceptOutboundMessage(Object msg) throws Exception {
- return msg instanceof InternalMessage;
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
deleted file mode 100644
index 53a6cf2..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ /dev/null
@@ -1,1084 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.ConnectException;
-import java.security.KeyStore;
-import java.security.MessageDigest;
-import java.security.PublicKey;
-import java.security.cert.Certificate;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
-
-/**
- * Netty based MessagingService.
- */
-@Component(enabled = false, service = MessagingService.class)
-public class NettyMessagingManager implements MessagingService {
- private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
- private static final long TIMEOUT_INTERVAL = 50;
- private static final int WINDOW_SIZE = 60;
- private static final int WINDOW_UPDATE_SAMPLE_SIZE = 100;
- private static final long WINDOW_UPDATE_MILLIS = 10000;
- private static final int MIN_SAMPLES = 25;
- private static final int MIN_STANDARD_DEVIATION = 100;
- private static final int PHI_FAILURE_THRESHOLD = 12;
- private static final long MIN_TIMEOUT_MILLIS = 100;
- private static final long MAX_TIMEOUT_MILLIS = 5000;
- private static final int CHANNEL_POOL_SIZE = 8;
-
- private static final byte[] EMPTY_PAYLOAD = new byte[0];
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final LocalClientConnection localClientConnection = new LocalClientConnection();
- private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
-
- //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
- private static final String CONFIG_DIR = "../config";
- private static final String KS_FILE_NAME = "onos.jks";
- private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
- private static final String DEFAULT_KS_PASSWORD = "changeit";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected HybridLogicalClockService clockService;
-
- private Endpoint localEndpoint;
- private int preamble;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
- private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
- private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
-
- private ScheduledFuture<?> timeoutFuture;
-
- private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
-
- private EventLoopGroup serverGroup;
- private EventLoopGroup clientGroup;
- private Class<? extends ServerChannel> serverChannelClass;
- private Class<? extends Channel> clientChannelClass;
- private ScheduledExecutorService timeoutExecutor;
-
- protected static final boolean TLS_ENABLED = true;
- protected static final boolean TLS_DISABLED = false;
- protected boolean enableNettyTls = TLS_ENABLED;
-
- protected TrustManagerFactory trustManager;
- protected KeyManagerFactory keyManager;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Activate
- public void activate() throws InterruptedException {
- ControllerNode localNode = clusterMetadataService.getLocalNode();
- getTlsParameters();
-
- if (started.get()) {
- log.warn("Already running at local endpoint: {}", localEndpoint);
- return;
- }
- this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
- this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
- initEventLoopGroup();
- startAcceptingConnections();
- timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("NettyMessagingEvt", "timeout", log));
- timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
- this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
- started.set(true);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- if (started.get()) {
- serverGroup.shutdownGracefully();
- clientGroup.shutdownGracefully();
- timeoutFuture.cancel(false);
- timeoutExecutor.shutdown();
- started.set(false);
- }
- log.info("Stopped");
- }
-
- private void getTlsParameters() {
- // default is TLS enabled unless key stores cannot be loaded
- enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
-
- if (enableNettyTls) {
- enableNettyTls = loadKeyStores();
- }
- }
-
- private boolean loadKeyStores() {
- // Maintain a local copy of the trust and key managers in case anything goes wrong
- TrustManagerFactory tmf;
- KeyManagerFactory kmf;
- try {
- String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
- String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
- char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
- char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
-
- tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmf.init(ts);
-
- kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
- if (log.isInfoEnabled()) {
- logKeyStore(ks, ksLocation, ksPwd);
- }
- } catch (FileNotFoundException e) {
- log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
- return TLS_DISABLED;
- } catch (Exception e) {
- //TODO we might want to catch exceptions more specifically
- log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
- return TLS_DISABLED;
- }
- this.trustManager = tmf;
- this.keyManager = kmf;
- return TLS_ENABLED;
- }
-
- private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
- if (log.isInfoEnabled()) {
- log.info("Loaded cluster key store from: {}", ksLocation);
- try {
- for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
- String alias = e.nextElement();
- Certificate cert = ks.getCertificate(alias);
- if (cert == null) {
- log.info("No certificate for alias {}", alias);
- continue;
- }
- PublicKey key = cert.getPublicKey();
- // Compute the certificate's fingerprint (use the key if certificate cannot be found)
- MessageDigest digest = MessageDigest.getInstance("SHA1");
- digest.update(key.getEncoded());
- StringJoiner fingerprint = new StringJoiner(":");
- for (byte b : digest.digest()) {
- fingerprint.add(String.format("%02X", b));
- }
- log.info("{} -> {}", alias, fingerprint);
- }
- } catch (Exception e) {
- log.warn("Unable to print contents of key store: {}", ksLocation, e);
- }
- }
- }
-
- private void initEventLoopGroup() {
- // try Epoll first and if that does work, use nio.
- try {
- clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
- serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
- serverChannelClass = EpollServerSocketChannel.class;
- clientChannelClass = EpollSocketChannel.class;
- return;
- } catch (Throwable e) {
- log.debug("Failed to initialize native (epoll) transport. "
- + "Reason: {}. Proceeding with nio.", e.getMessage());
- }
- clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
- serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
- serverChannelClass = NioServerSocketChannel.class;
- clientChannelClass = NioSocketChannel.class;
- }
-
- /**
- * Times out response callbacks.
- */
- private void timeoutAllCallbacks() {
- // Iterate through all connections and time out callbacks.
- localClientConnection.timeoutCallbacks();
- for (RemoteClientConnection connection : clientConnections.values()) {
- connection.timeoutCallbacks();
- }
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- checkPermission(CLUSTER_WRITE);
- InternalRequest message = new InternalRequest(preamble,
- clockService.timeNow(),
- messageIdGenerator.incrementAndGet(),
- localEndpoint,
- type,
- payload);
- return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
- checkPermission(CLUSTER_WRITE);
- return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- long messageId = messageIdGenerator.incrementAndGet();
- InternalRequest message = new InternalRequest(preamble,
- clockService.timeNow(),
- messageId,
- localEndpoint,
- type,
- payload);
- return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
- }
-
- private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
- return channels.computeIfAbsent(endpoint, e -> {
- List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
- for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
- defaultList.add(null);
- }
- return Lists.newCopyOnWriteArrayList(defaultList);
- });
- }
-
- private int getChannelOffset(String messageType) {
- return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
- }
-
- private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
- List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
- int offset = getChannelOffset(messageType);
-
- CompletableFuture<Channel> channelFuture = channelPool.get(offset);
- if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
- synchronized (channelPool) {
- channelFuture = channelPool.get(offset);
- if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
- channelFuture = openChannel(endpoint);
- channelPool.set(offset, channelFuture);
- }
- }
- }
-
- CompletableFuture<Channel> future = new CompletableFuture<>();
- final CompletableFuture<Channel> finalFuture = channelFuture;
- finalFuture.whenComplete((channel, error) -> {
- if (error == null) {
- if (!channel.isActive()) {
- CompletableFuture<Channel> currentFuture;
- synchronized (channelPool) {
- currentFuture = channelPool.get(offset);
- if (currentFuture == finalFuture) {
- channelPool.set(offset, null);
- }
- }
-
- ClientConnection connection = clientConnections.remove(channel);
- if (connection != null) {
- connection.close();
- }
-
- if (currentFuture == finalFuture) {
- getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
- if (recursiveError == null) {
- future.complete(recursiveResult);
- } else {
- future.completeExceptionally(recursiveError);
- }
- });
- } else {
- currentFuture.whenComplete((recursiveResult, recursiveError) -> {
- if (recursiveError == null) {
- future.complete(recursiveResult);
- } else {
- future.completeExceptionally(recursiveError);
- }
- });
- }
- } else {
- future.complete(channel);
- }
- } else {
- future.completeExceptionally(error);
- }
- });
- return future;
- }
-
- private <T> CompletableFuture<T> executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor) {
- CompletableFuture<T> future = new CompletableFuture<T>();
- executeOnPooledConnection(endpoint, type, callback, executor, future);
- return future;
- }
-
- private <T> void executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor,
- CompletableFuture<T> future) {
- if (endpoint.equals(localEndpoint)) {
- callback.apply(localClientConnection).whenComplete((result, error) -> {
- if (error == null) {
- executor.execute(() -> future.complete(result));
- } else {
- executor.execute(() -> future.completeExceptionally(error));
- }
- });
- return;
- }
-
- getChannel(endpoint, type).whenComplete((channel, channelError) -> {
- if (channelError == null) {
- ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
- callback.apply(connection).whenComplete((result, sendError) -> {
- if (sendError == null) {
- executor.execute(() -> future.complete(result));
- } else {
- Throwable cause = Throwables.getRootCause(sendError);
- if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
- channel.close().addListener(f -> {
- connection.close();
- clientConnections.remove(channel);
- });
- }
- executor.execute(() -> future.completeExceptionally(sendError));
- }
- });
- } else {
- executor.execute(() -> future.completeExceptionally(channelError));
- }
- });
- }
-
- @Override
- public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> executor.execute(() ->
- handler.accept(message.sender(), message.payload())));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> executor.execute(() -> {
- byte[] responsePayload = null;
- InternalReply.Status status = InternalReply.Status.OK;
- try {
- responsePayload = handler.apply(message.sender(), message.payload());
- } catch (Exception e) {
- log.debug("An error occurred in a message handler: {}", e);
- status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
- }
- connection.reply(message, status, Optional.ofNullable(responsePayload));
- }));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> {
- handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- InternalReply.Status status;
- if (error == null) {
- status = InternalReply.Status.OK;
- } else {
- log.debug("An error occurred in a message handler: {}", error);
- status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
- }
- connection.reply(message, status, Optional.ofNullable(result));
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- checkPermission(CLUSTER_WRITE);
- handlers.remove(type);
- }
-
- private Bootstrap bootstrapClient(Endpoint endpoint) {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
- bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
- bootstrap.group(clientGroup);
- bootstrap.channel(clientChannelClass);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
- if (enableNettyTls) {
- bootstrap.handler(new SslClientCommunicationChannelInitializer());
- } else {
- bootstrap.handler(new BasicChannelInitializer());
- }
- return bootstrap;
- }
-
- private void startAcceptingConnections() throws InterruptedException {
- ServerBootstrap b = new ServerBootstrap();
- b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(8 * 1024, 32 * 1024));
- b.option(ChannelOption.SO_RCVBUF, 1048576);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
- b.childOption(ChannelOption.TCP_NODELAY, true);
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.group(serverGroup, clientGroup);
- b.channel(serverChannelClass);
- if (enableNettyTls) {
- b.childHandler(new SslServerCommunicationChannelInitializer());
- } else {
- b.childHandler(new BasicChannelInitializer());
- }
- b.option(ChannelOption.SO_BACKLOG, 128);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
-
- // Bind and start to accept incoming connections.
- b.bind(localEndpoint.port()).sync().addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} accepting incoming connections on port {}",
- localEndpoint.host(), localEndpoint.port());
- } else {
- log.warn("{} failed to bind to port {} due to {}",
- localEndpoint.host(), localEndpoint.port(), future.cause());
- }
- });
- }
-
- private CompletableFuture<Channel> openChannel(Endpoint ep) {
- Bootstrap bootstrap = bootstrapClient(ep);
- CompletableFuture<Channel> retFuture = new CompletableFuture<>();
- ChannelFuture f = bootstrap.connect();
-
- f.addListener(future -> {
- if (future.isSuccess()) {
- retFuture.complete(f.channel());
- } else {
- retFuture.completeExceptionally(future.cause());
- }
- });
- log.debug("Established a new connection to {}", ep);
- return retFuture;
- }
-
- /**
- * Channel initializer for TLS servers.
- */
- private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- SSLContext serverContext = SSLContext.getInstance("TLS");
- serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
-
- SSLEngine serverSslEngine = serverContext.createSSLEngine();
-
- serverSslEngine.setNeedClientAuth(true);
- serverSslEngine.setUseClientMode(false);
- serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
- serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
- serverSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel initializer for TLS clients.
- */
- private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- SSLContext clientContext = SSLContext.getInstance("TLS");
- clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
-
- SSLEngine clientSslEngine = clientContext.createSSLEngine();
-
- clientSslEngine.setUseClientMode(true);
- clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
- clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
- clientSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel initializer for basic connections.
- */
- private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel inbound handler that dispatches messages to the appropriate handler.
- */
- @ChannelHandler.Sharable
- private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
- // Effectively SimpleChannelInboundHandler<InternalMessage>,
- // had to specify <Object> to avoid Class Loader not being able to find some classes.
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
- InternalMessage message = (InternalMessage) rawMessage;
- try {
- if (message.isRequest()) {
- RemoteServerConnection connection =
- serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
- connection.dispatch((InternalRequest) message);
- } else {
- RemoteClientConnection connection =
- clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
- connection.dispatch((InternalReply) message);
- }
- } catch (RejectedExecutionException e) {
- log.warn("Unable to dispatch message due to {}", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
-
- RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
- if (clientConnection != null) {
- clientConnection.close();
- }
-
- RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
- if (serverConnection != null) {
- serverConnection.close();
- }
- context.close();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
- RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
- if (clientConnection != null) {
- clientConnection.close();
- }
-
- RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
- if (serverConnection != null) {
- serverConnection.close();
- }
- context.close();
- }
-
- /**
- * Returns true if the given message should be handled.
- *
- * @param msg inbound message
- * @return true if {@code msg} is {@link InternalMessage} instance.
- * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
- */
- @Override
- public final boolean acceptInboundMessage(Object msg) {
- return msg instanceof InternalMessage;
- }
- }
-
- /**
- * Wraps a {@link CompletableFuture} and tracks its type and creation time.
- */
- private final class Callback {
- private final String type;
- private final CompletableFuture<byte[]> future;
- private final long time = System.currentTimeMillis();
-
- Callback(String type, CompletableFuture<byte[]> future) {
- this.type = type;
- this.future = future;
- }
-
- public void complete(byte[] value) {
- future.complete(value);
- }
-
- public void completeExceptionally(Throwable error) {
- future.completeExceptionally(error);
- }
- }
-
- /**
- * Represents the client side of a connection to a local or remote server.
- */
- private interface ClientConnection {
-
- /**
- * Sends a message to the other side of the connection.
- *
- * @param message the message to send
- * @return a completable future to be completed once the message has been sent
- */
- CompletableFuture<Void> sendAsync(InternalRequest message);
-
- /**
- * Sends a message to the other side of the connection, awaiting a reply.
- *
- * @param message the message to send
- * @return a completable future to be completed once a reply is received or the request times out
- */
- CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
-
- /**
- * Closes the connection.
- */
- default void close() {
- }
- }
-
- /**
- * Represents the server side of a connection.
- */
- private interface ServerConnection {
-
- /**
- * Sends a reply to the other side of the connection.
- *
- * @param message the message to which to reply
- * @param status the reply status
- * @param payload the response payload
- */
- void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
-
- /**
- * Closes the connection.
- */
- default void close() {
- }
- }
-
- /**
- * Remote connection implementation.
- */
- private abstract class AbstractClientConnection implements ClientConnection {
- private final Map<Long, Callback> futures = Maps.newConcurrentMap();
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
- .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
- .build();
-
- /**
- * Times out callbacks for this connection.
- */
- void timeoutCallbacks() {
- // Store the current time.
- long currentTime = System.currentTimeMillis();
-
- // Iterate through future callbacks and time out callbacks that have been alive
- // longer than the current timeout according to the message type.
- Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
- while (iterator.hasNext()) {
- Callback callback = iterator.next().getValue();
- try {
- RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
- long elapsedTime = currentTime - callback.time;
- if (elapsedTime > MAX_TIMEOUT_MILLIS ||
- (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
- iterator.remove();
- requestMonitor.addReplyTime(elapsedTime);
- callback.completeExceptionally(
- new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
- }
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
- }
- }
-
- protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
- futures.put(id, new Callback(subject, future));
- }
-
- protected Callback completeCallback(long id) {
- Callback callback = futures.remove(id);
- if (callback != null) {
- try {
- RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
- requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
- }
- return callback;
- }
-
- protected Callback failCallback(long id) {
- return futures.remove(id);
- }
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- for (Callback callback : futures.values()) {
- callback.completeExceptionally(new ConnectException());
- }
- }
- }
- }
-
- /**
- * Local connection implementation.
- */
- private final class LocalClientConnection extends AbstractClientConnection {
- @Override
- public CompletableFuture<Void> sendAsync(InternalRequest message) {
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, localServerConnection);
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- }
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- future.whenComplete((r, e) -> completeCallback(message.id()));
- registerCallback(message.id(), message.subject(), future);
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, new LocalServerConnection(future));
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- new LocalServerConnection(future)
- .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
- }
- return future;
- }
- }
-
- /**
- * Local server connection.
- */
- private final class LocalServerConnection implements ServerConnection {
- private final CompletableFuture<byte[]> future;
-
- LocalServerConnection(CompletableFuture<byte[]> future) {
- this.future = future;
- }
-
- @Override
- public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
- if (future != null) {
- if (status == InternalReply.Status.OK) {
- future.complete(payload.orElse(EMPTY_PAYLOAD));
- } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
- future.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
- future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
- future.completeExceptionally(new MessagingException.ProtocolException());
- }
- }
- }
- }
-
- /**
- * Remote connection implementation.
- */
- private final class RemoteClientConnection extends AbstractClientConnection {
- private final Channel channel;
-
- RemoteClientConnection(Channel channel) {
- this.channel = channel;
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(InternalRequest message) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- registerCallback(message.id(), message.subject(), future);
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- Callback callback = failCallback(message.id());
- if (callback != null) {
- callback.completeExceptionally(channelFuture.cause());
- }
- }
- });
- return future;
- }
-
- /**
- * Dispatches a message to a local handler.
- *
- * @param message the message to dispatch
- */
- private void dispatch(InternalReply message) {
- if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble", message.type());
- return;
- }
-
- clockService.recordEventTime(message.time());
-
- Callback callback = completeCallback(message.id());
- if (callback != null) {
- if (message.status() == InternalReply.Status.OK) {
- callback.complete(message.payload());
- } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
- callback.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.ProtocolException());
- }
- } else {
- log.debug("Received a reply for message id:[{}] "
- + "but was unable to locate the"
- + " request handle", message.id());
- }
- }
- }
-
- /**
- * Remote server connection.
- */
- private final class RemoteServerConnection implements ServerConnection {
- private final Channel channel;
-
- RemoteServerConnection(Channel channel) {
- this.channel = channel;
- }
-
- /**
- * Dispatches a message to a local handler.
- *
- * @param message the message to dispatch
- */
- private void dispatch(InternalRequest message) {
- if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
- reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
- return;
- }
-
- clockService.recordEventTime(message.time());
-
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, this);
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
- }
- }
-
- @Override
- public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
- InternalReply response = new InternalReply(preamble,
- clockService.timeNow(),
- message.id(),
- payload.orElse(EMPTY_PAYLOAD),
- status);
- channel.writeAndFlush(response);
- }
- }
-
- /**
- * Request-reply timeout history tracker.
- */
- private static final class RequestMonitor {
- private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
- private final AtomicLong max = new AtomicLong();
- private volatile int replyCount;
- private volatile long lastUpdate = System.currentTimeMillis();
-
- /**
- * Adds a reply time to the history.
- *
- * @param replyTime the reply time to add to the history
- */
- void addReplyTime(long replyTime) {
- max.accumulateAndGet(replyTime, Math::max);
-
- // If at least WINDOW_UPDATE_SAMPLE_SIZE response times have been recorded, and at least
- // WINDOW_UPDATE_MILLIS have passed since the last update, record the maximum response time in the samples.
- int replyCount = ++this.replyCount;
- if (replyCount >= WINDOW_UPDATE_SAMPLE_SIZE
- && System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
- synchronized (this) {
- if (System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
- long lastMax = max.get();
- if (lastMax > 0) {
- samples.addValue(lastMax);
- lastUpdate = System.currentTimeMillis();
- this.replyCount = 0;
- max.set(0);
- }
- }
- }
- }
- }
-
- /**
- * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
- *
- * @param elapsedTime the elapsed request time
- * @return indicates whether the request should be timed out
- */
- boolean isTimedOut(long elapsedTime) {
- return samples.getN() == WINDOW_SIZE && phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
- }
-
- /**
- * Compute phi for the specified node id.
- *
- * @param elapsedTime the duration since the request was sent
- * @return phi value
- */
- private double phi(long elapsedTime) {
- if (samples.getN() < MIN_SAMPLES) {
- return 0.0;
- }
- return computePhi(samples, elapsedTime);
- }
-
- /**
- * Computes the phi value from the given samples.
- *
- * @param samples the samples from which to compute phi
- * @param elapsedTime the duration since the request was sent
- * @return phi
- */
- private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
- double meanMillis = samples.getMean();
- double y = (elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), MIN_STANDARD_DEVIATION);
- double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
- if (elapsedTime > meanMillis) {
- return -Math.log10(e / (1.0 + e));
- } else {
- return -Math.log10(1.0 - 1.0 / (1.0 + e));
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index ec2532b..4edd2f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -831,6 +831,16 @@
}
/**
+ * Purges the flow table.
+ */
+ public void purge() {
+ flowTasks.clear();
+ flowBuckets.values().forEach(bucket -> bucket.purge());
+ lastBackupTimes.clear();
+ inFlightUpdates.clear();
+ }
+
+ /**
* Closes the device flow table.
*/
public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 9084ede..89758c4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -826,9 +826,18 @@
* @param deviceId the device for which to purge flow rules
*/
public void purgeFlowRule(DeviceId deviceId) {
- DeviceFlowTable flowTable = flowTables.remove(deviceId);
- if (flowTable != null) {
- flowTable.close();
+ // If the device is still present in the store, purge the underlying DeviceFlowTable.
+ // Otherwise, remove the DeviceFlowTable and unregister message handlers.
+ if (deviceService.getDevice(deviceId) != null) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ if (flowTable != null) {
+ flowTable.purge();
+ }
+ } else {
+ DeviceFlowTable flowTable = flowTables.remove(deviceId);
+ if (flowTable != null) {
+ flowTable.close();
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index 34d470c..ea1887d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -264,6 +264,13 @@
}
/**
+ * Purges the bucket.
+ */
+ public void purge() {
+ flowBucket.clear();
+ }
+
+ /**
* Clears the bucket.
*/
public void clear() {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
deleted file mode 100644
index e651288..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterMetadataServiceAdapter;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.Version;
-import org.onosproject.core.VersionServiceAdapter;
-import org.onosproject.store.cluster.messaging.impl.NettyMessagingManager;
-
-import java.util.Set;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.assertFalse;
-
-/**
- * Unit test for DistributedClusterStore.
- */
-public class DistributedClusterStoreTest {
- DistributedClusterStore distributedClusterStore;
- ClusterStore clusterStore;
- NodeId nodeId;
- ControllerNode local;
- private static final NodeId NID1 = new NodeId("foo");
- private static final NodeId NID2 = new NodeId("bar");
- private static final NodeId NID3 = new NodeId("buz");
-
- private static final IpAddress IP1 = IpAddress.valueOf("127.0.0.1");
- private static final IpAddress IP2 = IpAddress.valueOf("127.0.0.2");
- private static final IpAddress IP3 = IpAddress.valueOf("127.0.0.3");
-
- private static final int PORT1 = 1;
- private static final int PORT2 = 2;
- private static Set<ControllerNode> nodes;
-
- private TestDelegate delegate = new TestDelegate();
- private class TestDelegate implements ClusterStoreDelegate {
- private ClusterEvent event;
- @Override
- public void notify(ClusterEvent event) {
- this.event = event;
- }
- }
-
- @Before
- public void setUp() throws Exception {
- distributedClusterStore = new DistributedClusterStore();
- distributedClusterStore.clusterMetadataService = new ClusterMetadataServiceAdapter() {
- @Override
- public ControllerNode getLocalNode() {
- return new DefaultControllerNode(NID1, IP1);
- }
- };
- distributedClusterStore.messagingService = new NettyMessagingManager();
- distributedClusterStore.cfgService = new ComponentConfigAdapter();
- distributedClusterStore.versionService = new VersionServiceAdapter() {
- @Override
- public Version version() {
- return Version.version("1.1.1");
- }
- };
- distributedClusterStore.activate();
- clusterStore = distributedClusterStore;
- }
-
- @After
- public void tearDown() throws Exception {
- distributedClusterStore.deactivate();
- }
-
- @Test
- public void testEmpty() {
- nodeId = new NodeId("newNode");
- assertThat(clusterStore.getNode((nodeId)), is(nullValue()));
- assertFalse(clusterStore.hasDelegate());
- assertThat(clusterStore.getState(nodeId), is(ControllerNode.State.INACTIVE));
- assertThat(clusterStore.getVersion(nodeId), is(nullValue()));
- }
-
- @Test
- public void addNodes() {
- clusterStore.setDelegate(delegate);
- assertThat(clusterStore.hasDelegate(), is(true));
- clusterStore.addNode(NID1, IP1, PORT1);
- clusterStore.addNode(NID2, IP2, PORT2);
- clusterStore.removeNode(NID1);
-
- assertThat(clusterStore.getNode(NID1), is(nullValue()));
- clusterStore.addNode(NID3, IP3, PORT2);
-
- clusterStore.markFullyStarted(true);
- assertThat(clusterStore.getState(clusterStore.getLocalNode().id()),
- is(ControllerNode.State.READY));
- clusterStore.markFullyStarted(false);
- assertThat(clusterStore.getState(clusterStore.getLocalNode().id()),
- is(ControllerNode.State.ACTIVE));
- nodes = clusterStore.getNodes();
- assertThat(nodes.size(), is(2));
- clusterStore.markFullyStarted(true);
-
- clusterStore.unsetDelegate(delegate);
- assertThat(clusterStore.hasDelegate(), is(false));
- }
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
deleted file mode 100644
index 8cfa7ff..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import java.net.ConnectException;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.onlab.junit.TestTools.findAvailablePort;
-
-/**
- * Unit tests for NettyMessaging.
- */
-public class NettyMessagingManagerTest {
-
- HybridLogicalClockService testClockService = new HybridLogicalClockService() {
- AtomicLong counter = new AtomicLong();
- @Override
- public HybridLogicalTime timeNow() {
- return new HybridLogicalTime(counter.incrementAndGet(), 0);
- }
-
- @Override
- public void recordEventTime(HybridLogicalTime time) {
- }
- };
-
- NettyMessagingManager netty1;
- NettyMessagingManager netty2;
-
- private static final String DUMMY_NAME = "node";
- private static final String IP_STRING = "127.0.0.1";
-
- Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
- Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
- Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
-
- @Before
- public void setUp() throws Exception {
- ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
- netty1 = new NettyMessagingManager();
- netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
- netty1.clockService = testClockService;
- netty1.activate();
-
- ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
- netty2 = new NettyMessagingManager();
- netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
- netty2.clockService = testClockService;
- netty2.activate();
- }
-
- /**
- * Returns a random String to be used as a test subject.
- * @return string
- */
- private String nextSubject() {
- return UUID.randomUUID().toString();
- }
-
- @After
- public void tearDown() throws Exception {
- if (netty1 != null) {
- netty1.deactivate();
- }
-
- if (netty2 != null) {
- netty2.deactivate();
- }
- }
-
- @Test
- public void testSendAsync() {
- String subject = nextSubject();
- CountDownLatch latch1 = new CountDownLatch(1);
- CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
- response.whenComplete((r, e) -> {
- assertNull(e);
- latch1.countDown();
- });
- Uninterruptibles.awaitUninterruptibly(latch1);
-
- CountDownLatch latch2 = new CountDownLatch(1);
- response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
- response.whenComplete((r, e) -> {
- assertNotNull(e);
- assertTrue(e instanceof ConnectException);
- latch2.countDown();
- });
- Uninterruptibles.awaitUninterruptibly(latch2);
- }
-
- @Test
- @Ignore // FIXME disabled on 9/29/16 due to random failures
- public void testSendAndReceive() {
- String subject = nextSubject();
- AtomicBoolean handlerInvoked = new AtomicBoolean(false);
- AtomicReference<byte[]> request = new AtomicReference<>();
- AtomicReference<Endpoint> sender = new AtomicReference<>();
-
- BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
- handlerInvoked.set(true);
- sender.set(ep);
- request.set(data);
- return "hello there".getBytes();
- };
- netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
-
- CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
- assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
- assertTrue(handlerInvoked.get());
- assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
- assertEquals(ep1, sender.get());
- }
-
- @Test
- public void testDefaultTimeout() {
- String subject = nextSubject();
- BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
- netty2.registerHandler(subject, handler);
-
- try {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- fail();
- } catch (CompletionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- @Test
- public void testDynamicTimeout() {
- String subject = nextSubject();
- AtomicInteger counter = new AtomicInteger();
- BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
- if (counter.incrementAndGet() <= 50) {
- return CompletableFuture.completedFuture(new byte[0]);
- } else {
- return new CompletableFuture<>();
- }
- };
- netty2.registerHandler(subject, handler);
-
- for (int i = 0; i < 50; i++) {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- }
- try {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- fail();
- } catch (CompletionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /*
- * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
- * and response completion occurs on the expected thread.
- */
- @Test
- @Ignore
- public void testSendAndReceiveWithExecutor() {
- String subject = nextSubject();
- ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
- ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
- AtomicReference<String> handlerThreadName = new AtomicReference<>();
- AtomicReference<String> completionThreadName = new AtomicReference<>();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
- handlerThreadName.set(Thread.currentThread().getName());
- try {
- latch.await();
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- fail("InterruptedException");
- }
- return "hello there".getBytes();
- };
- netty2.registerHandler(subject, handler, handlerExecutor);
-
- CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
- subject,
- "hello world".getBytes(),
- completionExecutor);
- response.whenComplete((r, e) -> {
- completionThreadName.set(Thread.currentThread().getName());
- });
- latch.countDown();
-
- // Verify that the message was request handling and response completion happens on the correct thread.
- assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
- assertEquals("completion-thread", completionThreadName.get());
- assertEquals("handler-thread", handlerThreadName.get());
- }
-
- private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
- return new ClusterMetadataService() {
- @Override
- public ClusterMetadata getClusterMetadata() {
- return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
- name, getLocalNode(), Sets.newHashSet(), Sets.newHashSet());
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return new ControllerNode() {
- @Override
- public NodeId id() {
- return null;
- }
-
- @Override
- public String host() {
- return ipAddress;
- }
-
- @Override
- public IpAddress ip() {
- return IpAddress.valueOf(ipAddress);
- }
-
- @Override
- public IpAddress ip(boolean resolve) {
- return ip();
- }
-
- @Override
- public int tcpPort() {
- return ep.port();
- }
- };
- }
-
- @Override
- public void addListener(ClusterMetadataEventListener listener) {}
-
- @Override
- public void removeListener(ClusterMetadataEventListener listener) {}
- };
- }
-}
diff --git a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultDpdkNicFlowRule.java b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultDpdkNicFlowRule.java
index b7b6f05..4c2b7f0 100644
--- a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultDpdkNicFlowRule.java
+++ b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultDpdkNicFlowRule.java
@@ -75,8 +75,7 @@
rule += "ipv4 ";
if (this.ipv4Protocol() > 0) {
- rule += "proto spec " + Integer.toString(this.ipv4Protocol()) + " ";
- rule += "proto mask 0x0 ";
+ rule += "proto is " + Integer.toString(this.ipv4Protocol()) + " ";
}
if (this.ipv4SrcAddress() != null) {
@@ -129,15 +128,16 @@
// No subsequent field
if (action.actionField().isEmpty()) {
+ rule += "/ ";
continue;
}
// A subsequent field is associated with a value
rule += action.actionField() + " ";
- rule += Long.toString(action.actionValue()) + " ";
+ rule += Long.toString(action.actionValue()) + " / ";
}
- rule += "/ end";
+ rule += " end";
}
return rule;
diff --git a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultNicFlowRule.java b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultNicFlowRule.java
index abbf0b8..019b875 100644
--- a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultNicFlowRule.java
+++ b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/DefaultNicFlowRule.java
@@ -167,6 +167,9 @@
new NicRuleAction(NicRuleAction.Action.METER, meterInstruction.meterId().id()));
}
}
+
+ // This action provides basic rule match counters
+ this.actions.add(new NicRuleAction(NicRuleAction.Action.COUNT));
}
@Override
diff --git a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/NicRuleAction.java b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/NicRuleAction.java
index 3ec13c3..ff3afeb 100644
--- a/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/NicRuleAction.java
+++ b/drivers/server/src/main/java/org/onosproject/drivers/server/devices/nic/NicRuleAction.java
@@ -196,7 +196,7 @@
ACTION_FIELD.put(Action.JUMP, "group");
ACTION_FIELD.put(Action.MARK, "id");
ACTION_FIELD.put(Action.FLAG, "");
- ACTION_FIELD.put(Action.COUNT, "id");
+ ACTION_FIELD.put(Action.COUNT, "");
ACTION_FIELD.put(Action.QUEUE, "index");
ACTION_FIELD.put(Action.RSS, "queue");
ACTION_FIELD.put(Action.PF, "");
diff --git a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/TlsParams.java b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/TlsParams.java
index 1d09e1d..dfe7fa9 100644
--- a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/TlsParams.java
+++ b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/TlsParams.java
@@ -143,6 +143,7 @@
while (dis.read(buffer) > 0) {
// nothing to do :)
}
+ is.close();
return dis.getMessageDigest().digest();
} catch (NoSuchAlgorithmException e) {
log.error("Algorithm SHA1 Not found");
diff --git a/providers/null/src/main/java/org/onosproject/provider/nil/CustomTopologySimulator.java b/providers/null/src/main/java/org/onosproject/provider/nil/CustomTopologySimulator.java
index e7a5788..8ed469e 100644
--- a/providers/null/src/main/java/org/onosproject/provider/nil/CustomTopologySimulator.java
+++ b/providers/null/src/main/java/org/onosproject/provider/nil/CustomTopologySimulator.java
@@ -85,6 +85,23 @@
}
/**
+ * Creates simulated device.
+ *
+ * @param id device identifier
+ * @param name device name
+ * @param type device type
+ * @param hw hardware revision
+ * @param sw software revision
+ * @param portCount number of device ports
+ */
+ public void createDevice(DeviceId id, String name, Device.Type type,
+ String hw, String sw, int portCount) {
+ int chassisId = Integer.parseInt(id.uri().getSchemeSpecificPart(), 16);
+ createDevice(id, chassisId, type, hw, sw, portCount);
+ nameToId.put(name, id);
+ }
+
+ /**
* Creates a simulated host.
*
* @param hostId host identifier
diff --git a/providers/null/src/main/java/org/onosproject/provider/nil/TopologySimulator.java b/providers/null/src/main/java/org/onosproject/provider/nil/TopologySimulator.java
index 62410a3..3a35971 100644
--- a/providers/null/src/main/java/org/onosproject/provider/nil/TopologySimulator.java
+++ b/providers/null/src/main/java/org/onosproject/provider/nil/TopologySimulator.java
@@ -204,9 +204,23 @@
* @param portCount number of device ports
*/
public void createDevice(DeviceId id, int chassisId, Device.Type type, int portCount) {
+ createDevice(id, chassisId, type, "0.1", "0.1.2", portCount);
+ }
+
+ /**
+ * Creates simulated device.
+ *
+ * @param id device identifier
+ * @param chassisId chassis identifier number
+ * @param type device type
+ * @param hw hardware revision
+ * @param sw software revision
+ * @param portCount number of device ports
+ */
+ public void createDevice(DeviceId id, int chassisId, Device.Type type,
+ String hw, String sw, int portCount) {
DeviceDescription desc =
- new DefaultDeviceDescription(id.uri(), type,
- "ONF", "0.1", "0.1", "1234",
+ new DefaultDeviceDescription(id.uri(), type, "ONF", hw, sw, "1234",
new ChassisId(chassisId));
deviceIds.add(id);
mastershipAdminService.setRoleSync(localNode, id, MASTER);
diff --git a/providers/null/src/main/java/org/onosproject/provider/nil/cli/CreateNullDevice.java b/providers/null/src/main/java/org/onosproject/provider/nil/cli/CreateNullDevice.java
index bbe1e131..752ef51 100644
--- a/providers/null/src/main/java/org/onosproject/provider/nil/cli/CreateNullDevice.java
+++ b/providers/null/src/main/java/org/onosproject/provider/nil/cli/CreateNullDevice.java
@@ -17,7 +17,9 @@
import org.apache.karaf.shell.api.action.Argument;
import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
import org.apache.karaf.shell.api.action.lifecycle.Service;
+
import org.onlab.util.Tools;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
@@ -61,6 +63,15 @@
required = false)
String locType = GEO;
+ @Option(name = "-I", aliases = "--id", description = "Device identifier")
+ String id = null;
+
+ @Option(name = "-H", aliases = "--hw", description = "Hardware version")
+ String hw = "0.1";
+
+ @Option(name = "-S", aliases = "--sw", description = "Software version")
+ String sw = "0.1.2";
+
@Override
protected void doExecute() {
NullProviders service = get(NullProviders.class);
@@ -72,13 +83,14 @@
}
CustomTopologySimulator sim = (CustomTopologySimulator) simulator;
- DeviceId deviceId = sim.nextDeviceId();
+ DeviceId deviceId = id == null ? sim.nextDeviceId() : DeviceId.deviceId(id);
BasicDeviceConfig cfg = cfgService.addConfig(deviceId, BasicDeviceConfig.class);
cfg.name(name);
setUiCoordinates(cfg, locType, latOrY, longOrX);
Tools.delay(10);
- sim.createDevice(deviceId, name, Device.Type.valueOf(type.toUpperCase()), portCount);
+ sim.createDevice(deviceId, name, Device.Type.valueOf(type.toUpperCase()),
+ hw, sw, portCount);
}
}
diff --git a/tools/build/libgen/src/main/java/org/onosproject/libgen/AetherResolver.java b/tools/build/libgen/src/main/java/org/onosproject/libgen/AetherResolver.java
old mode 100644
new mode 100755
index 4f5dcfa..e97bad5
--- a/tools/build/libgen/src/main/java/org/onosproject/libgen/AetherResolver.java
+++ b/tools/build/libgen/src/main/java/org/onosproject/libgen/AetherResolver.java
@@ -26,6 +26,8 @@
import org.eclipse.aether.repository.LocalRepository;
import org.eclipse.aether.repository.RemoteRepository;
import org.eclipse.aether.repository.RepositoryPolicy;
+import org.eclipse.aether.repository.Proxy;
+import org.eclipse.aether.util.repository.AuthenticationBuilder;
import org.eclipse.aether.resolution.ArtifactRequest;
import org.eclipse.aether.resolution.ArtifactResult;
import org.eclipse.aether.resolution.VersionRangeRequest;
@@ -47,6 +49,8 @@
import java.util.List;
import java.util.jar.Attributes;
import java.util.jar.JarFile;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
import static org.eclipse.aether.repository.RepositoryPolicy.CHECKSUM_POLICY_WARN;
import static org.eclipse.aether.repository.RepositoryPolicy.UPDATE_POLICY_ALWAYS;
@@ -59,8 +63,6 @@
private static RepositorySystem system;
private static RepositorySystemSession session;
- private static final RemoteRepository CENTRAL =
- new RemoteRepository.Builder("central", "default", CENTRAL_URL).build();
private final String repoUrl;
@@ -182,18 +184,69 @@
return newestVersion.toString();
}
- public List<RemoteRepository> repositories()
- {
- if (repoUrl != null && repoUrl.length() > 0) {
- RepositoryPolicy policy = new RepositoryPolicy(true,
- UPDATE_POLICY_ALWAYS,
- CHECKSUM_POLICY_WARN);
- RemoteRepository repository =
- new RemoteRepository.Builder("temp", "default", repoUrl)
- .setSnapshotPolicy(policy).build();
- return Arrays.asList(CENTRAL, repository);
+ public List<RemoteRepository> repositories() {
+ RemoteRepository.Builder central = new RemoteRepository.Builder("central", "default", CENTRAL_URL);
+
+ // set http_proxy
+ String env_http_proxy = System.getenv("HTTP_PROXY");
+ if (env_http_proxy != null) {
+ List<String> proxyHostInfo = getProxyHostInfo(env_http_proxy);
+
+ // set authentication
+ if ((proxyHostInfo.get(2) != null) && (proxyHostInfo.get(3) != null)) {
+ central.setProxy(
+ new Proxy(Proxy.TYPE_HTTP, proxyHostInfo.get(0), Integer.valueOf(proxyHostInfo.get(1)),
+ new AuthenticationBuilder()
+ .addUsername(proxyHostInfo.get(2)).addPassword(proxyHostInfo.get(3)).build()));
+ } else {
+ central.setProxy(
+ new Proxy(Proxy.TYPE_HTTP, proxyHostInfo.get(0), Integer.valueOf(proxyHostInfo.get(1))));
+ }
}
- return Collections.singletonList(CENTRAL);
+ if (repoUrl != null && repoUrl.length() > 0) {
+ RemoteRepository.Builder other =
+ new RemoteRepository.Builder("temp", "default", repoUrl)
+ .setSnapshotPolicy(new RepositoryPolicy(true, UPDATE_POLICY_ALWAYS, CHECKSUM_POLICY_WARN));
+
+ // set https_proxy
+ String env_https_proxy = System.getenv("HTTPS_PROXY");
+ if (env_https_proxy != null) {
+ List<String> proxyHostInfo = getProxyHostInfo(env_https_proxy);
+
+ // set authentication
+ if ((proxyHostInfo.get(2) != null) && (proxyHostInfo.get(3) != null)) {
+ other.setProxy(
+ new Proxy(Proxy.TYPE_HTTPS, proxyHostInfo.get(0), Integer.valueOf(proxyHostInfo.get(1)),
+ new AuthenticationBuilder()
+ .addUsername(proxyHostInfo.get(2)).addPassword(proxyHostInfo.get(3)).build()));
+ } else {
+ other.setProxy(
+ new Proxy(Proxy.TYPE_HTTPS, proxyHostInfo.get(0), Integer.valueOf(proxyHostInfo.get(1))));
+ }
+ }
+
+ return Arrays.asList(central.build(), other.build());
+ }
+
+ return Collections.singletonList(central.build());
+ }
+
+ private static List<String> getProxyHostInfo(String proxyUrl) {
+ if (proxyUrl == null) {
+ return null;
+ }
+
+ // matching pattern
+ // http://(host):(port) or http://(user):(pass)@(host):(port)
+ // https://(host):(port) or https://(user):(pass)@(host):(port)
+ Pattern p = Pattern.compile("^(http|https):\\/\\/(([^:\\@]+):([^\\@]+)\\@)?([^:\\@\\/]+):([0-9]+)\\/?$");
+ Matcher m = p.matcher(proxyUrl);
+ if (!m.find()) {
+ return null;
+ }
+
+ // matcher group 3:user 4:pass 5:host 6:port (null if not set)
+ return Arrays.asList(m.group(5), m.group(6), m.group(3), m.group(4));
}
}
diff --git a/tools/build/libgen/src/main/java/org/onosproject/libgen/BuckLibGenerator.java b/tools/build/libgen/src/main/java/org/onosproject/libgen/BuckLibGenerator.java
old mode 100644
new mode 100755
index 4c1099e..9be3ef7
--- a/tools/build/libgen/src/main/java/org/onosproject/libgen/BuckLibGenerator.java
+++ b/tools/build/libgen/src/main/java/org/onosproject/libgen/BuckLibGenerator.java
@@ -28,6 +28,10 @@
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
+import java.net.PasswordAuthentication;
+import java.net.Authenticator;
+import java.net.Proxy;
+import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
@@ -36,10 +40,13 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
/**
* Generates a BUCK file from a JSON file containing third-party library
@@ -304,7 +311,31 @@
// fall back to regular download
}
}
- URLConnection connection = url.openConnection();
+
+ System.setProperty("jdk.http.auth.tunneling.disabledSchemes", "");
+
+ URLConnection connection;
+ String env_http_proxy = System.getenv("HTTP_PROXY");
+ if (env_http_proxy != null) {
+ List<String> proxyHostInfo = getProxyHostInfo(env_http_proxy);
+ Proxy http_proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostInfo.get(0),
+ Integer.valueOf(proxyHostInfo.get(1))));
+
+ if ((proxyHostInfo.get(2) != null) && (proxyHostInfo.get(3) != null)) {
+ Authenticator authenticator = new Authenticator() {
+ public PasswordAuthentication getPasswordAuthentication() {
+ return (new PasswordAuthentication(proxyHostInfo.get(2), proxyHostInfo.get(3).toCharArray()));
+ }
+ };
+
+ Authenticator.setDefault(authenticator);
+ }
+
+ connection = url.openConnection(http_proxy);
+ } else {
+ connection = url.openConnection();
+ }
+
connection.connect();
InputStream stream = connection.getInputStream();
@@ -323,6 +354,24 @@
}
}
+ private static List<String> getProxyHostInfo(String proxyUrl) {
+ if (proxyUrl == null) {
+ return null;
+ }
+
+ // matching pattern
+ // http://(host):(port) or http://(user):(pass)@(host):(port)
+ // https://(host):(port) or https://(user):(pass)@(host):(port)
+ Pattern p = Pattern.compile("^(http|https):\\/\\/(([^:\\@]+):([^\\@]+)\\@)?([^:\\@\\/]+):([0-9]+)\\/?$");
+ Matcher m = p.matcher(proxyUrl);
+ if (!m.find()) {
+ return null;
+ }
+
+ // matcher group 3:user 4:pass 5:host 6:port (null if not set)
+ return Arrays.asList(m.group(5), m.group(6), m.group(3), m.group(4));
+ }
+
private void error(String format, String... args) {
if (!format.endsWith("\n")) {
format += '\n';
diff --git a/utils/misc/src/main/java/org/onlab/packet/Ethernet.java b/utils/misc/src/main/java/org/onlab/packet/Ethernet.java
index 006c32e..5fd07bf 100644
--- a/utils/misc/src/main/java/org/onlab/packet/Ethernet.java
+++ b/utils/misc/src/main/java/org/onlab/packet/Ethernet.java
@@ -700,6 +700,8 @@
sb.append("\ndata packet");
} else if (pkt instanceof LLC) {
sb.append("\nllc packet");
+ } else if (pkt instanceof EAPOL) {
+ sb.append("\neapol");
} else {
sb.append("\nunknown packet");
}
diff --git a/web/api/src/main/java/org/onosproject/rest/resources/CoreWebApplication.java b/web/api/src/main/java/org/onosproject/rest/resources/CoreWebApplication.java
index 85997dc..67b6c96 100644
--- a/web/api/src/main/java/org/onosproject/rest/resources/CoreWebApplication.java
+++ b/web/api/src/main/java/org/onosproject/rest/resources/CoreWebApplication.java
@@ -55,7 +55,8 @@
DpisWebResource.class,
DiagnosticsWebResource.class,
UiPreferencesWebResource.class,
- SystemInfoWebResource.class
+ SystemInfoWebResource.class,
+ PacketProcessorsWebResource.class
);
}
}
diff --git a/web/api/src/main/java/org/onosproject/rest/resources/PacketProcessorsWebResource.java b/web/api/src/main/java/org/onosproject/rest/resources/PacketProcessorsWebResource.java
new file mode 100644
index 0000000..aefd84e
--- /dev/null
+++ b/web/api/src/main/java/org/onosproject/rest/resources/PacketProcessorsWebResource.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2015-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rest.resources;
+
+import org.onosproject.rest.AbstractWebResource;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.net.packet.PacketProcessorEntry;
+import org.onosproject.net.packet.PacketService;
+import org.slf4j.Logger;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.List;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.packet.PacketProcessor.ADVISOR_MAX;
+import static org.onosproject.net.packet.PacketProcessor.DIRECTOR_MAX;
+
+/**
+ * Manage inventory of packet processors.
+ */
+
+@Path("packet/processors")
+public class PacketProcessorsWebResource extends AbstractWebResource {
+
+ private final Logger log = getLogger(getClass());
+ PacketService service = get(PacketService.class);
+ private final ObjectNode root = mapper().createObjectNode();
+ private final ArrayNode pktProcNode = root.putArray("packet-processors");
+
+ /**
+ * Gets packet processors. Returns array of all packet processors.
+
+ * @onos.rsModel PacketProcessorsGet
+ * @return 200 OK with array of all packet processors.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPacketProcessors() {
+ List<PacketProcessorEntry> processors = service.getProcessors();
+ ObjectMapper mapper = new ObjectMapper();
+ for (PacketProcessorEntry p : processors) {
+ pktProcNode.add(mapper.createObjectNode()
+ .put("priority", priorityFormat(p.priority()))
+ .put("class", p.processor().getClass().getName())
+ .put("packets", p.invocations())
+ .put("avgNanos", p.averageNanos()));
+ }
+
+ return ok(root).build();
+ }
+
+ private String priorityFormat(int priority) {
+ if (priority > DIRECTOR_MAX) {
+ return "observer(" + (priority - DIRECTOR_MAX - 1) + ")";
+ } else if (priority > ADVISOR_MAX) {
+ return "director(" + (priority - ADVISOR_MAX - 1) + ")";
+ }
+ return "advisor(" + (priority - 1) + ")";
+ }
+}
diff --git a/web/api/src/main/resources/definitions/PacketProcessorsGet.json b/web/api/src/main/resources/definitions/PacketProcessorsGet.json
new file mode 100644
index 0000000..cc88dc6
--- /dev/null
+++ b/web/api/src/main/resources/definitions/PacketProcessorsGet.json
@@ -0,0 +1,44 @@
+{
+ "type": "object",
+ "title": "packet-processors",
+ "required": [
+ "packet-processors"
+ ],
+ "properties": {
+ "packet-processors": {
+ "type": "array",
+ "xml": {
+ "name": "packet-processors",
+ "wrapped": true
+ },
+ "items": {
+ "type": "object",
+ "title": "packet-processors",
+ "required": [
+ "priority",
+ "class",
+ "packets",
+ "avgNanos"
+ ],
+ "properties": {
+ "priority": {
+ "type": "string",
+ "example": "advisor(0)"
+ },
+ "class": {
+ "type": "string",
+ "example": "org.onosproject.provider.host.impl.HostLocationProvider$InternalHostProvider"
+ },
+ "packets": {
+ "type": "integer",
+ "example": 568376
+ },
+ "avgNanos": {
+ "type": "integer",
+ "example": 5683
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/web/gui/src/main/webapp/app/fw/nav/nav.css b/web/gui/src/main/webapp/app/fw/nav/nav.css
index 3d2f646..7413fdf 100644
--- a/web/gui/src/main/webapp/app/fw/nav/nav.css
+++ b/web/gui/src/main/webapp/app/fw/nav/nav.css
@@ -24,6 +24,8 @@
left: 0;
padding: 0;
z-index: 3000;
+ max-height: 90%;
+ overflow-y: auto;
visibility: hidden;
}
diff --git a/web/gui2/src/main/webapp/app/nav/nav.component.css b/web/gui2/src/main/webapp/app/nav/nav.component.css
index 5030be8..d7471c4 100644
--- a/web/gui2/src/main/webapp/app/nav/nav.component.css
+++ b/web/gui2/src/main/webapp/app/nav/nav.component.css
@@ -24,6 +24,8 @@
left: 0;
padding: 0;
z-index: 3000;
+ max-height: 90%;
+ overflow-y: auto;
}
html[data-platform='iPad'] #nav {