/*
 * Copyright 2015-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.meter.impl;

import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.math.RandomUtils;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.DriverHandler;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeter;
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterCellId;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterFeatures;
import org.onosproject.net.meter.MeterFeaturesFlag;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterScope;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.meter.MeterStore;
import org.onosproject.net.meter.MeterStoreDelegate;
import org.onosproject.net.meter.MeterStoreResult;
import org.onosproject.net.meter.MeterTableKey;
import org.onosproject.net.pi.model.PiMeterId;
import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicCounterMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
import static org.onosproject.net.meter.MeterStoreResult.Type.FAIL;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * A distributed meter store implementation. Meters are stored consistently
 * across the cluster.
 */
@Component(immediate = true, service = MeterStore.class)
public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
                    implements MeterStore {

    private Logger log = getLogger(getClass());

    // Meters map related objects
    private static final String METERSTORE = "onos-meter-store";
    private ConsistentMap<MeterKey, MeterData> meters;
    private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
    private Map<MeterKey, MeterData> metersMap;

    // Meters features related objects
    private static final String METERFEATURESSTORE = "onos-meter-features-store";
    private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
    private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
        new InternalFeaturesMapEventListener();

    // Meters id related objects
    private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
    protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
    private static final String METERIDSTORE = "onos-meters-id-store";
    private AtomicCounterMap<MeterTableKey> meterIdGenerators;

    private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
            .register(KryoNamespaces.API)
            .register(MeterKey.class)
            .register(MeterData.class)
            .register(DefaultMeter.class)
            .register(DefaultBand.class)
            .register(Band.Type.class)
            .register(MeterState.class)
            .register(Meter.Unit.class)
            .register(MeterFailReason.class)
            .register(MeterTableKey.class)
            .register(MeterFeatures.class)
            .register(DefaultMeterFeatures.class)
            .register(MeterFeaturesFlag.class)
            .register(MeterScope.class);
    private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected DriverService driverService;

    // Local cache to handle async ops through futures.
    private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
            Maps.newConcurrentMap();

    // Control the user defined index mode for the store.
    protected boolean userDefinedIndexMode = false;

    /**
     * Defines possible selection strategies to reuse meter ids.
     */
    enum ReuseStrategy {
        /**
         * Select randomly an available id.
         */
        RANDOM,
        /**
         * Select the first one.
         */
        FIRST_FIT
    }
    private ReuseStrategy reuseStrategy = FIRST_FIT;

    @Activate
    public void activate() {
        meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
                    .withName(METERSTORE)
                    .withSerializer(serializer).build();
        meters.addListener(metersMapListener);
        metersMap = meters.asJavaMap();

        metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
                .withName(METERFEATURESSTORE)
                .withTimestampProvider((key, features) -> new WallClockTimestamp())
                .withSerializer(APP_KRYO_BUILDER).build();
        metersFeatures.addListener(featuresMapListener);

        availableMeterIds = new ConcurrentHashMap<>();

        meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
                .withName(METERIDSTORE)
                .withSerializer(Serializer.using(KryoNamespaces.API,
                                                 MeterTableKey.class,
                                                 MeterScope.class)).build();

        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        meters.removeListener(metersMapListener);
        metersFeatures.removeListener(featuresMapListener);
        meters.destroy();
        metersFeatures.destroy();
        availableMeterIds.forEach((key, set) -> set.destroy());

        log.info("Stopped");
    }

    @Override
    public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
        checkArgument(validIndex(meter), "Meter index is not valid");
        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
        MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
        MeterData data = new MeterData(meter, null);
        futures.put(key, future);
        try {
            meters.compute(key, (k, v) -> data);
        } catch (StorageException e) {
            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                    e.getMessage(), e);
            futures.remove(key);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
        MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
        futures.put(key, future);
        // Update the state of the meter. It will be pruned by observing
        // that it has been removed from the dataplane.
        try {
            Versioned<MeterData> versionedData = meters.computeIfPresent(key, (k, v) -> {
                DefaultMeter m = (DefaultMeter) v.meter();
                MeterState meterState = m.state();
                if (meterState == MeterState.PENDING_REMOVE) {
                    return v;
                }
                m.setState(meter.state());
                return new MeterData(m, v.reason().isPresent() ? v.reason().get() : null);
            });
            // If it does not exist in the system, completes immediately
            if (versionedData == null) {
                futures.remove(key);
                future.complete(MeterStoreResult.success());
            }
        } catch (StorageException e) {
            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                    e.getMessage(), e);
            futures.remove(key);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
        // Store meter features, this is done once for each features of every device
        MeterStoreResult result = MeterStoreResult.success();
        MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
        try {
            metersFeatures.put(key, meterfeatures);
        } catch (StorageException e) {
            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                    e.getMessage(), e);
            result = MeterStoreResult.fail(TIMEOUT);
        }
        return result;
    }

    @Override
    public MeterStoreResult storeMeterFeatures(Collection<MeterFeatures> meterfeatures) {
        // These store operations is treated as one single operation
        // If one of them is failed, Fail is returned
        // But the failed operation will not block the rest.
        MeterStoreResult result = MeterStoreResult.success();
        for (MeterFeatures mf : meterfeatures) {
            if (storeMeterFeatures(mf).type() == FAIL) {
                result = MeterStoreResult.fail(TIMEOUT);
            }
        }
        return result;
    }

    @Override
    public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
        MeterStoreResult result = MeterStoreResult.success();
        try {
            Set<MeterTableKey> keys = metersFeatures.keySet().stream()
                                        .filter(key -> key.deviceId().equals(deviceId))
                                        .collect(Collectors.toUnmodifiableSet());
            keys.forEach(k -> metersFeatures.remove(k));
        } catch (StorageException e) {
            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                        e.getMessage(), e);
            result = MeterStoreResult.fail(TIMEOUT);
        }
        return result;
    }

    @Override
    public MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures) {
        // Same logic of storeMeterFeatures
        MeterStoreResult result = MeterStoreResult.success();
        for (MeterFeatures mf : meterfeatures) {
            try {
                MeterTableKey key = MeterTableKey.key(mf.deviceId(), mf.scope());
                metersFeatures.remove(key);
            } catch (StorageException e) {
                log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                        e.getMessage(), e);
                result = MeterStoreResult.fail(TIMEOUT);
            }
        }

        return result;
    }

    @Override
    public Meter updateMeterState(Meter meter) {
        // Update meter if present (stats workflow)
        MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
        Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
            DefaultMeter m = (DefaultMeter) v.meter();
            MeterState meterState = m.state();
            if (meterState == MeterState.PENDING_ADD) {
                m.setState(meter.state());
            }
            m.setProcessedPackets(meter.packetsSeen());
            m.setProcessedBytes(meter.bytesSeen());
            m.setLife(meter.life());
            // TODO: Prune if drops to zero.
            m.setReferenceCount(meter.referenceCount());
            return new MeterData(m, null);
        });
        return value != null ? value.value().meter() : null;
    }

    @Override
    public Meter getMeter(MeterKey key) {
        MeterData data = Versioned.valueOrElse(meters.get(key), null);
        return data == null ? null : data.meter();
    }

    @Override
    public Collection<Meter> getAllMeters() {
        return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
                                      MeterData::meter);
    }

    @Override
    public Collection<Meter> getAllMeters(DeviceId deviceId) {
        return Collections2.transform(
                Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
                        (MeterData m) -> m.meter().deviceId().equals(deviceId)),
                MeterData::meter);
    }

    @Override
    public Collection<Meter> getAllMeters(DeviceId deviceId, MeterScope scope) {
        if (scope.equals(MeterScope.globalScope())) {
            return Collections2.transform(
                    Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
                            (MeterData m) -> m.meter().meterCellId().type() == INDEX),
                    MeterData::meter);
        }
        return Collections2.transform(
                Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
                        (MeterData m) -> m.meter().meterCellId().type() == PIPELINE_INDEPENDENT &&
                                ((PiMeterCellId) m.meter().meterCellId()).meterId().id().equals(scope.id())),
                MeterData::meter);
    }

    @Override
    public void failedMeter(MeterOperation op, MeterFailReason reason) {
        // Meter ops failed (got notification from the sb)
        MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
        meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
    }

    @Override
    public void purgeMeter(Meter m) {
        // Once we receive the ack from the sb, create the key
        // remove definitely the meter and free the id
        MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
        try {
            if (Versioned.valueOrNull(meters.remove(key)) != null) {
                MeterScope scope;
                if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
                    PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
                    scope = MeterScope.of(piMeterCellId.meterId().id());
                } else {
                    scope = MeterScope.globalScope();
                }
                MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
                freeMeterId(meterTableKey, m.meterCellId());
            }
        } catch (StorageException e) {
            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                    e.getMessage(), e);
        }
    }

    @Override
    public void purgeMeters(DeviceId deviceId) {
        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                .map(Map.Entry::getValue)
                .collect(Collectors.toList());
        metersPendingRemove.forEach(versionedMeterKey
                -> purgeMeter(versionedMeterKey.value().meter()));
    }

    @Override
    public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
                        e.getValue().value().meter().appId().equals(appId))
                .map(Map.Entry::getValue)
                .collect(Collectors.toList());
        metersPendingRemove.forEach(versionedMeterKey
                -> purgeMeter(versionedMeterKey.value().meter()));
    }

    @Override
    public boolean userDefinedIndexMode(boolean enable) {
        if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
            userDefinedIndexMode = enable;
        } else {
            log.warn("Unable to {} user defined index mode as store did" +
                    "already some allocations", enable ? "activate" : "deactivate");
        }
        return userDefinedIndexMode;
    }

    protected long getMaxMeters(MeterTableKey key) {
        MeterFeatures features = metersFeatures.get(key);
        return features == null ? 0L : features.maxMeter();
    }

    // Validate index using the meter features, useful mainly
    // when user defined index mode is enabled
    private boolean validIndex(Meter meter) {
        long index;
        MeterTableKey key;

        if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
            PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
            index = piMeterCellId.index();
            key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
        } else if (meter.meterCellId().type() == INDEX) {
            MeterId meterId = (MeterId) meter.meterCellId();
            index = meterId.id();
            key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
        } else {
            log.warn("Unable to validate index unsupported cell type {}", meter.meterCellId().type());
            return false;
        }

        MeterFeatures features = metersFeatures.get(key);
        long startIndex = features == null ? -1L : features.startIndex();
        long endIndex = features == null ? -1L : features.endIndex();
        return index >= startIndex && index <= endIndex;
    }

    private long getStartIndex(MeterTableKey key) {
        MeterFeatures features = metersFeatures.get(key);
        return features == null ? -1L : features.startIndex();
    }

    private long getEndIndex(MeterTableKey key) {
        MeterFeatures features = metersFeatures.get(key);
        return features == null ? -1L : features.endIndex();
    }

    // queryMaxMeters is implemented in MeterQuery behaviour implementations.
    private long queryMaxMeters(DeviceId device) {
        DriverHandler handler = driverService.createHandler(device);
        if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
            return 0L;
        }

        // FIXME architecturally this is not right, we should fallback to this
        //  behavior in the providers. Once we do that we can remove this code.
        MeterQuery query = handler.behaviour(MeterQuery.class);
        // This results to be necessary because the available ids sets are created
        // in the meter features map listener if the device does not provide the meter
        // feature this is the only chance to create this set.
        String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
        MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
        insertAvailableKeySet(meterTableKey, setName);

        return query.getMaxMeters();
    }

    private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
                                              boolean available) {
        DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
        if (keySet == null) {
            log.warn("Reusable Key set for device: {} scope: {} not found",
                meterTableKey.deviceId(), meterTableKey.scope());
            return false;
        }

        // According to available, make available or unavailable a meter key
        DeviceId deviceId = meterTableKey.deviceId();
        return available ? keySet.add(MeterKey.key(deviceId, id)) :
                keySet.remove(MeterKey.key(deviceId, id));
    }

    private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
        if (availableIds.isEmpty()) {
            return null;
        }

        if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
            return availableIds.iterator().next();
        }

        // If it is random, get the size and return a random element
        int size = availableIds.size();
        return Iterables.get(availableIds, RandomUtils.nextInt(size));
    }

    // Implements reuse strategy of the meter cell ids
    private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
        DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
        if (keySet == null) {
            log.warn("Reusable Key set for device: {} scope: {} not found",
                meterTableKey.deviceId(), meterTableKey.scope());
            return null;
        }

        Set<MeterCellId> localAvailableMeterIds = keySet.stream()
                .filter(meterKey ->
                    meterKey.deviceId().equals(meterTableKey.deviceId()))
                .map(MeterKey::meterCellId)
                .collect(Collectors.toSet());
        MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
        while (meterId != null) {
            if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
                return meterId;
            }
            localAvailableMeterIds.remove(meterId);
            meterId = getNextAvailableId(localAvailableMeterIds);
        }
        // there are no available ids that can be reused
        return null;
    }

    @Override
    public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
        if (userDefinedIndexMode) {
            log.warn("Unable to allocate meter id when user defined index mode is enabled");
            return null;
        }
        MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
        MeterCellId meterCellId;
        long id;
        // First, search for reusable key
        meterCellId = firstReusableMeterId(meterTableKey);
        if (meterCellId != null) {
            return meterCellId;
        }
        // If there was no reusable meter id we have to generate a new value
        // using start and end index as lower and upper bound respectively.
        long startIndex = getStartIndex(meterTableKey);
        long endIndex = getEndIndex(meterTableKey);
        // If the device does not give us MeterFeatures fallback to queryMeters
        if (startIndex == -1L || endIndex == -1L) {
            // Only meaningful for OpenFlow today
            long maxMeters = queryMaxMeters(deviceId);
            if (maxMeters == 0L) {
                return null;
            } else {
                // OpenFlow meter index starts from 1, ends with max
                startIndex = 1L;
                endIndex = maxMeters;
            }
        }

        do {
            id = meterIdGenerators.getAndIncrement(meterTableKey);
        } while (id < startIndex);
        if (id > endIndex) {
            return null;
        }

        // For backward compatibility if we are using global scope,
        // return a MeterId, otherwise we create a PiMeterCellId
        if (meterScope.isGlobal()) {
            return MeterId.meterId(id);
        } else {
            return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
        }

    }

    @Override
    public void freeMeterId(DeviceId deviceId, MeterId meterId) {
        MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
        freeMeterId(meterTableKey, meterId);
    }

    protected void freeMeterId(DeviceId deviceId, MeterCellId meterCellId) {
        MeterTableKey meterTableKey;
        if (meterCellId.type() == PIPELINE_INDEPENDENT) {
            meterTableKey = MeterTableKey.key(deviceId,
                    MeterScope.of(((PiMeterCellId) meterCellId).meterId().id()));
        } else if (meterCellId.type() == INDEX) {
            meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
        } else {
            log.warn("Unable to free meter id unsupported cell type {}", meterCellId.type());
            return;
        }
        freeMeterId(meterTableKey, meterCellId);
    }

    protected void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
        if (userDefinedIndexMode) {
            log.debug("Unable to free meter id when user defined index mode is enabled");
            return;
        }
        long index;
        if (meterCellId.type() == PIPELINE_INDEPENDENT) {
            PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
            index = piMeterCellId.index();
        } else if (meterCellId.type() == INDEX) {
            MeterId meterId = (MeterId) meterCellId;
            index = meterId.id();
        } else {
            log.warn("Unable to free meter id unsupported cell type {}", meterCellId.type());
            return;
        }
        // Avoid to free meter not allocated
        if (meterIdGenerators.get(meterTableKey) <= index) {
            return;
        }
        updateMeterIdAvailability(meterTableKey, meterCellId, true);
    }

    // Enabling the events distribution across the cluster
    private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
        @Override
        public void event(MapEvent<MeterKey, MeterData> event) {
            MeterKey key = event.key();
            Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
            MeterData data = value.value();
            MeterData oldData = Versioned.valueOrNull(event.oldValue());
            switch (event.type()) {
                case INSERT:
                case UPDATE:
                        switch (data.meter().state()) {
                            case PENDING_ADD:
                            case PENDING_REMOVE:
                                // Two cases. If there is a reason, the meter operation failed.
                                // Otherwise, we are ready to install/remove through the delegate.
                                if (data.reason().isEmpty()) {
                                    notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
                                        MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
                                } else {
                                    futures.computeIfPresent(key, (k, v) -> {
                                        v.complete(MeterStoreResult.fail(data.reason().get()));
                                        return null;
                                    });
                                }
                                break;
                            case ADDED:
                                // Transition from pending to installed
                                if (data.meter().state() == MeterState.ADDED &&
                                        (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
                                    futures.computeIfPresent(key, (k, v) -> {
                                        v.complete(MeterStoreResult.success());
                                        return null;
                                    });
                                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
                                // Update stats case - we report reference count zero only for INDEX based meters
                                } else if (data.meter().referenceCount() == 0 &&
                                        data.meter().meterCellId().type() == INDEX) {
                                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
                                            data.meter()));
                                }
                                break;
                            default:
                                log.warn("Unknown meter state type {}", data.meter().state());
                        }
                    break;
                case REMOVE:
                    futures.computeIfPresent(key, (k, v) -> {
                        v.complete(MeterStoreResult.success());
                        return null;
                    });
                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
                    break;
                default:
                    log.warn("Unknown Map event type {}", event.type());
            }
        }
    }

    private class InternalFeaturesMapEventListener implements
        EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
        @Override
        public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
            MeterTableKey meterTableKey = event.key();
            MeterFeatures meterFeatures = event.value();
            switch (event.type()) {
                case PUT:
                    String setName = AVAILABLEMETERIDSTORE + "-" +
                        meterFeatures.deviceId() + meterFeatures.scope().id();
                    insertAvailableKeySet(meterTableKey, setName);
                    break;
                case REMOVE:
                    DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
                    if (set != null) {
                        set.destroy();
                    }
                    break;
                default:
                    break;
            }
        }
    }

    private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
        DistributedSet<MeterKey> availableMeterIdSet =
            new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
                .withName(setName)
                .withSerializer(Serializer.using(KryoNamespaces.API,
                                                MeterKey.class)).build(),
                DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
        availableMeterIds.put(meterTableKey, availableMeterIdSet);
    }
}
