zoukankan      html  css  js  c++  java
  • Kafka+Zookeeper+confluent-kafka搭建

    • 参考资料:

    https://blog.csdn.net/xiewendong93/article/details/50500471

    https://my.oschina.net/phoebus789/blog/730787

    https://blog.csdn.net/qinglu000/article/details/9816135

    https://blog.csdn.net/jund05/article/details/78507075

    https://www.cnblogs.com/vipzhou/p/7235625.html

    启动: https://www.cnblogs.com/lanyangsh/p/7782795.html 

    • pro_1.py
    from confluent_kafka import Producer
    
    
    def acked(err, msg):
        if err is not None:
            print("Failed to deliver message: {0}: {1}"
                  .format(msg.value(), err.str()))
        else:
            print("Message produced: {0}".format(msg.value()))
    
    p = Producer({'bootstrap.servers': 'localhost:9092'})
    
    try:
        for val in xrange(1, 1000):
            p.produce('mytopic', 'myvalue #{0}'
                      .format(val), callback=acked)
            p.poll(0.5)
    
    except KeyboardInterrupt:
        pass
    
    p.flush(30)
    • con_1.py
    from confluent_kafka import Consumer, KafkaError
    
    settings = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'mygroup',
        'client.id': 'client-1',
        'enable.auto.commit': True,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'}
    }
    
    c = Consumer(settings)
    
    c.subscribe(['mytopic'])
    
    try:
        while True:
            msg = c.poll(0.1)
            if msg is None:
                continue
            elif not msg.error():
                print('Received message: {0}'.format(msg.value()))
            elif msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached {0}/{1}'
                      .format(msg.topic(), msg.partition()))
            else:
                print('Error occured: {0}'.format(msg.error().str()))
    
    except KeyboardInterrupt:
        pass
    
    finally:
        c.close()
  • 相关阅读:
    HBase 在HDFS上的物理目录结构
    Hbase-site.xml
    hbase-default.xml(Hbase 默认参数翻译)
    flink-conf.yaml
    Spark Standalone spark-default.conf
    Spark Standalone spark-env.sh
    windows linux 文件编码转换
    Hbase G1 gc 调优最终参数
    python
    python
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9411306.html
Copyright © 2011-2022 走看看