由于JavaReceiverInputDStream<String> lines = ssc.receiverStream(Receiver<T> receiver) 中 没有直接对接MetaQ的工具,当然可以实用使用spark streaming已经有的工具进行转接,这里不建议,所以可以继承Receiver类重写onStart()方法
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executor; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.dinpay.bdp.rcp.domain.Order; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.consumer.ConsumerConfig; import com.taobao.metamorphosis.client.consumer.MessageConsumer; import com.taobao.metamorphosis.client.consumer.MessageListener; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public abstract class MetaQReceiver<T> extends Receiver<T>{ private static final long serialVersionUID = -3240967436204273248L; Logger logger=LoggerFactory.getLogger(MetaQReceiver.class); private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private String zkConnect; private String zkRoot; private String topic; private String group; public MetaQReceiver(String zkConnect,String zkRoot,String topic,String group) { super(StorageLevel.MEMORY_ONLY()); this.zkConnect=zkConnect; this.zkRoot=zkRoot; this.topic=topic; this.group=group; } @Override public void onStart() { try{ final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = this.zkConnect;// "127.0.0.1:2181"; zkConfig.zkRoot = this.zkRoot;// "/meta"; metaClientConfig.setZkConfig(zkConfig); final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory( metaClientConfig); ConsumerConfig consumerConfig = new ConsumerConfig(group); // 默认最大获取延迟为5秒,这里设置成100毫秒,请根据实际应用要求做设置。 consumerConfig.setMaxDelayFetchTimeInMills(100); final MessageConsumer consumer = sessionFactory .createConsumer(consumerConfig); // subscribe topic consumer.subscribe(topic, 1024 * 1024, new MessageListener() { @Override public void recieveMessages(final Message message) { try{ //T t=message2Object(new String(message.getData(),"utf-8")); logger.info("Receive message " + new String(message.getData())); String orderJson = new String(message.getData()); Order order = ParameterDataUtil.getObject(orderJson, Order.class); String cardNo = order.getCard_no(); String yyyyMMdd = df.format(new Date()); String payclassId = order.getPayclass_id(); String cntKey = "DK_CNT_" + cardNo + "_" + payclassId + "_" + yyyyMMdd; logger.info(cntKey); System.out.println(cntKey); T result = (T) cntKey; if(result!=null){ store(result); } }catch(Exception e){ logger.error("message2Object error",e); } } @Override public Executor getExecutor() { return null; } }); consumer.completeSubscribe(); }catch(Exception e){ throw new RuntimeException("metaq error",e); } } @Override public void onStop() { } //public abstract T message2Object(String message) throws Exception; }
下面该段代码可以减掉,若有需要转Object可以在此进行处理
public class MetaQReceiverStreaming extends MetaQReceiver<String>{ private static final long serialVersionUID = -2290689243756756929L; public MetaQReceiverStreaming(String zkConnect, String zkRoot, String topic, String group) { super(zkConnect, zkRoot, topic, group); } /*@Override public String message2Object(String message) throws Exception { return message; }*/ }
接下来通过spark streaming进行metaq的消息处理
ort java.util.Arrays; import java.util.List; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.dinpay.bdp.rcp.util.Constant; import com.dinpay.bdp.rcp.util.MetaQReceiverStreaming; import com.google.common.base.Optional; import com.sun.xml.bind.v2.runtime.reflect.opt.Const; import scala.Tuple2; /** * @author ll */ public class MetaqStreamingCount { public static void main(String[] args) { String zkConnect=Constant.METAZK; String zkRoot="/meta"; String topic=Constant.METATOPIC; String group=Constant.METAGROUP; //屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf sparkConf = new SparkConf().setAppName("MetaqStreamingCount").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> lines = ssc.receiverStream(new MetaQReceiverStreaming(zkConnect,zkRoot,topic,group)); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state){ //第一个参数就是key传进来的数据,第二个参数是已经有的数据 Integer updateValue = 0;//如果第一次,state没有,updateValue为0,如果有就获取 if(state.isPresent()){ updateValue = state.get(); } //遍历batch传进来的数据可以一直加,随着时间的流式会不断的累加相同key的value结果 for (Integer value : values) { updateValue += value; } return Optional.of(updateValue);//返回更新的值 } }); wordsCount.print(); //需要将结果保存到Codis中 ssc.checkpoint("checkpoint"); ssc.start(); ssc.awaitTermination(); ssc.close(); } }