zoukankan      html  css  js  c++  java
  • zeppelin的数据集的优化

    前面我们介绍了zeppelin的修改,前面由于自己的原因,对zeppelin的修改过于多,现在由于优化了,我们两个类,

    一个是zeppelin-server的NotebookServer的类的broadcastParagraph(Note note,Paragraph p)的代码,另外

    的一个是zeppelin-zengine里面的Note的run(String paragraphId)的这个方法还有添加了Utils的这个类对于数据集的查找,对于这两个方法,就可以完美的对

    zeppelin的数据集的修改。

    我们首先介绍一下zeppelin的工作原理以及我们做的修改原理

    1.zeppelin的实现

    原先zeppelin的实现原理是当我在note的paragraph里面写spark语句的时候,当我们点击运行或点击其他的note的时候,

    此时,zeppelin会保存那个paragraph里面的所有数据,而且不会进行改变。当我们需要运行的时候,

    zeppelin就会去找notebook里面paragraph的text属性然后进行运行以及修改状态和显示到页面。

    2.zeppelin的修改点

    关于zeppelin的paragraph.text保存到数据库的操作没有进行更改,将用户所写数据原样的保存到数据库中 

    改动一:

    在run的时候,把待运行paragraph里面的text数据中的数据集改为真实路径提交到zeppelin中运行

    改动二:

    运行完成回显数据的时候修改其paragraph状态,再恢复成为用户输入的样子(把真实路径换为数据集)

    下面贴一下代码

    zeppelin-zengine的Note的类

     public void run(String paragraphId) {
        //根据是否逾期来选择读取数据的方式
        if(Utils.isOverDueTime()){
    //      setdataMap = findData();
            setdataMap = Utils.finddataMap();
            setdataList = Utils.finddataList();
        }
        Paragraph p = getParagraph(paragraphId);
        p.setListener(jobListenerFactory.getParagraphJobListener(this));
    
    
    
        if (p.isBlankParagraph()) {
          logger.info("skip to run blank paragraph. {}", p.getId());
          p.setStatus(Job.Status.FINISHED);
          return;
        }
    
        String requiredReplName = p.getRequiredReplName();
        Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName);
    
        if (intp == null) {
          String intpExceptionMsg =
              p.getJobName() + "'s Interpreter " + requiredReplName + " not found";
          InterpreterException intpException = new InterpreterException(intpExceptionMsg);
          InterpreterResult intpResult =
              new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage());
          p.setReturn(intpResult, intpException);
          p.setStatus(Job.Status.ERROR);
          throw intpException;
        }
        if (p.getConfig().get("enabled") == null || (Boolean) p.getConfig().get("enabled")) {
          p.setAuthenticationInfo(p.getAuthenticationInfo());
    
          Paragraph p2 = p;
          if (p.settings != null) {
            String text = p2.getText();
            if (text != null && text.contains("#")) {
                p2 = p.cloneParagraphForUser(p.getId());
                p2.setListener(jobListenerFactory.getParagraphJobListener(this));
                p2.setAuthenticationInfo(p.getAuthenticationInfo());
                //替换数据
               for (String s : setdataList){
                 if(! text.contains("#")){
                   break;
                 }
                 if(text.contains(s)){
                     text = text.replaceAll(s, setdataMap.get(s));
                 }
               }
                p2.setText(text);
                p2.setInterpreterFactory(factory);
                p2.setNote(p.getNote());
              }
          }
          intp.getScheduler().submit(p2);
        }
      }

    zeppelin-server的NotebookServer

     public void broadcastParagraph(Note note, Paragraph p) {
          Paragraph cacheP = note.getParagraph(p.getId());
          if(cacheP == null)
            cacheP = p;
         if (cacheP.getText() != null && !cacheP.getText().equals(p.getText())) {
             if (p.getReturn() instanceof InterpreterResult) {
                 cacheP.setReturn(p.getResult(), p.getException());
             } else {
                 cacheP.setResult(p.getReturn());
             }
            cacheP.setStatus(p.getStatus());
           return;
         }
    //      if (p.settings != null) {
    //          //clone()方法
    //          p2 = p.cloneParagraphForUser(p.getId());
    //
    //          String text = p2.getText();
    //          if (text != null) {
    //              if (text.contains("/tmp/xjdx.txt")) {
    //                  text = text.replaceAll("/tmp/xjdx.txt", "#mydata");
    //                  p2.setText(text);
    //              }
    //          }
    //      }
        if (note.isPersonalizedMode()) {
          broadcastParagraphs(p.getUserParagraphMap(), cacheP);
        } else {
          broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", cacheP));
        }
      }

    下面介绍一下Utils的类

    public class Utils {
    
        //数据库信息
    //    private static  final String driver = "com.mysql.jdbc.Driver";
    //    private static final String url = "jdbc:mysql://172.20.11.10:3306/xxxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false";
    //    private static final String username = "xxxx";
    //    private static final String password = "xxxx";
    
        private static  String driver = null;
        private static  String url = null;
        private static  String username = null;
        private static  String password = null;
        private static final long ALLOW_TIME = 10  * 1000;
        private static Date preTime = null;
    
        private static HashMap<String,String> setdataMap = null;
        private static ArrayList<String> setdataList = null;
    
        static{
            driver = Config.getConfig("jdbc_driverClassName");
            url = Config.getConfig("jdbc_url");
            username = Config.getConfig("jdbc_username");
            password = Config.getConfig("jdbc_password");
        }
    
        //是否逾期时间
        public static boolean isOverDueTime() {
            Date d = new Date();
            if (preTime == null || (d.getTime() - preTime.getTime()) > ALLOW_TIME || setdataList == null || setdataMap == null)
                return true;
            return false;
        }
    
        //进行实时查询数据库查询
        public static HashMap<String,String> finddataMap(){
            try {
                preTime = new Date();
                setdataMap = new HashMap<>();
                Class.forName(driver);
                Connection conn = (Connection) DriverManager.getConnection(url, username, password);
                String sql = "select dataset.ds_name , dataset_storage.item_value from dataset left join dataset_storage on dataset.id = dataset_storage.dataset_id and dataset_storage.item_key = 'majorPath'";
                PreparedStatement pstmt = (PreparedStatement)conn.prepareStatement(sql);
                ResultSet rs = pstmt.executeQuery();
                while (rs.next()) {
                    if((rs.getString("item_value")) != null  ){
                        setdataMap.put("#" + rs.getString("ds_name"),rs.getString("item_value"));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return setdataMap;
        }
    
        public static ArrayList<String> finddataList(){
            setdataList = new ArrayList<>();
            if(setdataMap != null){
                for (String s:setdataMap.keySet()){
                    setdataList.add(s) ;
                }
            }
            return setdataList;
        }
    
    }

    以上就是对于zeppelin的数据集修改的二次优化

  • 相关阅读:
    SAE/ISO standards for Automotive
    The J1850 Core
    SAE J1708 DS36277 MAX3444, DS75176B
    X431 元征诊断枪
    凯尔卡C68全球版汽车电脑诊断仪
    汽车王牌
    Vehicle’s communication protocol
    Vehicle Network Protocols -- ISO/KWP CAN CCD PCI SCI / SCP / Class 2
    On-board diagnostics -- Standards documents
    On-board diagnostics connector SAE J1962
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6516269.html
Copyright © 2011-2022 走看看