zoukankan      html  css  js  c++  java
  • canal 环境搭建 canal 与kafka通信(三)

    canal 占用了生产者

    .net core端 使用消费者获取canal 消息

    安装 Confluent.Kafka  demo使用 1.3.0

     public static void Consumer()
            {
                var conf = new ConsumerConfig
                {
                    GroupId = "canal-group",
                    BootstrapServers = "192.168.1.26:9092",
                    // Note: The AutoOffsetReset property determines the start offset in the event
                    // there are not yet any committed offsets for the consumer group for the
                    // topic/partitions of interest. By default, offsets are committed
                    // automatically, so in this example, consumption will only start from the
                    // earliest message in the topic 'my-topic' the first time you run the program.
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    EnableAutoCommit = true //是否自动提交  默认为true
    
    
                };
    
                using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
                {
                    //指定 消费那个分区
                    //c.Assign(new TopicPartition ("flexmall13",new Partition ()))
                    c.Subscribe("flexmall13");
    
                    CancellationTokenSource cts = new CancellationTokenSource();
                    Console.CancelKeyPress += (_, e) =>
                    {
                        e.Cancel = true; // prevent the process from terminating.
                        cts.Cancel();
                    };
    
                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var cr = c.Consume(cts.Token);
                                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    
                                //如果配置 自动提交为 否 需要手动提交
    
                                //c.Commit();
                                //c.Commit(new List<TopicPartitionOffset>() { cr.TopicPartitionOffset });
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Error occured: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException e)
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
    
                        Console.WriteLine($"OperationCanceledException occured: {e.StackTrace}");
                        c.Close();
                    }
                }
            }
        }

    注意:

    1. 在config项 中 有 EnableAutoCommit 设置是否自动应答提交,默认是 true,如果设置为否,需要在消费后,手动 commit。

    2.消费者指定订阅 分区,如果不指定使用Subscribe,指定需要使用Assign

  • 相关阅读:
    Dom解析
    几道算法水题
    Bom和Dom编程以及js中prototype的详解
    sqlserver练习
    java框架BeanUtils及路径问题练习
    Java的IO以及线程练习
    在数据库查询时解决大量in 关键字的方法
    SaltStack--配置管理
    SaltStack--远程执行
    SaltStack--快速入门
  • 原文地址:https://www.cnblogs.com/shikyoh/p/12092494.html
Copyright © 2011-2022 走看看