zoukankan      html  css  js  c++  java
  • [Kafka]

    根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer

    Kafka的Producer API主要提供下列三个方法:

      public void send(KeyedMessage<K,V> message) 发送单条数据到Kafka集群

      public void send(List<KeyedMessage<K,V>> messages) 发送多条数据(数据集)到Kafka集群

      public void close() 关闭Kafka连接资源

    使用Java语言实现Kafka的Consumer详见博客: Java 实现 High Level Consumer API 以及 Java实现LowerLevelConsumerAPI

    ======================================================================

    一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    
    /**
     * Created by gerry on 12/21.
     */
    public class JavaKafkaProducerPartitioner implements Partitioner {
    
        /**
         * 无参构造函数
         */
        public JavaKafkaProducerPartitioner() {
            this(new VerifiableProperties());
        }
    
        /**
         * 构造函数,必须给定
         *
         * @param properties 上下文
         */
        public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
            // nothings
        }
    
        @Override
        public int partition(Object key, int numPartitions) {
            int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
            return num % numPartitions;
        }
    }

    二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import org.apache.log4j.Logger;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * Created by gerry on 12/21.
     */
    public class JavaKafkaProducer {
        private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
        public static final String TOPIC_NAME = "test";
        public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
        public static final int chartsLength = charts.length;
    
    
        public static void main(String[] args) {
            String brokerList = "192.168.187.149:9092";
            brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
            brokerList = "192.168.187.146:9092";
            Properties props = new Properties();
            props.put("metadata.broker.list", brokerList);
            /**
             * 0表示不等待结果返回<br/>
             * 1表示等待至少有一个服务器返回数据接收标识<br/>
             * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
             * */
            props.put("request.required.acks", "0");
            /**
             * 内部发送数据是异步还是同步
             * sync:同步, 默认
             * async:异步
             */
            props.put("producer.type", "async");
            /**
             * 设置序列化的类
             * 可选:kafka.serializer.StringEncoder
             * 默认:kafka.serializer.DefaultEncoder
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /**
             * 设置分区类
             * 根据key进行数据分区
             * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
             * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
             */
            props.put("partitioner.class", "JavaKafkaProducerPartitioner");
    
            // 重试次数
            props.put("message.send.max.retries", "3");
    
            // 异步提交的时候(async),并发提交的记录数
            props.put("batch.num.messages", "200");
    
            // 设置缓冲区大小,默认10KB
            props.put("send.buffer.bytes", "102400");
    
            // 2. 构建Kafka Producer Configuration上下文
            ProducerConfig config = new ProducerConfig(props);
    
            // 3. 构建Producer对象
            final Producer<String, String> producer = new Producer<String, String>(config);
    
            // 4. 发送数据到服务器,并发线程发送
            final AtomicBoolean flag = new AtomicBoolean(true);
            int numThreads = 50;
            ExecutorService pool = Executors.newFixedThreadPool(numThreads);
            for (int i = 0; i < 5; i++) {
                pool.submit(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (flag.get()) {
                            // 发送数据
                            KeyedMessage message = generateKeyedMessage();
                            producer.send(message);
                            System.out.println("发送数据:" + message);
    
                            // 休眠一下
                            try {
                                int least = 10;
                                int bound = 100;
                                Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        System.out.println(Thread.currentThread().getName() + " shutdown....");
                    }
                }, "Thread-" + i));
    
            }
    
            // 5. 等待执行完成
            long sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag.set(false);
    
            // 6. 关闭资源
    
            pool.shutdown();
            try {
                pool.awaitTermination(6, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            } finally {
                producer.close(); // 最后之后调用
            }
        }
    
        /**
         * 产生一个消息
         *
         * @return
         */
        private static KeyedMessage<String, String> generateKeyedMessage() {
            String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
            StringBuilder sb = new StringBuilder();
            int num = ThreadLocalRandom.current().nextInt(1, 5);
            for (int i = 0; i < num; i++) {
                sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
            }
            String message = sb.toString().trim();
            return new KeyedMessage(TOPIC_NAME, key, message);
        }
    
        /**
         * 产生一个给定长度的字符串
         *
         * @param numItems
         * @return
         */
        private static String generateStringMessage(int numItems) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < numItems; i++) {
                sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
            }
            return sb.toString();
        }
    }

    三、Pom.xml依赖配置如下

    <properties>
        <kafka.version>0.8.2.1</kafka.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>
  • 相关阅读:
    ArrayList源码剖析
    Qt线程外使用Sleep
    malloc、calloc和realloc比较
    C++各大名库
    Qt 编译boost
    VC++ 设置控件显示文本的前景色、背景色以及字体
    std::map的操作:插入、修改、删除和遍历
    time.h文件中包含的几个函数使用时须注意事项
    赋值操作符和拷贝构造函数
    virtual析构函数的作用
  • 原文地址:https://www.cnblogs.com/liuming1992/p/6433055.html
Copyright © 2011-2022 走看看