zoukankan      html  css  js  c++  java
  • canal 环境搭建 canal 与kafka通信(三)

    canal 占用了生产者

    .net core端 使用消费者获取canal 消息

    安装 Confluent.Kafka  demo使用 1.3.0

     public static void Consumer()
            {
                var conf = new ConsumerConfig
                {
                    GroupId = "canal-group",
                    BootstrapServers = "192.168.1.26:9092",
                    // Note: The AutoOffsetReset property determines the start offset in the event
                    // there are not yet any committed offsets for the consumer group for the
                    // topic/partitions of interest. By default, offsets are committed
                    // automatically, so in this example, consumption will only start from the
                    // earliest message in the topic 'my-topic' the first time you run the program.
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    EnableAutoCommit = true //是否自动提交  默认为true
    
    
                };
    
                using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
                {
                    //指定 消费那个分区
                    //c.Assign(new TopicPartition ("flexmall13",new Partition ()))
                    c.Subscribe("flexmall13");
    
                    CancellationTokenSource cts = new CancellationTokenSource();
                    Console.CancelKeyPress += (_, e) =>
                    {
                        e.Cancel = true; // prevent the process from terminating.
                        cts.Cancel();
                    };
    
                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var cr = c.Consume(cts.Token);
                                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    
                                //如果配置 自动提交为 否 需要手动提交
    
                                //c.Commit();
                                //c.Commit(new List<TopicPartitionOffset>() { cr.TopicPartitionOffset });
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Error occured: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException e)
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
    
                        Console.WriteLine($"OperationCanceledException occured: {e.StackTrace}");
                        c.Close();
                    }
                }
            }
        }

    注意:

    1. 在config项 中 有 EnableAutoCommit 设置是否自动应答提交,默认是 true,如果设置为否,需要在消费后,手动 commit。

    2.消费者指定订阅 分区,如果不指定使用Subscribe,指定需要使用Assign

  • 相关阅读:
    poj 2049 Let it Bead(polya模板)
    poj 1286 Necklace of Beads (polya(旋转+翻转)+模板)
    poj 2226 Muddy Fields(最小点覆盖+巧妙构图)
    poj 3692 Kindergarten (最大独立集之逆匹配)
    poj 1466 Girls and Boys(二分匹配之最大独立集)
    poj 1486 Sorting Slides(二分图匹配的查找应用)
    poj 2112 Optimal Milking (二分图匹配的多重匹配)
    PHP访问控制
    OO(Object Oriented)
    重载与重写以及重构
  • 原文地址:https://www.cnblogs.com/shikyoh/p/12092494.html
Copyright © 2011-2022 走看看