[ONOS-4242] Capability support for wide community

Change-Id: Ib29a4aeddde863d3ecf281cea8d31c6a945834bd
diff --git a/protocols/bgp/api/src/main/java/org/onosproject/bgp/controller/BgpCfg.java b/protocols/bgp/api/src/main/java/org/onosproject/bgp/controller/BgpCfg.java
index f058fb8..6a432f3 100755
--- a/protocols/bgp/api/src/main/java/org/onosproject/bgp/controller/BgpCfg.java
+++ b/protocols/bgp/api/src/main/java/org/onosproject/bgp/controller/BgpCfg.java
@@ -331,4 +331,18 @@
      * @param flowSpec flow specification capability
      */
     void setFlowSpecCapability(FlowSpec flowSpec);
+
+    /**
+     * Returns the flow specification route policy distribution capability.
+     *
+     * @return RDP flow specification capability
+     */
+    boolean flowSpecRpdCapability();
+
+    /**
+     * Sets the flow specification route policy distribution capability.
+     *
+     * @param rpdCapability flow specification RPD capability
+     */
+    void setFlowSpecRpdCapability(boolean rpdCapability);
 }
diff --git a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/BgpOpenMsg.java b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/BgpOpenMsg.java
index 99c5c82..f762852 100644
--- a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/BgpOpenMsg.java
+++ b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/BgpOpenMsg.java
@@ -139,6 +139,15 @@
          */
         Builder setVpnFlowSpecCapabilityTlv(boolean isVpnFlowSpecCapabilitySet);
 
+        /**
+         * Sets flow specification route distribution policy capability and return its builder.
+         *
+         * @param isFlowSpecRpdCapabilitySet boolean value to know whether flow spec RPD capability is set or not
+         *
+         * @return builder by setting capabilities
+         */
+        Builder setFlowSpecRpdCapabilityTlv(boolean isFlowSpecRpdCapabilitySet);
+
         @Override
         Builder setHeader(BgpHeader bgpMsgHeader);
     }
diff --git a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/ver4/BgpOpenMsgVer4.java b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/ver4/BgpOpenMsgVer4.java
index dfaf665..b55b178 100644
--- a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/ver4/BgpOpenMsgVer4.java
+++ b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/protocol/ver4/BgpOpenMsgVer4.java
@@ -258,16 +258,24 @@
                 break;
             case MultiProtocolExtnCapabilityTlv.TYPE:
                 log.debug("MultiProtocolExtnCapabilityTlv");
-                if (MultiProtocolExtnCapabilityTlv.LENGTH != length) {
-                    throw new BgpParseException("Invalid length received for MultiProtocolExtnCapabilityTlv.");
-                }
+
                 if (length > cb.readableBytes()) {
                     throw new BgpParseException("BGP LS tlv length is more than readableBytes.");
                 }
                 short afi = cb.readShort();
                 byte res = cb.readByte();
                 byte safi = cb.readByte();
-                tlv = new MultiProtocolExtnCapabilityTlv(afi, res, safi);
+                if ((afi == Constants.AFI_FLOWSPEC_RPD_VALUE) && (safi == Constants.SAFI_FLOWSPEC_RPD_VALUE)) {
+                    if ((MultiProtocolExtnCapabilityTlv.LENGTH + 1) != length) {
+                        throw new BgpParseException("Invalid length received for MultiProtocolExtnCapabilityTlv.");
+                    }
+                    tlv = new MultiProtocolExtnCapabilityTlv(afi, res, safi, cb.readByte());
+                } else {
+                    if (MultiProtocolExtnCapabilityTlv.LENGTH != length) {
+                        throw new BgpParseException("Invalid length received for MultiProtocolExtnCapabilityTlv.");
+                    }
+                    tlv = new MultiProtocolExtnCapabilityTlv(afi, res, safi);
+                }
                 break;
             default:
                 log.debug("Warning: Unsupported TLV: " + type);
@@ -297,6 +305,7 @@
         private boolean isLsCapabilityTlvSet = false;
         private boolean isFlowSpecCapabilityTlvSet = false;
         private boolean isVpnFlowSpecCapabilityTlvSet = false;
+        private boolean isFlowSpecRpdCapabilityTlvSet = false;
 
         LinkedList<BgpValueType> capabilityTlv = new LinkedList<>();
 
@@ -347,6 +356,15 @@
                 this.capabilityTlv.add(tlv);
             }
 
+            if (this.isFlowSpecRpdCapabilityTlvSet) {
+                BgpValueType tlv;
+                tlv = new MultiProtocolExtnCapabilityTlv(Constants.AFI_FLOWSPEC_RPD_VALUE,
+                                                         RES, Constants.SAFI_FLOWSPEC_RPD_VALUE,
+                                                         Constants.RPD_CAPABILITY_SEND_VALUE);
+                this.capabilityTlv.add(tlv);
+            }
+
+
             return new BgpOpenMsgVer4(bgpMsgHeader, PACKET_VERSION, this.asNumber, holdTime, this.bgpId,
                        this.capabilityTlv);
         }
@@ -407,6 +425,12 @@
             this.isVpnFlowSpecCapabilityTlvSet = isVpnFlowSpecCapabilitySet;
             return this;
         }
+
+        @Override
+        public Builder setFlowSpecRpdCapabilityTlv(boolean isFlowSpecRpdCapabilityTlvSet) {
+            this.isFlowSpecRpdCapabilityTlvSet = isFlowSpecRpdCapabilityTlvSet;
+            return this;
+        }
     }
 
     @Override
diff --git a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/types/MultiProtocolExtnCapabilityTlv.java b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/types/MultiProtocolExtnCapabilityTlv.java
index 1231214..832cd59 100644
--- a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/types/MultiProtocolExtnCapabilityTlv.java
+++ b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/types/MultiProtocolExtnCapabilityTlv.java
@@ -19,6 +19,7 @@
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.onosproject.bgpio.util.Constants;
 
 import java.util.Objects;
 
@@ -45,6 +46,7 @@
     private final short afi;
     private final byte res;
     private final byte safi;
+    private final byte rpdSendReceive;
 
     /**
      * Constructor to initialize variables.
@@ -56,6 +58,24 @@
         this.afi = afi;
         this.res = res;
         this.safi = safi;
+        this.rpdSendReceive = Constants.RPD_CAPABILITY_SEND_VALUE;
+    }
+
+        /**
+     * Constructor to initialize variables.
+     * @param afi Address Family Identifiers
+     * @param res reserved field
+     * @param safi Subsequent Address Family Identifier
+     * @param rpdSendReceive indicates whether the sender is
+                                    (a) willing  to receive Route Policies via BGP FLowSpec from its peer (value 1).
+                                    (b) would like to send Route Policies via BGP FLowSpec to its peer (value 2).
+                                    (c) both (value 3).
+     */
+    public MultiProtocolExtnCapabilityTlv(short afi, byte res, byte safi, byte rpdSendReceive) {
+        this.afi = afi;
+        this.res = res;
+        this.safi = safi;
+        this.rpdSendReceive = rpdSendReceive;
     }
 
     /**
@@ -120,9 +140,16 @@
 
     @Override
     public int write(ChannelBuffer cb) {
+        boolean isFsRpd = false;
         int iLenStartIndex = cb.writerIndex();
         cb.writeByte(TYPE);
-        cb.writeByte(LENGTH);
+
+        if ((afi == Constants.AFI_FLOWSPEC_RPD_VALUE) && (safi == Constants.SAFI_FLOWSPEC_RPD_VALUE)) {
+            cb.writeByte(LENGTH + 1);
+            isFsRpd = true;
+        } else {
+            cb.writeByte(LENGTH);
+        }
 
         // write afi
         cb.writeShort(afi);
@@ -133,6 +160,11 @@
         // write safi
         cb.writeByte(safi);
 
+        if (isFsRpd) {
+            // write Send/Receive (1 octet)
+            cb.writeByte(rpdSendReceive);
+        }
+
         return cb.writerIndex() - iLenStartIndex;
     }
 
@@ -145,6 +177,10 @@
         short afi = cb.readShort();
         byte res = cb.readByte();
         byte safi = cb.readByte();
+
+        if ((afi == Constants.AFI_FLOWSPEC_RPD_VALUE) && (safi == Constants.SAFI_FLOWSPEC_RPD_VALUE)) {
+            return new MultiProtocolExtnCapabilityTlv(afi, res, safi, cb.readByte());
+        }
         return new MultiProtocolExtnCapabilityTlv(afi, res, safi);
     }
 
diff --git a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/util/Constants.java b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/util/Constants.java
index 3e7e4ad..0bb5e7b 100644
--- a/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/util/Constants.java
+++ b/protocols/bgp/bgpio/src/main/java/org/onosproject/bgpio/util/Constants.java
@@ -40,6 +40,16 @@
     public static final byte SAFI_FLOWSPEC_VALUE = (byte) 133;
     public static final byte VPN_SAFI_FLOWSPEC_VALUE = (byte) 134;
 
+    /* TODO: The Capability Code
+   for this capability is to be specified by the IANA.*/
+    public static final short AFI_FLOWSPEC_RPD_VALUE = 1;
+    public static final byte SAFI_FLOWSPEC_RPD_VALUE = (byte) 200;
+    public static final byte VPN_SAFI_FLOWSPEC_RDP_VALUE = (byte) 201;
+
+    public static final byte RPD_CAPABILITY_RECEIVE_VALUE = 0;
+    public static final byte RPD_CAPABILITY_SEND_VALUE = 1;
+    public static final byte RPD_CAPABILITY_SEND_RECEIVE_VALUE = 2;
+
     public static final int EXTRA_TRAFFIC = 0x01;
     public static final int UNPROTECTED = 0x02;
     public static final int SHARED = 0x04;
diff --git a/protocols/bgp/bgpio/src/test/java/org/onosproject/bgpio/protocol/BgpOpenMsgTest.java b/protocols/bgp/bgpio/src/test/java/org/onosproject/bgpio/protocol/BgpOpenMsgTest.java
index f1edd9d..2d01fe8 100755
--- a/protocols/bgp/bgpio/src/test/java/org/onosproject/bgpio/protocol/BgpOpenMsgTest.java
+++ b/protocols/bgp/bgpio/src/test/java/org/onosproject/bgpio/protocol/BgpOpenMsgTest.java
@@ -291,15 +291,73 @@
     public void openMessageTest8() throws BgpParseException {
 
         // OPEN Message with invalid message type.
-        byte[] openMsg = new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff,
-                (byte) 0xff, (byte) 0xff, (byte) 0xff,
-                (byte) 0xff, (byte) 0xff, (byte) 0xff,
-                (byte) 0xff, (byte) 0xff, (byte) 0xff,
-                (byte) 0xff, (byte) 0xff, (byte) 0xff,
-                (byte) 0xff, 0x00, 0x1d, 0x05, 0X04,
-                (byte) 0xfe, 0x09, 0x00, (byte) 0xb4,
-                (byte) 0xc0, (byte) 0xa8, 0x00, 0x0f,
-                0x00};
+        byte[] openMsg = new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                     (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                     (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x00, 0x1d, 0x05, 0X04,
+                                     (byte) 0xfe, 0x09, 0x00, (byte) 0xb4, (byte) 0xc0, (byte) 0xa8, 0x00, 0x0f, 0x00 };
+
+        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+        buffer.writeBytes(openMsg);
+
+        BgpMessageReader<BgpMessage> reader = BgpFactories.getGenericReader();
+        BgpMessage message;
+        BgpHeader bgpHeader = new BgpHeader();
+        message = reader.readFrom(buffer, bgpHeader);
+
+        assertThat(message, instanceOf(BgpOpenMsg.class));
+    }
+
+    /**
+     * This test case checks open message with route policy distribution capability.
+     */
+    @Test
+    public void openMessageTest9() throws BgpParseException {
+
+        // OPEN Message with capabilities.
+        byte[] openMsg = new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                     (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                     (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x00, 0x38, 0x01, 0x04, 0x00,
+                                     0x64, 0x00, (byte) 0xb4, (byte) 0xc0, (byte) 0xa8, 0x07, 0x35, 0x1b, 0x02, 0x19,
+                                     0x01, 0x04, 0x00, 0x01, 0x00, 0x01, 0x01, 0x04, 0x40, 0x04, 0x00, 0x47, 0x01,
+                                     0x04, 0x00, 0x01, 0x00, (byte) 0x85, 0x01, 0x05, 0x00, 0x01, 0x00, (byte) 0xc8,
+                                     0x00 }; // Four Octet AS Number-CAPABILITY-TLV
+
+        byte[] testOpenMsg;
+        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+        buffer.writeBytes(openMsg);
+
+        BgpMessageReader<BgpMessage> reader = BgpFactories.getGenericReader();
+        BgpMessage message;
+        BgpHeader bgpHeader = new BgpHeader();
+
+        message = reader.readFrom(buffer, bgpHeader);
+
+        assertThat(message, instanceOf(BgpOpenMsg.class));
+
+        ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
+        message.writeTo(buf);
+
+        int readLen = buf.writerIndex();
+        testOpenMsg = new byte[readLen];
+        buf.readBytes(testOpenMsg, 0, readLen);
+
+        assertThat(testOpenMsg, is(openMsg));
+    }
+
+    /**
+     * In this test case, Invalid multiprotocol capability length is given as input and expecting an exception.
+     */
+    @Test(expected = BgpParseException.class)
+    public void openMessageTest10() throws BgpParseException {
+
+        // OPEN Message with invalid message type.
+        byte[] openMsg = new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                      (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+                                      (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, 0x00, 0x38, 0x01, 0x04, 0x00,
+                                      0x64, 0x00, (byte) 0xb4, (byte) 0xc0, (byte) 0xa8, 0x07, 0x35, 0x1b, 0x02, 0x19,
+                                      0x01, 0x04, 0x00, 0x01, 0x00, 0x01, 0x01, 0x04, 0x40, 0x04, 0x00, 0x47, 0x01,
+                                      0x04, 0x00, 0x01, 0x00, (byte) 0x85, 0x01, 0x04, 0x00, 0x01, 0x00, (byte) 0xc8,
+                                      0x00 };
 
         ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
         buffer.writeBytes(openMsg);
diff --git a/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpChannelHandler.java b/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpChannelHandler.java
index 7f4f59a..40c63d4 100755
--- a/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpChannelHandler.java
+++ b/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpChannelHandler.java
@@ -55,9 +55,9 @@
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.RejectedExecutionException;
 
 /**
@@ -85,6 +85,7 @@
     static final short AFI = 16388;
     static final byte RES = 0;
     static final byte SAFI = 71;
+    static final byte MAX_UNSUPPORTED_CAPABILITY = 5;
 
     // State needs to be volatile because the HandshakeTimeoutHandler
     // needs to check if the handshake is complete
@@ -520,7 +521,7 @@
             byte errorSubCode = errMsg.getErrorSubCode();
             ChannelBuffer tempCb = errMsg.getData();
             if (tempCb != null) {
-                int dataLength = tempCb.capacity();
+                int dataLength = tempCb.readableBytes();
                 data = new byte[dataLength];
                 tempCb.readBytes(data, 0, dataLength);
             }
@@ -685,7 +686,8 @@
                 .setLsCapabilityTlv(bgpconfig.getLsCapability())
                 .setLargeAsCapabilityTlv(bgpconfig.getLargeASCapability())
                 .setFlowSpecCapabilityTlv(flowSpecStatus)
-                .setVpnFlowSpecCapabilityTlv(vpnFlowSpecStatus).build();
+                .setVpnFlowSpecCapabilityTlv(vpnFlowSpecStatus)
+                .setFlowSpecRpdCapabilityTlv(bgpconfig.flowSpecRpdCapability()).build();
         log.debug("Sending open message to {}", channel.getRemoteAddress());
         channel.write(Collections.singletonList(msg));
 
@@ -786,20 +788,28 @@
 
         List<BgpValueType> capabilityTlv = openmsg.getCapabilityTlv();
         ListIterator<BgpValueType> listIterator = capabilityTlv.listIterator();
-        List<BgpValueType> unSupportedCapabilityTlv = new LinkedList<>();
+        List<BgpValueType> unSupportedCapabilityTlv = new CopyOnWriteArrayList<BgpValueType>();
         ListIterator<BgpValueType> unSupportedCaplistIterator = unSupportedCapabilityTlv.listIterator();
         BgpValueType tempTlv;
         boolean isLargeAsCapabilityCfg = h.bgpconfig.getLargeASCapability();
+        boolean isFlowSpecRpdCapabilityCfg = h.bgpconfig.flowSpecRpdCapability();
         boolean isLsCapabilityCfg = h.bgpconfig.getLsCapability();
-        boolean isFlowSpecCapabilityCfg = false;
+        boolean isFlowSpecIpv4CapabilityCfg = false;
+        boolean isFlowSpecVpnv4CapabilityCfg = false;
         MultiProtocolExtnCapabilityTlv tempCapability;
         boolean isMultiProtocolLsCapability = false;
+        boolean isMultiProtocolFlowSpecRpdCapability = false;
         boolean isMultiProtocolFlowSpecCapability = false;
         boolean isMultiProtocolVpnFlowSpecCapability = false;
         BgpCfg.FlowSpec flowSpec = h.bgpconfig.flowSpecCapability();
 
-        if (flowSpec != BgpCfg.FlowSpec.NONE) {
-            isFlowSpecCapabilityCfg = true;
+        if (flowSpec == BgpCfg.FlowSpec.IPV4) {
+            isFlowSpecIpv4CapabilityCfg = true;
+        } else if (flowSpec == BgpCfg.FlowSpec.VPNV4) {
+            isFlowSpecVpnv4CapabilityCfg = true;
+        } else if (flowSpec == BgpCfg.FlowSpec.IPV4_VPNV4) {
+            isFlowSpecIpv4CapabilityCfg = true;
+            isFlowSpecVpnv4CapabilityCfg = true;
         }
 
         while (listIterator.hasNext()) {
@@ -817,6 +827,10 @@
                 if (SAFI == tempCapability.getSafi()) {
                     isMultiProtocolLsCapability = true;
                 }
+
+                if (Constants.SAFI_FLOWSPEC_RPD_VALUE == tempCapability.getSafi()) {
+                    isMultiProtocolFlowSpecRpdCapability = true;
+                }
             }
             if (tlv.getType() == FOUR_OCTET_AS_NUM_CAPA_TYPE) {
                 isFourOctetCapabilityExits = true;
@@ -843,13 +857,15 @@
             }
         }
 
-        if ((isFlowSpecCapabilityCfg)) {
+        if (isFlowSpecIpv4CapabilityCfg) {
             if (!isMultiProtocolFlowSpecCapability) {
                 tempTlv = new MultiProtocolExtnCapabilityTlv(Constants.AFI_FLOWSPEC_VALUE,
                                                              RES, Constants.SAFI_FLOWSPEC_VALUE);
                 unSupportedCapabilityTlv.add(tempTlv);
             }
+        }
 
+        if (isFlowSpecVpnv4CapabilityCfg) {
             if (!isMultiProtocolVpnFlowSpecCapability) {
                 tempTlv = new MultiProtocolExtnCapabilityTlv(Constants.AFI_FLOWSPEC_VALUE,
                                                              RES, Constants.VPN_SAFI_FLOWSPEC_VALUE);
@@ -864,7 +880,16 @@
             }
         }
 
-        if (unSupportedCapabilityTlv.size() == 3) {
+        if ((isFlowSpecRpdCapabilityCfg)) {
+            if (!isMultiProtocolFlowSpecRpdCapability) {
+                tempTlv = new MultiProtocolExtnCapabilityTlv(Constants.AFI_FLOWSPEC_RPD_VALUE,
+                                                             RES, Constants.SAFI_FLOWSPEC_RPD_VALUE,
+                                                             Constants.RPD_CAPABILITY_SEND_VALUE);
+                unSupportedCapabilityTlv.add(tempTlv);
+            }
+        }
+
+        if (unSupportedCapabilityTlv.size() == MAX_UNSUPPORTED_CAPABILITY) {
             ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
             while (unSupportedCaplistIterator.hasNext()) {
                 BgpValueType tlv = unSupportedCaplistIterator.next();
diff --git a/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpConfig.java b/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpConfig.java
index 6073559..bbeea48 100755
--- a/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpConfig.java
+++ b/protocols/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BgpConfig.java
@@ -57,6 +57,7 @@
     private BgpConnectPeer connectPeer;
     private BgpPeerManagerImpl peerManager;
     private BgpController bgpController;
+    private boolean rpdCapability;
 
     /*
      * Constructor to initialize the values.
@@ -129,6 +130,16 @@
     }
 
     @Override
+    public boolean flowSpecRpdCapability() {
+        return this.rpdCapability;
+    }
+
+    @Override
+    public void setFlowSpecRpdCapability(boolean rpdCapability) {
+        this.rpdCapability = rpdCapability;
+    }
+
+    @Override
     public String getRouterId() {
         if (this.routerId != null) {
             return this.routerId.toString();
diff --git a/protocols/bgp/ctl/src/test/java/org/onosproject/bgp/BgpControllerImplTest.java b/protocols/bgp/ctl/src/test/java/org/onosproject/bgp/BgpControllerImplTest.java
index 72d5607..4795d35 100644
--- a/protocols/bgp/ctl/src/test/java/org/onosproject/bgp/BgpControllerImplTest.java
+++ b/protocols/bgp/ctl/src/test/java/org/onosproject/bgp/BgpControllerImplTest.java
@@ -72,6 +72,7 @@
 import org.onosproject.bgpio.types.MultiProtocolExtnCapabilityTlv;
 import org.onosproject.bgpio.types.IsIsPseudonode;
 import org.onosproject.bgpio.types.RouteDistinguisher;
+import org.onosproject.bgpio.util.Constants;
 
 /**
  * Test case for BGPControllerImpl.
@@ -299,6 +300,29 @@
         assertThat(result, is(true));
     }
 
+    @Test
+    public void bgpOpenMessageTest8() throws InterruptedException {
+        // Open message with route policy distribution capability
+        short afi = Constants.AFI_FLOWSPEC_RPD_VALUE;
+        byte res = 0;
+        byte safi = Constants.SAFI_FLOWSPEC_RPD_VALUE;
+        peer1.peerChannelHandler.asNumber = 200;
+        peer1.peerChannelHandler.version = 4;
+        peer1.peerChannelHandler.holdTime = 120;
+
+        bgpControllerImpl.getConfig().setFlowSpecRpdCapability(true);
+        BgpValueType tempTlv1 = new MultiProtocolExtnCapabilityTlv(afi, res, safi);
+        peer1.peerChannelHandler.capabilityTlv.add(tempTlv1);
+
+        peer1.connect(connectToSocket);
+
+        boolean result;
+        result = peer1.peerFrameDecoder.receivedOpenMessageLatch.await(
+            MESSAGE_TIMEOUT_MS,
+            TimeUnit.MILLISECONDS);
+        assertThat(result, is(true));
+    }
+
     /**
      * Peer1 has Node NLRI (MpReach).
      */
diff --git a/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpAppConfig.java b/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpAppConfig.java
index abf27d8..079f80d 100644
--- a/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpAppConfig.java
+++ b/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpAppConfig.java
@@ -48,6 +48,7 @@
     public static final String HOLD_TIME = "holdTime";
     public static final String LARGE_AS_CAPABILITY = "largeAsCapability";
     public static final String FLOW_SPEC_CAPABILITY = "flowSpecCapability";
+    public static final String FLOW_SPEC_RPD_CAPABILITY = "flowSpecRpdCapability";
 
     public static final String BGP_PEER = "bgpPeer";
     public static final String PEER_IP = "peerIp";
@@ -68,11 +69,11 @@
         bgpConfig = bgpController.getConfig();
 
         fields = hasOnlyFields(ROUTER_ID, LOCAL_AS, MAX_SESSION, LS_CAPABILITY,
-                HOLD_TIME, LARGE_AS_CAPABILITY, FLOW_SPEC_CAPABILITY, BGP_PEER) &&
+                HOLD_TIME, LARGE_AS_CAPABILITY, FLOW_SPEC_CAPABILITY, FLOW_SPEC_RPD_CAPABILITY, BGP_PEER) &&
                 isIpAddress(ROUTER_ID, MANDATORY) && isNumber(LOCAL_AS, MANDATORY) &&
                 isNumber(MAX_SESSION, OPTIONAL, 20) && isNumber(HOLD_TIME, OPTIONAL, 180) &&
                 isBoolean(LS_CAPABILITY, OPTIONAL) && isBoolean(LARGE_AS_CAPABILITY, OPTIONAL) &&
-                isString(FLOW_SPEC_CAPABILITY, OPTIONAL);
+                isString(FLOW_SPEC_CAPABILITY, OPTIONAL) && isBoolean(FLOW_SPEC_RPD_CAPABILITY, OPTIONAL);
 
         if (!fields) {
             return fields;
@@ -118,6 +119,15 @@
     }
 
     /**
+     * Returns flow spec route policy distribution capability support from the configuration.
+     *
+     * @return true if flow spec route policy distribution capability is set otherwise false
+     */
+    public boolean rpdCapability() {
+        return Boolean.parseBoolean(get(FLOW_SPEC_RPD_CAPABILITY, null));
+    }
+
+    /**
      * Returns largeAs capability support from the configuration.
      *
      * @return largeAs capability
diff --git a/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpCfgProvider.java b/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpCfgProvider.java
index 95ad2e2..268962c 100755
--- a/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpCfgProvider.java
+++ b/providers/bgp/cfg/src/main/java/org/onosproject/provider/bgp/cfg/impl/BgpCfgProvider.java
@@ -137,6 +137,7 @@
         } else {
             bgpConfig.setFlowSpecCapability(BgpCfg.FlowSpec.NONE);
         }
+        bgpConfig.setFlowSpecRpdCapability(config.rpdCapability());
 
         nodes = config.bgpPeer();
         for (int i = 0; i < nodes.size(); i++) {
@@ -186,6 +187,7 @@
         } else {
             log.info(" Self configuration cannot be modified as there is existing connections ");
         }
+        bgpConfig.setFlowSpecRpdCapability(config.rpdCapability());
 
         /* update the peer configuration */
         bgpPeerTree = bgpConfig.getPeerTree();