ECDeviceStore: DeviceStore built using ONOS distributed primitives: ECMap and DistributedSet (disabled right now)

Change-Id: I36fdcd635f982f2b8dac291c52be4662601ef9f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java
new file mode 100644
index 0000000..0896bf1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.device.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Key for DeviceDescriptions in ECDeviceStore.
+ */
+public class DeviceKey {
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+
+    public DeviceKey(ProviderId providerId, DeviceId deviceId) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(providerId, deviceId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof DeviceKey)) {
+            return false;
+        }
+        DeviceKey that = (DeviceKey) obj;
+        return Objects.equals(this.deviceId, that.deviceId) &&
+               Objects.equals(this.providerId, that.providerId);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
new file mode 100644
index 0000000..f913f4d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
@@ -0,0 +1,784 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipTermService;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.AnnotationsUtil;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.OchPort;
+import org.onosproject.net.OduCltPort;
+import org.onosproject.net.OmsPort;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.Device.Type;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.DeviceClockService;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceStore;
+import org.onosproject.net.device.DeviceStoreDelegate;
+import org.onosproject.net.device.OchPortDescription;
+import org.onosproject.net.device.OduCltPortDescription;
+import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.WallClockTimestamp;
+
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ECDeviceStore
+    extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+    implements DeviceStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+    private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
+    private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+    Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
+
+    private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
+    private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
+    private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
+    private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
+
+    private DistributedSet<DeviceId> availableDevices;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipTermService mastershipTermService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceClockService deviceClockService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private NodeId localNodeId;
+    private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
+            new InternalDeviceChangeEventListener();
+    private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
+            new InternalPortChangeEventListener();
+    private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
+            new InternalPortStatsListener();
+    private final SetEventListener<DeviceId> deviceStatusTracker =
+            new InternalDeviceStatusTracker();
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(DistributedStoreSerializers.STORE_COMMON)
+                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+                    .register(DeviceInjectedEvent.class)
+                    .register(PortInjectedEvent.class)
+                    .build();
+        }
+    };
+
+    protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(DeviceKey.class)
+            .register(PortKey.class)
+            .register(DeviceKey.class)
+            .register(PortKey.class)
+            .register(MastershipBasedTimestamp.class);
+
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+
+        deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
+                .withName("onos-device-descriptions")
+                .withSerializer(SERIALIZER_BUILDER)
+                .withTimestampProvider((k, v) -> {
+                    try {
+                        return deviceClockService.getTimestamp(k.deviceId());
+                    } catch (IllegalStateException e) {
+                        return null;
+                    }
+                }).build();
+
+        portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
+                .withName("onos-port-descriptions")
+                .withSerializer(SERIALIZER_BUILDER)
+                .withTimestampProvider((k, v) -> {
+                    try {
+                        return deviceClockService.getTimestamp(k.deviceId());
+                    } catch (IllegalStateException e) {
+                        return null;
+                    }
+                }).build();
+
+        devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
+                .withName("onos-port-stats")
+                .withSerializer(SERIALIZER_BUILDER)
+                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
+                eventuallyConsistentMapBuilder()
+                .withName("onos-port-stats-delta")
+                .withSerializer(SERIALIZER_BUILDER)
+                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        clusterCommunicator.addSubscriber(DEVICE_INJECTED,
+                SERIALIZER::decode,
+                this::injectDevice,
+                SERIALIZER::encode,
+                SharedExecutors.getPoolThreadExecutor());
+
+        clusterCommunicator.addSubscriber(PORT_INJECTED,
+                SERIALIZER::decode,
+                this::injectPort,
+                SERIALIZER::encode,
+                SharedExecutors.getPoolThreadExecutor());
+
+        availableDevices = storageService.<DeviceId>setBuilder()
+                .withName("onos-online-devices")
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .withPartitionsDisabled()
+                .withRelaxedReadConsistency()
+                .build();
+
+        deviceDescriptions.addListener(deviceUpdateListener);
+        portDescriptions.addListener(portUpdateListener);
+        devicePortStats.addListener(portStatsListener);
+        availableDevices.addListener(deviceStatusTracker);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        devicePortStats.removeListener(portStatsListener);
+        deviceDescriptions.removeListener(deviceUpdateListener);
+        portDescriptions.removeListener(portUpdateListener);
+        availableDevices.removeListener(deviceStatusTracker);
+        devicePortStats.destroy();
+        devicePortDeltaStats.destroy();
+        deviceDescriptions.destroy();
+        portDescriptions.destroy();
+        devices.clear();
+        devicePorts.clear();
+        clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
+        clusterCommunicator.removeSubscriber(PORT_INJECTED);
+        log.info("Stopped");
+    }
+
+    @Override
+    public Iterable<Device> getDevices() {
+        return devices.values();
+    }
+
+    @Override
+    public int getDeviceCount() {
+        return devices.size();
+    }
+
+    @Override
+    public Device getDevice(DeviceId deviceId) {
+        return devices.get(deviceId);
+    }
+
+    @Override
+    public DeviceEvent createOrUpdateDevice(ProviderId providerId,
+            DeviceId deviceId,
+            DeviceDescription deviceDescription) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        if (localNodeId.equals(master)) {
+            deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
+            return refreshDeviceCache(providerId, deviceId);
+        } else {
+            DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
+            return Futures.getUnchecked(
+                    clusterCommunicator.sendAndReceive(deviceInjectedEvent,
+                            DEVICE_INJECTED,
+                            SERIALIZER::encode,
+                            SERIALIZER::decode,
+                            master));
+        }
+    }
+
+    private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
+        AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
+        Device device = devices.compute(deviceId, (k, existingDevice) -> {
+            Device newDevice = composeDevice(deviceId);
+            if (existingDevice == null) {
+                eventType.set(DEVICE_ADDED);
+            } else {
+                // We allow only certain attributes to trigger update
+                boolean propertiesChanged =
+                        !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
+                                !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
+                                !Objects.equals(existingDevice.providerId(), newDevice.providerId());
+                boolean annotationsChanged =
+                        !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
+
+                // Primary providers can respond to all changes, but ancillary ones
+                // should respond only to annotation changes.
+                if ((providerId.isAncillary() && annotationsChanged) ||
+                        (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
+                    boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
+                    verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                            providerId, existingDevice, devices.get(deviceId), newDevice);
+                    eventType.set(DEVICE_UPDATED);
+                }
+            }
+            return newDevice;
+        });
+        if (eventType.get() != null && !providerId.isAncillary()) {
+            markOnline(deviceId);
+        }
+        return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
+    }
+
+    /**
+     * Returns the primary providerId for a device.
+     * @param deviceId device identifier
+     * @return primary providerId
+     */
+    private Set<ProviderId> getAllProviders(DeviceId deviceId) {
+        return deviceDescriptions.keySet()
+                                 .stream()
+                                 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
+                                 .map(deviceKey -> deviceKey.providerId())
+                                 .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns the identifier for all providers for a device.
+     * @param deviceId device identifier
+     * @return set of provider identifiers
+     */
+    private ProviderId getPrimaryProviderId(DeviceId deviceId) {
+        Set<ProviderId> allProviderIds = getAllProviders(deviceId);
+        return allProviderIds.stream()
+                             .filter(p -> !p.isAncillary())
+                             .findFirst()
+                             .orElse(Iterables.getFirst(allProviderIds, null));
+    }
+
+    /**
+     * Returns a Device, merging descriptions from multiple Providers.
+     *
+     * @param deviceId      device identifier
+     * @return Device instance
+     */
+    private Device composeDevice(DeviceId deviceId) {
+
+        ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
+        DeviceDescription primaryDeviceDescription =
+                deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
+
+        Type type = primaryDeviceDescription.type();
+        String manufacturer = primaryDeviceDescription.manufacturer();
+        String hwVersion = primaryDeviceDescription.hwVersion();
+        String swVersion = primaryDeviceDescription.swVersion();
+        String serialNumber = primaryDeviceDescription.serialNumber();
+        ChassisId chassisId = primaryDeviceDescription.chassisId();
+        DefaultAnnotations annotations = mergeAnnotations(deviceId);
+
+        return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
+                                 hwVersion, swVersion, serialNumber,
+                                 chassisId, annotations);
+    }
+
+    private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
+        Device removedDevice = devices.remove(deviceId);
+        if (removedDevice != null) {
+            getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
+            return new DeviceEvent(DEVICE_REMOVED, removedDevice);
+        }
+        return null;
+    }
+
+    private boolean markOnline(DeviceId deviceId) {
+        return availableDevices.add(deviceId);
+    }
+
+    @Override
+    public DeviceEvent markOffline(DeviceId deviceId) {
+        availableDevices.remove(deviceId);
+        // set update listener will raise the event.
+        return null;
+    }
+
+    @Override
+    public List<DeviceEvent> updatePorts(ProviderId providerId,
+            DeviceId deviceId,
+            List<PortDescription> descriptions) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        List<DeviceEvent> deviceEvents = null;
+        if (localNodeId.equals(master)) {
+            descriptions.forEach(description -> {
+                PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
+                portDescriptions.put(portKey, description);
+            });
+            deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
+        } else {
+            if (master == null) {
+                return Collections.emptyList();
+            }
+            PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
+            deviceEvents = Futures.getUnchecked(
+                    clusterCommunicator.sendAndReceive(portInjectedEvent,
+                                    PORT_INJECTED,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    master));
+        }
+        return deviceEvents == null ? Collections.emptyList() : deviceEvents;
+    }
+
+    private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
+            DeviceId deviceId,
+            Optional<PortNumber> portNumber) {
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+        List<DeviceEvent> events = Lists.newArrayList();
+
+        Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
+        List<PortDescription> descriptions = Lists.newArrayList();
+        portDescriptions.entrySet().forEach(e -> {
+            PortKey key = e.getKey();
+            PortDescription value = e.getValue();
+            if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
+                if (portNumber.isPresent()) {
+                    if (portNumber.get().equals(key.portNumber())) {
+                        descriptions.add(value);
+                    }
+                } else {
+                    descriptions.add(value);
+                }
+            }
+        });
+
+        for (PortDescription description : descriptions) {
+            final PortNumber number = description.portNumber();
+            ports.compute(number, (k, existingPort) -> {
+                Port newPort = composePort(device, number);
+                if (existingPort == null) {
+                    events.add(new DeviceEvent(PORT_ADDED, device, newPort));
+                } else {
+                    if (existingPort.isEnabled() != newPort.isEnabled() ||
+                            existingPort.type() != newPort.type() ||
+                            existingPort.portSpeed() != newPort.portSpeed() ||
+                            !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
+                        events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
+                    }
+                }
+                return newPort;
+            });
+        }
+
+        return events;
+    }
+
+    /**
+     * Returns a Port, merging descriptions from multiple Providers.
+     *
+     * @param device   device the port is on
+     * @param number   port number
+     * @return Port instance
+     */
+    private Port composePort(Device device, PortNumber number) {
+
+        Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
+        portDescriptions.entrySet().forEach(entry -> {
+            PortKey portKey = entry.getKey();
+            if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
+                descriptions.put(portKey.providerId(), entry.getValue());
+            }
+        });
+        ProviderId primary = getPrimaryProviderId(device.id());
+        PortDescription primaryDescription = descriptions.get(primary);
+
+        // if no primary, assume not enabled
+        boolean isEnabled = false;
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        if (primaryDescription != null) {
+            isEnabled = primaryDescription.isEnabled();
+            annotations = merge(annotations, primaryDescription.annotations());
+        }
+        Port updated = null;
+        for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            annotations = merge(annotations, e.getValue().annotations());
+            updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
+        }
+        if (primaryDescription == null) {
+            return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
+        }
+        return updated == null
+                ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
+                : updated;
+    }
+
+    private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
+            PortDescription description, Annotations annotations) {
+        switch (description.type()) {
+        case OMS:
+            OmsPortDescription omsDesc = (OmsPortDescription) description;
+            return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
+                    omsDesc.maxFrequency(), omsDesc.grid(), annotations);
+        case OCH:
+            OchPortDescription ochDesc = (OchPortDescription) description;
+            return new OchPort(device, number, isEnabled, ochDesc.signalType(),
+                    ochDesc.isTunable(), ochDesc.lambda(), annotations);
+        case ODUCLT:
+            OduCltPortDescription oduDesc = (OduCltPortDescription) description;
+            return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
+        default:
+            return new DefaultPort(device, number, isEnabled, description.type(),
+                    description.portSpeed(), annotations);
+        }
+    }
+
+    @Override
+    public DeviceEvent updatePortStatus(ProviderId providerId,
+            DeviceId deviceId,
+            PortDescription portDescription) {
+        portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
+        List<DeviceEvent> events =
+                refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
+        return Iterables.getFirst(events, null);
+    }
+
+    @Override
+    public List<Port> getPorts(DeviceId deviceId) {
+        return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
+    }
+
+    @Override
+    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+        return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
+    }
+
+    @Override
+    public DeviceEvent updatePortStatistics(ProviderId providerId,
+            DeviceId deviceId,
+            Collection<PortStatistics> newStatsCollection) {
+
+        Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
+        Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
+        Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
+
+        if (prvStatsMap != null) {
+            for (PortStatistics newStats : newStatsCollection) {
+                PortNumber port = PortNumber.portNumber(newStats.port());
+                PortStatistics prvStats = prvStatsMap.get(port);
+                DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
+                PortStatistics deltaStats = builder.build();
+                if (prvStats != null) {
+                    deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
+                }
+                deltaStatsMap.put(port, deltaStats);
+                newStatsMap.put(port, newStats);
+            }
+        } else {
+            for (PortStatistics newStats : newStatsCollection) {
+                PortNumber port = PortNumber.portNumber(newStats.port());
+                newStatsMap.put(port, newStats);
+            }
+        }
+        devicePortDeltaStats.put(deviceId, deltaStatsMap);
+        devicePortStats.put(deviceId, newStatsMap);
+        // DeviceEvent returns null because of InternalPortStatsListener usage
+        return null;
+    }
+
+    /**
+     * Calculate delta statistics by subtracting previous from new statistics.
+     *
+     * @param deviceId
+     * @param prvStats
+     * @param newStats
+     * @return PortStatistics
+     */
+    public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
+        // calculate time difference
+        long deltaStatsSec, deltaStatsNano;
+        if (newStats.durationNano() < prvStats.durationNano()) {
+            deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
+            deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
+        } else {
+            deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
+            deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
+        }
+        DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
+        DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
+                .setPort(newStats.port())
+                .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
+                .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
+                .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
+                .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
+                .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
+                .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
+                .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
+                .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
+                .setDurationSec(deltaStatsSec)
+                .setDurationNano(deltaStatsNano)
+                .build();
+        return deltaStats;
+    }
+
+    @Override
+    public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
+        Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
+        if (portStats == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(portStats.values());
+    }
+
+    @Override
+    public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
+        Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
+        if (portStats == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(portStats.values());
+    }
+
+    @Override
+    public boolean isAvailable(DeviceId deviceId) {
+        return availableDevices.contains(deviceId);
+    }
+
+    @Override
+    public Iterable<Device> getAvailableDevices() {
+        return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
+    }
+
+    @Override
+    public DeviceEvent removeDevice(DeviceId deviceId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        // if there exist a master, forward
+        // if there is no master, try to become one and process
+        boolean relinquishAtEnd = false;
+        if (master == null) {
+            final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+            if (myRole != MastershipRole.NONE) {
+                relinquishAtEnd = true;
+            }
+            log.debug("Temporarily requesting role for {} to remove", deviceId);
+            MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
+            if (role == MastershipRole.MASTER) {
+                master = localNodeId;
+            }
+        }
+
+        if (!localNodeId.equals(master)) {
+            log.debug("{} has control of {}, forwarding remove request",
+                      master, deviceId);
+
+            clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
+                               .whenComplete((r, e) -> {
+                                   if (e != null) {
+                                       log.error("Failed to forward {} remove request to its master", deviceId, e);
+                                   }
+                               });
+            return null;
+        }
+
+        // I have control..
+        DeviceEvent event = null;
+        final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
+        DeviceDescription removedDeviceDescription =
+                deviceDescriptions.remove(deviceKey);
+        if (removedDeviceDescription != null) {
+            event = purgeDeviceCache(deviceId);
+        }
+
+        if (relinquishAtEnd) {
+            log.debug("Relinquishing temporary role acquired for {}", deviceId);
+            mastershipService.relinquishMastership(deviceId);
+        }
+        return event;
+    }
+
+    private DeviceEvent injectDevice(DeviceInjectedEvent event) {
+        return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
+    }
+
+    private List<DeviceEvent> injectPort(PortInjectedEvent event) {
+        return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
+    }
+
+    private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
+        ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
+        DeviceDescription primaryDeviceDescription =
+                deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        annotations = merge(annotations, primaryDeviceDescription.annotations());
+        for (ProviderId providerId : getAllProviders(deviceId)) {
+            if (!providerId.equals(primaryProviderId)) {
+                annotations = merge(annotations,
+                                    deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
+            }
+        }
+        return annotations;
+    }
+
+    private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
+        @Override
+        public void event(SetEvent<DeviceId> event) {
+            final DeviceId deviceId = event.entry();
+            final Device device = devices.get(deviceId);
+            if (device != null) {
+                notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
+            } else {
+                pendingAvailableChangeUpdates.add(deviceId);
+            }
+        }
+    }
+
+    private class InternalDeviceChangeEventListener
+        implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
+            DeviceId deviceId = event.key().deviceId();
+            ProviderId providerId = event.key().providerId();
+            if (event.type() == PUT) {
+                notifyDelegate(refreshDeviceCache(providerId, deviceId));
+                if (pendingAvailableChangeUpdates.remove(deviceId)) {
+                    notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
+                }
+            } else if (event.type() == REMOVE) {
+                notifyDelegate(purgeDeviceCache(deviceId));
+            }
+        }
+    }
+
+    private class InternalPortChangeEventListener
+        implements EventuallyConsistentMapListener<PortKey, PortDescription> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
+            DeviceId deviceId = event.key().deviceId();
+            ProviderId providerId = event.key().providerId();
+            PortNumber portNumber = event.key().portNumber();
+            if (event.type() == PUT) {
+                if (devices.containsKey(deviceId)) {
+                    List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
+                    for (DeviceEvent deviceEvent : events) {
+                        notifyDelegate(deviceEvent);
+                    }
+                }
+            } else if (event.type() == REMOVE) {
+                log.warn("Unexpected port removed event");
+            }
+        }
+    }
+
+    private class InternalPortStatsListener
+        implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
+            if (event.type() == PUT) {
+                Device device = devices.get(event.key());
+                if (device != null) {
+                    delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java
new file mode 100644
index 0000000..62b0995
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.device.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Key for PortDescriptions in ECDeviceStore.
+ */
+public class PortKey {
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final PortNumber portNumber;
+
+    public PortKey(ProviderId providerId, DeviceId deviceId, PortNumber portNumber) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portNumber = portNumber;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public PortNumber portNumber() {
+        return portNumber;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(providerId, deviceId, portNumber);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof PortKey)) {
+            return false;
+        }
+        PortKey that = (PortKey) obj;
+        return Objects.equals(this.deviceId, that.deviceId) &&
+               Objects.equals(this.providerId, that.providerId) &&
+               Objects.equals(this.portNumber, that.portNumber);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("portNumber", portNumber)
+                .toString();
+    }
+}