zoukankan      html  css  js  c++  java
  • 高并发推送数据之Kafka(.Net、Java)消息队列

    .NET VS工具添加程序包源 在NuGet包管理中选择程序包源为上面添加的私有仓库。 搜索Data.Pipelines并安装。 在app.congif或者web.config中添加Kafka配置

      <appSettings>
        <add key="kafka.ip" value="172.20.105.205"/>
        <add key="kafka.prot" value="9092"/>
      </appSettings>
            /// <summary>
            /// 实时数据综合推送
            /// </summary>
            /// <param name="listTemp"></param>
            public void RealTimeDataPush(List<T_SJZX_SZZDJC_CYXXInfo> listTemp, List<T_SJZX_SZZDJC_JCJGInfo> listTempDetail)
            {
                var listData = new List<Dictionary<string, object>>();
    
                foreach (var item in listTemp)
                {
                    var d = new Dictionary<string, object>();
                    d.Add("XH", item.XH);//序号
                    d.Add("DXMC", item.DXMC);//对象名称
                    d.Add("DXBH", item.DXBH);//对象编号
                    d.Add("CDBH", item.CDBH);//测点编号
                    d.Add("CDMC", item.CDMC);//测点名称
                    d.Add("XZQDM", item.XZQDM);//行政区代码
                    d.Add("SSXZQ", item.SSXZQ);//所属行政区
                    d.Add("JCSJ", item.JCSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//监测时间
                                                                                   //d.Add("JCN", item.JCN);//监测年
                                                                                   //d.Add("JCY", item.JCY);//监测月
                                                                                   //d.Add("JCR", item.JCR);//监测日
                                                                                   //d.Add("JCXS", item.JCXS);//监测小时
                    d.Add("SFZX", item.SFZX);//是否在线
                    if (!string.IsNullOrEmpty(item.ZXBZBH))
                        d.Add("ZXBZBH", item.ZXBZBH);//执行标准编号
                    else
                        d.Add("ZXBZBH", "");
    
                    if (!string.IsNullOrEmpty(item.ZXBZMC))
                        d.Add("ZXBZMC", item.ZXBZMC);//执行标准名称
                    else
                        d.Add("ZXBZMC", "");
    
                    if (!string.IsNullOrEmpty(item.SZLB))
                        d.Add("SZLB", item.SZLB);//水质类别
                    else
                        d.Add("SZLB", "");
    
                    if (!string.IsNullOrEmpty(item.SFCB))
                        d.Add("SFCB", item.SFCB);//是否超标
                    else
                        d.Add("SFCB", "");
    
                    if (!string.IsNullOrEmpty(item.CBXM))
                        d.Add("CBXM", item.CBXM);//超标项目
                    else
                        d.Add("CBXM", "");
    
                    //d.Add("YQBM", item.YQBM);//仪器编码
                    //d.Add("YQMC", item.YQMC);//仪器名称
                    //d.Add("YQXH", item.YQXH);//仪器型号
                    //d.Add("YQZT", item.YQZT);//仪器状态
    
                    d.Add("ORGID", item.ORGID);//机构代码
                    d.Add("CJR", item.CJR);//创建人
                    d.Add("CJSJ", item.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
                    d.Add("GXR", item.GXR);//更新人
                    d.Add("SJLY", item.SJLY);//数据来源
                    d.Add("SJZT", item.SJZT);//数据状态
                    d.Add("GXSJ", item.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间
    
                    #region 明细
    
                    var listData2 = new List<Dictionary<string, object>>();
                    var listTempDetail2 = listTempDetail.Where(p => p.CYXH == item.XH);
    
                    foreach (var item2 in listTempDetail2)
                    {
                        var d2 = new Dictionary<string, object>();
                        d2.Add("XH", item2.XH);//序号
                        d2.Add("CYXH", item2.CYXH);//采样序号
    
                        if (!string.IsNullOrEmpty(item2.FXXMDM))
                            d2.Add("FXXMDM", item2.FXXMDM);//分析项目编号
                        else
                            d2.Add("FXXMDM", "");
    
                        if (!string.IsNullOrEmpty(item2.FXXMMC))
                            d2.Add("FXXMMC", item2.FXXMMC);//分析项目名称
                        else
                            d2.Add("FXXMMC", "");
    
                        d2.Add("BCJG", item2.BCJG);//报出结果
                        d2.Add("BCJGBS", item2.BCJGBS);//报出结果表示
                        d2.Add("BCJGDW", item2.BCJGDW);//报出结果单位
    
                        if (!string.IsNullOrEmpty(item2.BZSX))
                            d2.Add("BZSX", item2.BZSX);//标准上限
                        else
                            d2.Add("BZSX", "");
    
                        if (!string.IsNullOrEmpty(item2.BZXX))
                            d2.Add("BZXX", item2.BZXX);//标准下限
                        else
                            d2.Add("BZXX", "");
    
                        if (!string.IsNullOrEmpty(item2.SFCB))
                            d2.Add("SFCB", item2.SFCB);//是否超标
                        else
                            d2.Add("SFCB", "");
    
                        if (!string.IsNullOrEmpty(item2.CBBS))
                            d2.Add("CBBS", item2.CBBS);//超标倍数
                        else
                            d2.Add("CBBS", "");
    
                        if (!string.IsNullOrEmpty(item2.SZLB))
                            d2.Add("SZLB", item2.SZLB);//水质类别
                        else
                            d2.Add("SZLB", "");
    
                        if (!string.IsNullOrEmpty(item2.SZLB))
                            d2.Add("YQSBXH", item2.YQSBXH);//仪器编码
                        else
                            d2.Add("YQSBXH", "");
    
                        d2.Add("ORGID", item2.ORGID);//机构代码
                        d2.Add("CJR", item2.CJR);//创建人
                        d2.Add("CJSJ", item2.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
                        d2.Add("GXR", item2.GXR);//更新人
                        d2.Add("SJLY", item2.SJLY);//数据来源
                        d2.Add("SJZT", item2.SJZT);//数据状态
                        d2.Add("GXSJ", item2.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间
    
                        listData2.Add(d2);
                    }
                    #endregion
    
                    d.Add("JCJG", listData2);
                    listData.Add(d);
                }
    
                //SZZDZ_CYXX    
                var product = new DataProducer("dfb426e321fc417981858b0927c21016", "XH");
    
                //分页推送
                int PageSize = 100;
                int PageTotal = (int)Math.Ceiling((decimal)listData.Count / PageSize);
                for (int PageIndex = 0; PageIndex < PageTotal; PageIndex++)
                {
                    var data = listData.Skip(PageIndex * PageSize).Take(PageSize).ToList();
    
                    product.Send(data, r => Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}"));
                }
    
                Assert.IsTrue(true);
    
                //Thread.Sleep(500);
            }
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using Confluent.Kafka;
    //using Newtonsoft.Json;
    using System.Configuration;
    using Newtonsoft.Json;
    
    namespace Common
    {
        class DataProducer
        {
            private readonly string _key;
            private readonly string _dataKeyField;
    
            // 从配置读取kafka配置
            static string ip = ConfigurationManager.AppSettings["kafka.ip"];
            static string port = ConfigurationManager.AppSettings["kafka.port"];
    
            /// <summary>
            /// 构造方法
            /// </summary>
            /// <param name="key">业务数据队列key</param>
            /// <param name="dataKeyField">数据主键</param>
            public DataProducer(string key, string dataKeyField)
            {
                this._key = key;
                this._dataKeyField = dataKeyField;
            }
    
            /// <summary>
            /// 发送数据
            /// </summary>
            /// <param name="data">数据集</param>
            /// <param name="callback">发送成功后回调,在发送成功前,不要杀死相关线程,否则可能导致数据丢失</param>
            public void Send(List<Dictionary<string, object>> data, Action<DeliveryReport<string, string>> callback)
            {
                foreach (var datum in data)
                {
                    var keys = new List<string>(datum.Count);
                    // 找出时候类型的字段
                    foreach (var entry in datum)
                    {
                        if (entry.Value is DateTime)
                        {
                            keys.Add(entry.Key);
                        }
                    }
    
                    // 把时间转成long,与Java的getTime()兼容
                    foreach (var key in keys)
                    {
                        var eeee = (DateTime)datum[key];
                        // TODO 日期转换
                        datum[key] = "日期字符串";
                    }
    
                }
    
    
                var topic = this._key;
    
                var d = new Dictionary<string, object> { { "data", data }, { "keyField", this._dataKeyField } };
                var jsonString = JsonConvert.SerializeObject(d, new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore });
    
    
                // TODO 生成key
                Send(topic, jsonString.GetHashCode().ToString(), jsonString, callback);
            }
    
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="topic">KafkaTopic</param>
            /// <param name="key">消息key</param>
            /// <param name="value">数据</param>
            /// <param name="callback">发送完成回调</param>
            private void Send(string topic, string key, string value, Action<DeliveryReport<string, string>> callback)
            {
                if (string.IsNullOrEmpty(ip))
                {
                    ip = "172.20.105.205";
                }
                if (string.IsNullOrEmpty(port))
                {
                    port = "9092";
                }
    
                var conf = new ProducerConfig { BootstrapServers = $"{ip}:{port}" };
    
                using (var p = new ProducerBuilder<string, string>(conf).Build())
                {
                    // TODO 异常处理,添加日志
                    p.Produce(topic, new Message<string, string> { Key = key, Value = value }, callback);
                    //Console.WriteLine($"topic: {topic}. key: k, value: " + value);
    
                    // TODO 异常处理,添加日志
                    // wait for up to 1 seconds for any inflight messages to be delivered.
                    p.Flush(TimeSpan.FromSeconds(1));
                }
            }
        }
    }

    JAVA 

    Maven镜像配置

    修改本地mavensettings文件

    mirros下增加mirror

     

         <mirror>
          <id>bigdata-center</id>
          <mirrorOf>*</mirrorOf>
          <url>http://172.20.105.205:8081/repository/maven-public/</url>
        </mirror>

     

    集成SDK

    Maven引包

    <dependency>
        <groupId>com.tencent.guangdong-eco-environment</groupId>
        <artifactId>data-pipelines</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

    配置

    新建kafka配置文件

    resources/conf/kafka.properties

    配置内容如下:

    kafka.ip=172.20.105.205
    kafka.port=9092

    package com.tencent.eco.data.product;
    
    import com.szboanda.platform.common.utils.DateUtils;
    import org.testng.annotations.Test;
    
    import java.text.ParseException;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class DataProducerTest {
    
        @Test
        public void send() throws ParseException {
            List<Map<String, Object>> data = new ArrayList<>();
            Map<String, Object> d = new HashMap<>();
            d.put("XH", "XH1");
            d.put("CDBH", "CDBH1");
            d.put("JCSJ", DateUtils.parseDate("2019-05-12 12:00","yyyy-MM-dd HH:mm"));
            d.put("JCXMDH", "w23234");
            d.put("JCZ", 12.4);
            d.put("ORGID", "440000");
            d.put("CJR", "tencent");
            d.put("CJSJ", new Date());
            d.put("XGR", "tencent");
            d.put("GXSJ", new Date());
            d.put("SJLY", "testQueue");
            d.put("SJZT", "I");
            data.add(d);
    
            Map<String, Object> d1 = new HashMap<>();
            d1.put("XH", "XH2");
            d1.put("CDBH", "CDBH1");
            d1.put("JCSJ", DateUtils.parseDate("2019-05-12 12:00","yyyy-MM-dd HH:mm"));
            d1.put("JCXMDH", "w23234");
            d1.put("JCZ", 12.45);
            d1.put("ORGID", "440000");
            d1.put("CJR", "tencent");
            d1.put("CJSJ", new Date());
            d1.put("XGR", "tencent");
            d1.put("GXSJ", new Date());
            d1.put("SJLY", "testQueue");
            d1.put("SJZT", "I");
            data.add(d1);
    
            // 用来控制当前线程等待发送消息后才退出
            final AtomicBoolean isSanded = new AtomicBoolean(false);
    
            DataProducer producer = new DataProducer("74ef6a7b7adc4e5cb6ee3d741d0e3210", "XH");
            // 发送消息
            producer.send(data, (metadata, exception) -> {
                isSanded.set(true);
            });
    
            // 由于消息是异步发送,在跑单元测试时,需要暂停线程等待消息发送,如果不暂停,消息可能会丢失
            try {
                while (!isSanded.get()) {
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

     

    Maven镜像配置

    修改本地mavensettings文件

    mirros下增加mirror

  • 相关阅读:
    【学习篇】JavaScript可折叠区域
    hdu 2201 (简单数学概率)
    hdu 2552 (这题很强大)
    hdu 2212 (简单数学)
    hdu 2124 (赤裸裸的贪心)
    hdu 2570 (贪心)
    hdu 2401 (简单数学)
    hdu 2537(水)
    hdu4432
    hdu 1181 (搜索BFS,深搜DFS,并查集)
  • 原文地址:https://www.cnblogs.com/elves/p/12274823.html
Copyright © 2011-2022 走看看