博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用
阅读量:3959 次
发布时间:2019-05-24

本文共 1139 字,大约阅读时间需要 3 分钟。

进阶阶段实战目录

 

 updateStateByKey算子的使用

import Spark.WordCount.sscimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object UpdateStateByKey extends App{  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")  val ssc = new StreamingContext(sparkConf,Seconds(5))  // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制  // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份  // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在  // 内存数据丢失的时候,可以从checkpoint中恢复数据  // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可  ssc.checkpoint("E:/test")  // 实现wordcount逻辑  val lines = ssc.socketTextStream("hadoop2", 9999)  //val lines = ssc.textFileStream("E:/test")  val workds = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{    //更新函数两个参数Seq[V], Option[S],前者是每个key新增的值的集合,后者是当前保存的状态,    //创建一个变量,用于记录单词出现次数    var newValue=state.getOrElse(0) //getOrElse相当于if....else.....    for(value <- values){      newValue +=value //将单词出现次数累计相加    }    Option(newValue)  })  workds.print()  ssc.start()  ssc.awaitTermination()}

 

转载地址:http://mtazi.baihongyu.com/

你可能感兴趣的文章
潜伏中体现的潜规则
查看>>
[Java] Appfuse 源代码分析
查看>>
[Java] Appfuse 最佳实践
查看>>
[心情] 如果有一天
查看>>
[随笔] 6月近况小记 & 一个站点优化问题
查看>>
[Perl] 关于 Bugzilla 的一些问题与研究
查看>>
[Linux] 常用 linux 系统命令及维护备忘
查看>>
[Linux] 关于 Ext4 HowTo
查看>>
[杂记] 新年物语&关于Mysql引擎性能测试
查看>>
[心得] 近期更新&关于Infobright
查看>>
[杂记] 流量统计 & 短信接口
查看>>
[Java] JRebel + Maven + Jetty 热部署
查看>>
[算法] 从 Memcached 分布式应用看一致性哈希散列函数的选择
查看>>
[中间件] 消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用
查看>>
[设计] 原型界面设计利器 Balsamiq Mockups 推荐
查看>>
[闲话] 在西方的程序员眼里,东方的程序员是什么样的
查看>>
[管理] 成功之路的探寻 —— “三力” 理论
查看>>
[连载] Socket 深度探索 4 PHP (一)
查看>>
[连载] Socket 深度探究 4 PHP (二)
查看>>
[连载] Socket 深度探究 4 PHP (三)
查看>>