Restructured to separate stores and managers into different bundles. Reactive forwarding does not seem to work; will investigate.
diff --git a/core/net/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java b/core/net/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java
new file mode 100644
index 0000000..3834676
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java
@@ -0,0 +1,93 @@
+package org.onlab.onos.event.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractEvent;
+import org.onlab.onos.event.DefaultEventSinkRegistry;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.event.EventSink;
+import org.slf4j.Logger;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Simple implementation of an event dispatching service.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleEventDispatcher extends DefaultEventSinkRegistry
+ implements EventDeliveryService {
+
+ private final Logger log = getLogger(getClass());
+
+ private final ExecutorService executor =
+ newSingleThreadExecutor(namedThreads("event-dispatch-%d"));
+
+ @SuppressWarnings("unchecked")
+ private static final Event KILL_PILL = new AbstractEvent(null, 0) {
+ };
+
+ private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+
+ private volatile boolean stopped = false;
+
+ @Override
+ public void post(Event event) {
+ events.add(event);
+ }
+
+ @Activate
+ public void activate() {
+ stopped = false;
+ executor.execute(new DispatchLoop());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ stopped = true;
+ post(KILL_PILL);
+ log.info("Stopped");
+ }
+
+ // Auxiliary event dispatching loop that feeds off the events queue.
+ private class DispatchLoop implements Runnable {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ log.info("Dispatch loop initiated");
+ while (!stopped) {
+ try {
+ // Fetch the next event and if it is the kill-pill, bail
+ Event event = events.take();
+ if (event == KILL_PILL) {
+ break;
+ }
+
+ // Locate the sink for the event class and use it to
+ // process the event
+ EventSink sink = getSink(event.getClass());
+ if (sink != null) {
+ sink.process(event);
+ } else {
+ log.warn("No sink registered for event class {}",
+ event.getClass());
+ }
+ } catch (Exception e) {
+ log.warn("Error encountered while dispatching event:", e);
+ }
+ }
+ log.info("Dispatch loop terminated");
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/event/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/event/impl/package-info.java
new file mode 100644
index 0000000..46722ad
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/event/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Local event dispatching mechanism.
+ */
+package org.onlab.onos.event.impl;
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
new file mode 100644
index 0000000..5964c08
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -0,0 +1,223 @@
+package org.onlab.onos.net.device.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DeviceAdminService;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.device.DeviceProvider;
+import org.onlab.onos.net.device.DeviceProviderRegistry;
+import org.onlab.onos.net.device.DeviceProviderService;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides basic implementation of the device SB & NB APIs.
+ */
+@Component(immediate = true)
+@Service
+public class DeviceManager
+ extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
+ implements DeviceService, DeviceAdminService, DeviceProviderRegistry {
+
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final String PORT_NUMBER_NULL = "Port number cannot be null";
+ private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
+ private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
+ private static final String ROLE_NULL = "Role cannot be null";
+
+ private final Logger log = getLogger(getClass());
+
+ protected final AbstractListenerRegistry<DeviceEvent, DeviceListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ @Activate
+ public void activate() {
+ eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(DeviceEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ public int getDeviceCount() {
+ return store.getDeviceCount();
+ }
+
+ @Override
+ public Iterable<Device> getDevices() {
+ return store.getDevices();
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getDevice(deviceId);
+ }
+
+ @Override
+ public MastershipRole getRole(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getRole(deviceId);
+ }
+
+ @Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getPorts(deviceId);
+ }
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(portNumber, PORT_NUMBER_NULL);
+ return store.getPort(deviceId, portNumber);
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.isAvailable(deviceId);
+ }
+
+ @Override
+ public void setRole(DeviceId deviceId, MastershipRole newRole) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(newRole, ROLE_NULL);
+ DeviceEvent event = store.setRole(deviceId, newRole);
+ if (event != null) {
+ Device device = event.subject();
+ DeviceProvider provider = getProvider(device.providerId());
+ if (provider != null) {
+ provider.roleChanged(device, newRole);
+ }
+ post(event);
+ }
+ }
+
+ @Override
+ public void removeDevice(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ DeviceEvent event = store.removeDevice(deviceId);
+ if (event != null) {
+ log.info("Device {} administratively removed", deviceId);
+ post(event);
+ }
+ }
+
+ @Override
+ public void addListener(DeviceListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(DeviceListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ @Override
+ protected DeviceProviderService createProviderService(DeviceProvider provider) {
+ return new InternalDeviceProviderService(provider);
+ }
+
+ // Personalized device provider service issued to the supplied provider.
+ private class InternalDeviceProviderService
+ extends AbstractProviderService<DeviceProvider>
+ implements DeviceProviderService {
+
+ InternalDeviceProviderService(DeviceProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void deviceConnected(DeviceId deviceId, DeviceDescription deviceDescription) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
+ checkValidity();
+ DeviceEvent event = store.createOrUpdateDevice(provider().id(),
+ deviceId, deviceDescription);
+
+ // If there was a change of any kind, trigger role selection process.
+ if (event != null) {
+ log.info("Device {} connected", deviceId);
+ Device device = event.subject();
+ provider().roleChanged(device, store.getRole(device.id()));
+ post(event);
+ }
+ }
+
+ @Override
+ public void deviceDisconnected(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkValidity();
+ DeviceEvent event = store.markOffline(deviceId);
+ if (event != null) {
+ log.info("Device {} disconnected", deviceId);
+ post(event);
+ }
+ }
+
+ @Override
+ public void updatePorts(DeviceId deviceId, List<PortDescription> portDescriptions) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(portDescriptions, "Port descriptions list cannot be null");
+ checkValidity();
+ List<DeviceEvent> events = store.updatePorts(deviceId, portDescriptions);
+ for (DeviceEvent event : events) {
+ post(event);
+ }
+ }
+
+ @Override
+ public void portStatusChanged(DeviceId deviceId, PortDescription portDescription) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(portDescription, PORT_DESCRIPTION_NULL);
+ checkValidity();
+ DeviceEvent event = store.updatePortStatus(deviceId, portDescription);
+ if (event != null) {
+ log.info("Device {} port {} status changed", deviceId,
+ event.port().number());
+ post(event);
+ }
+ }
+ }
+
+ // Posts the specified event to the local event dispatcher.
+ private void post(DeviceEvent event) {
+ if (event != null && eventDispatcher != null) {
+ eventDispatcher.post(event);
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/SimpleFlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/SimpleFlowRuleManager.java
new file mode 100644
index 0000000..4a1cd3b
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/SimpleFlowRuleManager.java
@@ -0,0 +1,169 @@
+package org.onlab.onos.net.flow.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleListener;
+import org.onlab.onos.net.flow.FlowRuleProvider;
+import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
+import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.flow.FlowRuleStore;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+
+@Component(immediate = true)
+@Service
+public class SimpleFlowRuleManager
+ extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
+ implements FlowRuleService, FlowRuleProviderRegistry {
+
+ public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
+ private final Logger log = getLogger(getClass());
+
+ private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Activate
+ public void activate() {
+ eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(FlowRuleEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
+ return store.getFlowEntries(deviceId);
+ }
+
+ @Override
+ public List<FlowRule> applyFlowRules(FlowRule... flowRules) {
+ List<FlowRule> entries = new ArrayList<FlowRule>();
+
+ for (int i = 0; i < flowRules.length; i++) {
+ FlowRule f = flowRules[i];
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ entries.add(store.storeFlowRule(f));
+ frp.applyFlowRule(f);
+ }
+
+ return entries;
+ }
+
+ @Override
+ public void removeFlowRules(FlowRule... flowRules) {
+ for (int i = 0; i < flowRules.length; i++) {
+ FlowRule f = flowRules[i];
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ store.removeFlowRule(f);
+ frp.removeFlowRule(f);
+ }
+
+ }
+
+ @Override
+ public void addListener(FlowRuleListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(FlowRuleListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ @Override
+ protected FlowRuleProviderService createProviderService(
+ FlowRuleProvider provider) {
+ return new InternalFlowRuleProviderService(provider);
+ }
+
+ private class InternalFlowRuleProviderService
+ extends AbstractProviderService<FlowRuleProvider>
+ implements FlowRuleProviderService {
+
+ protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void flowRemoved(FlowRule flowRule) {
+ checkNotNull(flowRule, FLOW_RULE_NULL);
+ checkValidity();
+ FlowRuleEvent event = store.removeFlowRule(flowRule);
+
+ if (event != null) {
+ log.debug("Flow {} removed", flowRule);
+ post(event);
+ }
+ }
+
+ @Override
+ public void flowMissing(FlowRule flowRule) {
+ checkNotNull(flowRule, FLOW_RULE_NULL);
+ checkValidity();
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void flowAdded(FlowRule flowRule) {
+ checkNotNull(flowRule, FLOW_RULE_NULL);
+ checkValidity();
+
+ FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
+ if (event == null) {
+ log.debug("Flow {} updated", flowRule);
+ } else {
+ log.debug("Flow {} added", flowRule);
+ post(event);
+ }
+ }
+
+ // Posts the specified event to the local event dispatcher.
+ private void post(FlowRuleEvent event) {
+ if (event != null) {
+ eventDispatcher.post(event);
+ }
+ }
+
+ @Override
+ public void pushFlowMetrics(Iterable<FlowRule> flowEntries) {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/package-info.java
new file mode 100644
index 0000000..0d05cb2
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Core subsystem for tracking and manipulating global flow state.
+ */
+package org.onlab.onos.net.flow.impl;
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostManager.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostManager.java
new file mode 100644
index 0000000..bc9312d
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostManager.java
@@ -0,0 +1,190 @@
+package org.onlab.onos.net.host.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.host.HostAdminService;
+import org.onlab.onos.net.host.HostDescription;
+import org.onlab.onos.net.host.HostEvent;
+import org.onlab.onos.net.host.HostListener;
+import org.onlab.onos.net.host.HostProvider;
+import org.onlab.onos.net.host.HostProviderRegistry;
+import org.onlab.onos.net.host.HostProviderService;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.host.HostStore;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.slf4j.Logger;
+
+/**
+ * Provides basic implementation of the host SB & NB APIs.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleHostManager
+ extends AbstractProviderRegistry<HostProvider, HostProviderService>
+ implements HostService, HostAdminService, HostProviderRegistry {
+
+ public static final String HOST_ID_NULL = "Host ID cannot be null";
+ private final Logger log = getLogger(getClass());
+
+ private final AbstractListenerRegistry<HostEvent, HostListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+
+ @Activate
+ public void activate() {
+ eventDispatcher.addSink(HostEvent.class, listenerRegistry);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(HostEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ protected HostProviderService createProviderService(HostProvider provider) {
+ return new InternalHostProviderService(provider);
+ }
+
+ @Override
+ public int getHostCount() {
+ return store.getHostCount();
+ }
+
+ @Override
+ public Iterable<Host> getHosts() {
+ return store.getHosts();
+ }
+
+ @Override
+ public Host getHost(HostId hostId) {
+ checkNotNull(hostId, HOST_ID_NULL);
+ return store.getHost(hostId);
+ }
+
+ @Override
+ public Set<Host> getHostsByVlan(VlanId vlanId) {
+ return store.getHosts(vlanId);
+ }
+
+ @Override
+ public Set<Host> getHostsByMac(MacAddress mac) {
+ checkNotNull(mac, "MAC address cannot be null");
+ return store.getHosts(mac);
+ }
+
+ @Override
+ public Set<Host> getHostsByIp(IpPrefix ip) {
+ checkNotNull(ip, "IP address cannot be null");
+ return store.getHosts(ip);
+ }
+
+ @Override
+ public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
+ checkNotNull(connectPoint, "Connection point cannot be null");
+ return store.getConnectedHosts(connectPoint);
+ }
+
+ @Override
+ public Set<Host> getConnectedHosts(DeviceId deviceId) {
+ checkNotNull(deviceId, "Device ID cannot be null");
+ return store.getConnectedHosts(deviceId);
+ }
+
+ @Override
+ public void monitorIp(IpPrefix ip) {
+ // TODO pass through to SimpleHostMonitor
+ }
+
+ @Override
+ public void stopMonitoringIp(IpPrefix ip) {
+ // TODO pass through to SimpleHostMonitor
+ }
+
+ @Override
+ public void addListener(HostListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(HostListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ @Override
+ public void removeHost(HostId hostId) {
+ checkNotNull(hostId, HOST_ID_NULL);
+ HostEvent event = store.removeHost(hostId);
+ if (event != null) {
+ log.info("Host {} administratively removed", hostId);
+ post(event);
+ }
+ }
+
+ // Personalized host provider service issued to the supplied provider.
+ private class InternalHostProviderService
+ extends AbstractProviderService<HostProvider>
+ implements HostProviderService {
+
+ InternalHostProviderService(HostProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void hostDetected(HostId hostId, HostDescription hostDescription) {
+ checkNotNull(hostId, HOST_ID_NULL);
+ checkValidity();
+ HostEvent event = store.createOrUpdateHost(provider().id(), hostId,
+ hostDescription);
+ if (event != null) {
+ log.debug("Host {} detected", hostId);
+ post(event);
+ }
+ }
+
+ @Override
+ public void hostVanished(HostId hostId) {
+ checkNotNull(hostId, HOST_ID_NULL);
+ checkValidity();
+ HostEvent event = store.removeHost(hostId);
+ if (event != null) {
+ log.debug("Host {} vanished", hostId);
+ post(event);
+ }
+ }
+ }
+
+ // Posts the specified event to the local event dispatcher.
+ private void post(HostEvent event) {
+ if (event != null) {
+ eventDispatcher.post(event);
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostMonitor.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostMonitor.java
new file mode 100644
index 0000000..4849466
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/SimpleHostMonitor.java
@@ -0,0 +1,108 @@
+package org.onlab.onos.net.host.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.host.HostProvider;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.packet.PacketProvider;
+import org.onlab.onos.net.topology.TopologyService;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.Timer;
+
+public class SimpleHostMonitor implements TimerTask {
+
+ private final HostService hostService;
+ private final TopologyService topologyService;
+ private final DeviceService deviceService;
+ private final HostProvider hostProvider;
+ private final PacketProvider packetProvider;
+
+ private final Set<IpPrefix> monitoredAddresses;
+
+ private final long probeRate;
+
+ private Timeout timeout;
+
+ public SimpleHostMonitor(HostService hostService, TopologyService topologyService,
+ DeviceService deviceService,
+ HostProvider hostProvider, PacketProvider packetProvider) {
+ this.hostService = hostService;
+ this.topologyService = topologyService;
+ this.deviceService = deviceService;
+ this.hostProvider = hostProvider;
+ this.packetProvider = packetProvider;
+
+ monitoredAddresses = new HashSet<>();
+
+ probeRate = 30000; // milliseconds
+
+ timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ public void addMonitoringFor(IpPrefix ip) {
+ monitoredAddresses.add(ip);
+ }
+
+ public void stopMonitoring(IpPrefix ip) {
+ monitoredAddresses.remove(ip);
+ }
+
+ public void shutdown() {
+ timeout.cancel();
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ for (IpPrefix ip : monitoredAddresses) {
+ Set<Host> hosts = hostService.getHostsByIp(ip);
+
+ if (hosts.isEmpty()) {
+ sendArpRequest(ip);
+ } else {
+ for (Host host : hosts) {
+ hostProvider.triggerProbe(host);
+ }
+ }
+ }
+
+ timeout = Timer.getTimer().newTimeout(this, probeRate, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Sends an ARP request for the given IP address.
+ *
+ * @param targetIp IP address to ARP for
+ */
+ private void sendArpRequest(IpPrefix targetIp) {
+ // emit ARP packet out appropriate ports
+
+ // if ip in one of the configured (external) subnets
+ // sent out that port
+ // else (ip isn't in any configured subnet)
+ // send out all non-external edge ports
+
+ for (Device device : deviceService.getDevices()) {
+ for (Port port : deviceService.getPorts(device.id())) {
+ for (IpPrefix ip : port.ipAddresses()) {
+ if (ip.contains(targetIp)) {
+ sendProbe(port, targetIp);
+ continue;
+ }
+ }
+ }
+ }
+
+ }
+
+ private void sendProbe(Port port, IpPrefix targetIp) {
+
+ }
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/package-info.java
new file mode 100644
index 0000000..55370d3
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Core subsystem for tracking global inventory of end-station hosts.
+ */
+package org.onlab.onos.net.host.impl;
diff --git a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
new file mode 100644
index 0000000..9ac5e80
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
@@ -0,0 +1,239 @@
+package org.onlab.onos.net.link.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.link.LinkAdminService;
+import org.onlab.onos.net.link.LinkDescription;
+import org.onlab.onos.net.link.LinkEvent;
+import org.onlab.onos.net.link.LinkListener;
+import org.onlab.onos.net.link.LinkProvider;
+import org.onlab.onos.net.link.LinkProviderRegistry;
+import org.onlab.onos.net.link.LinkProviderService;
+import org.onlab.onos.net.link.LinkService;
+import org.onlab.onos.net.link.LinkStore;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Provides basic implementation of the link SB & NB APIs.
+ */
+@Component(immediate = true)
+@Service
+public class LinkManager
+ extends AbstractProviderRegistry<LinkProvider, LinkProviderService>
+ implements LinkService, LinkAdminService, LinkProviderRegistry {
+
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final String LINK_DESC_NULL = "Link description cannot be null";
+ private static final String CONNECT_POINT_NULL = "Connection point cannot be null";
+
+ private final Logger log = getLogger(getClass());
+
+ protected final AbstractListenerRegistry<LinkEvent, LinkListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ private final DeviceListener deviceListener = new InnerDeviceListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ @Activate
+ public void activate() {
+ eventDispatcher.addSink(LinkEvent.class, listenerRegistry);
+ deviceService.addListener(deviceListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(LinkEvent.class);
+ deviceService.removeListener(deviceListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public int getLinkCount() {
+ return store.getLinkCount();
+ }
+
+ @Override
+ public Iterable<Link> getLinks() {
+ return store.getLinks();
+ }
+
+ @Override
+ public Set<Link> getDeviceLinks(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return Sets.union(store.getDeviceEgressLinks(deviceId),
+ store.getDeviceIngressLinks(deviceId));
+ }
+
+ @Override
+ public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getDeviceEgressLinks(deviceId);
+ }
+
+ @Override
+ public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getDeviceIngressLinks(deviceId);
+ }
+
+ @Override
+ public Set<Link> getLinks(ConnectPoint connectPoint) {
+ checkNotNull(connectPoint, CONNECT_POINT_NULL);
+ return Sets.union(store.getEgressLinks(connectPoint),
+ store.getIngressLinks(connectPoint));
+ }
+
+ @Override
+ public Set<Link> getEgressLinks(ConnectPoint connectPoint) {
+ checkNotNull(connectPoint, CONNECT_POINT_NULL);
+ return store.getEgressLinks(connectPoint);
+ }
+
+ @Override
+ public Set<Link> getIngressLinks(ConnectPoint connectPoint) {
+ checkNotNull(connectPoint, CONNECT_POINT_NULL);
+ return store.getIngressLinks(connectPoint);
+ }
+
+ @Override
+ public Link getLink(ConnectPoint src, ConnectPoint dst) {
+ checkNotNull(src, CONNECT_POINT_NULL);
+ checkNotNull(dst, CONNECT_POINT_NULL);
+ return store.getLink(src, dst);
+ }
+
+ @Override
+ public void removeLinks(ConnectPoint connectPoint) {
+ removeLinks(getLinks(connectPoint));
+ }
+
+ @Override
+ public void removeLinks(DeviceId deviceId) {
+ removeLinks(getDeviceLinks(deviceId));
+ }
+
+ @Override
+ public void addListener(LinkListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(LinkListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ // Auxiliary interceptor for device remove events to prune links that
+ // are associated with the removed device or its port.
+ private class InnerDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ if (event.type() == DeviceEvent.Type.DEVICE_REMOVED) {
+ removeLinks(event.subject().id());
+ } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
+ removeLinks(new ConnectPoint(event.subject().id(),
+ event.port().number()));
+ }
+ }
+ }
+
+ @Override
+ protected LinkProviderService createProviderService(LinkProvider provider) {
+ return new InternalLinkProviderService(provider);
+ }
+
+ // Personalized link provider service issued to the supplied provider.
+ private class InternalLinkProviderService
+ extends AbstractProviderService<LinkProvider>
+ implements LinkProviderService {
+
+ InternalLinkProviderService(LinkProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void linkDetected(LinkDescription linkDescription) {
+ checkNotNull(linkDescription, LINK_DESC_NULL);
+ checkValidity();
+ LinkEvent event = store.createOrUpdateLink(provider().id(),
+ linkDescription);
+ if (event != null) {
+ log.debug("Link {} detected", linkDescription);
+ post(event);
+ }
+ }
+
+ @Override
+ public void linkVanished(LinkDescription linkDescription) {
+ checkNotNull(linkDescription, LINK_DESC_NULL);
+ checkValidity();
+ LinkEvent event = store.removeLink(linkDescription.src(),
+ linkDescription.dst());
+ if (event != null) {
+ log.info("Link {} vanished", linkDescription);
+ post(event);
+ }
+ }
+
+ @Override
+ public void linksVanished(ConnectPoint connectPoint) {
+ checkNotNull(connectPoint, "Connect point cannot be null");
+ checkValidity();
+ log.info("Links for connection point {} vanished", connectPoint);
+ removeLinks(getLinks(connectPoint));
+ }
+
+ @Override
+ public void linksVanished(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkValidity();
+ log.info("Links for device {} vanished", deviceId);
+ removeLinks(getDeviceLinks(deviceId));
+ }
+ }
+
+ // Removes all links in the specified set and emits appropriate events.
+ private void removeLinks(Set<Link> links) {
+ for (Link link : links) {
+ LinkEvent event = store.removeLink(link.src(), link.dst());
+ post(event);
+ }
+ }
+
+ // Posts the specified event to the local event dispatcher.
+ private void post(LinkEvent event) {
+ if (event != null) {
+ eventDispatcher.post(event);
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/packet/impl/SimplePacketManager.java b/core/net/src/main/java/org/onlab/onos/net/packet/impl/SimplePacketManager.java
new file mode 100644
index 0000000..ee1b025
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/packet/impl/SimplePacketManager.java
@@ -0,0 +1,96 @@
+package org.onlab.onos.net.packet.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.packet.OutboundPacket;
+import org.onlab.onos.net.packet.PacketContext;
+import org.onlab.onos.net.packet.PacketProcessor;
+import org.onlab.onos.net.packet.PacketProvider;
+import org.onlab.onos.net.packet.PacketProviderRegistry;
+import org.onlab.onos.net.packet.PacketProviderService;
+import org.onlab.onos.net.packet.PacketService;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+
+/**
+ * Provides a basic implementation of the packet SB & NB APIs.
+ */
+@Component(immediate = true)
+@Service
+public class SimplePacketManager
+extends AbstractProviderRegistry<PacketProvider, PacketProviderService>
+implements PacketService, PacketProviderRegistry {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DeviceService deviceService;
+
+ private final Map<Integer, PacketProcessor> processors = new TreeMap<>();
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void addProcessor(PacketProcessor processor, int priority) {
+ checkNotNull(processor, "Processor cannot be null");
+ processors.put(priority, processor);
+ }
+
+ @Override
+ public void removeProcessor(PacketProcessor processor) {
+ checkNotNull(processor, "Processor cannot be null");
+ processors.values().remove(processor);
+ }
+
+ @Override
+ public void emit(OutboundPacket packet) {
+ checkNotNull(packet, "Packet cannot be null");
+ final Device device = deviceService.getDevice(packet.sendThrough());
+ final PacketProvider packetProvider = getProvider(device.providerId());
+ packetProvider.emit(packet);
+ }
+
+ @Override
+ protected PacketProviderService createProviderService(PacketProvider provider) {
+ return new InternalPacketProviderService(provider);
+ }
+
+ // Personalized link provider service issued to the supplied provider.
+ private class InternalPacketProviderService
+ extends AbstractProviderService<PacketProvider>
+ implements PacketProviderService {
+
+ protected InternalPacketProviderService(PacketProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void processPacket(PacketContext context) {
+ for (PacketProcessor processor : processors.values()) {
+ processor.process(context);
+ }
+ }
+
+ }
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/packet/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/net/packet/impl/package-info.java
new file mode 100644
index 0000000..78dfeaa
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/packet/impl/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * Core subsystem for processing inbound packets and emitting outbound packets.
+ * Processing of inbound packets is always in the local context only, but
+ * emitting outbound packets allows for cluster-wide operation.
+ */
+package org.onlab.onos.net.packet.impl;
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
new file mode 100644
index 0000000..770f368
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -0,0 +1,183 @@
+package org.onlab.onos.net.topology.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.event.AbstractEventAccumulator;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventAccumulator;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.link.LinkEvent;
+import org.onlab.onos.net.link.LinkListener;
+import org.onlab.onos.net.link.LinkService;
+import org.onlab.onos.net.provider.AbstractProvider;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.net.topology.DefaultGraphDescription;
+import org.onlab.onos.net.topology.GraphDescription;
+import org.onlab.onos.net.topology.TopologyProvider;
+import org.onlab.onos.net.topology.TopologyProviderRegistry;
+import org.onlab.onos.net.topology.TopologyProviderService;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Default implementation of a network topology provider that feeds off
+ * device and link subsystem events to trigger assembly and computation of
+ * new topology snapshots.
+ */
+@Component(immediate = true)
+public class DefaultTopologyProvider extends AbstractProvider
+ implements TopologyProvider {
+
+ // TODO: make these configurable
+ private static final int MAX_EVENTS = 100;
+ private static final int MAX_IDLE_MS = 50;
+ private static final int MAX_BATCH_MS = 200;
+ private static final int MAX_THREADS = 8;
+
+ // FIXME: Replace with a system-wide timer instance;
+ // TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
+ private static final Timer TIMER = new Timer();
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected TopologyProviderRegistry providerRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ private volatile boolean isStarted = false;
+
+ private TopologyProviderService providerService;
+ private DeviceListener deviceListener = new InnerDeviceListener();
+ private LinkListener linkListener = new InnerLinkListener();
+
+ private EventAccumulator accumulator;
+ private ExecutorService executor;
+
+ /**
+ * Creates a provider with the supplier identifier.
+ */
+ public DefaultTopologyProvider() {
+ super(new ProviderId("core", "org.onlab.onos.provider.topology"));
+ }
+
+ @Activate
+ public synchronized void activate() {
+ executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-build-%d"));
+ accumulator = new TopologyChangeAccumulator();
+
+ providerService = providerRegistry.register(this);
+ deviceService.addListener(deviceListener);
+ linkService.addListener(linkListener);
+
+ isStarted = true;
+ triggerTopologyBuild(null);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public synchronized void deactivate() {
+ isStarted = false;
+
+ deviceService.removeListener(deviceListener);
+ linkService.removeListener(linkListener);
+ providerRegistry.unregister(this);
+ providerService = null;
+
+ executor.shutdownNow();
+ executor = null;
+
+ log.info("Stopped");
+ }
+
+ /**
+ * Triggers assembly of topology data citing the specified events as the
+ * reason.
+ *
+ * @param reasons events which triggered the topology change
+ */
+ private synchronized void triggerTopologyBuild(List<Event> reasons) {
+ if (executor != null) {
+ executor.execute(new TopologyBuilderTask(reasons));
+ }
+ }
+
+ // Builds the topology using the latest device and link information
+ // and citing the specified events as reasons for the change.
+ private void buildTopology(List<Event> reasons) {
+ if (isStarted) {
+ GraphDescription desc =
+ new DefaultGraphDescription(System.nanoTime(),
+ deviceService.getDevices(),
+ linkService.getLinks());
+ providerService.topologyChanged(desc, reasons);
+ }
+ }
+
+ // Callback for device events
+ private class InnerDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceEvent.Type type = event.type();
+ if (type == DEVICE_ADDED || type == DEVICE_REMOVED ||
+ type == DEVICE_AVAILABILITY_CHANGED) {
+ accumulator.add(event);
+ }
+ }
+ }
+
+ // Callback for link events
+ private class InnerLinkListener implements LinkListener {
+ @Override
+ public void event(LinkEvent event) {
+ accumulator.add(event);
+ }
+ }
+
+ // Event accumulator for paced triggering of topology assembly.
+ private class TopologyChangeAccumulator
+ extends AbstractEventAccumulator implements EventAccumulator {
+
+ TopologyChangeAccumulator() {
+ super(TIMER, MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+ }
+
+ @Override
+ public void processEvents(List<Event> events) {
+ triggerTopologyBuild(events);
+ }
+
+ }
+
+ // Task for building topology data in a separate thread.
+ private class TopologyBuilderTask implements Runnable {
+ private final List<Event> reasons;
+
+ public TopologyBuilderTask(List<Event> reasons) {
+ this.reasons = reasons;
+ }
+
+ @Override
+ public void run() {
+ buildTopology(reasons);
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/PathManager.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/PathManager.java
new file mode 100644
index 0000000..47cb376
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/PathManager.java
@@ -0,0 +1,170 @@
+package org.onlab.onos.net.topology.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultEdgeLink;
+import org.onlab.onos.net.DefaultPath;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.EdgeLink;
+import org.onlab.onos.net.ElementId;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.HostLocation;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.Path;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.topology.PathService;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.net.topology.LinkWeight;
+import org.onlab.onos.net.topology.Topology;
+import org.onlab.onos.net.topology.TopologyService;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides implementation of a path selection service atop the current
+ * topology and host services.
+ */
+@Component(immediate = true)
+@Service
+public class PathManager implements PathService {
+
+ private static final String ELEMENT_ID_NULL = "Element ID cannot be null";
+
+ private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core");
+ private static final PortNumber P0 = PortNumber.portNumber(0);
+
+ private static final EdgeLink NOT_HOST = new NotHost();
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected TopologyService topologyService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ @Activate
+ public void setUp() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void tearDown() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<Path> getPaths(ElementId src, ElementId dst) {
+ return getPaths(src, dst, null);
+ }
+
+ @Override
+ public Set<Path> getPaths(ElementId src, ElementId dst, LinkWeight weight) {
+ checkNotNull(src, ELEMENT_ID_NULL);
+ checkNotNull(dst, ELEMENT_ID_NULL);
+
+ // Get the source and destination edge locations
+ EdgeLink srcEdge = getEdgeLink(src, true);
+ EdgeLink dstEdge = getEdgeLink(dst, false);
+
+ DeviceId srcDevice = srcEdge != NOT_HOST ? srcEdge.dst().deviceId() : (DeviceId) src;
+ DeviceId dstDevice = dstEdge != NOT_HOST ? dstEdge.src().deviceId() : (DeviceId) dst;
+
+ // If the source and destination are on the same edge device, there
+ // is just one path, so build it and return it.
+ if (srcDevice.equals(dstDevice)) {
+ return edgeToEdgePaths(srcEdge, dstEdge);
+ }
+
+ // Otherwise get all paths between the source and destination edge
+ // devices.
+ Topology topology = topologyService.currentTopology();
+ Set<Path> paths = weight == null ?
+ topologyService.getPaths(topology, srcDevice, dstDevice) :
+ topologyService.getPaths(topology, srcDevice, dstDevice, weight);
+
+ return edgeToEdgePaths(srcEdge, dstEdge, paths);
+ }
+
+ // Finds the host edge link if the element ID is a host id of an existing
+ // host. Otherwise, if the host does not exist, it returns null and if
+ // the element ID is not a host ID, returns NOT_HOST edge link.
+ private EdgeLink getEdgeLink(ElementId elementId, boolean isIngress) {
+ if (elementId instanceof HostId) {
+ // Resolve the host, return null.
+ Host host = hostService.getHost((HostId) elementId);
+ if (host == null) {
+ return null;
+ }
+ return new DefaultEdgeLink(PID, new ConnectPoint(elementId, P0),
+ host.location(), isIngress);
+ }
+ return NOT_HOST;
+ }
+
+ // Produces a set of direct edge-to-edge paths.
+ private Set<Path> edgeToEdgePaths(EdgeLink srcLink, EdgeLink dstLink) {
+ Set<Path> endToEndPaths = Sets.newHashSetWithExpectedSize(1);
+ if (srcLink != NOT_HOST || dstLink != NOT_HOST) {
+ endToEndPaths.add(edgeToEdgePath(srcLink, dstLink));
+ }
+ return endToEndPaths;
+ }
+
+ // Produces a direct edge-to-edge path.
+ private Path edgeToEdgePath(EdgeLink srcLink, EdgeLink dstLink) {
+ List<Link> links = Lists.newArrayListWithCapacity(2);
+ // Add source and destination edge links only if they are real.
+ if (srcLink != NOT_HOST) {
+ links.add(srcLink);
+ }
+ if (dstLink != NOT_HOST) {
+ links.add(dstLink);
+ }
+ return new DefaultPath(PID, links, 2);
+ }
+
+ // Produces a set of edge-to-edge paths using the set of infrastructure
+ // paths and the given edge links.
+ private Set<Path> edgeToEdgePaths(EdgeLink srcLink, EdgeLink dstLink, Set<Path> paths) {
+ Set<Path> endToEndPaths = Sets.newHashSetWithExpectedSize(paths.size());
+ for (Path path : paths) {
+ endToEndPaths.add(edgeToEdgePath(srcLink, dstLink, path));
+ }
+ return endToEndPaths;
+ }
+
+ // Produces an edge-to-edge path using the specified infrastructure path
+ // and edge links.
+ private Path edgeToEdgePath(EdgeLink srcLink, EdgeLink dstLink, Path path) {
+ List<Link> links = Lists.newArrayListWithCapacity(path.links().size() + 2);
+ links.add(srcLink);
+ links.addAll(path.links());
+ links.add(dstLink);
+ return new DefaultPath(path.providerId(), links, path.cost() + 2);
+ }
+
+ // Special value for edge link to represent that this is really not an
+ // edge link since the src or dst are really an infrastructure device.
+ private static class NotHost extends DefaultEdgeLink implements EdgeLink {
+ NotHost() {
+ super(PID, new ConnectPoint(HostId.hostId("nic:none"), P0),
+ new HostLocation(deviceId("none:none"), P0, 0L), false);
+ }
+ }
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/TopologyManager.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/TopologyManager.java
new file mode 100644
index 0000000..57e9fb7
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/TopologyManager.java
@@ -0,0 +1,191 @@
+package org.onlab.onos.net.topology.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.Path;
+import org.onlab.onos.net.provider.AbstractProviderRegistry;
+import org.onlab.onos.net.provider.AbstractProviderService;
+import org.onlab.onos.net.topology.ClusterId;
+import org.onlab.onos.net.topology.GraphDescription;
+import org.onlab.onos.net.topology.LinkWeight;
+import org.onlab.onos.net.topology.Topology;
+import org.onlab.onos.net.topology.TopologyCluster;
+import org.onlab.onos.net.topology.TopologyEvent;
+import org.onlab.onos.net.topology.TopologyGraph;
+import org.onlab.onos.net.topology.TopologyListener;
+import org.onlab.onos.net.topology.TopologyProvider;
+import org.onlab.onos.net.topology.TopologyProviderRegistry;
+import org.onlab.onos.net.topology.TopologyProviderService;
+import org.onlab.onos.net.topology.TopologyService;
+import org.onlab.onos.net.topology.TopologyStore;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides basic implementation of the topology SB & NB APIs.
+ */
+@Component(immediate = true)
+@Service
+public class TopologyManager
+ extends AbstractProviderRegistry<TopologyProvider, TopologyProviderService>
+ implements TopologyService, TopologyProviderRegistry {
+
+ public static final String TOPOLOGY_NULL = "Topology cannot be null";
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final String CLUSTER_ID_NULL = "Cluster ID cannot be null";
+ private static final String CLUSTER_NULL = "Topology cluster cannot be null";
+ public static final String CONNECTION_POINT_NULL = "Connection point cannot be null";
+
+ private final Logger log = getLogger(getClass());
+
+ private final AbstractListenerRegistry<TopologyEvent, TopologyListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected TopologyStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+
+ @Activate
+ public void activate() {
+ eventDispatcher.addSink(TopologyEvent.class, listenerRegistry);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(TopologyEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Topology currentTopology() {
+ return store.currentTopology();
+ }
+
+ @Override
+ public boolean isLatest(Topology topology) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ return store.isLatest(topology);
+ }
+
+ @Override
+ public Set<TopologyCluster> getClusters(Topology topology) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ return store.getClusters(topology);
+ }
+
+ @Override
+ public TopologyCluster getCluster(Topology topology, ClusterId clusterId) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(topology, CLUSTER_ID_NULL);
+ return store.getCluster(topology, clusterId);
+ }
+
+ @Override
+ public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(topology, CLUSTER_NULL);
+ return store.getClusterDevices(topology, cluster);
+ }
+
+ @Override
+ public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(topology, CLUSTER_NULL);
+ return store.getClusterLinks(topology, cluster);
+ }
+
+ @Override
+ public TopologyGraph getGraph(Topology topology) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ return store.getGraph(topology);
+ }
+
+ @Override
+ public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(src, DEVICE_ID_NULL);
+ checkNotNull(dst, DEVICE_ID_NULL);
+ return store.getPaths(topology, src, dst);
+ }
+
+ @Override
+ public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst, LinkWeight weight) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(src, DEVICE_ID_NULL);
+ checkNotNull(dst, DEVICE_ID_NULL);
+ checkNotNull(weight, "Link weight cannot be null");
+ return store.getPaths(topology, src, dst, weight);
+ }
+
+ @Override
+ public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(connectPoint, CONNECTION_POINT_NULL);
+ return store.isInfrastructure(topology, connectPoint);
+ }
+
+ @Override
+ public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) {
+ checkNotNull(topology, TOPOLOGY_NULL);
+ checkNotNull(connectPoint, CONNECTION_POINT_NULL);
+ return store.isBroadcastPoint(topology, connectPoint);
+ }
+
+ @Override
+ public void addListener(TopologyListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(TopologyListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ // Personalized host provider service issued to the supplied provider.
+ @Override
+ protected TopologyProviderService createProviderService(TopologyProvider provider) {
+ return new InternalTopologyProviderService(provider);
+ }
+
+ private class InternalTopologyProviderService
+ extends AbstractProviderService<TopologyProvider>
+ implements TopologyProviderService {
+
+ InternalTopologyProviderService(TopologyProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void topologyChanged(GraphDescription topoDescription,
+ List<Event> reasons) {
+ checkNotNull(topoDescription, "Topology description cannot be null");
+
+ TopologyEvent event = store.updateTopology(provider().id(),
+ topoDescription, reasons);
+ if (event != null) {
+ log.info("Topology {} changed", event.subject());
+ eventDispatcher.post(event);
+ }
+ }
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/package-info.java
new file mode 100644
index 0000000..17c3644
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Core subsystem for tracking global & consistent topology graph views.
+ */
+package org.onlab.onos.net.topology.impl;