zoukankan      html  css  js  c++  java
  • kafka 0.8.x producer Example(scala)

    Producer

    最简配置
    metadata.broker.list参数指定broker地址,这里不需要填上所有的broker地址,但是如果只写一个,这个broker挂掉后就无法往topic中写入信息,一般写入2-3个broker地址。
    serializer.class指定序列化的方式

    props.put("metadata.broker.list","broker1:9092,broker2:9092,broker3:9092")
    props.put("serializer.class","kafka.serializer.StringEncoder")

    producer
    两个类型参数,第一个为partition key类型,第二个为消息类型

    val producer = new Producer[String,String] (config)

    发送消息
    KeyedMessage的两个参数,第一个为要写入的topic名字,第二个为要写入的消息。

    val date = new KeyedMessage[String, String] ("kafka-spark-test", "testInfo")
    producer.send (date)

    完整代码

    import java.util.Properties
    import kafka.javaapi.producer.Producer
    import kafka.producer.KeyedMessage
    import kafka.producer.ProducerConfig
    
    object kafka_producer {
      def main(args: Array[String]) {
        val props = new Properties()
        props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092")
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("request.required.acks", "1")
    
        val config = new ProducerConfig(props);
    
        val producer = new Producer[String, String](config)
    
        val date = new KeyedMessage[String, String]("kafka-spark-test", "testInfo")
    
        producer.send(date)
    
        producer.close
    
      }
    }

    Tip

    如果运行时发现如下错误:

    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
    log4j:WARN Please initialize the log4j system properly.

    log4j.properties加入到src下

  • 相关阅读:
    进程、线程、轻量级进程、协程与 go 的 goroutine
    Base: 一种 Acid 的替代方案
    单点登录 SSO(Single Sign-On)的实现原理
    大型网站之分布式会话管理
    PayPal 高级工程总监:读完这 100 篇文献,就能成大数据高手
    主流编程语言的 33 款开源爬虫
    docker基础命令
    mysql实现首字母从A-Z排序
    solr+zookeeper集群配置
    Lucene与Solr基础
  • 原文地址:https://www.cnblogs.com/xiaomaohai/p/6158047.html
Copyright © 2011-2022 走看看