zoukankan      html  css  js  c++  java
  • MetaQ对接SparkStreaming示例代码

    由于JavaReceiverInputDStream<String> lines = ssc.receiverStream(Receiver<T> receiver) 中 没有直接对接MetaQ的工具,当然可以实用使用spark streaming已经有的工具进行转接,这里不建议,所以可以继承Receiver类重写onStart()方法

    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.Executor;
    
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.receiver.Receiver;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.dinpay.bdp.rcp.domain.Order;
    import com.taobao.metamorphosis.Message;
    import com.taobao.metamorphosis.client.MessageSessionFactory;
    import com.taobao.metamorphosis.client.MetaClientConfig;
    import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
    import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
    import com.taobao.metamorphosis.client.consumer.MessageConsumer;
    import com.taobao.metamorphosis.client.consumer.MessageListener;
    import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
    
    public abstract class MetaQReceiver<T> extends Receiver<T>{
        private static final long serialVersionUID = -3240967436204273248L;
        Logger logger=LoggerFactory.getLogger(MetaQReceiver.class);
        private static final DateFormat df = new SimpleDateFormat("yyyyMMdd");
        
        private String zkConnect;
        private String zkRoot;
        private String topic;
        private String group;
    
        public MetaQReceiver(String zkConnect,String zkRoot,String topic,String group) {
            super(StorageLevel.MEMORY_ONLY());
            this.zkConnect=zkConnect;
            this.zkRoot=zkRoot;
            this.topic=topic;
            this.group=group;
            
        }
        
        @Override
        public void onStart() {
            try{
                final MetaClientConfig metaClientConfig = new MetaClientConfig();
                final ZKConfig zkConfig = new ZKConfig();
                zkConfig.zkConnect = this.zkConnect;// "127.0.0.1:2181";
                zkConfig.zkRoot = this.zkRoot;// "/meta";
                metaClientConfig.setZkConfig(zkConfig);
                final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(
                        metaClientConfig);
                ConsumerConfig consumerConfig = new ConsumerConfig(group);
                // 默认最大获取延迟为5秒,这里设置成100毫秒,请根据实际应用要求做设置。
                consumerConfig.setMaxDelayFetchTimeInMills(100);
                final MessageConsumer consumer = sessionFactory
                        .createConsumer(consumerConfig);
                // subscribe topic
                consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
                    @Override
                    public void recieveMessages(final Message message) {
                        try{
                            //T t=message2Object(new String(message.getData(),"utf-8"));
                            logger.info("Receive message " + new String(message.getData()));
                            String orderJson = new String(message.getData());
                            Order order = ParameterDataUtil.getObject(orderJson, Order.class);
                            String cardNo = order.getCard_no();
                            String yyyyMMdd = df.format(new Date());
                            String payclassId = order.getPayclass_id();
                            String cntKey = "DK_CNT_" + cardNo + "_" + payclassId + "_" + yyyyMMdd;
                            logger.info(cntKey);
                            System.out.println(cntKey);
                            T result = (T) cntKey;
                            if(result!=null){
                                store(result);
                            }
                        }catch(Exception e){
                            logger.error("message2Object error",e);
                        }
                    }
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }
                });
                consumer.completeSubscribe();
            }catch(Exception e){
                throw new RuntimeException("metaq error",e);
            }
        }
        
        @Override
        public void onStop() {
        }
        
        //public abstract T message2Object(String message) throws Exception;
    }

    下面该段代码可以减掉,若有需要转Object可以在此进行处理

    public class MetaQReceiverStreaming extends MetaQReceiver<String>{
        
        private static final long serialVersionUID = -2290689243756756929L;
        
        public MetaQReceiverStreaming(String zkConnect, String zkRoot, String topic, String group) {
            super(zkConnect, zkRoot, topic, group);
        }
    
        /*@Override
        public String message2Object(String message) throws Exception {
            return message;
        }*/
        
    }

    接下来通过spark streaming进行metaq的消息处理

    ort java.util.Arrays;
    import java.util.List;
    
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import com.dinpay.bdp.rcp.util.Constant;
    import com.dinpay.bdp.rcp.util.MetaQReceiverStreaming;
    import com.google.common.base.Optional;
    import com.sun.xml.bind.v2.runtime.reflect.opt.Const;
    
    import scala.Tuple2;  
    /**
     * @author ll
     */
    public class MetaqStreamingCount {
        
         public static void main(String[] args) {
             String zkConnect=Constant.METAZK;
             String zkRoot="/meta";
             String topic=Constant.METATOPIC;
             String group=Constant.METAGROUP; 
             
             //屏蔽日志
             Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
             SparkConf sparkConf = new SparkConf().setAppName("MetaqStreamingCount").setMaster("local[2]");  
             JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); 
             
             JavaReceiverInputDStream<String> lines = ssc.receiverStream(new MetaQReceiverStreaming(zkConnect,zkRoot,topic,group));
             
             JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                 
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });
             
            JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) {
                    return new Tuple2<>(word, 1);
                }
            });
            
            JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, 
                    Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
                
                @Override
                public Optional<Integer> call(List<Integer> values, Optional<Integer> state){
                    //第一个参数就是key传进来的数据,第二个参数是已经有的数据
                    Integer updateValue = 0;//如果第一次,state没有,updateValue为0,如果有就获取
                    if(state.isPresent()){
                        updateValue = state.get();
                    }
                    //遍历batch传进来的数据可以一直加,随着时间的流式会不断的累加相同key的value结果
                    for (Integer value : values) {
                        updateValue += value;
                    }
                    return Optional.of(updateValue);//返回更新的值
                }
            });
    
            wordsCount.print();
            //需要将结果保存到Codis中
            ssc.checkpoint("checkpoint");
            ssc.start();
            ssc.awaitTermination();
            ssc.close();
            
            }
    }
  • 相关阅读:
    20179203李鹏举 《Linux内核原理与分析》第一周学习笔记
    20179223《Linux内核原理与分析》第八周学习笔记
    20179223《Linux内核原理与分析》第七周学习笔记
    20179223《Linux内核原理与解析》第六周学习笔记
    20179223《Linux内核原理与分析》第五周学习笔记
    20179223《Linux内核原理与分析》第三周学习笔记
    20179223《Linux内核原理与分析》第二周学习笔记
    20179223《Linux内核原理与分析》第一周学习笔记
    51nod贪心算法入门-----活动安排问题2
    51nod贪心算法入门-----活动安排问题
  • 原文地址:https://www.cnblogs.com/atomicbomb/p/6678392.html
Copyright © 2011-2022 走看看