/**
 *    Copyright 2011, Big Switch Networks, Inc.
 *    Originally created by David Erickson, Stanford University
 *
 *    Licensed under the Apache License, Version 2.0 (the "License"); you may
 *    not use this file except in compliance with the License. You may obtain
 *    a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 *    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 *    License for the specific language governing permissions and limitations
 *    under the License.
 **/

package net.floodlightcontroller.core;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFType;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.ArrayList;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;

import net.floodlightcontroller.core.annotations.LogMessageCategory;
import net.floodlightcontroller.core.annotations.LogMessageDoc;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packetstreamer.thrift.*;
import net.floodlightcontroller.threadpool.IThreadPoolService;

@LogMessageCategory("OpenFlow Message Tracing")
public class OFMessageFilterManager 
        implements IOFMessageListener, IFloodlightModule, IOFMessageFilterManagerService {

    /**
     * @author Srini
     */
    protected static Logger log = LoggerFactory.getLogger(OFMessageFilterManager.class);

    // The port and client reference for packet streaming
    protected int serverPort = 9090;
    protected final int MaxRetry = 1;
    protected static TTransport transport = null;
    protected static PacketStreamer.Client packetClient = null;

    protected IFloodlightProviderService floodlightProvider = null;
    protected IThreadPoolService threadPool = null;
    // filter List is a key value pair.  Key is the session id, 
    // value is the filter rules.
    protected ConcurrentHashMap<String, 
                                ConcurrentHashMap<String,
                                                  String>> filterMap = null;
    protected ConcurrentHashMap<String, Long> filterTimeoutMap = null;
    protected Timer timer = null;

    protected int MAX_FILTERS=5;
    protected long MAX_FILTER_TIME= 300000; // maximum filter time is 5 minutes.
    protected int TIMER_INTERVAL = 1000;  // 1 second time interval.

    public static final String SUCCESS                     = "0";
    public static final String FILTER_SETUP_FAILED         = "-1001"; 
    public static final String FILTER_NOT_FOUND            = "-1002";
    public static final String FILTER_LIMIT_REACHED        = "-1003";
    public static final String FILTER_SESSION_ID_NOT_FOUND = "-1004";
    public static final String SERVICE_UNAVAILABLE         = "-1005";

    public enum FilterResult {
        /*
         * FILTER_NOT_DEFINED: Filter is not defined
         * FILTER_NO_MATCH:    Filter is defined and the packet doesn't 
         *                     match the filter
         * FILTER_MATCH:       Filter is defined and the packet matches
         *                     the filter
         */
        FILTER_NOT_DEFINED, FILTER_NO_MATCH, FILTER_MATCH
    }

    protected String addFilter(ConcurrentHashMap<String,String> f, long delta) {

        // Create unique session ID.  
        int prime = 33791;
        String s = null;
        int i;

        if ((filterMap == null) || (filterTimeoutMap == null))
            return  String.format("%d", FILTER_SETUP_FAILED);

        for (i=0; i<MAX_FILTERS; ++i) {
            Integer x = prime + i;
            s = String.format("%d", x.hashCode());
            // implies you can use this key for session id.    
            if (!filterMap.containsKey(s)) break; 
        }

        if (i==MAX_FILTERS) {
            return FILTER_LIMIT_REACHED;
        }

        filterMap.put(s, f);
        if (filterTimeoutMap.containsKey(s))  filterTimeoutMap.remove(s);
        filterTimeoutMap.put(s, delta);

        // set the timer as there will be no existing timers. 
        if (filterMap.size() == 1) { 
            TimeoutFilterTask task = new TimeoutFilterTask(this);
            Timer timer = new Timer();
            timer.schedule (task, TIMER_INTERVAL);                
            // Keep the listeners to avoid race condition
            //startListening();
        }   
        return s;  // the return string is the session ID.
    }

    public String setupFilter(String sid, 
                              ConcurrentHashMap<String,String> f, 
                              int deltaInMilliSeconds) {

        if (sid == null) {
            // Delta in filter needs to be milliseconds
            log.debug("Adding new filter: {} for {} ms", f, deltaInMilliSeconds);
            return addFilter(f, deltaInMilliSeconds);
        } else {// this is the session id.
            // we will ignore the hash map features.
            if (deltaInMilliSeconds > 0)  
                return refreshFilter(sid, deltaInMilliSeconds);
            else 
                return deleteFilter(sid);
        }
    }

    public int timeoutFilters() {                
        Iterator<String> i = filterTimeoutMap.keySet().iterator();

        while(i.hasNext()) {
            String s = i.next();

            Long t = filterTimeoutMap.get(s);
            if (t != null) {
                i.remove();
                t -= TIMER_INTERVAL;
                if (t > 0) {
                    filterTimeoutMap.put(s, t);
                } else deleteFilter(s);
            } else deleteFilter(s);
        }
        return filterMap.size();
    }

    protected String refreshFilter(String s, int delta) {
        Long t = filterTimeoutMap.get(s);
        if (t != null) {
            filterTimeoutMap.remove(s);
            t += delta;  // time is in milliseconds
            if (t > MAX_FILTER_TIME) t = MAX_FILTER_TIME;
            filterTimeoutMap.put(s, t);
            return SUCCESS;
        } else return FILTER_SESSION_ID_NOT_FOUND;
    }

    @LogMessageDoc(level="ERROR",
                   message="Error while terminating packet " +
                           "filter session",
                   explanation="An unknown error occurred while terminating " +
                   		"a packet filter session.",
                   recommendation=LogMessageDoc.GENERIC_ACTION)
    protected String deleteFilter(String sessionId) {

        if (filterMap.containsKey(sessionId)) {
            filterMap.remove(sessionId);
            try {
                if (packetClient != null)
                    packetClient.terminateSession(sessionId);
            } catch (TException e) {
                log.error("Error while terminating packet " +
                		  "filter session", e);
            }
            log.debug("Deleted Filter {}.  # of filters" +
            		 " remaining: {}", sessionId, filterMap.size());
            return SUCCESS;
        } else return FILTER_SESSION_ID_NOT_FOUND;
    }

    public HashSet<String> getMatchedFilters(OFMessage m, FloodlightContext cntx) {  

        HashSet<String> matchedFilters = new HashSet<String>();

        // This default function is written to match on packet ins and 
        // packet outs.
        Ethernet eth = null;

        if (m.getType() == OFType.PACKET_IN) {
            eth = IFloodlightProviderService.bcStore.get(cntx, 
                    IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        } else if (m.getType() == OFType.PACKET_OUT) {
            eth = new Ethernet();
            OFPacketOut p = (OFPacketOut) m;
            
            // No MAC match if packetOut doesn't have the packet.
            if (p.getPacketData() == null) return null;
            
            eth.deserialize(p.getPacketData(), 0, p.getPacketData().length);
        } else if (m.getType() == OFType.FLOW_MOD) {
            // flow-mod can't be matched by mac.
            return null;
        }

        if (eth == null) return null;

        Iterator<String> filterIt = filterMap.keySet().iterator();
        while (filterIt.hasNext()) {   // for every filter
            boolean filterMatch = false;
            String filterSessionId = filterIt.next();
            Map<String,String> filter = filterMap.get(filterSessionId);

            // If the filter has empty fields, then it is not considered as a match.
            if (filter == null || filter.isEmpty()) continue;                  
            Iterator<String> fieldIt = filter.keySet().iterator();
            while (fieldIt.hasNext()) {   
                String filterFieldType = fieldIt.next();
                String filterFieldValue = filter.get(filterFieldType);
                if (filterFieldType.equals("mac")) {

                    String srcMac = HexString.toHexString(eth.getSourceMACAddress());
                    String dstMac = HexString.toHexString(eth.getDestinationMACAddress());
                    log.debug("srcMac: {}, dstMac: {}", srcMac, dstMac);

                    if (filterFieldValue.equals(srcMac) || 
                            filterFieldValue.equals(dstMac)){
                        filterMatch = true; 
                    } else {
                        filterMatch = false;
                        break;
                    }
                }
            }
            if (filterMatch) {
                matchedFilters.add(filterSessionId);
            }
        }

        if (matchedFilters.isEmpty())
            return null;    
        else 
            return matchedFilters;
    }
    
    @LogMessageDoc(level="ERROR",
                   message="Failed to establish connection with the " +
                           "packetstreamer server.",
                   explanation="The message tracing server is not running " +
                   		"or otherwise unavailable.",
                   recommendation=LogMessageDoc.CHECK_CONTROLLER)
    public boolean connectToPSServer() {
        int numRetries = 0;
        if (transport != null && transport.isOpen()) {
            return true;
        }

        while (numRetries++ < MaxRetry) {
            try {
                transport = new TFramedTransport(new TSocket("localhost", 
                                                             serverPort));
                transport.open();

                TProtocol protocol = new  TBinaryProtocol(transport);
                packetClient = new PacketStreamer.Client(protocol);

                log.debug("Have a connection to packetstreamer server " +
                		  "localhost:{}", serverPort);
                break;
            } catch (TException x) {
                try {
                    // Wait for 1 second before retry
                    if (numRetries < MaxRetry) {
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {}
            } 
        }

        if (numRetries > MaxRetry) {
            log.error("Failed to establish connection with the " +
            		  "packetstreamer server.");
            return false;
        }
        return true;
    }

    public void disconnectFromPSServer() {
        if (transport != null && transport.isOpen()) {
            log.debug("Close the connection to packetstreamer server" +
            		  " localhost:{}", serverPort);
            transport.close();
        }
    }

    @Override
    public String getName() {
        return "messageFilterManager";
    }

    @Override
    public boolean isCallbackOrderingPrereq(OFType type, String name) {
        return (type == OFType.PACKET_IN && name.equals("devicemanager"));
    }

    @Override
    public boolean isCallbackOrderingPostreq(OFType type, String name) {
        return (type == OFType.PACKET_IN && name.equals("learningswitch"));
    }

    @Override
    @LogMessageDoc(level="ERROR",
                   message="Error while sending packet",
                   explanation="Failed to send a message to the message " +
                   		"tracing server",
                   recommendation=LogMessageDoc.CHECK_CONTROLLER)
    public Command receive(IOFSwitch sw, OFMessage msg, 
                           FloodlightContext cntx) {

        if (filterMap == null || filterMap.isEmpty()) return Command.CONTINUE;

        HashSet<String> matchedFilters = null;
        if (log.isDebugEnabled()) {
            log.debug("Received packet {} from switch {}", 
                      msg, sw.getStringId());
        }

        matchedFilters = getMatchedFilters(msg, cntx);
        if (matchedFilters == null) {
            return Command.CONTINUE;
        } else {
            try {
                sendPacket(matchedFilters, sw, msg, cntx, true);
            } catch (Exception e) {
                log.error("Error while sending packet", e);
            }
        }
        
        return Command.CONTINUE;
    }


    public class TimeoutFilterTask extends TimerTask {

        OFMessageFilterManager filterManager;
        ScheduledExecutorService ses = threadPool.getScheduledExecutor();

        public TimeoutFilterTask(OFMessageFilterManager manager) {
            filterManager = manager;
        }

        public void run() {
            int x = filterManager.timeoutFilters();

            if (x > 0) {  // there's at least one filter still active.
                Timer timer = new Timer();
                timer.schedule(new TimeoutFilterTask(filterManager), 
                               TIMER_INTERVAL);
            } else {
                // Don't stop the listener to avoid race condition
                //stopListening();
            }
        }
    }

    public int getNumberOfFilters() {
        return filterMap.size();
    }

    public int getMaxFilterSize() {
        return MAX_FILTERS;
    }

    protected void sendPacket(HashSet<String> matchedFilters, IOFSwitch sw, 
            OFMessage msg, FloodlightContext cntx, boolean sync) 
                    throws TException {
        Message sendMsg = new Message();
        Packet packet = new Packet();
        ChannelBuffer bb;
        sendMsg.setPacket(packet);

        List<String> sids = new ArrayList<String>(matchedFilters);

        sendMsg.setSessionIDs(sids);
        packet.setMessageType(OFMessageType.findByValue((msg.getType().ordinal())));

        switch (msg.getType()) {
            case PACKET_IN:
                OFPacketIn pktIn = (OFPacketIn)msg;
                packet.setSwPortTuple(new SwitchPortTuple(sw.getId(), 
                                                          pktIn.getInPort()));
                bb = ChannelBuffers.buffer(pktIn.getLength());
                pktIn.writeTo(bb);
                packet.setData(OFMessage.getData(sw, msg, cntx));
                break;
            case PACKET_OUT:
                OFPacketOut pktOut = (OFPacketOut)msg;
                packet.setSwPortTuple(new SwitchPortTuple(sw.getId(), 
                                                          pktOut.getInPort()));
                bb = ChannelBuffers.buffer(pktOut.getLength());
                pktOut.writeTo(bb);
                packet.setData(OFMessage.getData(sw, msg, cntx));
                break;
            case FLOW_MOD:
                OFFlowMod offlowMod = (OFFlowMod)msg;
                packet.setSwPortTuple(new SwitchPortTuple(sw.getId(), 
                                                          offlowMod.
                                                          getOutPort()));
                bb = ChannelBuffers.buffer(offlowMod.getLength());
                offlowMod.writeTo(bb);
                packet.setData(OFMessage.getData(sw, msg, cntx));
                break;
            default:
                packet.setSwPortTuple(new SwitchPortTuple(sw.getId(), 
                                                          (short)0));
                String strData = "Unknown packet";
                packet.setData(strData.getBytes());
                break;
        }

        try {
            if (transport == null || 
                !transport.isOpen() || 
                packetClient == null) {
                if (!connectToPSServer()) {
                    // No need to sendPacket if can't make connection to 
                    // the server
                    return;
                }
            }
            if (sync) {
                log.debug("Send packet sync: {}", packet.toString());
                packetClient.pushMessageSync(sendMsg);
            } else {
                log.debug("Send packet sync: ", packet.toString());
                packetClient.pushMessageAsync(sendMsg);
            }
        } catch (Exception e) {
            log.error("Error while sending packet", e);
            disconnectFromPSServer();
            connectToPSServer();
        }
    }

    // IFloodlightModule methods
    
    @Override
    public Collection<Class<? extends IFloodlightService>> getModuleServices() {
        Collection<Class<? extends IFloodlightService>> l = 
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(IOFMessageFilterManagerService.class);
        return l;
    }

    @Override
    public Map<Class<? extends IFloodlightService>, IFloodlightService>
            getServiceImpls() {
        Map<Class<? extends IFloodlightService>,
        IFloodlightService> m = 
            new HashMap<Class<? extends IFloodlightService>,
                        IFloodlightService>();
        // We are the class that implements the service
        m.put(IOFMessageFilterManagerService.class, this);
        return m;
    }

    @Override
    public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
        Collection<Class<? extends IFloodlightService>> l = 
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(IFloodlightProviderService.class);
        l.add(IThreadPoolService.class);
        return l;
    }

    @Override
    public void init(FloodlightModuleContext context) 
            throws FloodlightModuleException {
        this.floodlightProvider = 
                context.getServiceImpl(IFloodlightProviderService.class);
        this.threadPool =
                context.getServiceImpl(IThreadPoolService.class);
    }

    @Override
    public void startUp(FloodlightModuleContext context) {
        // This is our 'constructor'
        
        filterMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,String>>();
        filterTimeoutMap = new ConcurrentHashMap<String, Long>();
        serverPort = 
                Integer.parseInt(System.getProperty("net.floodlightcontroller." +
                		"packetstreamer.port", "9090"));
        
        floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
        floodlightProvider.addOFMessageListener(OFType.PACKET_OUT, this);
        floodlightProvider.addOFMessageListener(OFType.FLOW_MOD, this);
    }
}
