扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
怎么实现SparkStreaming转化操作,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
成都创新互联长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为南平企业提供专业的网站制作、网站建设,南平网站改版等技术服务。拥有10年丰富建站经验和众多成功案例,为您定制开发。
DStream的转化操作分为无状态和有状态两种
在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
有状态转化操作需要使用之前批次的数据或者中间结果来计算当前批次的数据,有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转换操作。
无状态转化操作的实质就说把简单的RDD转化操作应用到每个批次上,也就是转化DStream的每一个RDD
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也 就是对 DStream 中的 RDD 应用转换。
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]") val sc: StreamingContext = new StreamingContext(conf, Seconds(3)) val lines = sc.socketTextStream("localhost", 9999) // transform方法可以将底层RDD获取到后进行操作 // 1. DStream功能不完善 // 2. 需要代码周期性的执行 // Code : Driver端 val newDS: DStream[String] = lines.transform( rdd => { // Code : Driver端,(周期性执行) rdd.map( str => { // Code : Executor端 str } ) } ) // Code : Driver端 val newDS1: DStream[String] = lines.map( data => { // Code : Executor端 data } ) sc.start() sc.awaitTermination() }
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(5)) val data9999 = ssc.socketTextStream("localhost", 9999) val data8888 = ssc.socketTextStream("localhost", 8888) val map9999: DStream[(String, Int)] = data9999.map((_,9)) val map8888: DStream[(String, Int)] = data8888.map((_,8)) // 所谓的DStream的Join操作,其实就是两个RDD的join val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888) joinDS.print() ssc.start() ssc.awaitTermination() }
有状态转化操作是跨时间区间跟踪数据的操作,也就是说,一些先前批次的数据也被用来在新的批次中用于计算结果。有状态转换的主要的两种类型:
滑动窗口:以一个时间阶段为滑动窗口进行操作
updateStateByKey():通过key值来跟踪数据的状态变化
有状态转化操作需要在StreamingContext中打开检查点机制来提高容错
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey") val sc: StreamingContext = new StreamingContext(conf, Seconds(4)) sc.checkpoint("cp") val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999) val value: DStream[(String, Int)] = ds.map(((_: String), 1)) // updateStateByKey:根据key对数据的状态进行更新 // 传递的参数中含有两个值 // 第一个值表示相同的key的value数据的集合 // 第二个值表示缓存区key对应的计算值 val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => { val newCount: Int = option.getOrElse(0) + seq.sum Option(newCount) }) state.print() sc.start() sc.awaitTermination() }
所有基于窗口的函数都需要两个参数,分别对应窗口时长和滑动步长,并且两者都必须是SparkStreaming的批次间隔的整数倍。
窗口时长控制的是每次用来计算的批次的个数
滑动步长用于控制对新的DStream进行计算的间隔
基于window进行窗口内元素计数操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines = ssc.socketTextStream("localhost", 9999) val wordToOne = lines.map((_,1)) val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6)) val wordToCount = windowDS.reduceByKey(_+_) wordToCount.print() ssc.start() ssc.awaitTermination() }
有逆操作规约是一种更高效的规约操作,通过只考虑新进入窗口的元素和离开窗口的元素,让spark增量计算归约的结果,其在代码上的体现就是reduceFunc和 invReduceFunc
普通归约操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) lines.reduceByWindow( (x: String, y: String) => { x + "-" + y }, Seconds(9), Seconds(3) ).print() ssc.start() ssc.awaitTermination() }
有逆归约操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) val wordToOne = lines.map((_,1)) /** * 基于窗口进行有逆归约:通过控制窗口流出和进入的元素来提高性能 */ val windowDS: DStream[(String, Int)] = wordToOne.reduceByKeyAndWindow( (x:Int, y:Int) => { x + y}, (x:Int, y:Int) => {x - y}, Seconds(9), Seconds(3)) windowDS.print() ssc.start() ssc.awaitTermination() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) /** * 统计窗口中输入数据的个数 * 比如 3s内输入了10条数据,则打印10 */ val countByWindow: DStream[Long] = lines.countByWindow( Seconds(9), Seconds(3) ) countByWindow.print() /** * 统计窗口中每个值的个数 * 比如 3s内输入了1个3 2个4 3个5,则打印(3,1)(2,4)(3,5) */ val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow( Seconds(9), Seconds(3) ) countByValueAndWindow.print() ssc.start() ssc.awaitTermination() }
看完上述内容,你们掌握怎么实现SparkStreaming转化操作的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流