zoukankan      html  css  js  c++  java
  • Linux下单机安装部署kafka及代码实现

     技术交流群:233513714

    这几天研究了kafka的安装及使用,在网上找了很多教程但是均以失败告终,直到最后想起网络方面的问题最终才安装部署成功,下面就介绍一下kafka的安装部署及代码实现

    一、关闭防火墙

    重要的事情说100遍,关闭防火墙...(如果不关闭防火墙就会出现Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.等各种奇葩的问题)

    1、关闭firewall:
    systemctl stop firewalld.service                             #停止firewall
    systemctl disable firewalld.service                        #禁止firewall开机启动
    firewall -cmd --state                                              #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

    2、关闭iptables

    service iptables stop                                           #停止iptables
    chkconfig iptables off                                          #永久关闭防火墙

    service iptables status                                        #查看防火墙关闭状态

    以上提供了关闭两种防火墙的命令,可以选择性操作

    二、kafka安装测试

    1、安装JRE/JDK,(kafka的运行要依赖于jdk,这里就省略了jdk的安装,需要注意的是jdk的版本一定要支持所下载的kafka版本,否则就会报错,这里我安装的是jdk1.7)

    2、下载地址:http://kafka.apache.org/downloads.html(我下载的版本是kafka_2.11-0.11.0.1)

    3、解压:

    tar -xzvf kafka_2.11-0.11.0.1.tgz  

    rm kafka_2.11-0.11.0.1.tgz  (这里一定要删除压缩包,不然会出现zk或kafka启动不起来的问题)

    cd kafka_2.11-0.11.0.1

    4、在kafka_2.11-0.11.0.1目录下

    /bin       启动和停止命令等。 
    /config  配置文件 
    /libs      类库 

    5、修改配置

    在config下修改zookeeper.properties为如下配置

    maxClientCnxns=100
    tickTime=2000
    initLimit=10
    syncLimit=5

    在server.properties添加如下配置

    port=9092
    host.name=10.61.8.6

    zookeeper.connect=localhost:2181

    zookeeper.connection.timeout.ms=6000

    (以上配置没有的就需要添加)

     6、启动、测试、停止

    (1)、启动zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties &    (&是为了能退出命令行)

    (2)、启动kafka

    bin/kafka-server-start.sh config/server.properties &

    (3)、查看kafka和zk是否启动

    ps -ef|grep kafka

    (4)、创建topic(topic的名字叫abc)

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 8 --replication-factor 2 --topic abc

    (5)、删除topic

    bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic abc --zookeeper localhost:2181

    (6)、查看topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    (7)、producter推送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc

    (8)、consumer消费消息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic abc --from-beginning

    (9)、停止kafka

    bin/kafka-server-stop.sh 

    (10)、停止zookeeper

    bin/zookeeper-server-stop.sh  

    (11)、杀死服务

    kill -9 123     (123是进程号)

    三、java代码实现

    producter

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    
    /**
     * Created by Administrator on 2017/10/23 0023.
     */
    public class KafkaProducter {
        private static final Logger log = LoggerFactory.getLogger(KafkaProducter.class);
        private final Producer<String, String> producer;
        public final static String TOPIC = "abc";
    
        public static void main(String[] args) {
            new KafkaProducter().produce();
        }
    
        private KafkaProducter() {
            Properties props = new Properties();
            //此处配置的是kafka的端口
            props.put("metadata.broker.list", "10.61.8.6:9092");
            //配置value的序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            //配置key的序列化类
            props.put("key.serializer.class", "kafka.serializer.StringEncoder");
            //0、这意味着生产者从不等待来自代理的确认(与0.7相同的行为)。这个选项提供了最低的延迟,但是最弱的持久性保证(当服务器失败时,一些数据将丢失)。
            //1、这意味着在主副本接收到数据后,生产者得到确认。这个选项提供了更好的持久性,因为客户机一直等待直到服务器确认请求成功(只有消息被写入到已死的领导人,但尚未被复制的消息将会丢失)。
            //-1、这意味着在所有同步副本都接收到数据之后,生产者得到确认。这个选项提供了最好的持久性,我们保证只要至少有一个同步副本,就不会丢失任何消息。
            props.put("request.required.acks", "-1");
            producer = new Producer<String, String>(new ProducerConfig(props));
        }
    
        void produce() {
            int messageNo = 1;
            final int COUNT = 10;
            while (messageNo < COUNT) {
                String key = String.valueOf(messageNo);
                String data = "hello kafka" + key;
                producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
                log.info("",data);
                messageNo++;
            }
        }
    }

    consumer

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * Created by Administrator on 2017/10/25 0025.
     */
    public class KafkaConsumer {
        private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
        private final ConsumerConnector consumer;
        public final static String TOPIC = "abc";
    
    
        public static void main(String[] args) {
            new KafkaConsumer().consume();
        }
    
        private KafkaConsumer() {
            Properties props = new Properties();
            //zookeeper 配置
            props.put("zookeeper.connect", "10.61.8.6:2181");
            //group 代表一个消费组
            props.put("group.id", "jd-group");
            //zk连接超时
            props.put("zookeeper.session.timeout.ms", "4000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");
            //序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            ConsumerConfig config = new ConsumerConfig(props);
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
        }
    
        void consume() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(TOPIC, new Integer(1));
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
            KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
            ConsumerIterator<String, String> it = stream.iterator();
            while (it.hasNext()) {
                log.info("kafka监听到的消息:{}", it.next().message());
            }
            log.info("kafka监听完毕");
        }
    
    }

    原创不易,您的支持是我前进的动力

  • 相关阅读:
    Day-10: 错误、调试和测试
    Day-9: 面对对象高级编程
    json文件解析
    sqlite3入门之sqlite3_get_table,sqlite3_free_table
    sqlite3入门之sqlite3_open,sqlite3_exec,slite3_close
    字符集编码与字符大小
    让ubuntu下的eclipse支持GBK编码
    使用virtualbox安装unbuntu开启共享文件夹时遇到的权限问题
    QT--信号与槽
    QT--初识
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/7729305.html
Copyright © 2011-2022 走看看