SPark学习笔记:13 Spark Streaming 的Transform算子和Action算子_sparkstream action算子-程序员宅基地

技术标签: 学习  spark  Spark  大数据  

概述

和RDD类似,DStreams也有一些转换算子用于处输入流中的数据。DStream中有很多转换算子和RDD的转换算子一样,同时也提供了一些额外的算子。此文将总结DStreams的各种算子的使用。

Transformations on DStreams

map

作用在DStream上,用法同RDD的map.一个输入对应一个输出。

flatMap

说明:对源DStream中的每一个元素,作为flatMap函数的输入进行计算处理生成一个新的DStream,一个输入对应一个或者多个输出

val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
val sensorDs:DStream[String] = line.flatMap(data=>data.split(","))

filter

说明:过滤符合条件的记录,true保留,false过滤

val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
val sensorDs:DStream[(String,SensorReading)] = line
  .filter(_.nonEmpty)
    .map(data=>{
    
    val arr = data.split(",")
      (arr(0),SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
})

repartiton

说明:重分区

union

说明:合并两个DStream,DStream的元素的数据类型必须一致

count

说明:统计DStream中元素的个数,和RDD的count操作不同,DStream的count是一个懒加载的操作。

countByValue

说明: 对DStream中的元素按照VALUE进行统计,输出(V,Long)类型的DStream。

reduce

说明: 对DStream[K]中的每个对象进行reduce运算,输出DStream[K]类型的数据

sensorDs.reduce{
     case(first:(String,SensorReading),second:(String,SensorReading))=>
    if(first._2.temperature>second._2.temperature){
    
      first
    }else{
    
      second
    }
}.print()

reduceByKey

说明: 对DStream[K,V]类型的DStream中的元素按照key分组,进行reduce运算,输出DStream[K,V]类型的数据

sensorDs.reduceByKey((first:SensorReading,second:SensorReading)=>{
    
  if(first.temperature>second.temperature){
    
    first
  }else{
    
    second
  }
}).print()

join

说明 类似于关系型数据库表的join操作,连接两个DStream,作用在DStream[K,V]和DStream[K,W]的两个DStream上,输出一个DStream[K,(V,W)]类型的DStream。

cogroup

说明 作用在两个DStream[K,V]和DStream[K,W]类型的DStream上,输出一个新的DStream[K,SEQ[V],SEQ[W]]类型的DStream。

transform

说明: Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以很方便的扩展DStream的API。该函数每一批次调度一次。

val conf:SparkConf = new SparkConf()
conf.setMaster("local[*]").setAppName("DStreamTestApp")

val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
import StreamingContext._
val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
val sensorDs:DStream[(String,SensorReading)] = line
  .filter(_.nonEmpty)
    .map(data=>{
    
    val arr = data.split(",")
      (arr(0),SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
})
val transedDs:DStream[SensorReading] = sensorDs.transform(data=>{
    
  //data是一个RDD,可以使用RDD的API进行操作
  val data2:RDD[(String,SensorReading)] = data.filter(el=>{
    
    if(el._2.temperature>60){
    
      true
    }else{
    
      false
    }
  })
  //取最大温度的记录,并转将RDD(String,SensorReading)转换为RDD(SensorReading)
  val data3:RDD[SensorReading] = data2.reduceByKey((first,second)=>{
    
    if(first.temperature>second.temperature){
    
      first
    }else{
    
      second
    }
  }).map(_._2)
  
  //结果返回另一个RDD
  data3
})
  • 该函数的使用场景有很多,比如从文件中读取一个DataSet,然后可以使用该方法与实时流中的DStream中的RDD进程合并等操作。

updateStateByKey

说明: updateStateByKey用于记录历史记录的状态值,有时候我们需要在DStream中跨批次卫华状态(例如WordCount中统计Word的累加值)。针对这种情况,updateStateByKey提供了一个对状态变量的访问。对于键值形式的DStream,给定一个由(键、事件)对个偶成的DStream,并传递一个指定如何根据新的事件更新每个键值对应状态的函数,他可以构建出一个新的DStream。
updateStateByKey操作使得我们可以在用新的信息进行更新时保持任意的状态。只要两步,我们就可以使用这个功能:

  • 定义状态,状态可以是一个任意的数据类型
  • 定义状态更新函数,此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

示例一:wordcount,统计输入流中每个word出现的频率。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{
    Seconds, StreamingContext}

object StateApp {
    

  def main(args: Array[String]): Unit = {
    
    val conf:SparkConf = new SparkConf()
    conf.setAppName("StateAppTest").setMaster("local[*]")

    //构建StreamContext
    val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
    
    //使用updateStateByKey算子,需要设定checkpoint的目录
    ssc.checkpoint("./checkpoint")
    
    //构建一个socket文本流
    val strDs:DStream[String] = ssc.socketTextStream("192.168.0.52",8888)
    
    //构建DStream[(String,Long)]键值对类型的DStream
    val paris:DStream[(String,Long)] = strDs.filter(_.nonEmpty)
      .flatMap(data=>{
    data.split(",")}).map((_,1))
    
    //定义一个LONG类型的状态,并定义状态更新函数
    paris.updateStateByKey[Long]((values:Seq[Long],state:Option[Long])=>{
    
     //状态更新函数有两个参数:
     //参数一:是新的批次的以Key分组后的值的序列
     //参数二:是上一批次处理完毕时记录的状态的值
     
     //取上一批次的状态值
      val prev_ttls:Long = state.getOrElse(0L)
      //当前批次的值处理
      val current_ttls = values.foldLeft(0L)((data1,data2)=>data1+data2)
      
      //更新状态,为上一次的值+这一批次的值
      Some(prev_ttls+current_ttls)
    }).print()

    ssc.start()
    ssc.awaitTermination()

  }
}

示例二:记录每一个温度传感器的最高温度

import com.hjt.yxh.hw.sparksql.SensorReading
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{
    Seconds, StreamingContext}

object StateApp {
    

  def main(args: Array[String]): Unit = {
    
    val conf:SparkConf = new SparkConf()
    conf.setAppName("StateAppTest").setMaster("local[*]")

    //构建StreamContext
    val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("./checkpoint")
    val strDs:DStream[String] = ssc.socketTextStream("192.168.0.52",8888)

    val sensorDs:DStream[(String,SensorReading)] =
      strDs.filter(_.nonEmpty)
        .map(data=>{
    
        val arry = data.split(",")
        val sensor = SensorReading(arry(0),arry(1).toLong,arry(2).toDouble)
          (sensor.id,sensor)
      })

      val updateSensorState = (values:Seq[SensorReading],state:Option[SensorReading])=>{
    
        //定义的状态类型是SensorReading
        val prevSensor = state.getOrElse(values.apply(0))
        
        //迭代,记录温度最高的Sensor
        val max = values.foldLeft(prevSensor)((maxSensor,data)=>{
    
          if(data.temperature>maxSensor.temperature){
    
            data
          }else{
    
            maxSensor
          }
        })
        //更新状态
        Some(max)
      }
        
    sensorDs.updateStateByKey[SensorReading](updateSensorState).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Tips: 包括windows的相关算子在内,以上所有的转换算子都是“懒执行”的,如果整个应用中都没有行动算子,那么相关的计算操作将不会被执行。

Spark Streaming的行动算子 Output Operations On DStream

输出操作允许将DStream中的数据推送到外部系统,比如数据库或者文件系统。由于输出操作实际允许外部系统使用转换后的数据,所以他们会触发所有的转换算子的执行。(同RDD的行动算子)

print

说明: 在Driver节点上打印出DStream的每一批次中的前10条记录。通常用于开发调试阶段。

saveAsTextFiles(prefix,[suffix])

说明: 将DSteam的内容保存为文本文件,每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

saveASObjectFiles(prefix,[suffix])

说明: 将DSteam的内容保存为一个序列化的对象文件,使用java的Object序列化。每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

saveAsHadoopFiles(prefix, [suffix])

说明: 将DSteam的内容保存为hadoop的文件,使用java的Object序列化。每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

foreachRDD(func)

说明:
这是最通用的输出操作,即将函数func用于产生于stream的每一个RDD。其中参数传入的函数 func 应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和 transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。

//写入到数据库中
sensorStream.foreachRDD(rdd=>{
    
  //在Driver端执行
  //todolist
  println("executor at driver end")

  rdd.foreachPartition(
    rddPartiton=>{
    
      //在Executor端执行
      //创建数据库连接
      println("executor at driver Executor")
        for (elem <- rddPartiton) {
    
        println(elem)
          //每条记录执行一次

      }
      //在Executor端执行,每个Partition执行一次

    }
  )
})

注意:

  • 连接不能写在 driver 层面,因为数据的存取操作是在Executor中完成的,在Driver端创建的连接没办法在Executor中使用。(跨机器或者跨了JVM进程了)
  • 如果写在 foreach则每个RDD中的每一条数据都创建,得不偿失;
  • 增加 foreachPartition,在分区创建(获取)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wangzhongyudie/article/details/126391904

智能推荐

微信小程序的动态显示字体颜色_小程序 color if-程序员宅基地

文章浏览阅读1k次。<text wx:if="{{item.data.status=='待打印'}}" style="color:red">{{item.data.status}}</text><text wx:if="{{item.data.status=='已打印'}}" style="color:green">{{item.data.status}}</text>_小程序 color if

线上PHP问题排查思路与实践-程序员宅基地

文章浏览阅读210次。转载:http://www.bo56.com/%E7%BA%BF%E4%B8%8Aphp%E9%97%AE%E9%A2%98%E6%8E%92%E6%9F%A5%E6%80%9D%E8%B7%AF%E4%B8%8E%E5%AE%9E%E8%B7%B5/前几天,在一淘网,腾讯网媒和微博商业技术联合组织的技术分享大会上,我分享了《在线PHP问题排查思路与实践》。此博文除了对PPT..._在php开发过程中,线上代码怎么查哪段代码有问题,且不影响线上运行

linux 内核升级-程序员宅基地

文章浏览阅读841次,点赞28次,收藏9次。centos 7.x 升级内核 3.x 至 5.x

去掉java文件中的注释_利用JavaParser去除java文件中的注释-程序员宅基地

文章浏览阅读922次。利用JavaParser去除java文件中的注释个人博客:记录一下在项目实施过程中的一些点情景回顾之前项目有个需求,就是去掉.java文件中的所有注释,常用的方法是用正则匹配。然而在网络上查找到的正则或多或少都有一些问题,无法匹配到所有的情况。或者说,由于写.java文件的人的不规范(各种奇葩的问题),导致正则覆盖不全。所以正则方法不靠谱,或者说,存在一定的限制。新的想法后来想到利用AST来去除注..._在线java代码去除注释工具

VSCode - 使用VSCode远程连接到Linux并实现免密码登录_vscode连接linux-程序员宅基地

文章浏览阅读3w次,点赞77次,收藏282次。VSCode - Linux - 使用VSCode远程连接到Ubuntu并实现免密登录我使用的是Ubuntu14.04,即便是使用其他发行版也不会影响操作步骤_vscode连接linux

生活随记-腊月第一天-程序员宅基地

文章浏览阅读2.3k次。年底了总是很容易思考人生。出差坐在太原的yd酒店15楼,泡一壶茶翻着书听点音乐,坐在窗边看下空旷冷清的街道。感叹一声,逝者如斯夫不舍昼夜。这一年年过得真快。思考着自己的工作和以后的生活,何去何从。很多事情形成习惯之后就会懒得去改变,逐渐形成一种倦怠。工作,生活,投资也是。也好,今晚就让自己沉浸在无尽交织的思绪中吧。明天睡个懒觉吃个brunch回上海。转眼又到12月了。..._腊月第一天

随便推点

深信服AF防火墙配置SSL VPN_深信服ssl配置教程-程序员宅基地

文章浏览阅读1.5k次,点赞6次,收藏13次。深信服防火墙配置SSL VN_深信服ssl配置教程

linux系统rabbitmq安装步骤_rabbitmq linux安装-程序员宅基地

文章浏览阅读768次。一、安装erlang:1、先下载rpm包:wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm2、rpm包:rpm -Uvh erlang-solutions-1.0-1.noarch.rpm可能会有以下问题:解决办法:(执行以下命令后,在执行上一条命令)yum..._rabbitmq linux安装

CentOS 7 安装最新版Docker教程_centos7安装最新版dockers-程序员宅基地

文章浏览阅读781次。docker安装官方文档:Install Docker Engine on CentOS2、安装提供了工具3、通过添加docker repository如果出现上面的错误提示,可通阿里源进行添加4、安装docker4.1、直接安装最新版本这步完成后可直接跳至启动docker4.2、或者安装指定版本按版本号倒序列出可安装版本列表安装指定版本例如安装20.10.9版本5、启动docker通过进行启动设置docker服务开机启动6、测试7、卸载docker下的_centos7安装最新版dockers

敏感字识别算法基于JDK8 lambada表达式_敏感文本识别算法-程序员宅基地

文章浏览阅读429次。package aaa.bbb.demo;import java.util.ArrayList;import java.util.List;public class RecognitionDemo { public static void main(String[] args) { String str1="SB哈NM哈哈哈WBDhdsada"; String str_敏感文本识别算法

华为鸿蒙系统(Huawei HarmonyOS)

华为鸿蒙系统Huawei HarmonyOS

JS读取粘贴板内容-程序员宅基地

文章浏览阅读4.9k次。1.1 监听onpaste事件1.1.1 定义和用法npaste 事件在用户向元素中粘贴文本时触发。注意:虽然使用的 HTML 元素都支持 onpaste 事件,但实际上并非支持所有元素,例如 <p> 元素, 除非设置了 contenteditable 为 "true" (查看下文的更多实例)。提示:onpaste 事件通常用于 type="text" 的 ..._js 获取粘贴板内容 移动端