zoukankan      html  css  js  c++  java
  • 大数据离线分析平台 JavaSDK数据收集引擎编写

    JavaSDK设计规则

     

     JavaSDK提供两个事件触发方法,分别为onChargeSuccess和onChargeRefund。我们在java sdk中通过一个单独的线程来发送线程数据,这样可以减少对业务系统的延时性。

    SDK测试

      启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。

    命令:

       start-dfs.sh: 启动hdfs命令

       su root:切换用户

       service nginx restart: 启动nginx进程

       启动flume进程:

           进入flume安装根目录,执行命令:


    flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &


     工程目录结构

    AnalyticsEngineSDK如下:
    package com.kk.ae.sdk;
    
    import java.io.UnsupportedEncodingException;
    import java.net.URLEncoder;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /*
     * 分析引擎sdk java服务器数据收集
     * */
    public class AnalyticsEngineSDK {
        
        //日志记录对象
        private static final Logger log=Logger.getGlobal();
        //请求url的主体部分
        public static final String accessUrl="http://hadoop-001:8090/kkImg.gif";
        public static final String platformName="java_server";
        public static final String sdkName="jdk";
        private static final String version = "1";
        /**
         * 触发订单支付成功事件,发送事件数据到服务器
         * 
         * @param orderId
         *            订单支付id
         * @param memberIdd
         *            订单支付会员id
         * @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败).
         * @throws InterruptedException 
         */
        public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException {
            
            if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
                Map<String, String> map=new HashMap<String,String>();
                map.put("u_mid", memberId);
                map.put("oid", orderId);
                map.put("c_time", String.valueOf(System.currentTimeMillis()));
                map.put("ver", version);
                map.put("en", "e_cs");
                map.put("p1", platformName);
                map.put("sdk", sdkName);
                
                //创建url
                String url= buildUrl(map);
                // 发送url&将url加入到队列
                SendDataMonitor.addSendUrl(url);
                System.out.println(url);
                return true;
            } else {
                log.log(Level.WARNING, "订单id和会员id不能为空");
                return false;
            }
            
        }
        /**
         * 触发订单退款事件,发送退款数据到服务器
         * 
         * @param orderId
         *            退款订单id
         * @param memberIdd
         *            退款会员id
         * @return 如果发送数据成功,返回true。否则返回false。
         * @throws InterruptedException 
         */
        public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException {
            if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
                Map<String, String> map=new HashMap<String,String>();
                map.put("u_mid", memberId);
                map.put("oid", orderId);
                map.put("c_time", String.valueOf(System.currentTimeMillis()));
                map.put("ver", version);
                map.put("en", "e_cr");
                map.put("p1", platformName);
                map.put("sdk", sdkName);
                
                //创建url
                String url= buildUrl(map);
                // 发送url&将url加入到队列
                SendDataMonitor.addSendUrl(url);
                System.out.println(url);
                return true;
            } else {
                log.log(Level.WARNING, "订单id和会员id不能为空");
                return false;
            }
            
        }
        private static String buildUrl(Map<String, String> map) {
            
            StringBuffer stringBuffer=new StringBuffer();
            stringBuffer.append(accessUrl).append("?");
            for(Map.Entry<String, String> entry:map.entrySet()) {
                if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) {
                    {
                        try {
                            stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&");
                        } catch (UnsupportedEncodingException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                    }
                }
                
            }
            return stringBuffer.substring(0, stringBuffer.length() - 1);
        }
            
        
        
    }
    SendDataMonitor 如下:
    package com.kk.ae.sdk;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.MalformedURLException;
    import java.net.ProtocolException;
    import java.net.URL;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * 发送url数据的监控者,用于启动一个单独的线程来发送数据
     * 
     * @author gerry
     *
     */
    public class SendDataMonitor {
        //收集日志
        public static final Logger log=Logger.getGlobal();
        // 队列,用户存储发送url
        public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
        //用于单例的一个类对象
        private static SendDataMonitor monitor=null;
        
        private SendDataMonitor() {
            // 私有构造方法,进行单列模式的创建
        }
        
        
        public static SendDataMonitor getMonitor() {
            if (monitor==null) {
                synchronized (SendDataMonitor.class) {
                    if (monitor==null) {
                        monitor=new SendDataMonitor();
                        Thread thread=new Thread(new Runnable() {
                            
                            @Override
                            public void run() {
                            // TODO Auto-generated method stub
                            SendDataMonitor.monitor.run();    
                            
                            }
                        });
                        thread.start();
                    }    
                }
            } 
            return monitor;
        }
    
    
        protected void run() {
            while (true) {
                try {
                    String url=this.queue.take();
                    // 正式的发送url
                    HttpRequestUtil.sendData(url);
                } catch (Throwable e) {
                    log.log(Level.WARNING, "发送url异常", e);
                }    
            }
        }
    
    
        public static void setMonitor(SendDataMonitor monitor) {
            SendDataMonitor.monitor = monitor;
            
        }
    
    
        /**
         * 添加一个url到队列中去
         * 
         * @param url
         * @throws InterruptedException
         */
        public static void addSendUrl(String url) throws InterruptedException {
             getMonitor().queue.put(url);
        
        }
        /**
         * 内部类,用户发送数据的http工具类
         * 
         * @author gerry
         *
         */
        public static class HttpRequestUtil{
            /**
             * 具体发送url的方法
             * 
             * @param url
             * @throws IOException
             */
            public static void sendData(String url) throws IOException {
                HttpURLConnection con=null;
                BufferedReader bf=null;
                try {
                    URL obj=new URL(url);
                    con=(HttpURLConnection) obj.openConnection();
                    // 设置连接参数
                    con.setConnectTimeout(5000);//连接过期时间
                    con.setReadTimeout(5000);//读取数据过期时间
                    con.setRequestMethod("GET");//设置请求类型为get
                    System.out.println("发送url:" + url);
                    // 发送连接请求
                    bf=new BufferedReader(new InputStreamReader(con.getInputStream()));
                    
                } finally {
                    try {
                        if (bf!=null) {
                            bf.close();
                            
                        }
                    } catch (Throwable e) {
                        // TODO: handle exception
                        
                    }
                    try {
                        con.disconnect();
                    } catch (Throwable e) {
                        // TODO: handle exception
                    }
                }
            }
        
        }
    
    }

    测试类:

    package com.kk.ae.sdk;
    
    public class Test {
        
    public static void main(String[] args) {
        try {
            AnalyticsEngineSDK.chargeSuccess("order3516", "0958");
            AnalyticsEngineSDK.chargeRefund("kk3", "9009");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    }
  • 相关阅读:
    nginx学习编译安装(1)
    媒体查询
    web前端开发--超链接
    web前端开发--列表
    web前端开发--格式化文本与段落
    DIV与SPAN
    CSS基础
    表的创建
    数据库存储结构
    关系完整性约束
  • 原文地址:https://www.cnblogs.com/Transkai/p/10724020.html
Copyright © 2011-2022 走看看