zoukankan      html  css  js  c++  java
  • nifi练习项目,聚合多flowfile的attribute,以单个邮件发放通知

    nifi练习项目,聚合多flowfile的attribute,以单个邮件发放通知

    习惯看代码的直接访问

    https://github.com/cclient/nifi-email-bundle

    Nifi SendEmail Processor

    flowfile0                 mail0(flowfile0)
    
    flowfile1  PutMail(1:1)   mail1(flowfile1)
    
    flowfile2                 mail2(flowfile2)
    flowfile0            
    
    flowfile1  SendMail(n:1)  mail(flowfile0,flowfile1,flowfile2)
    
    flowfile2           
    

    需求

    • nifi生态和自定义processor下通常有failure stream的处理,为了及时发现failure需统一发送通知邮件
    • upstream 里会堆积多项flowfile,1:1发送邮件则通知过多,因此n:1 把多项flowfile合并在一个邮件里发送(只发送flowfile的attribute信息)
    • 邮件内容为table,每列是一个attribute,attribule取并集,每行是一个flowfile的所有attribute

    基于官方的PutMail 开发

    可以组合多个官方processor实现该功能,例如在前置Processor完成拼接email msg内容,直接通过putMail发送邮件

    通过简单定制实现也是为了熟悉nifi的开发/测试规范

    邮件为html body如下

    a0 a1 a2 a3 filename path uuid entryDate size
    v0 v1 v2 v3 184775642434972.mockFlowFile target 1bdaef1f-960b-48c5-a739-a881f92d1466 Thu May 21 15:37:00 CST 2020
    v0 v1 v2 v3 184323287499017.mockFlowFile target 45caab10-50c4-4727-ac42-bc345670cf85 Thu May 21 15:37:00 CST 2020 Thu Jan 01 08:00:00 CST 1970

    deploy

    1 compile

    mvn package -Dmaven.test.skip=true
    

    2 upload to one of

    nifi.nar.library.directory=./lib
    nifi.nar.library.directory.custom=./lib_custom
    nifi.nar.library.autoload.directory=./extensions
    nifi.nar.working.directory=./work/nar/
    

    cp nifi-email-nar/target/nifi-email-nar-0.1.nar nifi/lib_custom/

    3 restart nifi if need

    nifi/bin/nifi.sh restart

    实现方式

    • 1 官方MergeRecord/MergeContent组件+官方组件PutEmail组件

    http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.MergeRecord/index.html

    http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.MergeContent/index.html

    http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.PutEmail/index.html

    需要结合merge的特点和规范来应用

    • 2 自定制插件-其实主要是件这个基本的功能研究nifi插件的规范来为以后的深度定制作准备

    目标是全线替换logstash,flink,一些数据同步工具,原因后续有机会谈

    官方的文档较少,不过好在开源,可以参照官方的插件源码,熟悉nifi的开发规范(包括测试方式)

    以http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.PutEmail/index.html一个官方插件为例

    官方为每flowfile 作为一条邮件发放

    我们的目村是改造为 合并多条flowfile,作为一条邮件发放,暂时放弃发送附件,只将多flowfile的attr合并表格发放

    直接看代码吧

    合并flowfile的attr为html的table结构,很轻量的需求,为了少依赖,直接拼字符串

    public class Util {
        public static String buildEmailHtmlBodyByAttr(List<FlowFile> flowFileList) {
            List<Map<String, String>> attrList = flowFileList.stream().map(flowFile -> flowFile.getAttributes()).collect(Collectors.toList());
            List<String> allKeys = attrList.stream().map(attr -> attr.keySet()).flatMap(keys -> keys.stream()).distinct().collect(Collectors.toList());
            Collections.sort(allKeys);
            StringBuilder sb = new StringBuilder();
            sb.append("<table border="1" cellspacing="0" >");
            sb.append("<thead align="center" valign="middle">");
            sb.append("<tr>");
            allKeys.forEach(key -> sb.append("<th>" + key + "</th>"));
            sb.append("<th>entryDate</th>");
            sb.append("<th>size</th>");
            sb.append("</tr>");
            sb.append("</thead>");
            sb.append("<tbody>");
            flowFileList.stream().forEach(flowFile -> {
                Map<String, String> kvs = flowFile.getAttributes();
                sb.append("<tr>");
                allKeys.forEach(key -> sb.append("<td>" + kvs.getOrDefault(key, "") + "</td>"));
                sb.append("<td>" + new Date(flowFile.getEntryDate()) + "</td>");
                sb.append("<td>" + flowFile.getSize() + "</td>");
                sb.append("</tr>");
            });
            sb.append("</tbody>");
            sb.append("</table>");
            return sb.toString();
        }
    }
    

    发送邮件部分

        @Override
        public void onTrigger(final ProcessContext context, final ProcessSession session) {
            //邮件分组合并,获取组最大个数
            Integer groupSize = context.getProperty(GROUP_SIZE).asInteger();
            List<FlowFile> flowFiles = session.get(groupSize);
            if (flowFiles == null || flowFiles.size() == 0) {
                return;
            }
    
            final Properties properties = this.getMailPropertiesFromFlowFile(context);
    
            final Session mailSession = this.createMailSession(properties);
    
            final Message message = new MimeMessage(mailSession);
            final ComponentLog logger = getLogger();
    
            try {
                message.addFrom(toInetAddresses(context, FROM));
                message.setRecipients(RecipientType.TO, toInetAddresses(context, TO));
                message.setRecipients(RecipientType.CC, toInetAddresses(context, CC));
                message.setRecipients(RecipientType.BCC, toInetAddresses(context, BCC));
    
                this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).getValue(), message);
                message.setSubject(context.getProperty(SUBJECT).getValue());
                String messageText;
                //合并flowFiles的attr为messageText
                if (context.getProperty(MESSAGE) != null && context.getProperty(MESSAGE).getValue() != null) {
                    messageText = context.getProperty(MESSAGE).getValue();
                } else {
                    messageText = getMessage(flowFiles);
                }
    
                String contentType = context.getProperty(CONTENT_TYPE).getValue();
                message.setContent(messageText, contentType);
                message.setSentDate(new Date());
              
                //发送邮件
                send(message);
                session.transfer(flowFiles, REL_SUCCESS);
                logger.info("Sent email as a result of receiving {}", new Object[]{flowFiles});
            } catch (final ProcessException | MessagingException e) {
                context.yield();
                logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFiles, e.getMessage()}, e);
                session.transfer(flowFiles, REL_FAILURE);
            }
        }
    
        private String getMessage(final List<FlowFile> flowFiles) {
            String messageText = Util.buildEmailHtmlBodyByAttr(flowFiles);
            return messageText;
        }
    
  • 相关阅读:
    idea vue 格式化 并保存文件 宏 快捷键 ctrl+s
    IIS web.config 跨域设置 不包含 options的设置 thinkphp tp3 跨域
    vue peek 解决了 vue-template 加载 相对目录 ./components 组件内容 vscode
    base-table 加入动态slot 流程 vue2
    原码、反码、补码知识详细讲解
    巴什博奕
    Integer.bitCount() 函数理解
    el-table中的el-image预览小记
    shell 从变量中切割字符串
    QGIS,使用polygon裁剪栅格出现问题
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14459954.html
Copyright © 2011-2022 走看看