zoukankan      html  css  js  c++  java
  • Kafka简单使用

    package com.hgc.center.accounts.test;

    import java.util.Collections;
    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    public class KafkaUtils {


    /**
    * 生产者 发送消息
    * @param topic 通道名称
    * @param kafkaUrl kafka地址,多个地址用逗号隔开 127.0.1.2:9002,127.0.1.3:9002,
    * @param msg 发送的信息
    */
    public static void send(String topic,String kafkaUrl,String msg) {
    Properties p = new Properties();
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
    try {
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
    kafkaProducer.send(record);
    System.out.println("消息发送成功:" + msg);
    } finally {
    kafkaProducer.close();
    }
    }

    /**
    * 消费者代码
    * @param topic
    * @param kafkaUrl
    */
    public static void Consumer(String topic,String kafkaUrl) {
    Properties p = new Properties();
    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    p.put(ConsumerConfig.GROUP_ID_CONFIG,topic);
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
    kafkaConsumer.subscribe(Collections.singletonList(topic));// 订阅消息
    while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
    record.topic(), record.offset(), record.value()));
    }
    }
    }

    }

    //POM包

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
    </dependency>

  • 相关阅读:
    关于HTTP以及TCP
    .NetCore表单提交文件
    C# Out变量
    .NET Core 网络数据采集 -- 使用AngleSharp做html解析
    C# 根据Url下载文件/获取文件流
    C# 模拟表单提交
    C# 获取Url路径的参数信息
    C# 采集页面数据
    .net core 3.1 设置可跨域
    C# json字符串转化成Dictionary
  • 原文地址:https://www.cnblogs.com/Sora-L/p/11737132.html
Copyright © 2011-2022 走看看