zoukankan      html  css  js  c++  java
  • HttpServer发送数据到kafka

    文件夹

    1、需求

    2、框架结构图和步鄹图

    3、代码结构

    4、代码展现

    ———————————————————————-

    1、需求

    1.1、解析路径,将路径的最后一个字符串作为Appkey;
    1.2、数据缓存。当Kafka无法正常訪问时在本地Cache文件夹缓存数据;
    1.3、安全验证,对请求的appkey进行合法性验证(签名验证待定);
    1.4、自己主动更新appkey列表。每间隔一段时间获取一次最新的appkey列表;
    1.5、添加ip字段,给每份数据添加一个ip字段;
    1.6、记录日志,记录主要的统计信息日志。以及异常错误信息。

    2、框架结构图和步鄹图

    这里写图片描写叙述

    这里写图片描写叙述

    3、代码结构

    这里写图片描写叙述

    4、代码展现

    Configuration.java

    package com.donews.data;
    
    import com.typesafe.config.Config;
    import com.typesafe.config.ConfigFactory;
    
    /**
     * Created by yuhui on 16-6-23.
     */
    public class Configuration {
       public static  final Config conf= ConfigFactory.load();
    }

    Counter.java

    package com.donews.data;
    
    
    import io.vertx.core.Vertx;
    import io.vertx.core.logging.Logger;
    import io.vertx.core.logging.LoggerFactory;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * Created by yuhui on 16-6-22.
     */
    public class Counter {
        private Logger LOG = LoggerFactory.getLogger(Counter.class);
        AtomicLong messages = new AtomicLong(0L);
        AtomicLong bytes = new AtomicLong(0L);
        private long start = System.currentTimeMillis();
    
        private void reset() {
            messages.set(0L);
            bytes.set(0L);
            start = System.currentTimeMillis();
        }
    
        /***
         * 标记时间的方法
         二月 14, 2017 3:49:53 下午 com.donews.data.Counter
         信息: start Counter
         二月 14, 2017 3:49:54 下午 com.donews.data.Counter
         信息: start Counter
         二月 14, 2017 3:49:55 下午 com.donews.data.Counter
         信息: start Counter
         二月 14, 2017 3:49:56 下午 com.donews.data.Counter
         信息: start Counter
         * @param vertx
         */
        public void start(Vertx vertx) {
            LOG.info("start Counter");
            long delay = Configuration.conf.getDuration("server.counter.delay", TimeUnit.MILLISECONDS);
            vertx.setPeriodic(delay, h -> {
                long time = System.currentTimeMillis() - start;
                double rps = messages.get() * 1000.0 / time;
                double mbps = (bytes.get() * 1000.0 / 1024.0 / 1024.0) / time;
                Runtime runtime = Runtime.getRuntime();
                double totalMem = runtime.totalMemory() * 1.0 / 1024 / 1024;
                double maxMem = runtime.maxMemory() * 1.0 / 1024 / 1024;
                double freeMem = runtime.freeMemory() * 1.0 / 1024 / 1024;
                LOG.info("{0}:Message/S, {1}:MBytes/S", rps, mbps);
                LOG.info("totalMem:{0}MB maxMem:{1}MB freeMem:{2}MB", totalMem, maxMem, freeMem);
                reset();
            });
        }
    
    }
    

    KafkaHttpServer.java

    package com.donews.data;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import io.vertx.core.Vertx;
    import io.vertx.core.http.HttpServer;
    import io.vertx.core.http.HttpServerResponse;
    import io.vertx.core.json.JsonArray;
    import io.vertx.core.json.JsonObject;
    import io.vertx.core.logging.Logger;
    import io.vertx.core.logging.LoggerFactory;
    import io.vertx.ext.web.Router;
    import io.vertx.ext.web.RoutingContext;
    import io.vertx.ext.web.handler.BodyHandler;
    
    import java.io.*;
    import java.sql.*;
    import java.time.Instant;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class KafkaHttpServer {
        private static final Logger LOG = LoggerFactory.getLogger(KafkaHttpServer.class);
        private final Counter statistic = new Counter();
        private static final String DBDRIVER = "com.mysql.jdbc.Driver";
        private static final String URL = Configuration.conf.getString("mysql.url");
        private static final String USER = Configuration.conf.getString("mysql.user");
        private static final String PASSWORD = Configuration.conf.getString("mysql.password");
        private static HashSet<String> appkeys = new HashSet<>();
        private static boolean deleteFile = true;
    
        private void error(HttpServerResponse response, String message) {
            response.setStatusCode(500).end(new JsonObject()
                    .put("code", 3)
                    .put("msg", message)
                    .encode());
        }
    
        private void ok(HttpServerResponse response, String message) {
            response.putHeader("Access-Control-Allow-Origin", "*");
            response.setStatusCode(200).end(new JsonObject()
                    .put("code", 0)
                    .put("msg", message)
                    .encode());
        }
    
        private void startService(int port) {
            KafkaProducerWrapper sender = new KafkaProducerWrapper();
            Vertx vertx = Vertx.vertx();
            HttpServer server = vertx.createHttpServer();
            Router router = Router.router(vertx);
            router.route().handler(BodyHandler.create());
            //post请求:http://192.168.1.10:10002/mininfo/logs
            //router.route 这里是路由 ,/mininfo/logs相似于路由房间
            router.route("/mininfo/logs").handler(ctx -> {
                try {
                    JsonArray array = ctx.getBodyAsJsonArray();
                    String[] messages = new String[array.size()];
                    for (int i = 0; i < array.size(); i++) {
                        JsonObject message = array.getJsonObject(i);
                        message.put("ip", ctx.request().remoteAddress().host());
                        if (!message.containsKey("timestamp")) {
                            message.put("timestamp", Instant.now().toString());
                        }
                        messages[i] = array.getJsonObject(i).encode();
                    }
                    sendMessages(sender, ctx, "appstatistic_production", messages);
                } catch (Exception e) {
                    error(ctx.response(), e.getMessage());
                }
            });
            router.routeWithRegex("/mininfo/v1/logs/[^/]+").handler(routingContext -> {
                String path = routingContext.request().path();
                String topic = path.substring(path.lastIndexOf("/") + 1);
                LOG.info("如今处理的topic(appkey)为:" + topic);
                if (appkeys.contains(topic)) {
                    LOG.info("经过验证,该topic(appkey)有效");
                    String[] messages = routingContext.getBodyAsString().split("
    ");
                    //用于运行堵塞任务(有序运行和无序运行),默认顺序运行提交的堵塞任务
                    vertx.executeBlocking(future -> {
                        sendMessages(sender, routingContext, topic, messages);
                        future.complete();
                    }, result -> {
                    });
                } else {
                    LOG.info("您的topic(appkey)还没有配置,请在mysql中配置先");
                    error(routingContext.response(), "please configurate " + topic + "(appkey) in Mysql first! After 10mins it`ll take action");
                }
            });
            router.route("/mininfo/v1/ip").handler(ctx -> {
                LOG.info("x-real-for" + ctx.request().getHeader("x-real-for"));
                LOG.info("x-forwarded-for" + ctx.request().getHeader("x-forwarded-for"));
                ok(ctx.response(), ctx.request().getHeader("x-forwarded-for"));
            });
            router.route("/*").handler(ctx -> error(ctx.response(), "wrong! check your path..."));
            server.requestHandler(router::accept).listen(port, result -> {
                if (result.succeeded()) {
                    LOG.info("listen on port:{0}", String.valueOf(port));
                    this.statistic.start(vertx);
                } else {
                    LOG.error(result.cause());
                    vertx.close();
                }
            });
            //假设你须要在你的程序关闭前採取什么措施。那么关闭钩子(shutdown hook)是非常实用的,相似finally
            Runtime.getRuntime().addShutdownHook(new Thread(sender::close));
        }
    
        private void sendMessages(KafkaProducerWrapper sender, RoutingContext ctx, String topic, String[] messages) {
            AtomicInteger counter = new AtomicInteger(0);
            for (String message : messages) {
                if (message == null || "".equals(message)) {
                    ok(ctx.response(), "Success");
                    continue;
                }
                //将ip添加到数据的ip字段
                JSONObject jsonObject = JSON.parseObject(message);
                if (jsonObject.get("ip") == null) {
                    LOG.info("正在添加ip字段");
                    String ip;
                    String header = ctx.request().getHeader("x-forwarded-for");
                    if (!(header == null || header.trim().length() == 0 || header.trim().equals("null"))) {
                        ip = header.split(",")[0];
                    } else {
                        ip = ctx.request().remoteAddress().host();
                    }
                    jsonObject.put("ip", ip);
                    LOG.info("ip添加成功");
                }
                //topic, message, callback,以匿名函数的形式实现接口中的onCompletion函数
                sender.send(topic, jsonObject.toString(), (metadata, exception) -> {
                    if (exception != null) {
                        LOG.warn(exception);
                        String msg = new JsonObject()
                                .put("error", exception.getMessage())
                                .put("commit", counter.get())
                                .encode();
                        error(ctx.response(), msg);
                        cacheLocal(jsonObject.toString(), "/home/lihui/httpkafka/data_bak/" + topic + ".txt");
                        LOG.info("连接kafka失败,写入cache缓存文件夹以备份数据");
                    } else {
                        statistic.messages.incrementAndGet();  // Counter
                        statistic.bytes.addAndGet(message.length());
                        if (counter.incrementAndGet() == messages.length) {
                            ok(ctx.response(), "Success");
                        }
                    }
                });
            }
        }
    
        /**
         * 将发送到kafka失败的消息缓存到本地
         *
         * @param message   message
         * @param cachePath cachePath
         */
        private void cacheLocal(String message, String cachePath) {
            try {
                FileWriter fileWriter = new FileWriter(cachePath, true);
                BufferedWriter bw = new BufferedWriter(fileWriter);
                bw.write(message);
                bw.newLine();
                bw.flush();
                bw.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发送缓存数据到kafka,发送成功,删除缓存数据。失败过10分钟重试
         *
         * @param path 保存缓存数据的[文件夹]
         */
        private static void sendToKafka(String path) {
            String message;
            KafkaProducerWrapper sender = new KafkaProducerWrapper();
            File file = new File(path);
            if (file.isDirectory()) {
                String[] fileList = file.list();
                if (fileList != null && fileList.length != 0) {
                    LOG.info("正在将缓存文件夹中的备份数据发送到kafka中...");
                    for (String str : fileList) {
                        String topic = str.split("\.")[0];
                        try {
                            BufferedReader reader = new BufferedReader(new FileReader(path + str));
                            while ((message = reader.readLine()) != null) {
                                sender.send(topic, message, (metadata, exception) -> {
                                    if (metadata != null) {
                                        LOG.info("缓存的备份数据正在一条一条的插入kafka中");
                                    } else {
                                        //程序错误又一次运行
    //                                    exception.printStackTrace();
                                        LOG.error("kafka连接异常为:===> 10分钟后会自己主动重试," + exception.getMessage(), exception);
                                        deleteFile = false;
                                    }
                                });
                            }
                            if (deleteFile) {
                                LOG.info("開始删除已经插入到kafka中的缓存备份数据");
                                deleteFile(path, topic);
                                LOG.info("删除完成。");
                            }
                            reader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                } else {
                    LOG.info("缓存文件夹中没有备份文件");
                }
            }
        }
    
        private static void deleteFile(String path, String appkey) {
            String appkeyPath = path + "/" + appkey + ".txt";
            File file = new File(appkeyPath);
            file.delete();
            LOG.info("成功删除appkey为" + appkey + "的缓存数据");
        }
    
        private static Set<String> getAppkeys() {
            Set<String> appkeys = new HashSet<>();
            String sql = "select appkey from service_config_yarn_properties_table";
            try {
                Class.forName(DBDRIVER);
                Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
                PreparedStatement ps = conn.prepareStatement(sql);
                ResultSet rs = ps.executeQuery();
                while (rs.next()) {
                    appkeys.add(rs.getString(1));
                }
                rs.close();
                conn.close();
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            }
            return appkeys;
        }
    
        public static void main(String[] args) throws Exception {
            Timer timer = new Timer();
            //110十分钟检查cache文件夹是否有数据,2、同步数据库的APPKEY,做安全验证
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    appkeys.addAll(getAppkeys());
                    LOG.info("同步完数据库中的appkey(每隔十分钟)");
                    sendToKafka("/home/lihui/httpkafka/data_bak/");
    //              sendToKafka("C:\Dell\UpdatePackage\log");
                }
            }, 0L, 10 * 60 * 1000L);
    
            //主线程
            try {
                int port = Configuration.conf.getInt("server.port");
                KafkaHttpServer front = new KafkaHttpServer();
                front.startService(port);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    KafkaProducerWrapper.java

    package com.donews.data;
    
    import com.typesafe.config.Config;
    import io.vertx.core.logging.Logger;
    import io.vertx.core.logging.LoggerFactory;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    
    import java.util.Properties;
    
    /**
     * Created by yuhui on 16-6-22.
     *
     * kafka的生产。通过send方法()
     */
    public class KafkaProducerWrapper {
        private Logger LOG = LoggerFactory.getLogger(KafkaProducerWrapper.class);
        private KafkaProducer<String, String> producer = init();
    
        private KafkaProducer<String, String> init() {
            Config conf = Configuration.conf.getConfig("kafka");
            Properties props = new Properties();
            props.put("bootstrap.servers", conf.getString("bootstrap.servers"));
            props.put("acks", conf.getString("acks"));
            props.put("retries", conf.getInt("retries"));
            props.put("batch.size", conf.getInt("batch.size"));
            props.put("linger.ms", conf.getInt("linger.ms"));
            props.put("buffer.memory", conf.getLong("buffer.memory"));
            props.put("key.serializer", conf.getString("key.serializer"));
            props.put("value.serializer", conf.getString("value.serializer"));
            LOG.info("KafkaProducer Properties: {0}", props.toString());
            return new KafkaProducer<>(props);
        }
    
        public void send(String topic, String message, Callback callback) {
            producer.send(new ProducerRecord<>(topic, message), callback);
        }
    
        public void close() {
            producer.close();
            LOG.info("Kafka Producer Closed");
        }
    
        public static void main(String[] args) {
            //KafkaProducerWrapper sender=new KafkaProducerWrapper();
            //sender.producer.partitionsFor("xxxxx").forEach(System.out::println);
        }
    }
    

    application.conf

    server {
      port = 20000
      counter.delay = 30s
    }
    kafka {
      bootstrap.servers = "XXX"
      acks = all
      retries = 1
      batch.size = 1048576
      linger.ms = 1
      buffer.memory = 33554432
      key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
      value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
    }
    mysql {
      url = "jdbc:mysql://XXX/user_privileges"
      user = "XXX"
      password = "XXX"
    }

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.donews.data</groupId>
        <artifactId>kafkahttp</artifactId>
        <version>1.0-SNAPSHOT</version>
        <dependencies>
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-web</artifactId>
                <version>3.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.1</version>
            </dependency>
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>6.0.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.11</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.3.3</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    </project>

    假设您喜欢我写的博文。读后认为收获非常大,最好还是小额赞助我一下,让我有动力继续写出高质量的博文。感谢您的观赏!

    微信

    这里写图片描写叙述

  • 相关阅读:
    get 请求 请求参数超出请求链接最大数 解决办法
    请求被中止: 未能创建 SSL/TLS 安全通道 .NET
    <![CDATA[解析 XML 出错 (位置: /body): <unspecified file>(1): expected <]]> 微信支付统一下单
    js之new一个对象原理
    键盘事件keydown、keypress、keyup
    mongo的runCommand与集合操作函数的关系
    (3.2)狄泰软件学院C++课程学习剖析三
    PAL制式和NTSC制式的区别
    入门视频采集与处理(BT656简介)
    视频内同步与外同步有什么区别
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/7354505.html
Copyright © 2011-2022 走看看