Experimenting multi Provider support on SimpelDeviceStore.

Change-Id: I181db7704556768863624f072540d141e39d0904
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
index df20b2d..bc0a055 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
@@ -3,6 +3,8 @@
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -10,6 +12,7 @@
 import org.onlab.onos.net.DefaultDevice;
 import org.onlab.onos.net.DefaultPort;
 import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Port;
 import org.onlab.onos.net.PortNumber;
@@ -23,21 +26,27 @@
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 
+// TODO: synchronization should be done in more fine-grained manner.
 /**
  * Manages inventory of infrastructure devices using trivial in-memory
  * structures implementation.
@@ -52,9 +61,18 @@
 
     public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
 
-    private final Map<DeviceId, DefaultDevice> devices = new ConcurrentHashMap<>();
+    // collection of Description given from various providers
+    private final ConcurrentMap<DeviceId,
+                            ConcurrentMap<ProviderId, DeviceDescriptions>>
+                                deviceDescs = new ConcurrentHashMap<>();
+
+    // cache of Device and Ports generated by compositing descriptions from providers
+    private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+
+    // available(=UP) devices
     private final Set<DeviceId> availableDevices = new HashSet<>();
-    private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = new HashMap<>();
+
 
     @Activate
     public void activate() {
@@ -73,7 +91,7 @@
 
     @Override
     public Iterable<Device> getDevices() {
-        return Collections.unmodifiableSet(new HashSet<Device>(devices.values()));
+        return Collections.unmodifiableCollection(devices.values());
     }
 
     @Override
@@ -82,82 +100,115 @@
     }
 
     @Override
-    public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
-        DefaultDevice device = devices.get(deviceId);
-        if (device == null) {
-            return createDevice(providerId, deviceId, deviceDescription);
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = createIfAbsentUnchecked(deviceDescs, deviceId,
+                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+        Device oldDevice = devices.get(deviceId);
+
+        DeviceDescriptions descs
+            = createIfAbsentUnchecked(providerDescs, providerId,
+                    new InitDeviceDescs(deviceDescription));
+
+        descs.putDeviceDesc(deviceDescription);
+
+        Device newDevice = composeDevice(deviceId, providerDescs);
+
+        if (oldDevice == null) {
+            // ADD
+            return createDevice(providerId, newDevice);
+        } else {
+            // UPDATE or ignore (no change or stale)
+            return updateDevice(providerId, oldDevice, newDevice);
         }
-        return updateDevice(providerId, device, deviceDescription);
     }
 
     // Creates the device and returns the appropriate event if necessary.
-    private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
-                                     DeviceDescription desc) {
-        DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
-                                                 desc.manufacturer(),
-                                                 desc.hwVersion(), desc.swVersion(),
-                                                 desc.serialNumber());
+    private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
+
+        // update composed device cache
         synchronized (this) {
-            devices.put(deviceId, device);
-            availableDevices.add(deviceId);
+            devices.putIfAbsent(newDevice.id(), newDevice);
+            if (!providerId.isAncillary()) {
+                availableDevices.add(newDevice.id());
+            }
         }
-        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
+
+        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
     }
 
     // Updates the device and returns the appropriate event if necessary.
-    private DeviceEvent updateDevice(ProviderId providerId, DefaultDevice device,
-                                     DeviceDescription desc) {
+    private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
+
         // We allow only certain attributes to trigger update
-        if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
-                !Objects.equals(device.swVersion(), desc.swVersion())) {
-            DefaultDevice updated = new DefaultDevice(providerId, device.id(),
-                                                      desc.type(),
-                                                      desc.manufacturer(),
-                                                      desc.hwVersion(),
-                                                      desc.swVersion(),
-                                                      desc.serialNumber());
+        if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+            !Objects.equals(oldDevice.swVersion(), newDevice.swVersion())) {
+
             synchronized (this) {
-                devices.put(device.id(), updated);
-                availableDevices.add(device.id());
+                devices.replace(newDevice.id(), oldDevice, newDevice);
+                if (!providerId.isAncillary()) {
+                    availableDevices.add(newDevice.id());
+                }
             }
-            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
+            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
         }
 
-        // Otherwise merely attempt to change availability
-        synchronized (this) {
-            boolean added = availableDevices.add(device.id());
+        // Otherwise merely attempt to change availability if primary provider
+        if (!providerId.isAncillary()) {
+            synchronized (this) {
+            boolean added = availableDevices.add(newDevice.id());
             return !added ? null :
-                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+            }
         }
+        return null;
     }
 
     @Override
     public DeviceEvent markOffline(DeviceId deviceId) {
         synchronized (this) {
             Device device = devices.get(deviceId);
-            boolean removed = device != null && availableDevices.remove(deviceId);
+            boolean removed = (device != null) && availableDevices.remove(deviceId);
             return !removed ? null :
                     new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
         }
     }
 
     @Override
-    public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
+    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
                                   List<PortDescription> portDescriptions) {
+
+        // TODO: implement multi-provider
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+
         List<DeviceEvent> events = new ArrayList<>();
         synchronized (this) {
-            Device device = devices.get(deviceId);
-            checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-            Map<PortNumber, Port> ports = getPortMap(deviceId);
+            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
 
             // Add new ports
             Set<PortNumber> processed = new HashSet<>();
             for (PortDescription portDescription : portDescriptions) {
-                Port port = ports.get(portDescription.portNumber());
-                events.add(port == null ?
-                                   createPort(device, portDescription, ports) :
-                                   updatePort(device, port, portDescription, ports));
+                PortNumber number = portDescription.portNumber();
+                Port oldPort = ports.get(number);
+                // update description
+                descs.putPortDesc(number, portDescription);
+                Port newPort = composePort(device, number, descsMap);
+
+                events.add(oldPort == null ?
+                                   createPort(device, newPort, ports) :
+                                   updatePort(device, oldPort, newPort, ports));
                 processed.add(portDescription.portNumber());
             }
 
@@ -168,25 +219,20 @@
 
     // Creates a new port based on the port description adds it to the map and
     // Returns corresponding event.
-    private DeviceEvent createPort(Device device, PortDescription portDescription,
-                                   Map<PortNumber, Port> ports) {
-        DefaultPort port = new DefaultPort(device, portDescription.portNumber(),
-                                           portDescription.isEnabled());
-        ports.put(port.number(), port);
-        return new DeviceEvent(PORT_ADDED, device, port);
+    private DeviceEvent createPort(Device device, Port newPort,
+                                   ConcurrentMap<PortNumber, Port> ports) {
+        ports.put(newPort.number(), newPort);
+        return new DeviceEvent(PORT_ADDED, device, newPort);
     }
 
     // CHecks if the specified port requires update and if so, it replaces the
     // existing entry in the map and returns corresponding event.
-    private DeviceEvent updatePort(Device device, Port port,
-                                   PortDescription portDescription,
-                                   Map<PortNumber, Port> ports) {
-        if (port.isEnabled() != portDescription.isEnabled()) {
-            DefaultPort updatedPort =
-                    new DefaultPort(device, portDescription.portNumber(),
-                                    portDescription.isEnabled());
-            ports.put(port.number(), updatedPort);
-            return new DeviceEvent(PORT_UPDATED, device, updatedPort);
+    private DeviceEvent updatePort(Device device, Port oldPort,
+                                   Port newPort,
+                                   ConcurrentMap<PortNumber, Port> ports) {
+        if (oldPort.isEnabled() != newPort.isEnabled()) {
+            ports.put(oldPort.number(), newPort);
+            return new DeviceEvent(PORT_UPDATED, device, newPort);
         }
         return null;
     }
@@ -211,31 +257,48 @@
 
     // Gets the map of ports for the specified device; if one does not already
     // exist, it creates and registers a new one.
-    private Map<PortNumber, Port> getPortMap(DeviceId deviceId) {
-        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
-        if (ports == null) {
-            ports = new HashMap<>();
-            devicePorts.put(deviceId, ports);
-        }
-        return ports;
+    private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+        return createIfAbsentUnchecked(devicePorts, deviceId,
+                new InitConcurrentHashMap<PortNumber, Port>());
     }
 
     @Override
-    public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
                                  PortDescription portDescription) {
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+        // TODO: implement multi-provider
         synchronized (this) {
-            Device device = devices.get(deviceId);
-            checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-            Map<PortNumber, Port> ports = getPortMap(deviceId);
-            Port port = ports.get(portDescription.portNumber());
-            return updatePort(device, port, portDescription, ports);
+            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+            final PortNumber number = portDescription.portNumber();
+            Port oldPort = ports.get(number);
+            // update description
+            descs.putPortDesc(number, portDescription);
+            Port newPort = composePort(device, number, descsMap);
+            if (oldPort == null) {
+                return createPort(device, newPort, ports);
+            } else {
+                return updatePort(device, oldPort, newPort, ports);
+            }
         }
     }
 
     @Override
     public List<Port> getPorts(DeviceId deviceId) {
         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
-        return ports == null ? new ArrayList<Port>() : ImmutableList.copyOf(ports.values());
+        if (ports == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(ports.values());
     }
 
     @Override
@@ -257,4 +320,136 @@
                     new DeviceEvent(DEVICE_REMOVED, device, null);
         }
     }
+
+    /**
+     * Returns a Device, merging description given from multiple Providers.
+     *
+     * @param deviceId device identifier
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Device instance
+     */
+    private Device composeDevice(DeviceId deviceId,
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+
+        DeviceDescriptions desc = providerDescs.get(primary);
+        Type type = desc.getDeviceDesc().type();
+        String manufacturer = desc.getDeviceDesc().manufacturer();
+        String hwVersion = desc.getDeviceDesc().hwVersion();
+        String swVersion = desc.getDeviceDesc().swVersion();
+        String serialNumber = desc.getDeviceDesc().serialNumber();
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // FIXME: implement attribute merging once we have K-V attributes
+        }
+
+        return new DefaultDevice(primary, deviceId , type, manufacturer, hwVersion, swVersion, serialNumber);
+    }
+
+    // probably want composePorts
+    private Port composePort(Device device, PortNumber number,
+                ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+        DeviceDescriptions primDescs = providerDescs.get(primary);
+        final PortDescription portDesc = primDescs.getPortDesc(number);
+        boolean isEnabled;
+        if (portDesc != null) {
+            isEnabled = portDesc.isEnabled();
+        } else {
+            // if no primary, assume not enabled
+            // TODO: revisit this port enabled/disabled behavior
+            isEnabled = false;
+        }
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // FIXME: implement attribute merging once we have K-V attributes
+        }
+
+        return new DefaultPort(device, number, isEnabled);
+    }
+
+    /**
+     * @return primary ProviderID, or randomly chosen one if none exists
+     */
+    private ProviderId pickPrimaryPID(
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId fallBackPrimary = null;
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (!e.getKey().isAncillary()) {
+                return e.getKey();
+            } else if (fallBackPrimary == null) {
+                // pick randomly as a fallback in case there is no primary
+                fallBackPrimary = e.getKey();
+            }
+        }
+        return fallBackPrimary;
+    }
+
+    // TODO: can be made generic
+    private static final class InitConcurrentHashMap<K, V> implements
+            ConcurrentInitializer<ConcurrentMap<K, V>> {
+        @Override
+        public ConcurrentMap<K, V> get() throws ConcurrentException {
+            return new ConcurrentHashMap<>();
+        }
+    }
+
+    public static final class InitDeviceDescs
+        implements ConcurrentInitializer<DeviceDescriptions> {
+        private final DeviceDescription deviceDesc;
+        public InitDeviceDescs(DeviceDescription deviceDesc) {
+            this.deviceDesc = checkNotNull(deviceDesc);
+        }
+        @Override
+        public DeviceDescriptions get() throws ConcurrentException {
+            return new DeviceDescriptions(deviceDesc);
+        }
+    }
+
+
+    /**
+     * Collection of Description of a Device and it's Ports given from a Provider.
+     */
+    private static class DeviceDescriptions {
+        //        private final DeviceId id;
+        //        private final ProviderId pid;
+
+        private final AtomicReference<DeviceDescription> deviceDesc;
+        private final ConcurrentMap<PortNumber, PortDescription> portDescs;
+
+        public DeviceDescriptions(DeviceDescription desc) {
+            this.deviceDesc = new AtomicReference<>(desc);
+            this.portDescs = new ConcurrentHashMap<>();
+        }
+
+        public DeviceDescription getDeviceDesc() {
+            return deviceDesc.get();
+        }
+
+        public PortDescription getPortDesc(PortNumber number) {
+            return portDescs.get(number);
+        }
+
+        public Collection<PortDescription> getPortDescs() {
+            return Collections.unmodifiableCollection(portDescs.values());
+        }
+
+        public DeviceDescription putDeviceDesc(DeviceDescription newDesc) {
+            return deviceDesc.getAndSet(newDesc);
+        }
+
+        public PortDescription putPortDesc(PortNumber number, PortDescription newDesc) {
+            return portDescs.put(number, newDesc);
+        }
+    }
 }