zoukankan      html  css  js  c++  java
  • Windows平台下kafka环境的搭建以及简单使用

    一、Kafka配置: 

    参考网址:

    http://m.blog.csdn.net/ydc321/article/details/70154278

    http://www.2cto.com/net/201701/588235.html

    http://www.jianshu.com/p/f7037105db46

    http://www.jianshu.com/p/64d25dcf8300

    https://my.oschina.net/phoebus789/blog/733670

    http://orchome.com/kafka/index

    准备工作:

    1.安装jdk环境

    http://www.oracle.com/technetwork/java/javase/downloads/index.html 

    2.下载kafka的程序安装包,并解压

    http://kafka.apache.org/downloads 

    3.用命令行测试kafka生产和消费

    在 kafka 目录,按住shift+鼠标右键->在此处打开命令窗口(W) 

    第一个命令窗口->启动zookeeper服务: 

    binwindowszookeeper-server-start.bat configzookeeper.properties

    第二个命令窗口->启动kfaka服务: 

    binwindowskafka-server-start.bat configserver.properties 

    第三个命令窗口->启动启produce: 

    • 创建一个主题:
    binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
    • 使用如下命令查看创建的主题列表:
    binwindowskafka-topics.bat --list --zookeeper localhost:2181
    • 启动生产者:
    binwindowskafka-console-producer.bat --broker-list {本机ip}:9092 --topic testtopic

    第四个命令窗口->启动consumer:

    binwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic testtopic --from-beginning 

    二、测试kafka生产和消费

    • 创建两个控制台项目,一个是生产者,一个是消费者
    //生产者代码
    1
    using Confluent.Kafka; 2 using Confluent.Kafka.Serialization; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 7 namespace KafkaProducter 8 { 9 public class Program 10 { 11 public static void Main(string[] args) 12 { 13 string brokerList = "127.0.0.1:9092"; 14 string topicName = "testtopic"; 15 16 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 17 18 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 19 { 20 Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); 21 22 string text; 23 while ((text = Console.ReadLine()) != "q") 24 { 25 var deliveryReport = producer.ProduceAsync(topicName, null, text); 26 deliveryReport.ContinueWith(task => 27 { 28 Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); 29 }); 30 } 31 32 // Tasks are not waited on synchronously (ContinueWith is not synchronous), 33 // so it's possible they may still in progress here. 34 producer.Flush(TimeSpan.FromSeconds(10)); 35 } 36 } 37 } 38 }
    //消费者代码
    1
    using Confluent.Kafka; 2 using Confluent.Kafka.Serialization; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 7 namespace KafkaConsumer 8 { 9 public class Program 10 { 11 public static void Main(string[] args) 12 { 13 var config = new Dictionary<string, object> 14 { 15 { "group.id", "simple-csharp-consumer" }, 16 { "bootstrap.servers", "127.0.0.1:9092" } 17 }; 18 19 using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) 20 { 21 consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset("testtopic", 0, 0) }); 22 23 while (true) 24 { 25 Message<Null, string> msg; 26 if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 27 { 28 Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); 29 } 30 } 31 } 32 } 33 } 34 }
  • 相关阅读:
    fedora/centos7防火墙FirewallD详解
    python for dl
    神经网络画图工具
    卷积神经网络的复杂度分析
    如何理解深度学习中的Transposed Convolution?
    吴恩达课程及视频笔记汇总
    从LeNet-5到DenseNet
    WPS for Linux
    caffe:fine-tuning
    python下图像读取方式以及效率对比
  • 原文地址:https://www.cnblogs.com/jiao006/p/7753424.html
Copyright © 2011-2022 走看看