/*
 * Copyright 2015-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.device.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.ChassisId;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipTermService;
import org.onosproject.net.Annotations;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.Device.Type;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceStore;
import org.onosproject.net.device.DeviceStoreDelegate;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
 */
//@Component(immediate = true, enabled = false)
@Service
public class ECDeviceStore
    extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
    implements DeviceStore {

    private final Logger log = getLogger(getClass());

    private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";

    private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
    private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
    Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();

    private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
    private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
    private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
    private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;

    private DistributedSet<DeviceId> availableDevices;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipTermService mastershipTermService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceClockService deviceClockService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    private NodeId localNodeId;
    private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
            new InternalDeviceChangeEventListener();
    private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
            new InternalPortChangeEventListener();
    private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
            new InternalPortStatsListener();
    private final SetEventListener<DeviceId> deviceStatusTracker =
            new InternalDeviceStatusTracker();

    protected static final Serializer SERIALIZER = Serializer.using(
                  KryoNamespace.newBuilder()
                    .register(DistributedStoreSerializers.STORE_COMMON)
                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
                    .build("ECDevice"));

    protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
            .register(KryoNamespaces.API)
            .register(DeviceKey.class)
            .register(PortKey.class)
            .register(DeviceKey.class)
            .register(PortKey.class)
            .register(MastershipBasedTimestamp.class);

    @Activate
    public void activate() {
        localNodeId = clusterService.getLocalNode().id();

        deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
                .withName("onos-device-descriptions")
                .withSerializer(SERIALIZER_BUILDER)
                .withTimestampProvider((k, v) -> {
                    try {
                        return deviceClockService.getTimestamp(k.deviceId());
                    } catch (IllegalStateException e) {
                        return null;
                    }
                }).build();

        portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
                .withName("onos-port-descriptions")
                .withSerializer(SERIALIZER_BUILDER)
                .withTimestampProvider((k, v) -> {
                    try {
                        return deviceClockService.getTimestamp(k.deviceId());
                    } catch (IllegalStateException e) {
                        return null;
                    }
                }).build();

        devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
                .withName("onos-port-stats")
                .withSerializer(SERIALIZER_BUILDER)
                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
                .withTimestampProvider((k, v) -> new WallClockTimestamp())
                .withTombstonesDisabled()
                .build();

        devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
                eventuallyConsistentMapBuilder()
                .withName("onos-port-stats-delta")
                .withSerializer(SERIALIZER_BUILDER)
                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
                .withTimestampProvider((k, v) -> new WallClockTimestamp())
                .withTombstonesDisabled()
                .build();

        availableDevices = storageService.<DeviceId>setBuilder()
                .withName("onos-online-devices")
                .withSerializer(Serializer.using(KryoNamespaces.API))
                .withRelaxedReadConsistency()
                .build()
                .asDistributedSet();

        deviceDescriptions.addListener(deviceUpdateListener);
        portDescriptions.addListener(portUpdateListener);
        devicePortStats.addListener(portStatsListener);
        availableDevices.addListener(deviceStatusTracker);
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        devicePortStats.removeListener(portStatsListener);
        deviceDescriptions.removeListener(deviceUpdateListener);
        portDescriptions.removeListener(portUpdateListener);
        availableDevices.removeListener(deviceStatusTracker);
        devicePortStats.destroy();
        devicePortDeltaStats.destroy();
        deviceDescriptions.destroy();
        portDescriptions.destroy();
        devices.clear();
        devicePorts.clear();
        log.info("Stopped");
    }

    @Override
    public Iterable<Device> getDevices() {
        return devices.values();
    }

    @Override
    public int getDeviceCount() {
        return devices.size();
    }

    @Override
    public int getAvailableDeviceCount() {
        return availableDevices.size();
    }

    @Override
    public Device getDevice(DeviceId deviceId) {
        return devices.get(deviceId);
    }

    // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
    @Override
    public DeviceEvent createOrUpdateDevice(ProviderId providerId,
            DeviceId deviceId,
            DeviceDescription deviceDescription) {
        NodeId master = mastershipService.getMasterFor(deviceId);
        if (localNodeId.equals(master)) {
            deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
            return refreshDeviceCache(providerId, deviceId);
        } else {
                return null;
        }
    }

    private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
        AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
        Device device = devices.compute(deviceId, (k, existingDevice) -> {
            Device newDevice = composeDevice(deviceId);
            if (existingDevice == null) {
                eventType.set(DEVICE_ADDED);
            } else {
                // We allow only certain attributes to trigger update
                boolean propertiesChanged =
                        !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
                                !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
                                !Objects.equals(existingDevice.providerId(), newDevice.providerId());
                boolean annotationsChanged =
                        !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());

                // Primary providers can respond to all changes, but ancillary ones
                // should respond only to annotation changes.
                if ((providerId.isAncillary() && annotationsChanged) ||
                        (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
                    boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
                    verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
                            providerId, existingDevice, devices.get(deviceId), newDevice);
                    eventType.set(DEVICE_UPDATED);
                }
            }
            return newDevice;
        });
        if (eventType.get() != null && !providerId.isAncillary()) {
            markOnline(deviceId);
        }
        return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
    }

    /**
     * Returns the primary providerId for a device.
     * @param deviceId device identifier
     * @return primary providerId
     */
    private Set<ProviderId> getAllProviders(DeviceId deviceId) {
        return deviceDescriptions.keySet()
                                 .stream()
                                 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
                                 .map(deviceKey -> deviceKey.providerId())
                                 .collect(Collectors.toSet());
    }

    /**
     * Returns the identifier for all providers for a device.
     * @param deviceId device identifier
     * @return set of provider identifiers
     */
    private ProviderId getPrimaryProviderId(DeviceId deviceId) {
        Set<ProviderId> allProviderIds = getAllProviders(deviceId);
        return allProviderIds.stream()
                             .filter(p -> !p.isAncillary())
                             .findFirst()
                             .orElse(Iterables.getFirst(allProviderIds, null));
    }

    /**
     * Returns a Device, merging descriptions from multiple Providers.
     *
     * @param deviceId      device identifier
     * @return Device instance
     */
    private Device composeDevice(DeviceId deviceId) {

        ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
        DeviceDescription primaryDeviceDescription =
                deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));

        Type type = primaryDeviceDescription.type();
        String manufacturer = primaryDeviceDescription.manufacturer();
        String hwVersion = primaryDeviceDescription.hwVersion();
        String swVersion = primaryDeviceDescription.swVersion();
        String serialNumber = primaryDeviceDescription.serialNumber();
        ChassisId chassisId = primaryDeviceDescription.chassisId();
        DefaultAnnotations annotations = mergeAnnotations(deviceId);

        return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
                                 hwVersion, swVersion, serialNumber,
                                 chassisId, annotations);
    }

    private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
        Device removedDevice = devices.remove(deviceId);
        if (removedDevice != null) {
            getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
            return new DeviceEvent(DEVICE_REMOVED, removedDevice);
        }
        return null;
    }

    // FIXME publicization of markOnline -- trigger some action independently?
    public DeviceEvent markOnline(DeviceId deviceId) {
        if (devices.containsKey(deviceId)) {
            if (availableDevices.add(deviceId)) {
                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId), null);
            }
        }
        log.warn("Device {} does not exist in store", deviceId);
        return null;
    }

    @Override
    public DeviceEvent markOffline(DeviceId deviceId) {
        availableDevices.remove(deviceId);
        // set update listener will raise the event.
        return null;
    }

    @Override
    public List<DeviceEvent> updatePorts(ProviderId providerId,
            DeviceId deviceId,
            List<PortDescription> descriptions) {
        NodeId master = mastershipService.getMasterFor(deviceId);
        List<DeviceEvent> deviceEvents = null;
        if (localNodeId.equals(master)) {
            descriptions.forEach(description -> {
                PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
                portDescriptions.put(portKey, description);
            });
            deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
        } else {
            return Collections.emptyList();
        }
        return deviceEvents == null ? Collections.emptyList() : deviceEvents;
    }

    private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
            DeviceId deviceId,
            Optional<PortNumber> portNumber) {
        Device device = devices.get(deviceId);
        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
        List<DeviceEvent> events = Lists.newArrayList();

        Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
        List<PortDescription> descriptions = Lists.newArrayList();
        portDescriptions.entrySet().forEach(e -> {
            PortKey key = e.getKey();
            PortDescription value = e.getValue();
            if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
                if (portNumber.isPresent()) {
                    if (portNumber.get().equals(key.portNumber())) {
                        descriptions.add(value);
                    }
                } else {
                    descriptions.add(value);
                }
            }
        });

        for (PortDescription description : descriptions) {
            final PortNumber number = description.portNumber();
            ports.compute(number, (k, existingPort) -> {
                Port newPort = composePort(device, number);
                if (existingPort == null) {
                    events.add(new DeviceEvent(PORT_ADDED, device, newPort));
                } else {
                    if (existingPort.isEnabled() != newPort.isEnabled() ||
                            existingPort.type() != newPort.type() ||
                            existingPort.portSpeed() != newPort.portSpeed() ||
                            !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
                        events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
                    }
                }
                return newPort;
            });
        }

        return events;
    }

    /**
     * Returns a Port, merging descriptions from multiple Providers.
     *
     * @param device   device the port is on
     * @param number   port number
     * @return Port instance
     */
    private Port composePort(Device device, PortNumber number) {

        Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
        portDescriptions.entrySet().forEach(entry -> {
            PortKey portKey = entry.getKey();
            if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
                descriptions.put(portKey.providerId(), entry.getValue());
            }
        });
        ProviderId primary = getPrimaryProviderId(device.id());
        PortDescription primaryDescription = descriptions.get(primary);

        // if no primary, assume not enabled
        boolean isEnabled = false;
        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
        if (primaryDescription != null) {
            isEnabled = primaryDescription.isEnabled();
            annotations = merge(annotations, primaryDescription.annotations());
        }
        Port updated = null;
        for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
            if (e.getKey().equals(primary)) {
                continue;
            }
            annotations = merge(annotations, e.getValue().annotations());
            updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
        }
        if (primaryDescription == null) {
            return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
        }
        return updated == null
                ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
                : updated;
    }

    private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
            PortDescription description, Annotations annotations) {
            return new DefaultPort(device, number, isEnabled, description.type(),
                    description.portSpeed(), annotations);
    }

    @Override
    public DeviceEvent updatePortStatus(ProviderId providerId,
            DeviceId deviceId,
            PortDescription portDescription) {
        portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
        List<DeviceEvent> events =
                refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
        return Iterables.getFirst(events, null);
    }

    @Override
    public List<Port> getPorts(DeviceId deviceId) {
        return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
    }

    @Override
    public Stream<PortDescription> getPortDescriptions(ProviderId pid,
                                                       DeviceId deviceId) {

        return portDescriptions.entrySet().stream()
                .filter(e -> e.getKey().providerId().equals(pid))
                .map(Map.Entry::getValue);
    }

    @Override
    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
        return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
    }

    @Override
    public PortDescription getPortDescription(ProviderId pid,
                                              DeviceId deviceId,
                                              PortNumber portNumber) {
        return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
    }

    @Override
    public DeviceEvent updatePortStatistics(ProviderId providerId,
            DeviceId deviceId,
            Collection<PortStatistics> newStatsCollection) {

        Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
        Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
        Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();

        if (prvStatsMap != null) {
            for (PortStatistics newStats : newStatsCollection) {
                PortNumber port = PortNumber.portNumber(newStats.port());
                PortStatistics prvStats = prvStatsMap.get(port);
                DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
                PortStatistics deltaStats = builder.build();
                if (prvStats != null) {
                    deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
                }
                deltaStatsMap.put(port, deltaStats);
                newStatsMap.put(port, newStats);
            }
        } else {
            for (PortStatistics newStats : newStatsCollection) {
                PortNumber port = PortNumber.portNumber(newStats.port());
                newStatsMap.put(port, newStats);
            }
        }
        devicePortDeltaStats.put(deviceId, deltaStatsMap);
        devicePortStats.put(deviceId, newStatsMap);
        // DeviceEvent returns null because of InternalPortStatsListener usage
        return null;
    }

    /**
     * Calculate delta statistics by subtracting previous from new statistics.
     *
     * @param deviceId device indentifier
     * @param prvStats previous port statistics
     * @param newStats new port statistics
     * @return PortStatistics
     */
    public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
        // calculate time difference
        long deltaStatsSec, deltaStatsNano;
        if (newStats.durationNano() < prvStats.durationNano()) {
            deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
            deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
        } else {
            deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
            deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
        }
        DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
        DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
                .setPort(newStats.port())
                .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
                .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
                .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
                .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
                .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
                .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
                .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
                .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
                .setDurationSec(deltaStatsSec)
                .setDurationNano(deltaStatsNano)
                .build();
        return deltaStats;
    }

    @Override
    public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
        Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
        if (portStats == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf(portStats.values());
    }

    @Override
    public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
        Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
        if (portStatsMap == null) {
            return null;
        }
        PortStatistics portStats = portStatsMap.get(portNumber);
        return portStats;
    }

    @Override
    public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
        Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
        if (portStats == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf(portStats.values());
    }

    @Override
    public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
        Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
        if (portStatsMap == null) {
            return null;
        }
        PortStatistics portStats = portStatsMap.get(portNumber);
        return portStats;
    }

    @Override
    public boolean isAvailable(DeviceId deviceId) {
        return availableDevices.contains(deviceId);
    }

    @Override
    public Iterable<Device> getAvailableDevices() {
        return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
    }

    @Override
    public DeviceEvent removeDevice(DeviceId deviceId) {
        NodeId master = mastershipService.getMasterFor(deviceId);
        // if there exist a master, forward
        // if there is no master, try to become one and process
        boolean relinquishAtEnd = false;
        if (master == null) {
            final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
            if (myRole != MastershipRole.NONE) {
                relinquishAtEnd = true;
            }
            log.debug("Temporarily requesting role for {} to remove", deviceId);
            MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
            if (role == MastershipRole.MASTER) {
                master = localNodeId;
            }
        }

        if (!localNodeId.equals(master)) {
            log.debug("{} has control of {}, forwarding remove request",
                      master, deviceId);

            clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
                               .whenComplete((r, e) -> {
                                   if (e != null) {
                                       log.error("Failed to forward {} remove request to its master", deviceId, e);
                                   }
                               });
            return null;
        }

        // I have control..
        DeviceEvent event = null;
        final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
        DeviceDescription removedDeviceDescription =
                deviceDescriptions.remove(deviceKey);
        if (removedDeviceDescription != null) {
            event = purgeDeviceCache(deviceId);
        }

        if (relinquishAtEnd) {
            log.debug("Relinquishing temporary role acquired for {}", deviceId);
            mastershipService.relinquishMastership(deviceId);
        }
        return event;
    }

    private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
        ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
        DeviceDescription primaryDeviceDescription =
                deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
        annotations = merge(annotations, primaryDeviceDescription.annotations());
        for (ProviderId providerId : getAllProviders(deviceId)) {
            if (!providerId.equals(primaryProviderId)) {
                annotations = merge(annotations,
                                    deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
            }
        }
        return annotations;
    }

    private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
        @Override
        public void event(SetEvent<DeviceId> event) {
            final DeviceId deviceId = event.entry();
            final Device device = devices.get(deviceId);
            if (device != null) {
                notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
            } else {
                pendingAvailableChangeUpdates.add(deviceId);
            }
        }
    }

    private class InternalDeviceChangeEventListener
        implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
        @Override
        public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
            DeviceId deviceId = event.key().deviceId();
            ProviderId providerId = event.key().providerId();
            if (event.type() == PUT) {
                notifyDelegate(refreshDeviceCache(providerId, deviceId));
                if (pendingAvailableChangeUpdates.remove(deviceId)) {
                    notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
                }
            } else if (event.type() == REMOVE) {
                notifyDelegate(purgeDeviceCache(deviceId));
            }
        }
    }

    private class InternalPortChangeEventListener
        implements EventuallyConsistentMapListener<PortKey, PortDescription> {
        @Override
        public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
            DeviceId deviceId = event.key().deviceId();
            ProviderId providerId = event.key().providerId();
            PortNumber portNumber = event.key().portNumber();
            if (event.type() == PUT) {
                if (devices.containsKey(deviceId)) {
                    List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
                    for (DeviceEvent deviceEvent : events) {
                        notifyDelegate(deviceEvent);
                    }
                }
            } else if (event.type() == REMOVE) {
                log.warn("Unexpected port removed event");
            }
        }
    }

    private class InternalPortStatsListener
        implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
        @Override
        public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
            if (event.type() == PUT) {
                Device device = devices.get(event.key());
                if (device != null) {
                    notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
                }
            }
        }
    }
}
