zoukankan      html  css  js  c++  java
  • SparkStreaming和Drools结合的HelloWord版

    关于sparkStreaming的测试Drools框架结合版

    package com.dinpay.bdp.rcp.service;
    
    import java.math.BigDecimal;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Date;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.api.java.function.VoidFunction2;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.kie.api.KieServices;
    import org.kie.api.runtime.KieContainer;
    import org.kie.api.runtime.KieSession;
    
    import com.dinpay.bdp.rcp.metaq.MetaQReceiver;
    import com.dinpay.bdp.rcp.streaming.StreamingUtil;
    import com.dinpay.bdp.rcp.util.CodisUtil;
    import com.dinpay.bdp.rcp.util.Constant;
    import com.dinpay.dpp.rcp.po.Order;
    
    import redis.clients.jedis.Jedis;
    import scala.Tuple2;
    
    /**
     * 同卡号单日最大交易金额测试 
     * @author ll-t150
     *
     */
    public class SparkDroolsTest {
        
        public static Logger logger = Logger.getLogger(SparkDroolsTest.class);
        public static final DateFormat df = new SimpleDateFormat("yyyyMMdd");
        
         public static void main(String[] args) {
             String zkConnect=Constant.METAZK;
             String zkRoot="/meta";
             String topic=Constant.ORDERTOPIC;
             String group=Constant.STREAMGROUP; 
             //屏蔽日志
             Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
             logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group);
             SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local[2]");  
             JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); 
             //从metaq取消息
             JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group));
    
             JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() {
                @Override
                public Iterable<Order> call(Order order) throws Exception {
                    return Arrays.asList(new Order[]{order});
                }
            });
            
            //同卡号单日交易最大次数 统计包括成功和未成功的订单
            JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words);
            save2Codis(cardCntPairs);
            ssc.start();
            ssc.awaitTermination();
            ssc.close();
         }
         
         @SuppressWarnings({ "unchecked", "serial" })
            public static <T> JavaPairDStream<String, T>  getCardJavaPair(JavaDStream<Order> words){
                 JavaPairDStream<String, T> pairs = null;
                         //次数统计
                         pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() {
                            @Override
                            public Tuple2<String, Integer> call(Order order) {
                                Jedis jedis = CodisUtil.getJedisPool().getResource();
                                String cardCntkey = order.getSystemId()+"_CNT_"+order.getPayerCardNo()+"_"+df.format(new Date());
                                //拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1
                                String value = jedis.get(cardCntkey);
                                if (StringUtils.isEmpty(value)) {
                                    return new Tuple2<String, Integer>(cardCntkey, 1);
                                } else {
                                    return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1);
                                }
                            }
                        });
                        return pairs;
                 }
            
            
             /**
             * 将计算出的数据保存到codis中
             * @param pair
             */
            @SuppressWarnings("serial")
            public static <T> void save2Codis(JavaPairDStream<String, T> pair) {
                pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() {
                    @Override
                    public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception {
                        rdd.foreach(new VoidFunction<Tuple2<String,T>>() {
                            @Override
                            public void call(Tuple2<String, T> tp) throws Exception {
                                    Jedis jedis = CodisUtil.getJedisPool().getResource();
                                    jedis.set(tp._1(), String.valueOf(tp._2()));
                                    logger.info(tp._1() + ">>>" + tp._2()+",保存到Codis完成!");
                                    KieServices kieServices = KieServices.Factory.get();
                                    KieContainer kieContainer = kieServices.getKieClasspathContainer();
                                    KieSession kieSession = kieContainer.newKieSession("helloworld");
                                    ChannAmount objectChannel = new ChannAmount();
                                    objectChannel.setAmount(Integer.parseInt(String.valueOf(tp._2())));
                                    objectChannel.setChannel(tp._1());
                                    kieSession.insert(objectChannel);
                                    kieSession.fireAllRules();
                                    if(jedis !=null){
                                        jedis.close();
                                    }
                            }
                        });
                    }
                });
            }
        
    }

    关于配置文件的设置

    kmodule.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
        <kbase name="rules" packages="rules">
            <ksession name="helloworld"/>
        </kbase>
        <kbase name="dtables" packages="dtables">
            <ksession name="ksession-dtables"/>
        </kbase>
        <kbase name="process" packages="process">
            <ksession name="ksession-process"/>
        </kbase>
    </kmodule>

    riskMonitor.drl内容

    package rules;
    
    import com.dinpay.bdp.rcp.service.ChannAmount;
    //其中m为对象objectChannel 的引用
    rule "channel"
        when
            ChannAmount(amount>2)
        then
            System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 ");
    end

    测试OK!

  • 相关阅读:
    PyCharm配置 Git 教程
    Docker实践:基于python:3.7.1-stretch制作python镜像
    Docker开启远程安全访问
    Centos7安装apt-get
    Kubernetes 系列(二):在 Linux 部署多节点 Kubernetes 集群与 KubeSphere 容器平台
    微信小程序调试mock 数据,提示合法域名校验失败
    babel-plugin-import 配置多个组件按需加载时
    docker run -p 8070:80 -d nginx
    数据库的设计(E-R图,数据库模型图,三大范式)
    数据库 范式
  • 原文地址:https://www.cnblogs.com/atomicbomb/p/7171512.html
Copyright © 2011-2022 走看看