zoukankan      html  css  js  c++  java
  • kafka发送消息的三种方式

    package com.zl.kafkademo;
     
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.quartz.*;
    import org.quartz.impl.StdSchedulerFactory;
     
    import java.util.Properties;
     
    /**
     * @Auther: le
     * @Date: 2019/4/23 22:05
     * @Description:
     */
    public class MyProducer implements Job {
        private static KafkaProducer<String,String> producer;
     
        static {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","127.0.0.1:9092");
            properties.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<String, String>(properties);
        }
     
        /**
         * 第一种直接发送,不管结果
         */
        private static void sendMessageForgetResult(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","Forget_result"
            );
            producer.send(record);
            producer.close();
        }
     
        /**
         * 第二种同步发送,等待执行结果
         * @return
         * @throws Exception
         */
        private static RecordMetadata sendMessageSync() throws Exception{
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","sync"
            );
            RecordMetadata result = producer.send(record).get();
            System.out.println(result.topic());
            System.out.println(result.partition());
            System.out.println(result.offset());
            return result;
        }
     
        /**
         * 第三种执行回调函数
         */
        private static void sendMessageCallback(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","callback"
            );
            producer.send(record,new MyProducerCallback());
        }
     
        //定时任务
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            try {
                sendMessageSync();
            }catch (Exception e){
                System.out.println("error:"+e);
            }
     
        }
     
        private static class MyProducerCallback implements Callback{
     
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e !=null){
                    e.printStackTrace();
                    return;
                }
                System.out.println(recordMetadata.topic());
                System.out.println(recordMetadata.partition());
                System.out.println(recordMetadata.offset());
                System.out.println("Coming in MyProducerCallback");
            }
        }
     
     
        public static void main(String[] args){
            //sendMessageForgetResult();
            //sendMessageCallback();
            JobDetail job = JobBuilder.newJob(MyProducer.class).build();
     
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
     
            try {
                Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
                scheduler.scheduleJob(job,trigger);
                scheduler.start();
            }catch (SchedulerException e){
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
     
        }
     
     
    }

    需要引入文件:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.1</version>
            </dependency>
     
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.3.0</version>
            </dependency>
  • 相关阅读:
    JAVA编程心得-JAVA实现CRC-CCITT(XMODEM)算法
    自学PHP 环境搭建
    Postfix+Amavisd-new+Spamassassin+ClamAV整合安装
    安装Apache Felix OSGI Framework小记
    C#多线程
    使用maven进行测试设置断点调试的方法
    2016第33周四
    Spring配置文件头及xsd文件版本
    2016第33周二
    web中的重定向与转发
  • 原文地址:https://www.cnblogs.com/yoyowin/p/13402920.html
Copyright © 2011-2022 走看看