zoukankan      html  css  js  c++  java
  • kafka restful api功能介绍与使用

    • 前述

    采用confluent kafka-rest proxy实现kafka restful service时候(具体参考上一篇笔记),通过http协议数据传输,需要注意的是采用了base64编码(或者称之为加密),如果消息再post之前不采用base64处理将会出现:服务端消息乱码、程序报错等,因此正常的处理流程是:
    1.先对待post的消息做UTF-8统一处理
    2.采用base64编码包处理消息

    s='Kafka,hi'
    ad="hi,kafka,i'm xnchall"
    aa=ad.encode()#UTF-8统一处理
    print(aa)
    b64=base64.b64encode(ad.encode())#base64编码包统一处理
    • 利用kafka-rest生产消息
    POST /topics/(string:topic_name)

    data={"records":[
    {
    "key":"a2V5",
    "value":"Y29uZmx1ZW50"
    },
    {
    "value":"a2Fma2E=",
    "partition":1
    },
    {
    "value":"bG9ncw=="
    }
    ]}
    data1={"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}
    header={"Content-Type":"application/vnd.kafka.v1+json"}
    r=requests.post(url=url,json=data,headers=header)
    r=requests.post(url=url,json=data1,headers=header)
    View Code
    • 向指定分区生产消息:Produce messages to one partition of the topic
    POST /topics/(string:topic_name)/partitions/(int:partition_id)
    ad="hi kafka,i'm xnchall"
    url11="http://192.168.160.101:8082/topics/test_kfk_lk/partitions/1"
    
    data2={"records":[{"value":(base64.b64encode(ad.encode())).decode()}]}
    print(data2)
    r2=requests.post(url=url11,json=data2,headers=header)
    print(r2)
    print(r2.content)
    View Code
    • 创建或者注册消费实例:Create a new consumer instance in the consumer group
    POST /consumers/(string:group_name)

    url3="http://192.168.160.101:8082/consumers/my_group"
    data3={
    "id":"my_consumer1",
    "format":"binary",
    "auto.offset.reset":"smallest",
    "auto.commit.enable":"false"
    }
    
    r3=requests.post(url=url3,json=data3,headers=header)
    View Code
    • 提交偏移  Commit offsets for the consumer
    POST /consumers/(string:group_name)/instances/(string:instance)/offsets

    url4="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/offsets"
    r4=requests.post(url=url4,headers=header)
    View Code
    • 消费消息
    GET /consumers/(string:group_name)/instances/(string:instance)/topics/(string:topic_name)

    url_get2="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/topics/test_kfk_lk"
    rr2=requests.get(url=url_get2,headers=header)#,params={"timeout":3000000}
    print(rr2)
    print(rr2.content)
    print(rr2.text)
    View Code
    • 删除消费者实例 Destroy the consumer instance
    DELETE /consumers/(string:group_name)/instances/(string:instance)
    #url_del="http://192.168.160.101:8082/consumers/test_kfk_lk/instances/my_consumer"
    #d1=requests.delete(url_del)#删除消费者实例
    #print(d1)
    View Code
    • 获取指定分区、偏移消息: Consume messages from one partition of the topic.(api V2)
    GET /topics/(string:topic_name)/partitions/(int:partition_id)/messages?offset=(int)[&count=(int)]

    Fetch Response v1 only contains message format v0.
    Fetch Response v2 might either contain message format v0 or message format v1.
    Possible Error Codes
    * OFFSET_OUT_OF_RANGE (1)
    * UNKNOWN_TOPIC_OR_PARTITION (3)
    * NOT_LEADER_FOR_PARTITION (6)
    * REPLICA_NOT_AVAILABLE (9)
    * UNKNOWN (-1)
    url_p="http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages"
    rst=requests.get(url_p,headers=header,params={"offset":3,"count":2})#,"count":2})
    print(rst)
    print(len(rst.json()))
    if(rst.status_code!=500):
    For itr in rst.json():
        print(base64.b64decode(itr['value']).decode())
    print(rst.url)#http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages?offset=3&count=2
    View Code
    • 获取当前订阅的topic列表.(api V2)
    POST /consumers/(string:group_name)/instances/(string:instance)/subscription
    • 获取手工指定的消费者的分区(api V2)
    GET /consumers/(string:group_name)/instances/(string:instance)/assignments
    GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Accept: application/vnd.kafka.v2+json
    HTTP/1.1 200 OK
    Content-Type: application/vnd.kafka.v2+json
    {
      "partitions": [
        {
          "topic": "test",
          "partition": 0
        },
        {
          "topic": "test",
          "partition": 1
        }
    ]
    }
    
    • 覆盖消费者即将消费的消息的偏移量(api V2)
    POST /consumers/(string:group_name)/instances/(string:instance)/positions
    POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Content-Type: application/vnd.kafka.v2+json
    {
      "offsets": [
        {
          "topic": "test",
          "partition": 0,
          "offset": 20
        },
        {
          "topic": "test",
          "partition": 1,
          "offset": 30
        }
      ]
    }
    
    • 获取给定topic的分区的最后偏移
    POST /consumers/(string:group_name)/instances/(string:instance)/positions/end
    POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Content-Type: application/vnd.kafka.v2+json
    {
      "partitions": [
        {
          "topic": "test",
          "partition": 0
        },
        {
          "topic": "test",
          "partition": 1
        }
    ]
    }
    
    • 使用分配和订阅api消费topic或者分区数据
    GET /consumers/(string:group_name)/instances/(string:instance)/records
    GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Accept: application/vnd.kafka.binary.v2+json
    Example binary response:
    HTTP/1.1 200 OK
    Content-Type: application/vnd.kafka.binary.v2+json
    [
      {
        "topic": "test",
        "key": "a2V5",
        "value": "Y29uZmx1ZW50",
        "partition": 1,
        "offset": 100,
      },
      {
        "topic": "test",
        "key": "a2V5",
        "value": "a2Fma2E=",
        "partition": 2,
        "offset": 101,
      }
    ]
    

      

  • 相关阅读:
    100+ Python挑战性编程练习(1)
    python面试题--初级(二)
    python面试题--初级(一)
    centos查看iptables和firewall防火墙状态、开启、关闭防火墙
    Keepalived nginx HA负载均衡
    windows10安装SQLServer 2008 R2详细说明
    CentOS安装mysql5.7
    centOS7.5安装docker
    redis哨兵选举算法--raft
    CentOS7.5安装GitLab及汉化
  • 原文地址:https://www.cnblogs.com/xnchll/p/9618432.html
Copyright © 2011-2022 走看看