FlowRule private extension refactor.
1.merge private flow into regular flowrule subsystem.no mirror code any
more.no change flowrule api.
2.define a rich-data-type to carry private flow.
3.modify OpenFlowRuleProvider.class to support for 3rd party private
flow.i don't know whether is suitable.because this class name is
relative with open flow protocal.
4.fix some junit test bug caused by modification of FlowRule interface.

Change-Id: I6c54d1e97f231a75bd1b416f0893e0379613d7ce
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index add2752..d04af51 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2014 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,13 +15,16 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -45,6 +48,7 @@
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
 import org.onosproject.openflow.controller.RoleState;
+import org.onosproject.openflow.controller.ThirdPartyMessage;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
 import org.projectfloodlight.openflow.protocol.OFErrorType;
@@ -58,23 +62,20 @@
 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
- * Provider which uses an OpenFlow controller to detect network
- * end-station hosts.
+ * Provider which uses an OpenFlow controller to detect network end-station
+ * hosts.
  */
 @Component(immediate = true)
-public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
+public class OpenFlowRuleProvider extends AbstractProvider
+        implements FlowRuleProvider {
 
     private final Logger log = getLogger(getClass());
 
@@ -84,7 +85,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenFlowController controller;
 
-
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
@@ -93,7 +93,6 @@
 
     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
 
-
     /**
      * Creates an OpenFlow host provider.
      */
@@ -107,15 +106,19 @@
         controller.addListener(listener);
         controller.addEventListener(listener);
 
-        pendingBatches = CacheBuilder.newBuilder()
+        pendingBatches = CacheBuilder
+                .newBuilder()
                 .expireAfterWrite(10, TimeUnit.SECONDS)
                 .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
-                    if (notification.getCause() == RemovalCause.EXPIRED) {
-                        providerService.batchOperationCompleted(notification.getKey(),
-                                                                notification.getValue().failedCompletion());
-                    }
-                }).build();
-
+                                     if (notification.getCause() == RemovalCause.EXPIRED) {
+                                         providerService
+                                                 .batchOperationCompleted(notification
+                                                                                  .getKey(),
+                                                                          notification
+                                                                                  .getValue()
+                                                                                  .failedCompletion());
+                                     }
+                                 }).build();
 
         for (OpenFlowSwitch sw : controller.getSwitches()) {
             FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
@@ -123,7 +126,6 @@
             collectors.put(new Dpid(sw.getId()), fsc);
         }
 
-
         log.info("Started");
     }
 
@@ -143,13 +145,17 @@
     }
 
     private void applyRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
+                .uri()));
+        if (flowRule.payLoad().payLoad().length > 0) {
+            OFMessage msg = new ThirdPartyMessage(flowRule.payLoad().payLoad());
+            sw.sendMsg(msg);
+            return;
+        }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
-                    Optional.empty()).buildFlowAdd());
-
+                                          Optional.empty()).buildFlowAdd());
     }
 
-
     @Override
     public void removeFlowRule(FlowRule... flowRules) {
         for (FlowRule flowRule : flowRules) {
@@ -159,10 +165,15 @@
     }
 
     private void removeRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
-
+        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
+                .uri()));
+        if (flowRule.payLoad().payLoad().length > 0) {
+            OFMessage msg = new ThirdPartyMessage(flowRule.payLoad().payLoad());
+            sw.sendMsg(msg);
+            return;
+        }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
-                    Optional.empty()).buildFlowDel());
+                                          Optional.empty()).buildFlowDel());
     }
 
     @Override
@@ -172,50 +183,54 @@
     }
 
     @Override
-
     public void executeBatch(FlowRuleBatchOperation batch) {
 
         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
 
-
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
+        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
+                .uri()));
         OFFlowMod mod;
-
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-
-            FlowModBuilder builder =
-                    FlowModBuilder.builder(fbe.target(), sw.factory(),
-                                           Optional.of(batch.id()));
+            // flow is the third party privacy flow
+            if (fbe.target().payLoad().payLoad().length > 0) {
+                OFMessage msg = new ThirdPartyMessage(fbe.target().payLoad()
+                        .payLoad());
+                sw.sendMsg(msg);
+                continue;
+            }
+            FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
+                    .factory(), Optional.of(batch.id()));
             switch (fbe.operator()) {
-                case ADD:
-                    mod = builder.buildFlowAdd();
-                    break;
-                case REMOVE:
-                    mod = builder.buildFlowDel();
-                    break;
-                case MODIFY:
-                    mod = builder.buildFlowMod();
-                    break;
-                default:
-                    log.error("Unsupported batch operation {}; skipping flowmod {}",
-                              fbe.operator(), fbe);
-                    continue;
-                }
+            case ADD:
+                mod = builder.buildFlowAdd();
+                break;
+            case REMOVE:
+                mod = builder.buildFlowDel();
+                break;
+            case MODIFY:
+                mod = builder.buildFlowMod();
+                break;
+            default:
+                log.error("Unsupported batch operation {}; skipping flowmod {}",
+                          fbe.operator(), fbe);
+                continue;
+            }
             sw.sendMsg(mod);
         }
-        OFBarrierRequest.Builder builder = sw.factory()
-                .buildBarrierRequest()
+        OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
                 .setXid(batch.id());
         sw.sendMsg(builder.build());
     }
 
-
     private class InternalFlowProvider
             implements OpenFlowSwitchListener, OpenFlowEventListener {
 
         @Override
         public void switchAdded(Dpid dpid) {
-            FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
+            FlowStatsCollector fsc = new FlowStatsCollector(
+                                                            controller
+                                                                    .getSwitch(dpid),
+                                                            POLL_INTERVAL);
             fsc.start();
             collectors.put(dpid, fsc);
         }
@@ -234,64 +249,71 @@
 
         @Override
         public void portChanged(Dpid dpid, OFPortStatus status) {
-            //TODO: Decide whether to evict flows internal store.
+            // TODO: Decide whether to evict flows internal store.
         }
 
         @Override
         public void handleMessage(Dpid dpid, OFMessage msg) {
             OpenFlowSwitch sw = controller.getSwitch(dpid);
             switch (msg.getType()) {
-                case FLOW_REMOVED:
-                    OFFlowRemoved removed = (OFFlowRemoved) msg;
+            case FLOW_REMOVED:
+                OFFlowRemoved removed = (OFFlowRemoved) msg;
 
-                    FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
-                    providerService.flowRemoved(fr);
-                    break;
-                case STATS_REPLY:
-                    if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
-                        pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
+                FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+                providerService.flowRemoved(fr);
+                break;
+            case STATS_REPLY:
+                if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+                    pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
+                }
+                break;
+            case BARRIER_REPLY:
+                try {
+                    InternalCacheEntry entry = pendingBatches.getIfPresent(msg
+                            .getXid());
+                    if (entry != null) {
+                        providerService
+                                .batchOperationCompleted(msg.getXid(),
+                                                         entry.completed());
+                    } else {
+                        log.warn("Received unknown Barrier Reply: {}",
+                                 msg.getXid());
                     }
-                    break;
-                case BARRIER_REPLY:
-                    try {
-                        InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+                } finally {
+                    pendingBatches.invalidate(msg.getXid());
+                }
+                break;
+            case ERROR:
+                log.warn("received Error message {} from {}", msg, dpid);
+
+                OFErrorMsg error = (OFErrorMsg) msg;
+                if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+                    OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+                    if (fmFailed.getData().getParsedMessage().isPresent()) {
+                        OFMessage m = fmFailed.getData().getParsedMessage()
+                                .get();
+                        OFFlowMod fm = (OFFlowMod) m;
+                        InternalCacheEntry entry = pendingBatches
+                                .getIfPresent(msg.getXid());
                         if (entry != null) {
-                            providerService.batchOperationCompleted(msg.getXid(), entry.completed());
+                            entry.appendFailure(new FlowEntryBuilder(dpid, fm)
+                                    .build());
                         } else {
-                            log.warn("Received unknown Barrier Reply: {}", msg.getXid());
-                        }
-                    } finally {
-                        pendingBatches.invalidate(msg.getXid());
-                    }
-                    break;
-                case ERROR:
-                    log.warn("received Error message {} from {}", msg, dpid);
-
-                    OFErrorMsg error = (OFErrorMsg) msg;
-                    if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
-                        OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
-                        if (fmFailed.getData().getParsedMessage().isPresent()) {
-                            OFMessage m = fmFailed.getData().getParsedMessage().get();
-                            OFFlowMod fm = (OFFlowMod) m;
-                            InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
-                            if (entry != null) {
-                                entry.appendFailure(new FlowEntryBuilder(dpid, fm)
-                                                                         .build());
-                            } else {
-                                log.error("No matching batch for this error: {}", error);
-                            }
-                        } else {
-                            //FIXME: Potentially add flowtracking to avoid this message.
-                            log.error("Flow installation failed but switch didn't" +
-                                              " tell us which one.");
+                            log.error("No matching batch for this error: {}",
+                                      error);
                         }
                     } else {
-                        log.warn("Received error {}", error);
+                        // FIXME: Potentially add flowtracking to avoid this
+                        // message.
+                        log.error("Flow installation failed but switch didn't"
+                                + " tell us which one.");
                     }
+                } else {
+                    log.warn("Received error {}", error);
+                }
 
-
-                default:
-                    log.debug("Unhandled message type: {}", msg.getType());
+            default:
+                log.debug("Unhandled message type: {}", msg.getType());
             }
 
         }
@@ -318,11 +340,11 @@
     }
 
     /**
-     * The internal cache entry holding the original request as well
-     * as accumulating the any failures along the way.
+     * The internal cache entry holding the original request as well as
+     * accumulating the any failures along the way.
      *
-     * If this entry is evicted from the cache then the entire operation
-     * is considered failed. Otherwise, only the failures reported by the device
+     * If this entry is evicted from the cache then the entire operation is
+     * considered failed. Otherwise, only the failures reported by the device
      * will be propagated up.
      */
     private class InternalCacheEntry {
@@ -336,6 +358,7 @@
 
         /**
          * Appends a failed rule to the set of failed items.
+         *
          * @param rule the failed rule
          */
         public void appendFailure(FlowRule rule) {
@@ -344,21 +367,29 @@
 
         /**
          * Fails the entire batch and returns the failed operation.
+         *
          * @return the failed operation
          */
         public CompletedBatchOperation failedCompletion() {
             Set<FlowRule> fails = operation.getOperations().stream()
                     .map(op -> op.target()).collect(Collectors.toSet());
-            return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
+            return new CompletedBatchOperation(false,
+                                               Collections
+                                                       .unmodifiableSet(fails),
+                                               operation.deviceId());
         }
 
         /**
          * Returns the completed operation and whether the batch suceeded.
+         *
          * @return the completed operation
          */
         public CompletedBatchOperation completed() {
-            return new CompletedBatchOperation(failures.isEmpty(),
-                                               Collections.unmodifiableSet(failures), operation.deviceId());
+            return new CompletedBatchOperation(
+                                               failures.isEmpty(),
+                                               Collections
+                                                       .unmodifiableSet(failures),
+                                               operation.deviceId());
         }
 
     }