zoukankan      html  css  js  c++  java
  • UDP接收百万级数据的解决方案

    任务:有个发送方,会通过udp发送一些信息,然后服务接收到信息后保存到数据库的一张表A,保存的这些数据在经过一系列处理,处理完成后累积到另一张表B,然后清空处理的表A的数据。目前发送方比较少,不久就要增加到100个。

    我采用netty5来进行udp的网络通讯,将接收到的数据保存到BlockingQueue中,然后读取BlockingQueue中的数据,取到100条就存到hbase数据库中。

    初始化netty

    int DEFAULT_PORT = 6000;
    EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                        .handler(new UdpServerHandler());
                Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel();
                channel.closeFuture().await();
                LOGGER.info("netty初始化成功!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
    

     接收udp数据

    public BlockingQueue<Map<String, Object>> queue = <br>new LinkedBlockingQueue<Map<String, Object>>(990000);
    protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                // 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。
                String result = msg.content().toString(CharsetUtil.UTF_8);
                 
                Map<String, Object> getMap = new HashMap<String, Object>();
    //处理数据
                 
                        <br>queue.put(getMap);
                     
                <br>ctx.writeAndFlush(new DatagramPacket(<br>Unpooled.copiedBuffer("结果:", CharsetUtil.UTF_8), msg.sender()));
    

     读取数据存hbase

    public void getDate() {
            LOGGER.info("开始取数据");
            List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>();
                while (true) {
                    Map<String, Object> takeMap = null;
                    try {
                        takeMap = queue.take();
                        if (takeMap == null) {
                            continue;
                        }
                        jsonList.add(takeMap);
                        if (jsonList.size() == 100) {
                            String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList);
                            LOGGER.info(httpJson);
                            List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson);
                            HbaseAPI.insertDataList(hbaseDatas);
                            jsonList.clear();
                            LOGGER.info("hbase存了100条");
                        }
                    } catch (Exception e) {
                        jsonList.clear();
                        continue;
                    }
                }
     
            }
    
    • BlockingQueue一定要设置大小,不设置是int最大值,有可能会内存溢出;
    • 从BlockingQueue取数据的时候一定要阻塞式取take(),负责会死循环,占CPU100%;
    • hbase库连接时是阻塞式的,如果连接不上会一直阻塞。
  • 相关阅读:
    mybatis绑定错误-- Invalid bound statement (not found)
    JAVA MyBatis配置文件用properties引入外部配置文件
    JAVA错误提示:The operation is not applicable to the current selection.Select a field which is not declared as type variable or a type that declares such fields.
    MySql5.5安装步骤及MySql_Front视图配置
    Failed to load AppCompat ActionBar with unknown error
    android:整理drawable(余下的)(三)
    android:整理drawable(shapdrawable)(二)
    与drawable的较量(一)
    gradle与android studio 关系及gradle配置
    键盘优雅弹出与ios光标乱飘解决方案
  • 原文地址:https://www.cnblogs.com/wqsbk/p/7569683.html
Copyright © 2011-2022 走看看