[Emu] [ONOS-2591,ONOS-2594] Implementation of BGP channel handler to manage each BGP peer connection.

Change-Id: I14e90c9437f676698f89da79e736a81035689492
diff --git a/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BGPChannelHandler.java b/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BGPChannelHandler.java
index 942d365..c17736e 100755
--- a/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BGPChannelHandler.java
+++ b/bgp/ctl/src/main/java/org/onosproject/bgp/controller/impl/BGPChannelHandler.java
@@ -16,19 +16,616 @@
 
 package org.onosproject.bgp.controller.impl;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.onlab.packet.IpAddress;
+import org.onosproject.bgp.controller.BGPCfg;
+import org.onosproject.bgp.controller.BGPId;
+import org.onosproject.bgp.controller.BGPPeer;
+import org.onosproject.bgp.controller.BGPPeerCfg;
+import org.onosproject.bgp.controller.impl.BGPControllerImpl.BGPPeerManager;
+import org.onosproject.bgpio.exceptions.BGPParseException;
+import org.onosproject.bgpio.protocol.BGPMessage;
+//import org.onosproject.bgpio.protocol.BGPOpenMsg;
+import org.onosproject.bgpio.protocol.BGPType;
+import org.onosproject.bgpio.protocol.BGPVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Channel handler deals with the bgp peer connection and dispatches messages from peer to the appropriate locations.
  */
 class BGPChannelHandler extends IdleStateAwareChannelHandler {
 
-    // TODO: implement FSM and session handling mechanism
+    private static final Logger log = LoggerFactory.getLogger(BGPChannelHandler.class);
+
+    static final int BGP_MAX_KEEPALIVE_INTERVAL = 3;
+    private BGPPeer bgpPeer;
+    private BGPId thisbgpId;
+    Channel channel;
+    private BGPKeepAliveTimer keepAliveTimer = null;
+    private short peerHoldTime = 0;
+    private short negotiatedHoldTime = 0;
+    private short peerAsNum;
+    private int peerIdentifier;
+    private BGPPacketStatsImpl bgpPacketStats;
+    static final int MAX_WRONG_COUNT_PACKET = 5;
+
+    // State needs to be volatile because the HandshakeTimeoutHandler
+    // needs to check if the handshake is complete
+    private volatile ChannelState state;
+
+    // When a bgp peer with a ip addresss is found (i.e we already have a
+    // connected peer with the same ip), the new peer is immediately
+    // disconnected. At that point netty callsback channelDisconnected() which
+    // proceeds to cleaup peer state - we need to ensure that it does not cleanup
+    // peer state for the older (still connected) peer
+    private volatile Boolean duplicateBGPIdFound;
+    // Indicates the bgp version used by this bgp peer
+    protected BGPVersion bgpVersion;
+    private BGPControllerImpl bgpControllerImpl;
+    private BGPPeerManager peerManager;
+    private InetSocketAddress inetAddress;
+    private IpAddress ipAddress;
+    private SocketAddress address;
+    private String peerAddr;
+    private BGPCfg bgpconfig;
+
     /**
      * Create a new unconnected BGPChannelHandler.
      *
      * @param bgpCtrlImpl bgp controller implementation object
      */
     BGPChannelHandler(BGPControllerImpl bgpCtrlImpl) {
+        this.bgpControllerImpl = bgpCtrlImpl;
+        this.peerManager = bgpCtrlImpl.getPeerManager();
+        this.state = ChannelState.IDLE;
+        this.duplicateBGPIdFound = Boolean.FALSE;
+        this.bgpPacketStats = new BGPPacketStatsImpl();
+        this.bgpconfig = bgpCtrlImpl.getConfig();
     }
-}
\ No newline at end of file
+
+    // To disconnect peer session.
+    public void disconnectPeer() {
+        bgpPeer.disconnectPeer();
+    }
+
+    // *************************
+    // Channel State Machine
+    // *************************
+
+    /**
+     * The state machine for handling the peer/channel state. All state transitions should happen from within the state
+     * machine (and not from other parts of the code)
+     */
+    enum ChannelState {
+        /**
+         * Initial state before channel is connected.
+         */
+        IDLE(false) {
+
+        },
+
+        OPENSENT(false) {
+            @Override
+            void processBGPMessage(BGPChannelHandler h, BGPMessage m) throws IOException, BGPParseException {
+                log.debug("message received in OPENSENT state");
+                // check for OPEN message
+                if (m.getType() != BGPType.OPEN) {
+                    // When the message type is not keep alive message increment the wrong packet statistics
+                    h.processUnknownMsg();
+                    log.debug("Message is not OPEN message");
+                } else {
+                    log.debug("Sending keep alive message in OPENSENT state");
+                    h.bgpPacketStats.addInPacket();
+
+                    // TODO: initialize openmessage BGPOpenMsg pOpenmsg = (BGPOpenMsg) m;
+                    // TODO: initialize identifier from open messgae h.peerIdentifier = pOpenmsg.getBgpId();
+
+                    // validate capabilities and open msg
+                    if (h.openMsgValidation(h)) {
+                        log.debug("Sending handshake OPEN message");
+
+                        /*
+                         * RFC 4271, section 4.2: Upon receipt of an OPEN message, a BGP speaker MUST calculate the
+                         * value of the Hold Timer by using the smaller of its configured Hold Time and the Hold Time
+                         * received in the OPEN message
+                         */
+                        // TODO: initialize holdtime from open message h.peerHoldTime = pOpenmsg.getHoldTime();
+                        if (h.peerHoldTime < h.bgpconfig.getHoldTime()) {
+                            h.channel.getPipeline().replace("holdTime",
+                                                            "holdTime",
+                                                            new ReadTimeoutHandler(BGPPipelineFactory.TIMER,
+                                                                                   h.peerHoldTime));
+                        }
+
+                        log.info("Hold Time : " + h.peerHoldTime);
+
+                        // TODO: get AS number for open message update AS number
+                    }
+
+                    // Send keepalive message to peer.
+                    h.sendKeepAliveMessage();
+                    h.bgpPacketStats.addOutPacket();
+                    h.setState(OPENCONFIRM);
+                    h.bgpconfig.setPeerConnState(h.peerAddr, BGPPeerCfg.State.OPENCONFIRM);
+                }
+            }
+        },
+
+        OPENWAIT(false) {
+            @Override
+            void processBGPMessage(BGPChannelHandler h, BGPMessage m) throws IOException, BGPParseException {
+                log.debug("Message received in OPEN WAIT State");
+
+                // check for open message
+                if (m.getType() != BGPType.OPEN) {
+                    // When the message type is not open message increment the wrong packet statistics
+                    h.processUnknownMsg();
+                    log.debug("Message is not OPEN message");
+                } else {
+                    h.bgpPacketStats.addInPacket();
+
+                    // TODO: initialize open message BGPOpenMsg pOpenmsg = (BGPOpenMsg) m;
+
+                    // Validate open message
+                    if (h.openMsgValidation(h)) {
+                        log.debug("Sending handshake OPEN message");
+
+                        /*
+                         * RFC 4271, section 4.2: Upon receipt of an OPEN message, a BGP speaker MUST calculate the
+                         * value of the Hold Timer by using the smaller of its configured Hold Time and the Hold Time
+                         * received in the OPEN message
+                         */
+                        // TODO: get hold time from open message h.peerHoldTime = pOpenmsg.getHoldTime();
+                        if (h.peerHoldTime < h.bgpconfig.getHoldTime()) {
+                            h.channel.getPipeline().replace("holdTime",
+                                                            "holdTime",
+                                                            new ReadTimeoutHandler(BGPPipelineFactory.TIMER,
+                                                                                   h.peerHoldTime));
+                        }
+
+                        log.debug("Hold Time : " + h.peerHoldTime);
+
+                        //TODO: update AS number form open messsage update AS number
+
+                        h.sendHandshakeOpenMessage();
+                        h.bgpPacketStats.addOutPacket();
+                        h.setState(OPENCONFIRM);
+                    }
+                }
+            }
+        },
+
+        OPENCONFIRM(false) {
+            @Override
+            void processBGPMessage(BGPChannelHandler h, BGPMessage m) throws IOException, BGPParseException {
+                log.debug("Message received in OPENCONFIRM state");
+                // check for keep alive message
+                if (m.getType() != BGPType.KEEP_ALIVE) {
+                    // When the message type is not keep alive message handle the wrong packet
+                    h.processUnknownMsg();
+                    log.debug("Message is not KEEPALIVE message");
+                } else {
+
+                    // Set the peer connected status
+                    h.bgpPacketStats.addInPacket();
+                    log.debug("Sending keep alive message in OPENCONFIRM state");
+
+                    final InetSocketAddress inetAddress = (InetSocketAddress) h.address;
+                    h.thisbgpId = BGPId.bgpId(IpAddress.valueOf(inetAddress.getAddress()));
+
+                    h.bgpPeer = h.peerManager.getBGPPeerInstance(h.thisbgpId, h.bgpVersion, h.bgpPacketStats);
+                    // set the status fo bgp as connected
+                    h.bgpPeer.setConnected(true);
+                    h.bgpPeer.setChannel(h.channel);
+
+                    // set specific parameters to bgp peer
+                    h.bgpPeer.setBgpPeerVersion(h.bgpVersion);
+                    h.bgpPeer.setBgpPeerASNum(h.peerAsNum);
+                    h.bgpPeer.setBgpPeerHoldTime(h.peerHoldTime);
+                    h.bgpPeer.setBgpPeerIdentifier(h.peerIdentifier);
+
+                    h.negotiatedHoldTime = (h.peerHoldTime < h.bgpconfig.getHoldTime()) ? h.peerHoldTime : h.bgpconfig
+                            .getHoldTime();
+                    h.bgpPeer.setNegotiatedHoldTime(h.negotiatedHoldTime);
+                    /*
+                     * RFC 4271, When an OPEN message is received, sends a KEEPALIVE message, If the negotiated hold
+                     * time value is zero, then the HoldTimer and KeepaliveTimer are not started. A reasonable maximum
+                     * time between KEEPALIVE messages would be one third of the Hold Time interval.
+                     */
+                    h.sendKeepAliveMessage();
+
+                    if (h.negotiatedHoldTime != 0) {
+                        h.keepAliveTimer
+                            = new BGPKeepAliveTimer(h, (h.negotiatedHoldTime / BGP_MAX_KEEPALIVE_INTERVAL));
+                    }
+
+                    h.bgpPacketStats.addOutPacket();
+
+                    // set the state handshake completion.
+                    h.setHandshakeComplete(true);
+
+                    if (!h.peerManager.addConnectedPeer(h.thisbgpId, h.bgpPeer)) {
+                        /*
+                         * RFC 4271, Section 6.8, Based on the value of the BGP identifier, a convention is established
+                         * for detecting which BGP connection is to be preserved when a collision occurs. The convention
+                         * is to compare the BGP Identifiers of the peers involved in the collision and to retain only
+                         * the connection initiated by the BGP speaker with the higher-valued BGP Identifier..
+                         */
+                        // TODO: Connection collision handling.
+                        disconnectDuplicate(h);
+                    } else {
+                        h.setState(ESTABLISHED);
+                        h.bgpconfig.setPeerConnState(h.peerAddr, BGPPeerCfg.State.ESTABLISHED);
+                    }
+                }
+            }
+        },
+
+        ESTABLISHED(true) {
+            @Override
+            void processBGPMessage(BGPChannelHandler h, BGPMessage m) throws IOException, BGPParseException {
+                log.debug("Message received in established state " + m.getType());
+                // dispatch the message
+                h.dispatchMessage(m);
+            }
+        };
+
+        private boolean handshakeComplete;
+
+        ChannelState(boolean handshakeComplete) {
+            this.handshakeComplete = handshakeComplete;
+        }
+
+        /**
+         * Is this a state in which the handshake has completed?
+         *
+         * @return true if the handshake is complete
+         */
+        public boolean isHandshakeComplete() {
+            return this.handshakeComplete;
+        }
+
+        /**
+         * Disconnect duplicate peer connection.
+         *
+         * @param h channel handler
+         */
+        protected void disconnectDuplicate(BGPChannelHandler h) {
+            log.error("Duplicated BGP IP or incompleted cleanup - " + "" + "disconnecting channel {}",
+                      h.getPeerInfoString());
+            h.duplicateBGPIdFound = Boolean.TRUE;
+            h.channel.disconnect();
+        }
+
+        // set handshake completion status
+        public void setHandshakeComplete(boolean handshakeComplete) {
+            this.handshakeComplete = handshakeComplete;
+        }
+
+        void processBGPMessage(BGPChannelHandler bgpChannelHandler, BGPMessage pm)
+                throws IOException, BGPParseException {
+            // TODO Auto-generated method stub
+            log.debug("BGP message stub");
+        }
+
+    }
+
+    // *************************
+    // Channel handler methods
+    // *************************
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+        channel = e.getChannel();
+        log.info("BGP connected from {}", channel.getRemoteAddress());
+
+        address = channel.getRemoteAddress();
+        if (!(address instanceof InetSocketAddress)) {
+            throw new IOException("Invalid peer connection.");
+        }
+
+        // Connection should establish only if local ip and Autonomous system number is configured.
+        if (bgpconfig.getState() != BGPCfg.State.IP_AS_CONFIGURED) {
+            channel.close();
+            log.info("BGP local AS and router ID not configured");
+            return;
+        }
+
+        inetAddress = (InetSocketAddress) address;
+        ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+        peerAddr = ipAddress.toString();
+
+        // if peer is not configured disconnect session
+        if (!bgpconfig.isPeerConfigured(peerAddr)) {
+            log.debug("Peer is not configured {}", peerAddr);
+            channel.close();
+            return;
+        }
+
+        // if connection is already established close channel
+        if (peerManager.isPeerConnected(peerAddr)) {
+            log.debug("Duplicate connection received, peer {}", peerAddr);
+            channel.close();
+            return;
+        }
+
+        if (null != channel.getPipeline().get("PassiveHandler")) {
+            log.info("BGP handle connection request from peer");
+            // Wait for open message from bgp peer
+            setState(ChannelState.OPENWAIT);
+        } else if (null != channel.getPipeline().get("ActiveHandler")) {
+            log.info("BGP handle connection response from peer");
+
+            sendHandshakeOpenMessage();
+            bgpPacketStats.addOutPacket();
+            setState(ChannelState.OPENSENT);
+            bgpconfig.setPeerConnState(peerAddr, BGPPeerCfg.State.OPENSENT);
+        }
+    }
+
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+        channel = e.getChannel();
+        log.info("BGP disconnected callback for bgp:{}. Cleaning up ...", getPeerInfoString());
+
+        address = channel.getRemoteAddress();
+        if (!(address instanceof InetSocketAddress)) {
+            throw new IOException("Invalid peer connection.");
+        }
+
+        inetAddress = (InetSocketAddress) address;
+        ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+        peerAddr = ipAddress.toString();
+
+        if (thisbgpId != null) {
+            if (!duplicateBGPIdFound) {
+                // if the disconnected peer (on this ChannelHandler)
+                // was not one with a duplicate, it is safe to remove all
+                // state for it at the controller. Notice that if the disconnected
+                // peer was a duplicate-ip, calling the method below would clear
+                // all state for the original peer (with the same ip),
+                // which we obviously don't want.
+                log.debug("{}:removal called", getPeerInfoString());
+                if (bgpPeer != null) {
+                    peerManager.removeConnectedPeer(thisbgpId);
+                }
+            } else {
+                // A duplicate was disconnected on this ChannelHandler,
+                // this is the same peer reconnecting, but the original state was
+                // not cleaned up - XXX check liveness of original ChannelHandler
+                log.debug("{}:duplicate found", getPeerInfoString());
+                duplicateBGPIdFound = Boolean.FALSE;
+            }
+
+            if (null != keepAliveTimer) {
+                keepAliveTimer.getKeepAliveTimer().cancel();
+            }
+        } else {
+            log.warn("No bgp ip in channelHandler registered for " + "disconnected peer {}", getPeerInfoString());
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+
+        log.info("[exceptionCaught]: " + e.toString());
+
+        if (e.getCause() instanceof ReadTimeoutException) {
+            if ((ChannelState.OPENWAIT == state) || (ChannelState.OPENSENT == state)) {
+
+                // When ReadTimeout timer is expired in OPENWAIT/OPENSENT state, it is considered
+                // TODO: Send notification
+                channel.close();
+                state = ChannelState.IDLE;
+                return;
+            } else if (ChannelState.OPENCONFIRM == state) {
+
+                // When ReadTimeout timer is expired in OPENCONFIRM state.
+                // TODO: Send Notification
+                channel.close();
+                state = ChannelState.IDLE;
+                return;
+            }
+        } else if (e.getCause() instanceof ClosedChannelException) {
+            log.debug("Channel for bgp {} already closed", getPeerInfoString());
+        } else if (e.getCause() instanceof IOException) {
+            log.error("Disconnecting peer {} due to IO Error: {}", getPeerInfoString(), e.getCause().getMessage());
+            if (log.isDebugEnabled()) {
+                // still print stack trace if debug is enabled
+                log.debug("StackTrace for previous Exception: ", e.getCause());
+            }
+            channel.close();
+        } else if (e.getCause() instanceof BGPParseException) {
+            // TODO: SEND NOTIFICATION
+            log.debug("BGP Parse Exception: ", e.getCause());
+        } else if (e.getCause() instanceof RejectedExecutionException) {
+            log.warn("Could not process message: queue full");
+        } else {
+            log.error("Error while processing message from peer " + getPeerInfoString() + "state " + this.state);
+            channel.close();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getPeerInfoString();
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        if (e.getMessage() instanceof List) {
+            @SuppressWarnings("Unchecked")
+            List<BGPMessage> msglist = (List<BGPMessage>) e.getMessage();
+            for (BGPMessage pm : msglist) {
+                // Do the actual packet processing
+                state.processBGPMessage(this, pm);
+            }
+        } else {
+            state.processBGPMessage(this, (BGPMessage) e.getMessage());
+        }
+    }
+
+    // *************************
+    // Channel utility methods
+    // *************************
+    /**
+     * Set handshake status.
+     *
+     * @param handshakeComplete handshake complete status
+     */
+    public void setHandshakeComplete(boolean handshakeComplete) {
+        this.state.setHandshakeComplete(handshakeComplete);
+    }
+
+    /**
+     * Is this a state in which the handshake has completed?
+     *
+     * @return true if the handshake is complete
+     */
+    public boolean isHandshakeComplete() {
+        return state.isHandshakeComplete();
+    }
+
+    /**
+     * To handle the BGP message.
+     *
+     * @param m BGP message
+     */
+    private void dispatchMessage(BGPMessage m) throws BGPParseException {
+        bgpPacketStats.addInPacket();
+        bgpControllerImpl.processBGPPacket(thisbgpId, m);
+    }
+
+    /**
+     * Return a string describing this peer based on the already available information (ip address and/or remote
+     * socket).
+     *
+     * @return display string
+     */
+    private String getPeerInfoString() {
+        if (bgpPeer != null) {
+            return bgpPeer.toString();
+        }
+        String channelString;
+        if (channel == null || channel.getRemoteAddress() == null) {
+            channelString = "?";
+        } else {
+            channelString = channel.getRemoteAddress().toString();
+        }
+        String bgpIpString;
+        // TODO: implement functionality to get bgp id string
+        bgpIpString = "?";
+        return String.format("[%s BGP-IP[%s]]", channelString, bgpIpString);
+    }
+
+    /**
+     * Update the channels state. Only called from the state machine. TODO: enforce restricted state transitions
+     *
+     * @param state
+     */
+    private void setState(ChannelState state) {
+        this.state = state;
+    }
+
+    /**
+     * get packet statistics.
+     *
+     * @return packet statistics
+     */
+    public BGPPacketStatsImpl getBgpPacketStats() {
+        return bgpPacketStats;
+    }
+
+    /**
+     * Send handshake open message to the peer.
+     *
+     * @throws IOException ,BGPParseException
+     */
+    private void sendHandshakeOpenMessage() throws IOException, BGPParseException {
+        // TODO: send open message.
+
+    }
+
+    /**
+     * Send keep alive message.
+     *
+     * @throws IOException when channel is disconnected
+     * @throws BGPParseException while building keep alive message
+     */
+    synchronized void sendKeepAliveMessage() throws IOException, BGPParseException {
+
+        // TODO: send keep alive message.
+    }
+
+    /**
+     * Send notification and close channel with peer.
+     */
+    private void sendErrNotificationAndCloseChannel() {
+        // TODO: send notification
+        channel.close();
+    }
+
+    /**
+     * Process unknown BGP message received.
+     *
+     * @throws BGPParseException when received invalid message
+     */
+    public void processUnknownMsg() throws BGPParseException {
+        log.debug("UNKNOWN message received");
+        Date now = null;
+        if (bgpPacketStats.wrongPacketCount() == 0) {
+            now = new Date();
+            bgpPacketStats.setTime(now.getTime());
+            bgpPacketStats.addWrongPacket();
+            sendErrNotificationAndCloseChannel();
+        }
+        if (bgpPacketStats.wrongPacketCount() > 1) {
+            Date lastest = new Date();
+            bgpPacketStats.addWrongPacket();
+            // converting to seconds
+            if (((lastest.getTime() - bgpPacketStats.getTime()) / 1000) > 60) {
+                now = lastest;
+                bgpPacketStats.setTime(now.getTime());
+                bgpPacketStats.resetWrongPacket();
+                bgpPacketStats.addWrongPacket();
+            } else if (((int) (lastest.getTime() - now.getTime()) / 1000) < 60) {
+                if (MAX_WRONG_COUNT_PACKET <= bgpPacketStats.wrongPacketCount()) {
+                    // reset once wrong packet count reaches MAX_WRONG_COUNT_PACKET
+                    bgpPacketStats.resetWrongPacket();
+                    // max wrong packets received send error message and close the session
+                    sendErrNotificationAndCloseChannel();
+                }
+            }
+        }
+    }
+
+    /**
+     * Open message validation.
+     *
+     * @param h channel handler
+     * @return true if validation succeed, otherwise false
+     * @throws BGPParseException when received invalid message
+     */
+    public boolean openMsgValidation(BGPChannelHandler h) throws BGPParseException {
+        // TODO: Open message validation.
+        return true;
+    }
+}