zoukankan      html  css  js  c++  java
  • 2019年年终总结

    概述

    北京的冬天刮起风来还是比较冷的,在这2019年的最后一天,我也该回顾一下这一年的时光了,这一年对我来说都是很重要的一年,既是转折也是决定,既是顺势也是波折。当然写一篇随笔来诉苦是没有质量的文章。言归正传,我们依然从技术和其他来完结这篇总结。

    技术综述

    2019年技术大事记,按照时间顺序。

    mongodb的监控change-stream 多线程开发。

    kafka运维实践。

    hbase运维实践。

    spark运维实践。

    ESB培训与运维。

    MongoDB监控开发

    监控主体是mongoDB通过change-stream发送到kafka的数据,对比手段通过mongo的query语句查找op-log的数据,和消费kafka中的数据进行比对,对比过程都放在hbase中对比。逻辑关系间下图。绿色部分就是监控要做的事情。

     监控是多线程的划分就是根据kafka中的消息队列topic和处理逻辑区分的,比如要消费topic1的数据,就会有如下线程:消费topic1的线程,查找mongodb中topic1中表的数据的线程,处理hbase中topic1先关数据的线程,就是说三个线程监控一个topic,2个线程从mongo和kafka拉取数据,一个线程处理数据,处理数据在hbase的表中,相当于一个topic对应一个队列。现在列一下比较重要的代码。

    kafka消费比较简单,使用KafkaConsumer消费者中的低阶API assign方法,其中需要注意的是记录offset到redis,如果redis存在正常offset则使用这个offset,否则从最新位置开始消费。

     1         consumer = new KafkaConsumer<String, String>(props);
     2         /*consumer.subscribe(Arrays.asList(topic));
     3         consumer.seekToEnd();*/
     4         TopicPartition partitions = new TopicPartition(topic, partition);
     5         consumer.assign(Arrays.asList(partitions));
     6         if(RedisCheck==0){
     7         //使用assign方式进行消费
     8         consumer.seekToEnd(partitions);
     9         }else{
    10         consumer.seek(partitions, RedisCheck);
    11         };

    mongodb中使用sql语句查找指定表的数据,核心代码是,需要注意要连接secondary的节点:

     1     public  void init(){
     2         List<ServerAddress> addrs = new ArrayList<ServerAddress>();
     3         String mongoServer = PosterMogo.MongoServer;
     4         if(mongoServer==null){
     5             mongoServer="host1:27018,host2:27018";
     6         }
     7         if(mongoServer.indexOf(",")>0){
     8             String[] strs = mongoServer.split(",");
     9             for(String str:strs){
    10                 addrs.add(new ServerAddress(str.split(":")[0], Integer.valueOf(str.split(":")[1])));
    11             }
    12         }else{
    13             addrs.add(new ServerAddress(mongoServer.split(":")[0], Integer.valueOf(mongoServer.split(":")[1])));
    14         }
    15         
    16         Builder options = new MongoClientOptions.Builder().readPreference(ReadPreference.secondary());
    17         options.cursorFinalizerEnabled(true);
    18         //options.maxConnectionLifeTime(0);
    19         options.connectionsPerHost(3);
    20         options.connectTimeout(6000000);
    21         options.maxWaitTime(6000000);
    22         options.socketTimeout(6000000);
    23         options.maxConnectionIdleTime(6600000);
    24         options.socketKeepAlive(true).heartbeatFrequency(1000).connectTimeout(6000000);
    25         options.threadsAllowedToBlockForConnectionMultiplier(300);
    26         
    27         options.build();
    28         String[] identified = null;
    29         if(PosterMogo.MongoIdentify==null){
    30             identified="root,admin,842AB52563B13D332916113FB8DFA9BF".split(",");
    31         }else{
    32         identified = PosterMogo.MongoIdentify.split(",");
    33         }
    34         String password = null;
    35         try {
    36         password = DesUtils.decrypt(identified[2]);
    37         } catch (Exception e) {
    38             Log4jLog.logError(e,"mongoIdentify","init","init");
    39         }
    40         MongoCredential credential = MongoCredential.createScramSha1Credential(identified[0], identified[1], password.toCharArray());
    41         List<MongoCredential> credentials = new ArrayList<MongoCredential>();
    42         credentials.add(credential);
    43 
    44         //通过连接认证获取MongoDB连接
    45         mongoClient = new MongoClient(addrs,credentials);
    46         //getAdminInfo();
    47         
    48     }

    查找hbase中表的数据进行对比的代码:

     1 public void aggreDataNew(String firstCount,String lastCount,String topic){
     2         
     3         long corDelay=Long.MIN_VALUE;
     4         
     5         String hbaseSub=null;
     6         Table table = HBaseUtil.getTable(tableNameCache);
     7         
     8         Scan scan = new Scan();
     9         scan.setStartRow(Bytes.toBytes(firstCount));
    10         scan.setStopRow(Bytes.toBytes(lastCount));
    11         //scan.setBatch(50000);
    12         scan.setCaching(1000);
    13         scan.setCacheBlocks(false);
    14 
    15         ResultScanner scanner=null;
    16         try {
    17             scanner = table.getScanner(scan);
    18             
    19             for(Result rs:scanner){
    20                             }
    21                  } catch (IOException e) {
    22             e.printStackTrace();
    23         } finally{
    24             if(scanner!=null){scanner.close();}
    25             if(table!=null){
    26                 try {
    27                     table.close();
    28                 } catch (IOException e) {
    29                     // TODO Auto-generated catch block
    30                     e.printStackTrace();
    31                 }
    32                 }
    33         }
    34     }

    在这里需要注意这3个线程需要顺序限制 所以需要加锁的代码,加锁原理,在第三个线程计算mongodb和kafka的数据的线程,需要线程等待。

    kafka运维实践

    这年kafka运维没有什么值得写的问题,简单来说,包括硬件内存,ISR列表不一致,GC问题等等。

    HBASE运维实践

    检查一致性:hbase hbck

    关闭有问题的表:DISABLE TABLE

    更新phoenix索引。

    SPARK运维实践

    spark也没有什么大问题,基本上是hdfs磁盘问题,或者yarn计算节点重启。

    总结

    明天就是2020年了,感觉2019过的很快,也许因为自己成长速度太慢,安于现状吧,不激励自己永远也不知道自己会成长到什么地步。共勉!

  • 相关阅读:
    我的Java设计模式-原型模式
    我的Java设计模式-观察者模式
    我的Java设计模式-建造者模式
    我的Java设计模式-单例模式
    我的Java设计模式-工厂方法模式
    菜鸟必备教程,ajax与xml交互传输数据。
    javascript Date format(js日期格式化)
    用script实现内容显示,并使用json传输数据
    使用jquer获取当前时间,并赋值到input上。
    利用js获取时间并输出值
  • 原文地址:https://www.cnblogs.com/boanxin/p/12124423.html
Copyright © 2011-2022 走看看