zoukankan      html  css  js  c++  java
  • 阿里云kafka使用记录(python版本)

    kafka端
     
    consumer vpc版代码
     
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    # context.check_hostname = True
    
    consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],
                            group_id='xx',
                            api_version = (0,10)
                            )
    
    print('consumer start to consuming...')
    consumer.subscribe(('xx',))
    for message in consumer:
        print(message.topic)
        print(message.offset)
        print(message.key)
        print(message.value)
        print(message.partition)

    producer vpc版代码

    #!/usr/bin/env python
    # encoding: utf-8
    
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],
                            api_version = (0,10),
                            retries=5)
    
    partitions = producer.partitions_for('xx')
    print('Topic下分区: %s' % partitions)
    
    try:
        future = producer.send(topic='xx', value=b'hello aliyun-kafka!')
        future.get()
        print('send message succeed.')
    except KafkaError as e:
        print('send message failed.')
        print(e)

    consumer公网版代码

    import ssl
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("/tmp/ca-cert")
    
    consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
                            group_id='xxx',
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            sasl_plain_username='xxx',
                            sasl_plain_password='1234567890')
    
    print('consumer start to consuming...')
    consumer.subscribe(('xxx', ))
    for message in consumer:
        print(message.topic)
        print(message.offset)
        print(message.value)
        break
     
    producer 公网版代码
    #!/usr/bin/env python
    # encoding: utf-8
    
    import ssl
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("/tmp/ca-cert")
    #这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo
    
    producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            retries=5,
                            sasl_plain_username='xx',
                            sasl_plain_password='1234567890'#注意是access-key的最后十位)
    
    partitions = producer.partitions_for('xxx')
    print ('Topic下分区: %s' % partitions)
    
    try:
        future = producer.send('xxx', b'hello aliyun-kafka!')
        future.get()
        print('send message succeed.')
    except KafkaError as e:
        print('send message failed.')
        print(e)

    从阿里云控台获得连接信息

  • 相关阅读:
    imx6------watchdog导致不进系统
    嵌入式 Linux 如何操作 GPIO ?
    Linux下用文件IO的方式操作GPIO(/sys/class/gpio)
    一张图看懂AI、机器学习和深度学习的区别
    男生拍照姿势大全,这样拍才帅
    OpenCV/OpenCL/OpenGL区别
    设计简单算法体验Vivado HLS的使用
    FMC简介
    Android 开机Process xxx (pid xxxx) has died问题分析
    g711u与g729比较编码格式
  • 原文地址:https://www.cnblogs.com/castlevania/p/10370803.html
Copyright © 2011-2022 走看看