sparkStreaming是以连续bathinterval为单位,进行bath计算,在流式计算中,如果我们想维护一段数据的状态,就需要持久化上一段的数据,sparkStreaming提供的MapwithState函数,用于更新数据状态。
例子:(更新用户当前的操作状态)
1:定义用户会话类
package com.streamkafka.user_state_update import org.omg.CORBA.UserException /** * userNo:用户账号 * userName:用户名称 * userOperation:用户的操作 枚举 * userIsVIP:用户是否是会员 枚举 */ class UserSession (userNO:String,userName:String,userOperation:String,userIsVIP:Int) extends Serializable{ var userNo=""; var user_name=""; var user_operation=""; var user_vip=false; /** * 如果要在构造方法里调用其他方法,需要在构造方法的第一行调用构造方法 */ def userTrans(userNO:String,user_Name:String,userOperation:Int,userIsVIP:Int){ //this(userNO,user_Name,userOperation.toString(),userIsVIP); userNo=userNO; user_name=user_Name; this.operationTran(userOperation.toString()) this.isVipTran(userIsVIP); } //定义无参的构造方法 def this()={ this("","","",0); } def operationTran(userOp:String){ if(userOp.equals(UserEnum.login)){ user_operation="login"; } if(userOp.equals(UserEnum.loginOut)){ user_operation="loginOut"; } if(userOp.equals(UserEnum.clickNextPage)){ user_operation="clickNextPage"; } if(userOp.equals(UserEnum.clickPrePage)){ user_operation="clickPrePage"; } if(userOp.equals(UserEnum.createUser)){ user_operation="createUser"; } } def isVipTran(userIsVIP:Int){ if(userIsVIP==UserEnum.Yvip){ user_vip=true; } if(userIsVIP==UserEnum.Nvip){ user_vip=false; } } @Override def toStrings:String= { return "UserSession [param: userNo("+userNo+"),userName("+user_name+"),userOperation("+user_operation+"),user("+user_vip+")]"; } }
2:定义状态枚举类
package com.streamkafka.user_state_update import java.io.Serializable object UserEnum extends Serializable{ //operation 枚举 val login="1";//登录操作 val loginOut="0";//退出操作 val clickNextPage="2";//点击下一页操作 val clickPrePage="3";//点击上一页操作 val createUser="5";//创建用户操作 //是否是会员 枚举 val Yvip=1; val Nvip=0; }
3:定义生产者(生产者类是使用java写的)
package com.streamkafka.user_state_update; import java.io.Serializable; import java.util.Properties; import java.util.Random; import org.apache.commons.lang.math.RandomUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * 用户信息的格式 * userNo:用户账号 (数字+字母 长度:5) * userName:用户名称 (汉字) * userOperation:用户的操作 枚举 (数字:0,1,2,3,5) * userIsVIP:用户是否是会员 枚举 (0,,1) * @author a */ public class UserInfiProducer extends Thread implements Serializable{ private String topic="test"; private String userNo="userId-9iVEYecP"; private String userName="zhangxs"; private int userOper=0; private int isvip=0; private String message=""; Properties props=null; RandomUtils rand=new RandomUtils(); Random r=new Random(); private static int msgCount=0; //消费者配置 private Properties producerParam(){ props = new Properties(); props.put("bootstrap.servers", "192.168.99.xxx:9092");//kafka的服务器ip props.put("zk.connect", "192.168.99.143:2181"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } //生产用户消息 private String producerInfo(){ StringBuffer mess=new StringBuffer(); int oper=r.nextInt(5);//随机产生用户的操作 if(4!=oper){ userOper=oper; } isvip=r.nextInt(1);//用户是否是会员 mess.append(userNo).append("|") .append(userName).append("|") .append(String.valueOf(userOper)).append("|") .append(String.valueOf(isvip)); return mess.toString(); } KafkaProducer kp=new KafkaProducer(producerParam()); private void sendMsg(String message){ //生产者消息配置参数 Properties proper=this.producerParam(); //回调函数 Callback c=new Callback() { @Override public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) { // TODO Auto-generated method stub System.out.println("topic:"+paramRecordMetadata.topic()); System.out.println("partition:"+paramRecordMetadata.partition()); if(null!=paramException){ System.out.println("getMessage:"+paramException.getMessage()); } } }; //创建消息发送器 ProducerRecord<String,String> precord=new ProducerRecord<String,String>(topic, message); //发送消息 kp.send(precord, c); } @Override public void run() { while(true){ try { message=producerInfo(); System.out.println("producer:"+message); sendMsg(message); System.out.println("message send success"); msgCount++; System.out.println("成功发送【"+msgCount+"】消息"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { UserInfiProducer userPord=new UserInfiProducer(); Thread thread=new Thread(userPord); thread.start(); } }
4:定义消费者
package com.streamkafka.user_state_update import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.DStream import org.apache.commons.logging.LogFactory import org.apache.commons.logging.Log import scala.actors.threadpool.ExecutorService import scala.actors.threadpool.Executors import org.apache.spark.streaming.State import org.apache.spark.streaming.StateSpec import org.apache.spark.streaming.Time object UserInfoConsumer extends Serializable { var logs = LogFactory.getLog(UserInfoConsumer.getClass); def main(args: Array[String]): Unit = { /* val threadPool:ExecutorService=Executors.newFixedThreadPool(5) threadPool.execute(new ConsumerProcess())*/ var cc = new ConsumerProcess(); cc.resoleMethod(); } class ConsumerProcess extends Serializable { def resoleMethod() { var conf = new SparkConf(); conf.setMaster("spark://192.168.99.xxx:7077").setAppName("user_state_update"); var ssc = new StreamingContext(conf, Seconds(5));//创建sparkContext,这是sparkStreaming的入口 var topic = Array("test"); ssc.checkpoint("."); var kafkaParams = Map( //建立初始链接到kafka的服务器,这个可以是动态的所以不需要已下载配置完所有的服务器 "192.168.99.xxx:9092,anotherhost:9092" "bootstrap.servers" -> "192.168.99.xxx:9092", //反序列化器类实现了串并转换器接口的关键。 新的消费者配置 "key.deserializer" -> classOf[StringDeserializer], //反序列化器类值,实现了串并转换器接口。 新的消费者配置 "value.deserializer" -> classOf[StringDeserializer], //这是一个唯一的标识,用来标识这个消费者时属于哪个消费者组 。 新的消费者配置 "group.id" -> "user-consumer-group1", // "auto.offset.reset" -> "latest", //true 定期在后台提交 "enable.auto.commit" -> (false: java.lang.Boolean)) //println("接受到的消息【"+count+"】") //Subscribe 指定订阅的主题和配置 var dStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topic, kafkaParams)); dStream.foreachRDD(rdd => { var user: UserSession = new UserSession(); rdd.foreach(f => { var msgStr = f.value().split("\|");//获取消息体 println("usreNo:" + msgStr(1)) //对消息进行解析,并封装成userSession if (4 == msgStr.length) { print("组装userSession") user.userTrans(msgStr(0).toString(), msgStr(1).toString(), msgStr(2).toInt, msgStr(3).toString().toInt) } else { print("消息格式不符合定义!!!") } //打印组装后的userInfo println("userInfo:" + user.toStrings); }) //更新用户的状态 })
//更新用户状态的函数
var mapWithStateMethod = (userState: String, one: Option[Int], state: State[Int]) => { var stateInt = userState.toInt; var userM = new UserSession(); userM.operationTran(stateInt.toString()); //返回用户当前状态和对应的枚举 var output=(userM.user_operation,stateInt); println("当前用户的状态为:" + userM.user_operation) state.update(stateInt); output } var mapState = dStream.map(x => (x.value().split("\|")(2), 1)); var userState = mapState.mapWithState(StateSpec.function(mapWithStateMethod); println("userStatePrint:" + userState.print()); print("=============================================================================") ssc.start(); ssc.awaitTermination(); } /* override def run(){ while(true){ print("进入run方法................."); resoleMethod(); ssc.start(); ssc.awaitTermination(); Thread.sleep(5000L); } }*/ } }