[ONOS-7732] Automating switch workflow: api, app, and sample workflows

Change-Id: Iee87d4fe6cf61c1f8904d1d77df5f913a712b64a
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java
new file mode 100644
index 0000000..ca8a454
--- /dev/null
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.workflow.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BaseJsonNode;
+import com.fasterxml.jackson.databind.node.BigIntegerNode;
+import com.fasterxml.jackson.databind.node.BinaryNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeCreator;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.POJONode;
+import com.fasterxml.jackson.databind.node.ShortNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.workflow.api.DataModelTree;
+import org.onosproject.workflow.api.DefaultWorkplace;
+import org.onosproject.workflow.api.DefaultWorkflowContext;
+import org.onosproject.workflow.api.JsonDataModelTree;
+import org.onosproject.workflow.api.SystemWorkflowContext;
+import org.onosproject.workflow.api.WorkflowContext;
+import org.onosproject.workflow.api.WorkflowData;
+import org.onosproject.workflow.api.WorkflowState;
+import org.onosproject.workflow.api.Workplace;
+import org.onosproject.workflow.api.WorkflowDataEvent;
+import org.onosproject.workflow.api.WorkplaceStore;
+import org.onosproject.workflow.api.WorkplaceStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+@Service
+public class DistributedWorkplaceStore
+    extends AbstractStore<WorkflowDataEvent, WorkplaceStoreDelegate> implements WorkplaceStore {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ApplicationId appId;
+    private final Logger log = getLogger(getClass());
+
+    private final WorkplaceMapListener workplaceMapEventListener = new WorkplaceMapListener();
+    private ConsistentMap<String, WorkflowData> workplaceMap;
+    private Map<String, Workplace> localWorkplaceMap = Maps.newConcurrentMap();
+
+    private final WorkflowContextMapListener contextMapEventListener = new WorkflowContextMapListener();
+    private ConsistentMap<String, WorkflowData> contextMap;
+    private Map<String, WorkflowContext> localContextMap = Maps.newConcurrentMap();
+
+    private Map<String, Map<String, WorkflowContext>> localWorkplaceMemberMap = Maps.newConcurrentMap();
+
+    @Activate
+    public void activate() {
+
+        appId = coreService.registerApplication("org.onosproject.workplacestore");
+        log.info("appId=" + appId);
+
+        KryoNamespace workplaceNamespace = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(WorkflowData.class)
+                .register(Workplace.class)
+                .register(DefaultWorkplace.class)
+                .register(WorkflowContext.class)
+                .register(DefaultWorkflowContext.class)
+                .register(SystemWorkflowContext.class)
+                .register(WorkflowState.class)
+                .register(DataModelTree.class)
+                .register(JsonDataModelTree.class)
+                .register(List.class)
+                .register(ArrayList.class)
+                .register(JsonNode.class)
+                .register(ObjectNode.class)
+                .register(TextNode.class)
+                .register(LinkedHashMap.class)
+                .register(ArrayNode.class)
+                .register(BaseJsonNode.class)
+                .register(BigIntegerNode.class)
+                .register(BinaryNode.class)
+                .register(BooleanNode.class)
+                .register(ContainerNode.class)
+                .register(DecimalNode.class)
+                .register(DoubleNode.class)
+                .register(FloatNode.class)
+                .register(IntNode.class)
+                .register(JsonNodeType.class)
+                .register(LongNode.class)
+                .register(MissingNode.class)
+                .register(NullNode.class)
+                .register(NumericNode.class)
+                .register(POJONode.class)
+                .register(ShortNode.class)
+                .register(ValueNode.class)
+                .register(JsonNodeCreator.class)
+                .register(JsonNodeFactory.class)
+                .build();
+
+        localWorkplaceMap.clear();
+        workplaceMap = storageService.<String, WorkflowData>consistentMapBuilder()
+                .withSerializer(Serializer.using(workplaceNamespace))
+                .withName("workplace-map")
+                .withApplicationId(appId)
+                .build();
+        workplaceMap.addListener(workplaceMapEventListener);
+
+        localContextMap.clear();
+        contextMap = storageService.<String, WorkflowData>consistentMapBuilder()
+                .withSerializer(Serializer.using(workplaceNamespace))
+                .withName("workflow-context-map")
+                .withApplicationId(appId)
+                .build();
+        contextMap.addListener(contextMapEventListener);
+
+        workplaceMapEventListener.syncLocal();
+        contextMapEventListener.syncLocal();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        workplaceMap.destroy();
+        localWorkplaceMap.clear();
+        contextMap.destroy();
+        localContextMap.clear();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public void registerWorkplace(String name, Workplace workplace) throws StorageException {
+        workplaceMap.put(name, workplace);
+    }
+
+    @Override
+    public void removeWorkplace(String name) throws StorageException {
+        removeWorkplaceContexts(name);
+        workplaceMap.remove(name);
+    }
+
+    @Override
+    public Workplace getWorkplace(String name) throws StorageException {
+        return localWorkplaceMap.get(name);
+    }
+
+    @Override
+    public Collection<Workplace> getWorkplaces() throws StorageException {
+        return ImmutableList.copyOf(localWorkplaceMap.values());
+    }
+
+    @Override
+    public void commitWorkplace(String name, Workplace workplace, boolean handleEvent) throws StorageException {
+        workplace.setTriggerNext(handleEvent);
+        if (workplaceMap.containsKey(name)) {
+            workplaceMap.replace(name, workplace);
+        } else {
+            registerWorkplace(name, workplace);
+        }
+    }
+
+    @Override
+    public void registerContext(String name, WorkflowContext context) throws StorageException {
+        contextMap.put(name, context);
+    }
+
+    @Override
+    public void removeContext(String name) throws StorageException {
+        contextMap.remove(name);
+    }
+
+    @Override
+    public WorkflowContext getContext(String name) throws StorageException {
+        return localContextMap.get(name);
+    }
+
+    @Override
+    public void commitContext(String name, WorkflowContext context, boolean handleEvent) throws StorageException {
+        context.setTriggerNext(handleEvent);
+        if (contextMap.containsKey(name)) {
+            contextMap.replace(name, context);
+        } else {
+            registerContext(name, context);
+        }
+    }
+
+    @Override
+    public Collection<WorkflowContext> getContexts() throws StorageException {
+        return ImmutableList.copyOf(localContextMap.values());
+    }
+
+    @Override
+    public Collection<WorkflowContext> getWorkplaceContexts(String workplaceName) {
+        Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
+        if (ctxMap == null) {
+            return Collections.emptyList();
+        }
+
+        return ImmutableList.copyOf(ctxMap.values());
+    }
+
+    @Override
+    public void removeWorkplaceContexts(String workplaceName) {
+        for (WorkflowContext ctx : getWorkplaceContexts(workplaceName)) {
+            removeContext(ctx.name());
+        }
+    }
+
+    private class WorkplaceMapListener implements MapEventListener<String, WorkflowData> {
+
+        @Override
+        public void event(MapEvent<String, WorkflowData> event) {
+
+            Workplace newWorkplace = (Workplace) Versioned.valueOrNull(event.newValue());
+            Workplace oldWorkplace = (Workplace) Versioned.valueOrNull(event.oldValue());
+
+            log.info("WorkplaceMap event: {}", event);
+            switch (event.type()) {
+                case INSERT:
+                    insert(newWorkplace);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newWorkplace));
+                    break;
+                case UPDATE:
+                    update(newWorkplace);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newWorkplace));
+                    break;
+                case REMOVE:
+                    remove(oldWorkplace);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldWorkplace));
+                    break;
+                default:
+            }
+        }
+
+        private void insert(Workplace workplace) {
+            localWorkplaceMap.put(workplace.name(), workplace);
+        }
+
+        private void update(Workplace workplace) {
+            localWorkplaceMap.replace(workplace.name(), workplace);
+        }
+
+        private void remove(Workplace workplace) {
+            localWorkplaceMap.remove(workplace.name());
+        }
+
+        public void syncLocal() {
+            workplaceMap.values().stream().forEach(
+                    x -> insert((Workplace) (x.value()))
+            );
+        }
+    }
+
+    private class WorkflowContextMapListener implements MapEventListener<String, WorkflowData> {
+
+        @Override
+        public void event(MapEvent<String, WorkflowData> event) {
+
+            WorkflowContext newContext = (WorkflowContext) Versioned.valueOrNull(event.newValue());
+            WorkflowContext oldContext = (WorkflowContext) Versioned.valueOrNull(event.oldValue());
+
+            log.info("WorkflowContext event: {}", event);
+            switch (event.type()) {
+                case INSERT:
+                    insert(newContext);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newContext));
+                    break;
+                case UPDATE:
+                    update(newContext);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newContext));
+                    break;
+                case REMOVE:
+                    remove(oldContext);
+                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldContext));
+                    break;
+                default:
+            }
+        }
+
+        /**
+         * Inserts workflow context on local hash map.
+         * @param context workflow context
+         */
+        private void insert(WorkflowContext context) {
+            String workplaceName = context.workplaceName();
+            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
+            if (ctxMap == null) {
+                ctxMap = new HashMap<>();
+                localWorkplaceMemberMap.put(workplaceName, ctxMap);
+            }
+            ctxMap.put(context.name(), context);
+
+            localContextMap.put(context.name(), context);
+        }
+
+        /**
+         * Updates workflow context on local hash map.
+         * @param context workflow context
+         */
+        private void update(WorkflowContext context) {
+            String workplaceName = context.workplaceName();
+            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
+            if (ctxMap == null) {
+                ctxMap = new HashMap<>();
+                localWorkplaceMemberMap.put(workplaceName, ctxMap);
+            }
+            ctxMap.put(context.name(), context);
+
+            localContextMap.put(context.name(), context);
+        }
+
+        /**
+         * Removes workflow context from local hash map.
+         * @param context workflow context
+         */
+        private void remove(WorkflowContext context) {
+            localContextMap.remove(context.name());
+
+            String workplaceName = context.workplaceName();
+            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
+            if (ctxMap == null) {
+                log.error("remove-context: Failed to find workplace({}) in localWorkplaceMemberMap", workplaceName);
+                return;
+            }
+            ctxMap.remove(context.name());
+            if (ctxMap.size() == 0) {
+                localWorkplaceMemberMap.remove(workplaceName, ctxMap);
+            }
+        }
+
+        /**
+         * Synchronizes local hash map.
+         */
+        public void syncLocal() {
+            contextMap.values().stream().forEach(
+                    x -> insert((WorkflowContext) (x.value()))
+            );
+        }
+    }
+}