本文共 1139 字,大约阅读时间需要 3 分钟。
进阶阶段实战目录
updateStateByKey算子的使用
import Spark.WordCount.sscimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object UpdateStateByKey extends App{ val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)) // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制 // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份 // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在 // 内存数据丢失的时候,可以从checkpoint中恢复数据 // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可 ssc.checkpoint("E:/test") // 实现wordcount逻辑 val lines = ssc.socketTextStream("hadoop2", 9999) //val lines = ssc.textFileStream("E:/test") val workds = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{ //更新函数两个参数Seq[V], Option[S],前者是每个key新增的值的集合,后者是当前保存的状态, //创建一个变量,用于记录单词出现次数 var newValue=state.getOrElse(0) //getOrElse相当于if....else..... for(value <- values){ newValue +=value //将单词出现次数累计相加 } Option(newValue) }) workds.print() ssc.start() ssc.awaitTermination()}
转载地址:http://mtazi.baihongyu.com/