zoukankan      html  css  js  c++  java
  • UDP接收百万级数据的解决方案

    任务:有个发送方,会通过udp发送一些信息,然后服务接收到信息后保存到数据库的一张表A,保存的这些数据在经过一系列处理,处理完成后累积到另一张表B,然后清空处理的表A的数据。目前发送方比较少,不久就要增加到100个。

    我采用netty5来进行udp的网络通讯,将接收到的数据保存到BlockingQueue中,然后读取BlockingQueue中的数据,取到100条就存到hbase数据库中。

    初始化netty

    int DEFAULT_PORT = 6000;
    EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                        .handler(new UdpServerHandler());
                Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel();
                channel.closeFuture().await();
                LOGGER.info("netty初始化成功!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
    

     接收udp数据

    public BlockingQueue<Map<String, Object>> queue = <br>new LinkedBlockingQueue<Map<String, Object>>(990000);
    protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                // 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。
                String result = msg.content().toString(CharsetUtil.UTF_8);
                 
                Map<String, Object> getMap = new HashMap<String, Object>();
    //处理数据
                 
                        <br>queue.put(getMap);
                     
                <br>ctx.writeAndFlush(new DatagramPacket(<br>Unpooled.copiedBuffer("结果:", CharsetUtil.UTF_8), msg.sender()));
    

     读取数据存hbase

    public void getDate() {
            LOGGER.info("开始取数据");
            List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>();
                while (true) {
                    Map<String, Object> takeMap = null;
                    try {
                        takeMap = queue.take();
                        if (takeMap == null) {
                            continue;
                        }
                        jsonList.add(takeMap);
                        if (jsonList.size() == 100) {
                            String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList);
                            LOGGER.info(httpJson);
                            List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson);
                            HbaseAPI.insertDataList(hbaseDatas);
                            jsonList.clear();
                            LOGGER.info("hbase存了100条");
                        }
                    } catch (Exception e) {
                        jsonList.clear();
                        continue;
                    }
                }
     
            }
    
    • BlockingQueue一定要设置大小,不设置是int最大值,有可能会内存溢出;
    • 从BlockingQueue取数据的时候一定要阻塞式取take(),负责会死循环,占CPU100%;
    • hbase库连接时是阻塞式的,如果连接不上会一直阻塞。
  • 相关阅读:
    如何开始DDD(续)
    如何开始DDD
    ThinkNet终于见面了
    [Umbraco] umbraco中如何分页
    ETL 工具下载全集 包括 Informatica Datastage Cognos( 持续更新)
    js时间对比-转化为几天前,几小时前,几分钟前
    原生JS实现返回顶部和滚动锚点
    JSONP原理及简单实现 可做简单插件使用
    CSS3 transition效果 360度旋转 旋转放大 放大 移动
    js获取url的常用方法
  • 原文地址:https://www.cnblogs.com/wqsbk/p/7569683.html
Copyright © 2011-2022 走看看