zoukankan      html  css  js  c++  java
  • Kafka简单使用

    package com.hgc.center.accounts.test;

    import java.util.Collections;
    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    public class KafkaUtils {


    /**
    * 生产者 发送消息
    * @param topic 通道名称
    * @param kafkaUrl kafka地址,多个地址用逗号隔开 127.0.1.2:9002,127.0.1.3:9002,
    * @param msg 发送的信息
    */
    public static void send(String topic,String kafkaUrl,String msg) {
    Properties p = new Properties();
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
    try {
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
    kafkaProducer.send(record);
    System.out.println("消息发送成功:" + msg);
    } finally {
    kafkaProducer.close();
    }
    }

    /**
    * 消费者代码
    * @param topic
    * @param kafkaUrl
    */
    public static void Consumer(String topic,String kafkaUrl) {
    Properties p = new Properties();
    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    p.put(ConsumerConfig.GROUP_ID_CONFIG,topic);
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
    kafkaConsumer.subscribe(Collections.singletonList(topic));// 订阅消息
    while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
    record.topic(), record.offset(), record.value()));
    }
    }
    }

    }

    //POM包

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
    </dependency>

  • 相关阅读:
    Struts2多文件上传
    Struts2单文件上传
    java验证码
    spring-day01
    Spring MVC篇一、搭建Spring MVC框架
    连接oracle数据库
    spring 核心技术
    Spring的特点
    spring连接数据库
    oracle学习第六天
  • 原文地址:https://www.cnblogs.com/Sora-L/p/11737132.html
Copyright © 2011-2022 走看看