关于sparkStreaming的测试Drools框架结合版
package com.dinpay.bdp.rcp.service; import java.math.BigDecimal; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.kie.api.KieServices; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import com.dinpay.bdp.rcp.metaq.MetaQReceiver; import com.dinpay.bdp.rcp.streaming.StreamingUtil; import com.dinpay.bdp.rcp.util.CodisUtil; import com.dinpay.bdp.rcp.util.Constant; import com.dinpay.dpp.rcp.po.Order; import redis.clients.jedis.Jedis; import scala.Tuple2; /** * 同卡号单日最大交易金额测试 * @author ll-t150 * */ public class SparkDroolsTest { public static Logger logger = Logger.getLogger(SparkDroolsTest.class); public static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); public static void main(String[] args) { String zkConnect=Constant.METAZK; String zkRoot="/meta"; String topic=Constant.ORDERTOPIC; String group=Constant.STREAMGROUP; //屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.OFF); logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group); SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); //从metaq取消息 JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group)); JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() { @Override public Iterable<Order> call(Order order) throws Exception { return Arrays.asList(new Order[]{order}); } }); //同卡号单日交易最大次数 统计包括成功和未成功的订单 JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words); save2Codis(cardCntPairs); ssc.start(); ssc.awaitTermination(); ssc.close(); } @SuppressWarnings({ "unchecked", "serial" }) public static <T> JavaPairDStream<String, T> getCardJavaPair(JavaDStream<Order> words){ JavaPairDStream<String, T> pairs = null; //次数统计 pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() { @Override public Tuple2<String, Integer> call(Order order) { Jedis jedis = CodisUtil.getJedisPool().getResource(); String cardCntkey = order.getSystemId()+"_CNT_"+order.getPayerCardNo()+"_"+df.format(new Date()); //拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1 String value = jedis.get(cardCntkey); if (StringUtils.isEmpty(value)) { return new Tuple2<String, Integer>(cardCntkey, 1); } else { return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1); } } }); return pairs; } /** * 将计算出的数据保存到codis中 * @param pair */ @SuppressWarnings("serial") public static <T> void save2Codis(JavaPairDStream<String, T> pair) { pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() { @Override public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception { rdd.foreach(new VoidFunction<Tuple2<String,T>>() { @Override public void call(Tuple2<String, T> tp) throws Exception { Jedis jedis = CodisUtil.getJedisPool().getResource(); jedis.set(tp._1(), String.valueOf(tp._2())); logger.info(tp._1() + ">>>" + tp._2()+",保存到Codis完成!"); KieServices kieServices = KieServices.Factory.get(); KieContainer kieContainer = kieServices.getKieClasspathContainer(); KieSession kieSession = kieContainer.newKieSession("helloworld"); ChannAmount objectChannel = new ChannAmount(); objectChannel.setAmount(Integer.parseInt(String.valueOf(tp._2()))); objectChannel.setChannel(tp._1()); kieSession.insert(objectChannel); kieSession.fireAllRules(); if(jedis !=null){ jedis.close(); } } }); } }); } }
关于配置文件的设置
kmodule.xml文件
<?xml version="1.0" encoding="UTF-8"?> <kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule"> <kbase name="rules" packages="rules"> <ksession name="helloworld"/> </kbase> <kbase name="dtables" packages="dtables"> <ksession name="ksession-dtables"/> </kbase> <kbase name="process" packages="process"> <ksession name="ksession-process"/> </kbase> </kmodule>
riskMonitor.drl内容
package rules; import com.dinpay.bdp.rcp.service.ChannAmount; //其中m为对象objectChannel 的引用 rule "channel" when ChannAmount(amount>2) then System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 "); end
测试OK!