zoukankan      html  css  js  c++  java
  • kafka中常用API的简单JAVA代码

      通过之前《kafka分布式消息队列介绍以及集群安装》的介绍,对kafka有了初步的了解。本文主要讲述java代码中常用的操作。

    准备:增加kafka依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>

    一、kafka中对topic的操作

    package org.kafka;
    
    import kafka.admin.DeleteTopicCommand;
    import kafka.admin.TopicCommand;
    
    /**
     * kafka主题操作
     */
    public class TopicDemo {
    	/**
    	 * 添加主题
    	 * linux命令:bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest0416
    	 */
    	public static void createTopic() {
    		String[] options = new String[] {
    				"--create", 
    				"--zookeeper",
    				"192.168.2.100:2181", 
    				"--replication-factor", 
    				"3",
    				"--partitions",
    				"1", 
    				"--topic", 
    				"topictest0416" };
    		TopicCommand.main(options);
    	}
    
    	/**
    	 * 查询所有主题
    	 * linux命令:bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181
    	 */
    	public static void queryTopic() {
    		String[] options = new String[] { 
    				"--list", 
    				"--zookeeper",
    				"192.168.2.100:2181" };
    		TopicCommand.main(options);
    	}
    	
    	/**
    	 * 查看指定主题的分区及副本状态信息
    	 * bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest0416
    	 */
    	public static void queryTopicByName() {
    		String[] options = new String[]{  
    			    "--describe",  
    			    "--zookeeper",  
    			    "192.168.2.100:2181",  
    			    "--topic",  
    			    "topictest0416",  
    			};  
    		TopicCommand.main(options);
    	}
    	
    	/**
    	 * 修改主题
    	 * linux命令:bin/kafka-topics.sh --zookeeper 192.168.2.100:2181 --alter --topic topictest0416 --partitions 3
    	 */
    	public static void alterTopic() {
    		String[] options = new String[]{  
    			    "--alter",  
    			    "--zookeeper",  
    			    "192.168.2.100:2181",  
    			    "--topic",  
    			    "topictest0416",  
    			    "--partitions",  
    			    "3"  
    			};  
    			TopicCommand.main(options); 
    	}
    	
    	/**
    	 * 删除主题
    	 */
    	public static void delTopic() {
    		String[] options = new String[] { 
    				"--zookeeper",  
    			    "192.168.2.100:2181",  
    			    "--topic",  
    			    "topictest0416" };
    		DeleteTopicCommand.main(options);
    	}
    
    }
    

    二、Producer代码

    package org.kafka;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class ProducerDemo {
    	public static void main(String[] args) throws InterruptedException {
    		Properties props = new Properties();
    		//zookeeper集群列表
    		props.put("zk.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
    		props.put("metadata.broker.list", "hadoop1-1:9092,hadoop1-2:9092,hadoop1-3:9092");
    		//设置消息使用哪个类来序列化
    		props.put("serializer.class", "kafka.serializer.StringEncoder");
    		
    		ProducerConfig config = new ProducerConfig(props);
    		//构造Producer对象
    		Producer<String, String> producer = new Producer<String, String>(config);
    		
    		// 发送业务消息
    		// 读取文件 读取内存数据库
    		for (int i = 0; i < 10; i++) {
    			Thread.sleep(500);
    			KeyedMessage<String, String> km = new KeyedMessage<String, String>("topictest0416", "I am a producer " + i + " hello!");
    			producer.send(km);
    		}
    		
    	}
    }
    

    三、consumer代码

    package org.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    public class ConsumerDemo {
    	private static final String topic = "topictest0416";
    	private static final Integer threads = 1;
    	
    	public static void main(String[] args) {
    		Properties props = new Properties();
    		//zookeeper集群列表
    		props.put("zookeeper.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
    		//消费者组ID
    		props.put("group.id", "001");
    		//设置读取的偏移量;smallest意思是指向最小的偏移量
    		props.put("auto.offset.reset", "smallest");
    		//将Properties封装成消费者配置对象
    		ConsumerConfig config = new ConsumerConfig(props);
    		ConsumerConnector consumer =  Consumer.createJavaConsumerConnector(config);
    		
    		Map<String, Integer> topicMap = new HashMap<>();
    		//key为消费的topic
    		//value为消费的线程数量
    		topicMap.put(topic, threads);
    		
    		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);
    		
    		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    		
    		for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) {
    						System.out.println(new String(mm.message()));
    					}
    				}
    			}).start();
    		}
    	}
    
    }
    

    四、测试

      先启动Consumer,再启动Producer

      测试结果:

      

    转载请注明出处
  • 相关阅读:
    ue4 socket
    ue4动画蓝图
    localStorage 用法
    关于textarea中换行、回车、空格的识别与处理
    git忽略某些文件提交
    动态加载js文件
    H5 App页面 绝对定位 软键盘弹出时顶起底部按钮
    Android软键盘弹出时把布局顶上去的解决方法
    javascript 事件委托 和jQuery事件绑定on、off 和one
    escape()、encodeURI()、encodeURIComponent()区别详解
  • 原文地址:https://www.cnblogs.com/skyfeng/p/6719872.html
Copyright © 2011-2022 走看看