前面我们介绍了zeppelin的修改,前面由于自己的原因,对zeppelin的修改过于多,现在由于优化了,我们两个类,
一个是zeppelin-server的NotebookServer的类的broadcastParagraph(Note note,Paragraph p)的代码,另外
的一个是zeppelin-zengine里面的Note的run(String paragraphId)的这个方法还有添加了Utils的这个类对于数据集的查找,对于这两个方法,就可以完美的对
zeppelin的数据集的修改。
我们首先介绍一下zeppelin的工作原理以及我们做的修改原理
1.zeppelin的实现
原先zeppelin的实现原理是当我在note的paragraph里面写spark语句的时候,当我们点击运行或点击其他的note的时候,
此时,zeppelin会保存那个paragraph里面的所有数据,而且不会进行改变。当我们需要运行的时候,
zeppelin就会去找notebook里面paragraph的text属性然后进行运行以及修改状态和显示到页面。
2.zeppelin的修改点
关于zeppelin的paragraph.text保存到数据库的操作没有进行更改,将用户所写数据原样的保存到数据库中
改动一:
在run的时候,把待运行paragraph里面的text数据中的数据集改为真实路径提交到zeppelin中运行
改动二:
运行完成回显数据的时候修改其paragraph状态,再恢复成为用户输入的样子(把真实路径换为数据集)
下面贴一下代码
zeppelin-zengine的Note的类
public void run(String paragraphId) { //根据是否逾期来选择读取数据的方式 if(Utils.isOverDueTime()){ // setdataMap = findData(); setdataMap = Utils.finddataMap(); setdataList = Utils.finddataList(); } Paragraph p = getParagraph(paragraphId); p.setListener(jobListenerFactory.getParagraphJobListener(this)); if (p.isBlankParagraph()) { logger.info("skip to run blank paragraph. {}", p.getId()); p.setStatus(Job.Status.FINISHED); return; } String requiredReplName = p.getRequiredReplName(); Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName); if (intp == null) { String intpExceptionMsg = p.getJobName() + "'s Interpreter " + requiredReplName + " not found"; InterpreterException intpException = new InterpreterException(intpExceptionMsg); InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage()); p.setReturn(intpResult, intpException); p.setStatus(Job.Status.ERROR); throw intpException; } if (p.getConfig().get("enabled") == null || (Boolean) p.getConfig().get("enabled")) { p.setAuthenticationInfo(p.getAuthenticationInfo()); Paragraph p2 = p; if (p.settings != null) { String text = p2.getText(); if (text != null && text.contains("#")) { p2 = p.cloneParagraphForUser(p.getId()); p2.setListener(jobListenerFactory.getParagraphJobListener(this)); p2.setAuthenticationInfo(p.getAuthenticationInfo()); //替换数据 for (String s : setdataList){ if(! text.contains("#")){ break; } if(text.contains(s)){ text = text.replaceAll(s, setdataMap.get(s)); } } p2.setText(text); p2.setInterpreterFactory(factory); p2.setNote(p.getNote()); } } intp.getScheduler().submit(p2); } }
zeppelin-server的NotebookServer
public void broadcastParagraph(Note note, Paragraph p) { Paragraph cacheP = note.getParagraph(p.getId()); if(cacheP == null) cacheP = p; if (cacheP.getText() != null && !cacheP.getText().equals(p.getText())) { if (p.getReturn() instanceof InterpreterResult) { cacheP.setReturn(p.getResult(), p.getException()); } else { cacheP.setResult(p.getReturn()); } cacheP.setStatus(p.getStatus()); return; } // if (p.settings != null) { // //clone()方法 // p2 = p.cloneParagraphForUser(p.getId()); // // String text = p2.getText(); // if (text != null) { // if (text.contains("/tmp/xjdx.txt")) { // text = text.replaceAll("/tmp/xjdx.txt", "#mydata"); // p2.setText(text); // } // } // } if (note.isPersonalizedMode()) { broadcastParagraphs(p.getUserParagraphMap(), cacheP); } else { broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", cacheP)); } }
下面介绍一下Utils的类
public class Utils { //数据库信息 // private static final String driver = "com.mysql.jdbc.Driver"; // private static final String url = "jdbc:mysql://172.20.11.10:3306/xxxx?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false"; // private static final String username = "xxxx"; // private static final String password = "xxxx"; private static String driver = null; private static String url = null; private static String username = null; private static String password = null; private static final long ALLOW_TIME = 10 * 1000; private static Date preTime = null; private static HashMap<String,String> setdataMap = null; private static ArrayList<String> setdataList = null; static{ driver = Config.getConfig("jdbc_driverClassName"); url = Config.getConfig("jdbc_url"); username = Config.getConfig("jdbc_username"); password = Config.getConfig("jdbc_password"); } //是否逾期时间 public static boolean isOverDueTime() { Date d = new Date(); if (preTime == null || (d.getTime() - preTime.getTime()) > ALLOW_TIME || setdataList == null || setdataMap == null) return true; return false; } //进行实时查询数据库查询 public static HashMap<String,String> finddataMap(){ try { preTime = new Date(); setdataMap = new HashMap<>(); Class.forName(driver); Connection conn = (Connection) DriverManager.getConnection(url, username, password); String sql = "select dataset.ds_name , dataset_storage.item_value from dataset left join dataset_storage on dataset.id = dataset_storage.dataset_id and dataset_storage.item_key = 'majorPath'"; PreparedStatement pstmt = (PreparedStatement)conn.prepareStatement(sql); ResultSet rs = pstmt.executeQuery(); while (rs.next()) { if((rs.getString("item_value")) != null ){ setdataMap.put("#" + rs.getString("ds_name"),rs.getString("item_value")); } } } catch (Exception e) { e.printStackTrace(); } return setdataMap; } public static ArrayList<String> finddataList(){ setdataList = new ArrayList<>(); if(setdataMap != null){ for (String s:setdataMap.keySet()){ setdataList.add(s) ; } } return setdataList; } }
以上就是对于zeppelin的数据集修改的二次优化