zoukankan      html  css  js  c++  java
  • kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

    Kafka API 简单用法

    本篇会用到以下依赖:(本人包和这个不同,去maven里查找)

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.2.0</version>
    </dependency>

    package com.yjsj.kafka;
    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    public class OldProducer {
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put("metadata.broker.list", "master:9092");
    properties.put("request.required.acks", "1");
    properties.put("serializer.class",
    "kafka.serializer.StringEncoder");

    Producer<Integer, String> producer = new Producer<Integer,String>(new
    ProducerConfig(properties));
    KeyedMessage<Integer, String> message = new KeyedMessage<Integer,
    String>("first", "hello world");
    producer.send(message );
    }
    }

    package com.yjsj.kafka;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    public class NewProducer {
    public static void main(String[] args) {
    Properties props = new Properties();
    // Kafka 服务端的主机名和端口号
    props.put("bootstrap.servers", "master:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 请求延时
    props.put("linger.ms", 1);
    // 发送缓存区内存大小
    props.put("buffer.memory", 33554432);
    // key 序列化
    props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    // value 序列化
    props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 50; i++) {
    producer.send(new ProducerRecord<String, String>("first",

    Integer.toString(i), "hello world-" + i));
    }
    producer.close();
    }
    }

    for (int i = 0; i < 50; i++) {
    kafkaProducer.send(new ProducerRecord<String, String>("first",

    "hello" + i), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception
    exception) {
    if (metadata != null) {
    System.out.println(metadata.partition() + "---" +
    metadata.offset());
    }
    }
    });
    }
    kafkaProducer.close();
    }
    }

    package com.yjsj.kafka;
    import java.util.Map;
    import org.apache.kafka.clients.producer.Partitioner;

    import org.apache.kafka.common.Cluster;
    public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
    }
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object
    value, byte[] valueBytes, Cluster cluster) {
    // 控制分区
    return 0;
    }
    @Override
    public void close() {
    }
    }

    package com.yjsj.kafka;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    public class PartitionerProducer {
    public static void main(String[] args) {
    Properties props = new Properties();
    // Kafka 服务端的主机名和端口号
    props.put("bootstrap.servers", "node1:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 增加服务端请求延时
    props.put("linger.ms", 1);
    // 发送缓存区内存大小

    props.put("buffer.memory", 33554432);
    // key 序列化
    props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    // value 序列化
    props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    // 自定义分区
    props.put("partitioner.class", "com.yjsj.kafka.CustomPartitioner");
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("first", "1",
    "hadoop"));
    producer.close();
    }
    }

    package com.yjsj.kafka.consume;
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    public class CustomNewConsumer {

    public static void main(String[] args) {
    Properties props = new Properties();
    // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
    props.put("bootstrap.servers", "master:9092");
    // 制定 consumer group
    props.put("group.id", "test");
    // 是否自动确认 offset
    props.put("enable.auto.commit", "true");
    // 自动确认 offset 的时间间隔
    props.put("auto.commit.interval.ms", "1000");
    // key 的序列化类
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    // value 的序列化类
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
    // 定义 consumer
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 消费者订阅的 topic, 可同时订阅多个
    consumer.subscribe(Arrays.asList("first", "second","third"));
    while (true) {
    // 读取数据,读取超时时间为 100ms
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n",
    record.offset(), record.key(), record.value());
    }
    }
    }

    package com.yjsj.kafka.stream;
    import java.util.Properties;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    import org.apache.kafka.streams.processor.TopologyBuilder;
    public class Application {
    public static void main(String[] args) {
    // 定义输入的 topic
    String from = "first";
    // 定义输出的 topic
    String to = "second";
    // 设置参数
    Properties settings = new Properties();
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
    StreamsConfig config = new StreamsConfig(settings);
    // 构建拓扑

    TopologyBuilder builder = new TopologyBuilder();
    builder.addSource("SOURCE", from)
    .addProcessor("PROCESS", new ProcessorSupplier<byte[],
    byte[]>() {
    @Override
    public Processor<byte[], byte[]> get() {
    // 具体分析处理
    return new LogProcessor();
    }
    }, "SOURCE")
    .addSink("SINK", to, "PROCESS");
    // 创建 kafka stream
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
    }
    }

    具体业务处理

    package com.yjsj.kafka.stream;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorContext;
    public class LogProcessor implements Processor<byte[], byte[]> {
    private ProcessorContext context;
    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }
    @Override
    public void process(byte[] key, byte[] value) {
    String input = new String(value);
    // 如果包含“>>>”则只保留该标记后面的内容
    if (input.contains(">>>")) {
    input = input.split(">>>")[1].trim();
    // 输出到下一个 topic
    context.forward("logProcessor".getBytes(), input.getBytes());
    }else{
    context.forward("logProcessor".getBytes(), input.getBytes());

    }
    }
    @Override
    public void punctuate(long timestamp) {
    }
    @Override
    public void close() {
    }
    }

  • 相关阅读:
    java基础英语---第十九天
    java基础英语---第十六天
    java基础英语---第十七天
    java基础英语---第十四天
    java基础英语---第十五天
    java基础英语---第十三天
    设计模式
    设计模式
    设计模式
    设计模式
  • 原文地址:https://www.cnblogs.com/pursue339/p/10565392.html
Copyright © 2011-2022 走看看