zoukankan      html  css  js  c++  java
  • Kafka 0.8 Producer (0.9以前版本适用)

    Kafka旧版本producer由scala编写,0.9以后已经废除,但是很多公司还在使用0.9以前的版本,所以总结如下:
    要注意包Producer是 kafka.javaapi.producer.Producer 这个才是java api使用的包

    示例代码如下:

    import kafka.producer.KeyedMessage;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.ProducerConfig;
    import java.util.Properties;
    
    public class ProducerDemo {
    
    	public static void main(String[] args) {
    
    		Properties properties = new Properties();
    		properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
    		properties.put("serializer.class", "kafka.serializer.StringEncoder");
    		properties.put("request.requird.acks", "1");
    		ProducerConfig config = new ProducerConfig(properties);
    		Producer<String, String> producer = new Producer<String, String>(config);
    		KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","key","hello");
    		producer.send(msg);
    	}
    	
    }
    

    自定义partition示例代码如下:

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    
    public class SimplePartitioner implements Partitioner {
        public SimplePartitioner (VerifiableProperties props) {
    
        }
    
        public int partition(Object key, int a_numPartitions) {
            int partition = 0;
            String stringKey = (String) key;
            int offset = stringKey.lastIndexOf('.');
            if (offset > 0) {
                partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
            }
            return partition;
        }
    
    }
    

    更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

  • 相关阅读:
    和Mac有关的所有快捷键整理
    Python学习笔记
    在PHP的AWS SDK 的上传功能中指定Content-Type
    Docker 部署 Yapi
    PHP 队列
    数据库设计之一 数据库范式
    docker centos PHP7.2 安装 bcmath数学扩展
    记联调微信支付,调起微信支付之后显示支付验证失败
    记Windows 2012 FTP配置之后 客户端登陆报错
    记一次 MAC 安装 homebrew 报错解决
  • 原文地址:https://www.cnblogs.com/tree1123/p/11382681.html
Copyright © 2011-2022 走看看