/*
 * Copyright 2014 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onlab.onos.store.device.impl;

import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Device.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.packet.ChassisId;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
import static org.onlab.util.Tools.namedThreads;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;

// TODO: give me a better name
/**
 * Manages inventory of infrastructure devices using gossip protocol to distribute
 * information.
 */
@Component(immediate = true)
@Service
public class GossipDeviceStore
        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
        implements DeviceStore {

    private final Logger log = getLogger(getClass());

    private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";

    // innerMap is used to lock a Device, thus instance should never be replaced.
    // collection of Description given from various providers
    private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
                                deviceDescs = Maps.newConcurrentMap();

    // cache of Device and Ports generated by compositing descriptions from providers
    private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();

    // to be updated under Device lock
    private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
    private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();

    // available(=UP) devices
    private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceClockService deviceClockService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
        @Override
        protected void setupKryoPool() {
            serializerPool = KryoNamespace.newBuilder()
                    .register(DistributedStoreSerializers.COMMON)

                    .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
                    .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
                    .register(InternalDeviceRemovedEvent.class)
                    .register(InternalPortEvent.class, new InternalPortEventSerializer())
                    .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
                    .register(DeviceAntiEntropyAdvertisement.class)
                    .register(DeviceFragmentId.class)
                    .register(PortFragmentId.class)
                    .build()
                    .populate(1);
        }
    };

    private ScheduledExecutorService executor;

    @Activate
    public void activate() {
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
        clusterCommunicator.addSubscriber(
                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());

        executor =
                newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));

        // TODO: Make these configurable
        long initialDelaySec = 5;
        long periodSec = 5;
        // start anti-entropy thread
        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
                    initialDelaySec, periodSec, TimeUnit.SECONDS);

        log.info("Started");
    }

    @Deactivate
    public void deactivate() {

        executor.shutdownNow();
        try {
            boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
            if (timedout) {
                log.error("Timeout during executor shutdown");
            }
        } catch (InterruptedException e) {
            log.error("Error during executor shutdown", e);
        }

        deviceDescs.clear();
        devices.clear();
        devicePorts.clear();
        availableDevices.clear();
        log.info("Stopped");
    }

    @Override
    public int getDeviceCount() {
        return devices.size();
    }

    @Override
    public Iterable<Device> getDevices() {
        return Collections.unmodifiableCollection(devices.values());
    }

    @Override
    public Device getDevice(DeviceId deviceId) {
        return devices.get(deviceId);
    }

    @Override
    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                     DeviceId deviceId,
                                     DeviceDescription deviceDescription) {
        final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
        final DeviceEvent event;
        final Timestamped<DeviceDescription> mergedDesc;
        synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
            event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
            mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
        }
        if (event != null) {
            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
                providerId, deviceId);
            try {
                notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
            } catch (IOException e) {
                log.error("Failed to notify peers of a device update topology event for providerId: "
                        + providerId + " and deviceId: " + deviceId, e);
            }
        }
        return event;
    }

    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
                                    DeviceId deviceId,
                                    Timestamped<DeviceDescription> deltaDesc) {

        // Collection of DeviceDescriptions for a Device
        Map<ProviderId, DeviceDescriptions> providerDescs
            = getOrCreateDeviceDescriptionsMap(deviceId);

        synchronized (providerDescs) {
            // locking per device

            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
                log.debug("Ignoring outdated event: {}", deltaDesc);
                return null;
            }

            DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);

            final Device oldDevice = devices.get(deviceId);
            final Device newDevice;

            if (deltaDesc == descs.getDeviceDesc() ||
                deltaDesc.isNewer(descs.getDeviceDesc())) {
                // on new device or valid update
                descs.putDeviceDesc(deltaDesc);
                newDevice = composeDevice(deviceId, providerDescs);
            } else {
                // outdated event, ignored.
                return null;
            }
            if (oldDevice == null) {
                // ADD
                return createDevice(providerId, newDevice, deltaDesc.timestamp());
            } else {
                // UPDATE or ignore (no change or stale)
                return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
            }
        }
    }

    // Creates the device and returns the appropriate event if necessary.
    // Guarded by deviceDescs value (=Device lock)
    private DeviceEvent createDevice(ProviderId providerId,
                                     Device newDevice, Timestamp timestamp) {

        // update composed device cache
        Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
        verify(oldDevice == null,
                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
                providerId, oldDevice, newDevice);

        if (!providerId.isAncillary()) {
            markOnline(newDevice.id(), timestamp);
        }

        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
    }

    // Updates the device and returns the appropriate event if necessary.
    // Guarded by deviceDescs value (=Device lock)
    private DeviceEvent updateDevice(ProviderId providerId,
                                     Device oldDevice,
                                     Device newDevice, Timestamp newTimestamp) {
        // We allow only certain attributes to trigger update
        boolean propertiesChanged =
                !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
                        !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
        boolean annotationsChanged =
                !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());

        // Primary providers can respond to all changes, but ancillary ones
        // should respond only to annotation changes.
        if ((providerId.isAncillary() && annotationsChanged) ||
                (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
            boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
            if (!replaced) {
                verify(replaced,
                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
                        providerId, oldDevice, devices.get(newDevice.id())
                        , newDevice);
            }
            if (!providerId.isAncillary()) {
                markOnline(newDevice.id(), newTimestamp);
            }
            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
        }

        // Otherwise merely attempt to change availability if primary provider
        if (!providerId.isAncillary()) {
            boolean added = markOnline(newDevice.id(), newTimestamp);
            return !added ? null :
                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
        }
        return null;
    }

    @Override
    public DeviceEvent markOffline(DeviceId deviceId) {
        final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
        final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
        if (event != null) {
            log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
                    deviceId, timestamp);
            try {
                notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
            } catch (IOException e) {
                log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
                     deviceId);
            }
        }
        return event;
    }

    private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {

        Map<ProviderId, DeviceDescriptions> providerDescs
            = getOrCreateDeviceDescriptionsMap(deviceId);

        // locking device
        synchronized (providerDescs) {

            // accept off-line if given timestamp is newer than
            // the latest Timestamp from Primary provider
            DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
            if (timestamp.compareTo(lastTimestamp) <= 0) {
                // outdated event ignore
                return null;
            }

            offline.put(deviceId, timestamp);

            Device device = devices.get(deviceId);
            if (device == null) {
                return null;
            }
            boolean removed = availableDevices.remove(deviceId);
            if (removed) {
                // TODO: broadcast ... DOWN only?
                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
            }
            return null;
        }
    }

    /**
     * Marks the device as available if the given timestamp is not outdated,
     * compared to the time the device has been marked offline.
     *
     * @param deviceId identifier of the device
     * @param timestamp of the event triggering this change.
     * @return true if availability change request was accepted and changed the state
     */
    // Guarded by deviceDescs value (=Device lock)
    private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
        // accept on-line if given timestamp is newer than
        // the latest offline request Timestamp
        Timestamp offlineTimestamp = offline.get(deviceId);
        if (offlineTimestamp == null ||
            offlineTimestamp.compareTo(timestamp) < 0) {

            offline.remove(deviceId);
            return availableDevices.add(deviceId);
        }
        return false;
    }

    @Override
    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
                                       DeviceId deviceId,
                                       List<PortDescription> portDescriptions) {

        final Timestamp newTimestamp;
        try {
            newTimestamp = deviceClockService.getTimestamp(deviceId);
        } catch (IllegalStateException e) {
            log.info("Timestamp was not available for device {}", deviceId);
            log.debug("  discarding {}", portDescriptions);
            // Failed to generate timestamp.

            // Possible situation:
            //  Device connected and became master for short period of time,
            // but lost mastership before this instance had the chance to
            // retrieve term information.

            // Information dropped here is expected to be recoverable by
            // device probing after mastership change

            return Collections.emptyList();
        }
        log.info("timestamp for {} {}", deviceId, newTimestamp);

        final Timestamped<List<PortDescription>> timestampedInput
                = new Timestamped<>(portDescriptions, newTimestamp);
        final List<DeviceEvent> events;
        final Timestamped<List<PortDescription>> merged;

        synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
            events = updatePortsInternal(providerId, deviceId, timestampedInput);
            final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
            List<PortDescription> mergedList =
                    FluentIterable.from(portDescriptions)
                .transform(new Function<PortDescription, PortDescription>() {
                    @Override
                    public PortDescription apply(PortDescription input) {
                        // lookup merged port description
                        return descs.getPortDesc(input.portNumber()).value();
                    }
                }).toList();
            merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
        }
        if (!events.isEmpty()) {
            log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
                    providerId, deviceId);
            try {
                notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
            } catch (IOException e) {
                log.error("Failed to notify peers of a port update topology event or providerId: "
                    + providerId + " and deviceId: " + deviceId, e);
            }
        }
        return events;
    }

    private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
                                DeviceId deviceId,
                                Timestamped<List<PortDescription>> portDescriptions) {

        Device device = devices.get(deviceId);
        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);

        Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);

        List<DeviceEvent> events = new ArrayList<>();
        synchronized (descsMap) {

            if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
                log.debug("Ignoring outdated events: {}", portDescriptions);
                return null;
            }

            DeviceDescriptions descs = descsMap.get(providerId);
            // every provider must provide DeviceDescription.
            checkArgument(descs != null,
                    "Device description for Device ID %s from Provider %s was not found",
                    deviceId, providerId);

            Map<PortNumber, Port> ports = getPortMap(deviceId);

            final Timestamp newTimestamp = portDescriptions.timestamp();

            // Add new ports
            Set<PortNumber> processed = new HashSet<>();
            for (PortDescription portDescription : portDescriptions.value()) {
                final PortNumber number = portDescription.portNumber();
                processed.add(number);

                final Port oldPort = ports.get(number);
                final Port newPort;


                final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
                if (existingPortDesc == null ||
                    newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
                    // on new port or valid update
                    // update description
                    descs.putPortDesc(new Timestamped<>(portDescription,
                                            portDescriptions.timestamp()));
                    newPort = composePort(device, number, descsMap);
                } else {
                    // outdated event, ignored.
                    continue;
                }

                events.add(oldPort == null ?
                                   createPort(device, newPort, ports) :
                                   updatePort(device, oldPort, newPort, ports));
            }

            events.addAll(pruneOldPorts(device, ports, processed));
        }
        return FluentIterable.from(events).filter(notNull()).toList();
    }

    // Creates a new port based on the port description adds it to the map and
    // Returns corresponding event.
    // Guarded by deviceDescs value (=Device lock)
    private DeviceEvent createPort(Device device, Port newPort,
                                   Map<PortNumber, Port> ports) {
        ports.put(newPort.number(), newPort);
        return new DeviceEvent(PORT_ADDED, device, newPort);
    }

    // Checks if the specified port requires update and if so, it replaces the
    // existing entry in the map and returns corresponding event.
    // Guarded by deviceDescs value (=Device lock)
    private DeviceEvent updatePort(Device device, Port oldPort,
                                   Port newPort,
                                   Map<PortNumber, Port> ports) {
        if (oldPort.isEnabled() != newPort.isEnabled() ||
            !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {

            ports.put(oldPort.number(), newPort);
            return new DeviceEvent(PORT_UPDATED, device, newPort);
        }
        return null;
    }

    // Prunes the specified list of ports based on which ports are in the
    // processed list and returns list of corresponding events.
    // Guarded by deviceDescs value (=Device lock)
    private List<DeviceEvent> pruneOldPorts(Device device,
                                            Map<PortNumber, Port> ports,
                                            Set<PortNumber> processed) {
        List<DeviceEvent> events = new ArrayList<>();
        Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
        while (iterator.hasNext()) {
            Entry<PortNumber, Port> e = iterator.next();
            PortNumber portNumber = e.getKey();
            if (!processed.contains(portNumber)) {
                events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
                iterator.remove();
            }
        }
        return events;
    }

    // Gets the map of ports for the specified device; if one does not already
    // exist, it creates and registers a new one.
    private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
        return createIfAbsentUnchecked(devicePorts, deviceId,
                NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
    }

    private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
            DeviceId deviceId) {
        Map<ProviderId, DeviceDescriptions> r;
        r = deviceDescs.get(deviceId);
        if (r == null) {
            r = new HashMap<ProviderId, DeviceDescriptions>();
            final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
            concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
            if (concurrentlyAdded != null) {
                r = concurrentlyAdded;
            }
        }
        return r;
    }

    // Guarded by deviceDescs value (=Device lock)
    private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
            Map<ProviderId, DeviceDescriptions> device,
            ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {

        synchronized (device) {
            DeviceDescriptions r = device.get(providerId);
            if (r == null) {
                r = new DeviceDescriptions(deltaDesc);
                device.put(providerId, r);
            }
            return r;
        }
    }

    @Override
    public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
                                                     DeviceId deviceId,
                                                     PortDescription portDescription) {

        final Timestamp newTimestamp;
        try {
            newTimestamp = deviceClockService.getTimestamp(deviceId);
        } catch (IllegalStateException e) {
            log.info("Timestamp was not available for device {}", deviceId);
            log.debug("  discarding {}", portDescription);
            // Failed to generate timestamp. Ignoring.
            // See updatePorts comment
            return null;
        }
        final Timestamped<PortDescription> deltaDesc
            = new Timestamped<>(portDescription, newTimestamp);
        final DeviceEvent event;
        final Timestamped<PortDescription> mergedDesc;
        synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
            event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
            mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
                            .getPortDesc(portDescription.portNumber());
        }
        if (event != null) {
            log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
                        providerId, deviceId);
            try {
                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
            } catch (IOException e) {
                log.error("Failed to notify peers of a port status update topology event or providerId: "
                        + providerId + " and deviceId: " + deviceId, e);
            }
        }
        return event;
    }

    private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
                Timestamped<PortDescription> deltaDesc) {

        Device device = devices.get(deviceId);
        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);

        Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);

        synchronized (descsMap) {

            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
                log.debug("Ignoring outdated event: {}", deltaDesc);
                return null;
            }

            DeviceDescriptions descs = descsMap.get(providerId);
            // assuming all providers must to give DeviceDescription
            verify(descs != null,
                    "Device description for Device ID %s from Provider %s was not found",
                    deviceId, providerId);

            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
            final PortNumber number = deltaDesc.value().portNumber();
            final Port oldPort = ports.get(number);
            final Port newPort;

            final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
            if (existingPortDesc == null ||
                deltaDesc.isNewer(existingPortDesc)) {
                // on new port or valid update
                // update description
                descs.putPortDesc(deltaDesc);
                newPort = composePort(device, number, descsMap);
            } else {
                // same or outdated event, ignored.
                log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
                return null;
            }

            if (oldPort == null) {
                return createPort(device, newPort, ports);
            } else {
                return updatePort(device, oldPort, newPort, ports);
            }
        }
    }

    @Override
    public List<Port> getPorts(DeviceId deviceId) {
        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
        if (ports == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf(ports.values());
    }

    @Override
    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
        return ports == null ? null : ports.get(portNumber);
    }

    @Override
    public boolean isAvailable(DeviceId deviceId) {
        return availableDevices.contains(deviceId);
    }

    @Override
    public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
        final NodeId master = mastershipService.getMasterFor(deviceId);
        if (!clusterService.getLocalNode().id().equals(master)) {
            log.info("Removal of device {} requested on non master node", deviceId);
            // FIXME silently ignoring. Should be forwarding or broadcasting to
            // master.
            return null;
        }

        Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
        DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
        if (event != null) {
            log.info("Notifying peers of a device removed topology event for deviceId: {}",
                    deviceId);
            try {
                notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
            } catch (IOException e) {
                log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
                     deviceId);
            }
        }
        return event;
    }

    private DeviceEvent removeDeviceInternal(DeviceId deviceId,
                                             Timestamp timestamp) {

        Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
        synchronized (descs) {
            // accept removal request if given timestamp is newer than
            // the latest Timestamp from Primary provider
            DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
            if (timestamp.compareTo(lastTimestamp) <= 0) {
                // outdated event ignore
                return null;
            }
            removalRequest.put(deviceId, timestamp);

            Device device = devices.remove(deviceId);
            // should DEVICE_REMOVED carry removed ports?
            Map<PortNumber, Port> ports = devicePorts.get(deviceId);
            if (ports != null) {
                ports.clear();
            }
            markOfflineInternal(deviceId, timestamp);
            descs.clear();
            return device == null ? null :
                new DeviceEvent(DEVICE_REMOVED, device, null);
        }
    }

    /**
     * Checks if given timestamp is superseded by removal request
     * with more recent timestamp.
     *
     * @param deviceId identifier of a device
     * @param timestampToCheck timestamp of an event to check
     * @return true if device is already removed
     */
    private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
        Timestamp removalTimestamp = removalRequest.get(deviceId);
        if (removalTimestamp != null &&
            removalTimestamp.compareTo(timestampToCheck) >= 0) {
            // removalRequest is more recent
            return true;
        }
        return false;
    }

    /**
     * Returns a Device, merging description given from multiple Providers.
     *
     * @param deviceId device identifier
     * @param providerDescs Collection of Descriptions from multiple providers
     * @return Device instance
     */
    private Device composeDevice(DeviceId deviceId,
            Map<ProviderId, DeviceDescriptions> providerDescs) {

        checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");

        ProviderId primary = pickPrimaryPID(providerDescs);

        DeviceDescriptions desc = providerDescs.get(primary);

        final DeviceDescription base = desc.getDeviceDesc().value();
        Type type = base.type();
        String manufacturer = base.manufacturer();
        String hwVersion = base.hwVersion();
        String swVersion = base.swVersion();
        String serialNumber = base.serialNumber();
        ChassisId chassisId = base.chassisId();
        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
        annotations = merge(annotations, base.annotations());

        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
            if (e.getKey().equals(primary)) {
                continue;
            }
            // TODO: should keep track of Description timestamp
            // and only merge conflicting keys when timestamp is newer.
            // Currently assuming there will never be a key conflict between
            // providers

            // annotation merging. not so efficient, should revisit later
            annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
        }

        return new DefaultDevice(primary, deviceId , type, manufacturer,
                            hwVersion, swVersion, serialNumber,
                            chassisId, annotations);
    }

    /**
     * Returns a Port, merging description given from multiple Providers.
     *
     * @param device device the port is on
     * @param number port number
     * @param descsMap Collection of Descriptions from multiple providers
     * @return Port instance
     */
    private Port composePort(Device device, PortNumber number,
                Map<ProviderId, DeviceDescriptions> descsMap) {

        ProviderId primary = pickPrimaryPID(descsMap);
        DeviceDescriptions primDescs = descsMap.get(primary);
        // if no primary, assume not enabled
        // TODO: revisit this default port enabled/disabled behavior
        boolean isEnabled = false;
        DefaultAnnotations annotations = DefaultAnnotations.builder().build();

        final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
        if (portDesc != null) {
            isEnabled = portDesc.value().isEnabled();
            annotations = merge(annotations, portDesc.value().annotations());
        }

        for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
            if (e.getKey().equals(primary)) {
                continue;
            }
            // TODO: should keep track of Description timestamp
            // and only merge conflicting keys when timestamp is newer.
            // Currently assuming there will never be a key conflict between
            // providers

            // annotation merging. not so efficient, should revisit later
            final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
            if (otherPortDesc != null) {
                annotations = merge(annotations, otherPortDesc.value().annotations());
            }
        }

        return new DefaultPort(device, number, isEnabled, annotations);
    }

    /**
     * @return primary ProviderID, or randomly chosen one if none exists
     */
    private ProviderId pickPrimaryPID(
            Map<ProviderId, DeviceDescriptions> providerDescs) {
        ProviderId fallBackPrimary = null;
        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
            if (!e.getKey().isAncillary()) {
                return e.getKey();
            } else if (fallBackPrimary == null) {
                // pick randomly as a fallback in case there is no primary
                fallBackPrimary = e.getKey();
            }
        }
        return fallBackPrimary;
    }

    private DeviceDescriptions getPrimaryDescriptions(
                            Map<ProviderId, DeviceDescriptions> providerDescs) {
        ProviderId pid = pickPrimaryPID(providerDescs);
        return providerDescs.get(pid);
    }

    // TODO: should we be throwing exception?
    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
        ClusterMessage message = new ClusterMessage(
                clusterService.getLocalNode().id(),
                subject,
                SERIALIZER.encode(event));
        clusterCommunicator.unicast(message, recipient);
    }

    // TODO: should we be throwing exception?
    private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
        ClusterMessage message = new ClusterMessage(
                clusterService.getLocalNode().id(),
                subject,
                SERIALIZER.encode(event));
        clusterCommunicator.broadcast(message);
    }

    private void notifyPeers(InternalDeviceEvent event) throws IOException {
        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
    }

    private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
    }

    private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
    }

    private void notifyPeers(InternalPortEvent event) throws IOException {
        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
    }

    private void notifyPeers(InternalPortStatusEvent event) throws IOException {
        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
    }

    private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
        try {
            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
        } catch (IOException e) {
            log.error("Failed to send" + event + " to " + recipient, e);
        }
    }

    private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
        try {
            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
        } catch (IOException e) {
            log.error("Failed to send" + event + " to " + recipient, e);
        }
    }

    private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
        try {
            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
        } catch (IOException e) {
            log.error("Failed to send" + event + " to " + recipient, e);
        }
    }

    private void notifyPeer(NodeId recipient, InternalPortEvent event) {
        try {
            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
        } catch (IOException e) {
            log.error("Failed to send" + event + " to " + recipient, e);
        }
    }

    private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
        try {
            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
        } catch (IOException e) {
            log.error("Failed to send" + event + " to " + recipient, e);
        }
    }

    private DeviceAntiEntropyAdvertisement createAdvertisement() {
        final NodeId self = clusterService.getLocalNode().id();

        final int numDevices = deviceDescs.size();
        Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
        final int portsPerDevice = 8; // random factor to minimize reallocation
        Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
        Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);

        for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
            provs : deviceDescs.entrySet()) {

            // for each Device...
            final DeviceId deviceId = provs.getKey();
            final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
            synchronized (devDescs) {

                // send device offline timestamp
                Timestamp lOffline = this.offline.get(deviceId);
                if (lOffline != null) {
                    adOffline.put(deviceId, lOffline);
                }

                for (Entry<ProviderId, DeviceDescriptions>
                        prov : devDescs.entrySet()) {

                    // for each Provider Descriptions...
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions descs = prov.getValue();

                    adDevices.put(new DeviceFragmentId(deviceId, provId),
                            descs.getDeviceDesc().timestamp());

                    for (Entry<PortNumber, Timestamped<PortDescription>>
                            portDesc : descs.getPortDescs().entrySet()) {

                        final PortNumber number = portDesc.getKey();
                        adPorts.put(new PortFragmentId(deviceId, provId, number),
                                portDesc.getValue().timestamp());
                    }
                }
            }
        }

        return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
    }

    /**
     * Responds to anti-entropy advertisement message.
     * <P>
     * Notify sender about out-dated information using regular replication message.
     * Send back advertisement to sender if not in sync.
     *
     * @param advertisement to respond to
     */
    private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {

        final NodeId sender = advertisement.sender();

        Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
        Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
        Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());

        // Fragments to request
        Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
        Collection<PortFragmentId> reqPorts = new ArrayList<>();

        for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
            final DeviceId deviceId = de.getKey();
            final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();

            synchronized (lDevice) {
                // latestTimestamp across provider
                // Note: can be null initially
                Timestamp localLatest = offline.get(deviceId);

                // handle device Ads
                for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions lDeviceDescs = prov.getValue();

                    final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);


                    Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
                    Timestamp advDevTimestamp = devAds.get(devFragId);

                    if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
                        // remote does not have it or outdated, suggest
                        notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
                    } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
                        // local is outdated, request
                        reqDevices.add(devFragId);
                    }

                    // handle port Ads
                    for (Entry<PortNumber, Timestamped<PortDescription>>
                            pe : lDeviceDescs.getPortDescs().entrySet()) {

                        final PortNumber num = pe.getKey();
                        final Timestamped<PortDescription> lPort = pe.getValue();

                        final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);

                        Timestamp advPortTimestamp = portAds.get(portFragId);
                        if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
                            // remote does not have it or outdated, suggest
                            notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
                        } else if (!lPort.timestamp().equals(advPortTimestamp)) {
                            // local is outdated, request
                            log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
                            reqPorts.add(portFragId);
                        }

                        // remove port Ad already processed
                        portAds.remove(portFragId);
                    } // end local port loop

                    // remove device Ad already processed
                    devAds.remove(devFragId);

                    // find latest and update
                    final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
                    if (localLatest == null ||
                        providerLatest.compareTo(localLatest) > 0) {
                        localLatest = providerLatest;
                    }
                } // end local provider loop

                // checking if remote timestamp is more recent.
                Timestamp rOffline = offlineAds.get(deviceId);
                if (rOffline != null &&
                    rOffline.compareTo(localLatest) > 0) {
                    // remote offline timestamp suggests that the
                    // device is off-line
                    markOfflineInternal(deviceId, rOffline);
                }

                Timestamp lOffline = offline.get(deviceId);
                if (lOffline != null && rOffline == null) {
                    // locally offline, but remote is online, suggest offline
                    notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
                }

                // remove device offline Ad already processed
                offlineAds.remove(deviceId);
            } // end local device loop
        } // device lock

        // If there is any Ads left, request them
        log.trace("Ads left {}, {}", devAds, portAds);
        reqDevices.addAll(devAds.keySet());
        reqPorts.addAll(portAds.keySet());

        if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
            log.trace("Nothing to request to remote peer {}", sender);
            return;
        }

        log.info("Need to sync {} {}", reqDevices, reqPorts);

        // 2-way Anti-Entropy for now
        try {
            unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
        } catch (IOException e) {
            log.error("Failed to send response advertisement to " + sender, e);
        }

// Sketch of 3-way Anti-Entropy
//        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
//        ClusterMessage message = new ClusterMessage(
//                clusterService.getLocalNode().id(),
//                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
//                SERIALIZER.encode(request));
//
//        try {
//            clusterCommunicator.unicast(message, advertisement.sender());
//        } catch (IOException e) {
//            log.error("Failed to send advertisement reply to "
//                      + advertisement.sender(), e);
//        }
    }

    private void notifyDelegateIfNotNull(DeviceEvent event) {
        if (event != null) {
            notifyDelegate(event);
        }
    }

    private final class SendAdvertisementTask implements Runnable {

        @Override
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                log.info("Interrupted, quitting");
                return;
            }

            try {
                final NodeId self = clusterService.getLocalNode().id();
                Set<ControllerNode> nodes = clusterService.getNodes();

                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
                        .transform(toNodeId())
                        .toList();

                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
                    log.debug("No other peers in the cluster.");
                    return;
                }

                NodeId peer;
                do {
                    int idx = RandomUtils.nextInt(0, nodeIds.size());
                    peer = nodeIds.get(idx);
                } while (peer.equals(self));

                DeviceAntiEntropyAdvertisement ad = createAdvertisement();

                if (Thread.currentThread().isInterrupted()) {
                    log.info("Interrupted, quitting");
                    return;
                }

                try {
                    unicastMessage(peer, DEVICE_ADVERTISE, ad);
                } catch (IOException e) {
                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
                    return;
                }
            } catch (Exception e) {
                // catch all Exception to avoid Scheduled task being suppressed.
                log.error("Exception thrown while sending advertisement", e);
            }
        }
    }

    private class InternalDeviceEventListener implements ClusterMessageHandler {
        @Override
        public void handle(ClusterMessage message) {

            log.info("Received device update event from peer: {}", message.sender());
            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());

            ProviderId providerId = event.providerId();
            DeviceId deviceId = event.deviceId();
            Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();

            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
        }
    }

    private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
        @Override
        public void handle(ClusterMessage message) {

            log.info("Received device offline event from peer: {}", message.sender());
            InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());

            DeviceId deviceId = event.deviceId();
            Timestamp timestamp = event.timestamp();

            notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
        }
    }

    private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
        @Override
        public void handle(ClusterMessage message) {

            log.info("Received device removed event from peer: {}", message.sender());
            InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());

            DeviceId deviceId = event.deviceId();
            Timestamp timestamp = event.timestamp();

            notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
        }
    }

    private class InternalPortEventListener implements ClusterMessageHandler {
        @Override
        public void handle(ClusterMessage message) {

            log.info("Received port update event from peer: {}", message.sender());
            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());

            ProviderId providerId = event.providerId();
            DeviceId deviceId = event.deviceId();
            Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();

            notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
        }
    }

    private class InternalPortStatusEventListener implements ClusterMessageHandler {
        @Override
        public void handle(ClusterMessage message) {

            log.info("Received port status update event from peer: {}", message.sender());
            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
            log.info("{}", event);

            ProviderId providerId = event.providerId();
            DeviceId deviceId = event.deviceId();
            Timestamped<PortDescription> portDescription = event.portDescription();

            notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
        }
    }

    private final class InternalDeviceAdvertisementListener
        implements ClusterMessageHandler {

        @Override
        public void handle(ClusterMessage message) {
            log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
            DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
            handleAdvertisement(advertisement);
        }
    }
}
