Initial implementation of GossipIntentStore.

Create/update operations are implemented and working in 2-node cluster.
No remove operations yet.

Change-Id: Ief68c9d5c3bb483823b6f92d29fa83df0ab7b58f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
new file mode 100644
index 0000000..4f65979
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.intent.BatchWrite;
+import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentClockService;
+import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.intent.IntentState;
+import org.onosproject.net.intent.IntentStore;
+import org.onosproject.net.intent.IntentStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.impl.Timestamped;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.minPriority;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
+import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
+import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
+import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages inventory of Intents in a distributed data store that uses optimistic
+ * replication and gossip based techniques.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class GossipIntentStore
+        extends AbstractStore<IntentEvent, IntentStoreDelegate>
+        implements IntentStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private final ConcurrentMap<IntentId, Intent> intents =
+            new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
+            = new ConcurrentHashMap<>();
+
+    private final Set<IntentId> withdrawRequestedIntents
+            = Sets.newConcurrentHashSet();
+
+    private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
+            = new ConcurrentHashMap<>();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentClockService intentClockService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(DistributedStoreSerializers.STORE_COMMON)
+                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+                    .register(InternalIntentEvent.class)
+                    .register(InternalSetInstallablesEvent.class)
+                    //.register(InternalIntentAntiEntropyEvent.class)
+                    //.register(IntentAntiEntropyAdvertisement.class)
+                    .build();
+        }
+    };
+
+    private ExecutorService executor;
+
+    private ScheduledExecutorService backgroundExecutor;
+
+    // TODO: Make these anti-entropy params configurable
+    private long initialDelaySec = 5;
+    private long periodSec = 5;
+
+    @Activate
+    public void activate() {
+        clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
+                new InternalIntentCreateOrUpdateEventListener());
+        clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
+                                          new InternalIntentSetInstallablesListener());
+        clusterCommunicator.addSubscriber(
+                INTENT_ANTI_ENTROPY_ADVERTISEMENT,
+                new InternalIntentAntiEntropyAdvertisementListener());
+
+        executor = Executors.newCachedThreadPool(namedThreads("intent-fg-%d"));
+
+        backgroundExecutor =
+                newSingleThreadScheduledExecutor(minPriority(namedThreads("intent-bg-%d")));
+
+        // start anti-entropy thread
+        //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                    //initialDelaySec, periodSec, TimeUnit.SECONDS);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        executor.shutdownNow();
+        backgroundExecutor.shutdownNow();
+        try {
+            if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                log.error("Timeout during executor shutdown");
+            }
+        } catch (InterruptedException e) {
+            log.error("Error during executor shutdown", e);
+        }
+
+        intents.clear();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public long getIntentCount() {
+        return intents.size();
+    }
+
+    @Override
+    public Iterable<Intent> getIntents() {
+        // TODO don't actually need to copy intents, they are immutable
+        return ImmutableList.copyOf(intents.values());
+    }
+
+    @Override
+    public Intent getIntent(IntentId intentId) {
+        return intents.get(intentId);
+    }
+
+    @Override
+    public IntentState getIntentState(IntentId intentId) {
+        Timestamped<IntentState> state = intentStates.get(intentId);
+        if (state != null) {
+            return state.value();
+        }
+        return null;
+    }
+
+    @Override
+    public void setState(Intent intent, IntentState newState) {
+        // TODO implement
+    }
+
+    private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
+        switch (newState) {
+        case WITHDRAW_REQ:
+            withdrawRequestedIntents.add(intentId);
+            break;
+        case INSTALL_REQ:
+        case COMPILING:
+        case INSTALLING:
+        case INSTALLED:
+        case RECOMPILING:
+        case WITHDRAWING:
+        case WITHDRAWN:
+        case FAILED:
+            synchronized (intentStates) {
+                Timestamped<IntentState> existing = intentStates.get(intentId);
+                if (existing == null || !existing.isNewer(timestamp)) {
+                    intentStates.put(intentId, new Timestamped<>(newState, timestamp));
+                }
+            }
+            break;
+        default:
+            log.warn("Unknown intent state {}", newState);
+            break;
+        }
+
+        try {
+            // TODO make sure it's OK if the intent is null
+            return IntentEvent.getEvent(newState, intents.get(intentId));
+        } catch (IllegalArgumentException e) {
+            // Transient states can't be used for events, so don't send one
+            return null;
+        }
+    }
+
+    @Override
+    public void setInstallableIntents(IntentId intentId,
+                                      List<Intent> installableIntents) {
+        // TODO implement
+    }
+
+    private void setInstallableIntentsInternal(IntentId intentId,
+                                               List<Intent> installableIntents,
+                                               Timestamp timestamp) {
+        synchronized (installables) {
+            Timestamped<List<Intent>> existing = installables.get(intentId);
+            if (existing == null || !existing.isNewer(timestamp)) {
+                installables.put(intentId,
+                                 new Timestamped<>(installableIntents, timestamp));
+            }
+        }
+    }
+
+    @Override
+    public List<Intent> getInstallableIntents(IntentId intentId) {
+        Timestamped<List<Intent>> tInstallables = installables.get(intentId);
+        if (tInstallables != null) {
+            return tInstallables.value();
+        }
+        return null;
+    }
+
+    @Override
+    public void removeInstalledIntents(IntentId intentId) {
+        // TODO implement
+    }
+
+    @Override
+    public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
+
+        List<IntentEvent> events = Lists.newArrayList();
+        List<BatchWrite.Operation> failed = new ArrayList<>();
+
+        for (BatchWrite.Operation op : batch.operations()) {
+            switch (op.type()) {
+            case CREATE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "CREATE_INTENT takes 1 argument. %s", op);
+                Intent intent = op.arg(0);
+
+                events.add(createIntentInternal(intent));
+                notifyPeers(new InternalIntentEvent(
+                        intent.id(), intent, INSTALL_REQ, null));
+
+                break;
+            case REMOVE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INTENT takes 1 argument. %s", op);
+                IntentId intentId = (IntentId) op.arg(0);
+                // TODO implement
+
+                break;
+            case SET_STATE:
+                checkArgument(op.args().size() == 2,
+                              "SET_STATE takes 2 arguments. %s", op);
+                intent = op.arg(0);
+                IntentState newState = op.arg(1);
+
+                Timestamp timestamp = intentClockService.getTimestamp(
+                        intent.id());
+                IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
+                events.add(externalEvent);
+                notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
+
+                break;
+            case SET_INSTALLABLE:
+                checkArgument(op.args().size() == 2,
+                              "SET_INSTALLABLE takes 2 arguments. %s", op);
+                intentId = op.arg(0);
+                List<Intent> installableIntents = op.arg(1);
+
+                Timestamp timestamp1 = intentClockService.getTimestamp(intentId);
+                setInstallableIntentsInternal(
+                        intentId, installableIntents, timestamp1);
+
+                notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp1));
+
+                break;
+            case REMOVE_INSTALLED:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INSTALLED takes 1 argument. %s", op);
+                intentId = op.arg(0);
+                // TODO implement
+                break;
+            default:
+                log.warn("Unknown Operation encountered: {}", op);
+                failed.add(op);
+                break;
+            }
+        }
+
+        notifyDelegate(events);
+        return failed;
+    }
+
+    private IntentEvent createIntentInternal(Intent intent) {
+        Intent oldValue = intents.putIfAbsent(intent.id(), intent);
+        if (oldValue == null) {
+            return IntentEvent.getEvent(INSTALL_REQ, intent);
+        }
+
+        log.warn("Intent ID {} already in store, throwing new update away",
+                 intent.id());
+        return null;
+    }
+
+    private void notifyPeers(InternalIntentEvent event) {
+        try {
+            broadcastMessage(INTENT_UPDATED_MSG, event);
+        } catch (IOException e) {
+            // TODO this won't happen; remove from API
+            log.debug("IOException broadcasting update", e);
+        }
+    }
+
+    private void notifyPeers(InternalSetInstallablesEvent event) {
+        try {
+            broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
+        } catch (IOException e) {
+            // TODO this won't happen; remove from API
+            log.debug("IOException broadcasting update", e);
+        }
+    }
+
+    private void broadcastMessage(MessageSubject subject, Object event) throws
+            IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void unicastMessage(NodeId peer,
+                                MessageSubject subject,
+                                Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.unicast(message, peer);
+    }
+
+    private void notifyDelegateIfNotNull(IntentEvent event) {
+        if (event != null) {
+            notifyDelegate(event);
+        }
+    }
+
+    private final class InternalIntentCreateOrUpdateEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.debug("Received intent update event from peer: {}", message.sender());
+            InternalIntentEvent event = SERIALIZER.decode(message.payload());
+
+            IntentId intentId = event.intentId();
+            Intent intent = event.intent();
+            IntentState state = event.state();
+            Timestamp timestamp = event.timestamp();
+
+            executor.submit(() -> {
+                try {
+                    switch (state) {
+                    case INSTALL_REQ:
+                        notifyDelegateIfNotNull(createIntentInternal(intent));
+                        break;
+                    default:
+                        notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
+                        break;
+                    }
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling intent create or update", e);
+                }
+            });
+        }
+    }
+
+    private final class InternalIntentSetInstallablesListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.debug("Received intent set installables event from peer: {}", message.sender());
+            InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
+
+            IntentId intentId = event.intentId();
+            List<Intent> installables = event.installables();
+            Timestamp timestamp = event.timestamp();
+
+            executor.submit(() -> {
+                try {
+                    setInstallableIntentsInternal(intentId, installables, timestamp);
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling intent set installables", e);
+                }
+            });
+        }
+    }
+
+    private final class InternalIntentAntiEntropyAdvertisementListener
+            implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
+            // TODO implement
+            //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+            backgroundExecutor.submit(() -> {
+                try {
+                    log.debug("something");
+                    //handleAntiEntropyAdvertisement(advertisement);
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling intent advertisements", e);
+                }
+            });
+        }
+    }
+}
+