zoukankan      html  css  js  c++  java
  • sparkStreaming的mapWithState函数【案例二】

    sparkStreaming是以连续bathinterval为单位,进行bath计算,在流式计算中,如果我们想维护一段数据的状态,就需要持久化上一段的数据,sparkStreaming提供的MapwithState函数,用于更新数据状态。

    例子:(更新用户当前的操作状态)
    1:定义用户会话类
    package com.streamkafka.user_state_update
    
    import org.omg.CORBA.UserException
    
    /**
     * userNo:用户账号
     * userName:用户名称
     * userOperation:用户的操作 枚举
     * userIsVIP:用户是否是会员 枚举
     */
    class UserSession (userNO:String,userName:String,userOperation:String,userIsVIP:Int)  extends Serializable{
      
       var userNo="";
       var user_name="";
       var user_operation="";
       var user_vip=false;
      
      /**
       * 如果要在构造方法里调用其他方法,需要在构造方法的第一行调用构造方法
       */
     def userTrans(userNO:String,user_Name:String,userOperation:Int,userIsVIP:Int){
       //this(userNO,user_Name,userOperation.toString(),userIsVIP);
       userNo=userNO;
       user_name=user_Name;
       this.operationTran(userOperation.toString())
       this.isVipTran(userIsVIP);
     }
     //定义无参的构造方法
      def this()={
         this("","","",0);
      }
      def operationTran(userOp:String){
       if(userOp.equals(UserEnum.login)){
         user_operation="login";
       }
       if(userOp.equals(UserEnum.loginOut)){
         user_operation="loginOut";
       }
       if(userOp.equals(UserEnum.clickNextPage)){
         user_operation="clickNextPage";
       }
       if(userOp.equals(UserEnum.clickPrePage)){
         user_operation="clickPrePage";
       }
       if(userOp.equals(UserEnum.createUser)){
         user_operation="createUser";
       }
      }
       def isVipTran(userIsVIP:Int){
          if(userIsVIP==UserEnum.Yvip){
            user_vip=true;
          }
          if(userIsVIP==UserEnum.Nvip){
            user_vip=false;
           }
        }
        @Override
      def toStrings:String= {
        return "UserSession [param: userNo("+userNo+"),userName("+user_name+"),userOperation("+user_operation+"),user("+user_vip+")]";
      }
    }

    2:定义状态枚举类

    package com.streamkafka.user_state_update
    
    import java.io.Serializable
    
    object  UserEnum extends Serializable{
      //operation 枚举
      val  login="1";//登录操作
      val loginOut="0";//退出操作
      val clickNextPage="2";//点击下一页操作
      val clickPrePage="3";//点击上一页操作
      val createUser="5";//创建用户操作
     //是否是会员 枚举
      val Yvip=1;
      val Nvip=0;
    }

    3:定义生产者(生产者类是使用java写的)

    package com.streamkafka.user_state_update;
    
    import java.io.Serializable;
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.commons.lang.math.RandomUtils;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    /**
     * 用户信息的格式
     * userNo:用户账号  (数字+字母 长度:5)
     * userName:用户名称  (汉字)
     * userOperation:用户的操作 枚举 (数字:0,1,2,3,5)
     * userIsVIP:用户是否是会员 枚举 (0,,1)
     * @author a
     */
    public class UserInfiProducer extends Thread  implements Serializable{
        private String topic="test";
        private String userNo="userId-9iVEYecP";
        private String userName="zhangxs";
        private int userOper=0;
        private int isvip=0;
        private String message="";
         Properties props=null;
        RandomUtils rand=new RandomUtils();
        Random r=new Random();
        private static int msgCount=0;
        //消费者配置
        private Properties producerParam(){
             props = new Properties();
             props.put("bootstrap.servers", "192.168.99.xxx:9092");//kafka的服务器ip
             props.put("zk.connect", "192.168.99.143:2181");
             props.put("acks", "all");
             props.put("retries", 0);
             props.put("batch.size", 16384);
             props.put("linger.ms", 1);
             props.put("buffer.memory", 33554432);
             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             return props;
        }
        //生产用户消息
        private String producerInfo(){
            StringBuffer mess=new StringBuffer();
            int oper=r.nextInt(5);//随机产生用户的操作
            if(4!=oper){
                userOper=oper;
            }
            isvip=r.nextInt(1);//用户是否是会员
            mess.append(userNo).append("|")
            .append(userName).append("|")
            .append(String.valueOf(userOper)).append("|")
            .append(String.valueOf(isvip));
            return mess.toString();
        }
        KafkaProducer kp=new KafkaProducer(producerParam());
        private void sendMsg(String message){
            //生产者消息配置参数
            Properties proper=this.producerParam();
              //回调函数
            Callback c=new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
                    // TODO Auto-generated method stub
                    System.out.println("topic:"+paramRecordMetadata.topic());
                    System.out.println("partition:"+paramRecordMetadata.partition());
                    if(null!=paramException){
                        System.out.println("getMessage:"+paramException.getMessage());
                    }
                }
            };
            //创建消息发送器
            ProducerRecord<String,String> precord=new ProducerRecord<String,String>(topic, message);
            //发送消息
            kp.send(precord, c);
        }
        
        @Override
        public void run() {
            while(true){
                try {
                    message=producerInfo();
                    System.out.println("producer:"+message);
                    sendMsg(message);
                    System.out.println("message send success");
                    msgCount++;
                    System.out.println("成功发送【"+msgCount+"】消息");
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
        public static void main(String[] args) {
            UserInfiProducer userPord=new UserInfiProducer();
            Thread thread=new Thread(userPord);
            thread.start();
        }
    }

    4:定义消费者

    package com.streamkafka.user_state_update
    
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.commons.logging.LogFactory
    import org.apache.commons.logging.Log
    import scala.actors.threadpool.ExecutorService
    import scala.actors.threadpool.Executors
    import org.apache.spark.streaming.State
    import org.apache.spark.streaming.StateSpec
    import org.apache.spark.streaming.Time
    
    object UserInfoConsumer extends Serializable {
      var logs = LogFactory.getLog(UserInfoConsumer.getClass);
      def main(args: Array[String]): Unit = {
        /* val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
        threadPool.execute(new ConsumerProcess())*/
        var cc = new ConsumerProcess();
        cc.resoleMethod();
    
      }
      class ConsumerProcess extends Serializable { 
        def resoleMethod() {
          var conf = new SparkConf();
          conf.setMaster("spark://192.168.99.xxx:7077").setAppName("user_state_update");
          var ssc = new StreamingContext(conf, Seconds(5));//创建sparkContext,这是sparkStreaming的入口
          var topic = Array("test");
          ssc.checkpoint(".");
          var kafkaParams = Map(
            //建立初始链接到kafka的服务器,这个可以是动态的所以不需要已下载配置完所有的服务器 "192.168.99.xxx:9092,anotherhost:9092"
            "bootstrap.servers" -> "192.168.99.xxx:9092",
            //反序列化器类实现了串并转换器接口的关键。 新的消费者配置
            "key.deserializer" -> classOf[StringDeserializer],
            //反序列化器类值,实现了串并转换器接口。 新的消费者配置
            "value.deserializer" -> classOf[StringDeserializer],
            //这是一个唯一的标识,用来标识这个消费者时属于哪个消费者组 。   新的消费者配置
            "group.id" -> "user-consumer-group1",
            //
            "auto.offset.reset" -> "latest",
            //true 定期在后台提交
            "enable.auto.commit" -> (false: java.lang.Boolean))
          //println("接受到的消息【"+count+"】")
          //Subscribe 指定订阅的主题和配置
          var dStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topic, kafkaParams));
          dStream.foreachRDD(rdd => {
            var user: UserSession = new UserSession();
            rdd.foreach(f => {
              var msgStr = f.value().split("\|");//获取消息体
              println("usreNo:" + msgStr(1))
              //对消息进行解析,并封装成userSession
              if (4 == msgStr.length) {
                print("组装userSession")
                user.userTrans(msgStr(0).toString(), msgStr(1).toString(), msgStr(2).toInt, msgStr(3).toString().toInt)
              } else {
                print("消息格式不符合定义!!!")
              }
              //打印组装后的userInfo
              println("userInfo:" + user.toStrings);
            })
            //更新用户的状态
          })
         
         
    //更新用户状态的函数
    var mapWithStateMethod = (userState: String, one: Option[Int], state: State[Int]) => { var stateInt = userState.toInt; var userM = new UserSession(); userM.operationTran(stateInt.toString()); //返回用户当前状态和对应的枚举 var output=(userM.user_operation,stateInt); println("当前用户的状态为:" + userM.user_operation) state.update(stateInt); output } var mapState = dStream.map(x => (x.value().split("\|")(2), 1)); var userState = mapState.mapWithState(StateSpec.function(mapWithStateMethod); println("userStatePrint:" + userState.print()); print("=============================================================================") ssc.start(); ssc.awaitTermination(); } /* override def run(){ while(true){ print("进入run方法................."); resoleMethod(); ssc.start(); ssc.awaitTermination(); Thread.sleep(5000L); } }*/ } }
  • 相关阅读:
    hdu6148 Valley Numer
    NOI2007 生成树计数
    bzoj3336 Uva10572 Black and White
    hdu1693 eat the trees
    【模板】插头dp
    bzoj4712 洪水
    ZJOI2010 基站选址
    poj2376 Cleaning Shifts
    bzoj4367 [IOI2014]holiday假期
    bzoj4951 [Wf2017]Money for Nothing
  • 原文地址:https://www.cnblogs.com/zhangXingSheng/p/6779565.html
Copyright © 2011-2022 走看看