zoukankan      html  css  js  c++  java
  • .net +RdKafka

    先在nuget里安装RdKafka

    using RdKafka;
    using System;
    using System.Text;

    namespace KafkaApp
    {
    class Program
    {
    static void Main(string[] args)
    {

    //ProducterTest();
    ConsumerTest();
    }
    private static async void ProducterTest()
    {
    // Producer 接受一个或多个 BrokerList
    using (Producer producer = new Producer("192.168.1.4:9092"))
    //发送到一个名为 testtopic 的Topic,如果没有就会创建一个
    using (Topic topic = producer.Topic("666"))
    {
    //将message转为一个 byte[]
    byte[] data = Encoding.UTF8.GetBytes("Hello RdKafka");
    DeliveryReport deliveryReport = await topic.Produce(data);

    Console.WriteLine($"发送到分区:{deliveryReport.Partition}, Offset 为: {deliveryReport.Offset}");
    }
    }

    public static void ConsumerTest()
    {

    //配置消费者组
    var config = new Config() { GroupId = "example-csharp-consumer" };
    using (var consumer = new EventConsumer(config, "192.168.1.4:9092"))
    {

    //注册一个事件
    consumer.OnMessage += (obj, msg) =>
    {
    string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
    Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
    };

    //订阅一个或者多个Topic
    consumer.Subscribe(new[] { "666" });

    //启动
    consumer.Start();

    Console.WriteLine("Started consumer, press enter to stop consuming");
    Console.ReadLine();
    }
    }
    }
    }

  • 相关阅读:
    将jar打包成exe
    CXF + Spring 开发 Webservices
    关于highstock横坐标的一些的一些说明(1)使用UTC时间
    JAX-WS + Spring Integration Example
    EMA指标和MACD指标的JAVA语言实现
    MACD详细计算方法及例子
    notepad 不换行的问题
    eclipse 编码设置(转)
    如何理解作用域
    js中new一个对象的过程
  • 原文地址:https://www.cnblogs.com/bigmangotree/p/10560307.html
Copyright © 2011-2022 走看看