博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming(组件、updateStateByKey、Windows)总结
阅读量:3746 次
发布时间:2019-05-22

本文共 5983 字,大约阅读时间需要 19 分钟。

Spark Streaming

1. SparkStreaming 是什么

  • 它是一个可扩展高吞吐具有容错性流式计算框架

    吞吐量:单位时间内成功传输数据的数量

  • 之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

在这里插入图片描述

  • 但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。
    在这里插入图片描述

2. 实时计算框架对比

  • Storm

    • 流式计算框架,来一条处理一条

    • 以record为单位处理数据,支持micro-batch方式(Trident)

    • 对python不友好

  • flink

    • 流式计算框架,来一条处理一条
    • 比spark streaing速度快
  • Spark

    • 批处理计算框架,间隔一段时间,获取一次数据
    • 以RDD为单位处理数据,支持micro-batch流式处理数据(Spark Streaming
    • 实时性稍差,但是能处理的数据量更大
    • pyspark
  • 对比:

    • 吞吐量:Spark Streaming优于Storm
    • 延迟:Spark Streaming差于Storm

3. Spark Streaming组件

  • Streaming Context
    • 流上下文 通过Streaming Context 可以连接数据源获取数据
    • 通过spark context 可以获取到streaming context
    • 在创建Streaming Context 需要指定一个时间间隔(micro batch)
    • Streaming Context调用了stop方法之后,就不能再次调 start(),需要重新创建一个Streaming Context
    • 一个SparkContext创建一个Streaming Context
    • streaming Context上调用Stop方法,默认会把spark context也关掉
    • 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 对DStream中数据处理的逻辑要写在Streaming Context开启之前 一旦Streaming Context调用了start方法 就不能再添加新的数据处理逻辑
  • DStream(离散流)
    • Streaming Context 连接到不同的数据源获取到的数据 抽象成DStream模型
    • 代表一个连续的数据流
    • 一系列连续的RDD组成
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume

4. Spark Streaming 编码实战(无状态)

4.1 Spark Streaming编码步骤:

  1. 创建一个StreamingContext
  2. 从StreamingContext中创建一个数据对象
  3. 对数据对象进行Transformations操作
  4. 输出结果
  5. 开始和停止

4.2 利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ == "__main__":    sc = SparkContext("local[2]",appName="NetworkWordCount")    #参数2:指定执行计算的时间间隔    ssc = StreamingContext(sc, 1)    #监听ip,端口上的上的数据    lines = ssc.socketTextStream('localhost',9999)    #将数据按空格进行拆分为多个单词    words = lines.flatMap(lambda line:line.split(' '))    #将单词转换为(单词,1)的形式    pairs = words.map(lambda word:(word,1))    #统计单词个数    wordCounts = pairs.reduceByKey(lambda x,y:x+y)    #打印结果信息,会使得前面的transformation操作执行 类似于action    wordCounts.pprint()    #启动StreamingContext    ssc.start()    #等待计算结束 这里在jupyter notebook交互式环境中才需要加    ssc.awaitTermination()

可视化查看效果: 主机地址:4040 点击streaming,查看效果

5. Spark Streaming的状态操作

  • Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

    • 无状态:指的是每个时间片段的数据之间是没有关联的。
  • 需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作,一般超过一天都是用RDD或Spark SQL来进行离线批处理

  • 在Spark Streaming中存在两种状态操作

    • UpdateStateByKey
    • Windows操作
  • 使用有状态的transformation,需要开启Checkpoint

    • spark streaming 的容错机制
    • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

5.1 updateStateByKey

步骤

  • 首先,要定义一个state,可以是任意的数据类型
  • 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  • 对于每个新出现的key,也会执行state更新函数

5.2 案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark.streaming import StreamingContextfrom pyspark.sql.session import SparkSession# 创建SparkContextspark = SparkSession.builder.master("local[2]").getOrCreate()sc = spark.sparkContextssc = StreamingContext(sc, 3)#开启检查点ssc.checkpoint("checkpoint")#定义state更新函数def updateFunc(new_values, last_sum):    return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream("localhost", 9999)# 对数据以空格进行拆分,分为多个单词counts = lines.flatMap(lambda line: line.split(" ")) \    .map(lambda word: (word, 1)) \    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数    counts.pprint()ssc.start()ssc.awaitTermination()

5.3 Windows

  • 窗口长度L:运算的数据量
  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

在这里插入图片描述

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的
  • 窗口的长度控制考虑前几批次数据量
  • 默认为批处理的滑动间隔来确定计算结果的频率
  • 在这里插入图片描述

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

在这里插入图片描述

5.4 案例 windows

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"# 当存在多个版本时,不指定很可能会导致出错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.sql.session import SparkSessiondef get_countryname(line):    country_name = line.strip()    if country_name == 'usa':        output = 'USA'    elif country_name == 'ind':        output = 'India'    elif country_name == 'aus':        output = 'Australia'    else:        output = 'Unknown'    return (output, 1)if __name__ == "__main__":	#定义处理的时间间隔    batch_interval = 10 # base time unit (in seconds)    #定义窗口长度    window_length = 6 * batch_interval    #定义滑动时间间隔    frequency = 1 * batch_interval    #获取StreamingContext    spark = SparkSession.builder.master("local[2]").getOrCreate()		sc = spark.sparkContext		ssc = StreamingContext(sc, batch_interval)        #需要设置检查点    ssc.checkpoint("checkpoint")    lines = ssc.socketTextStream('localhost', 9999)    addFunc = lambda x, y: x + y    invAddFunc = lambda x, y: x - y    #调用reduceByKeyAndWindow,来进行窗口函数的调用    window_counts = lines.map(get_countryname) \        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)	#输出处理结果信息    window_counts.pprint()    ssc.start()    ssc.awaitTermination()

转载地址:http://qmdsn.baihongyu.com/

你可能感兴趣的文章
算法训练 素因子去重(给定一个正整数n,求一个正整数p,满足p仅包含n的所有素因子,且每个素因子的次数不大于1)
查看>>
算法训练 二进制数数( 给定L,R。统计[L,R]区间内的所有数在二进制下包含的“1”的个数之和。   如5的二进制为101,包含2个“1”。)
查看>>
第十届MathorCup高校数学建模D题解题思路
查看>>
2020年高教社杯全国大学生数学建模竞赛赛题 C题分析与思路!(持续更新)
查看>>
2020年高教社杯全国大学生数学建模竞赛赛题 B题分析与思路!(持续更新)
查看>>
蓝桥杯真题 18省4-测试次数 x星球的居民脾气不太好,但好在他们生气的时候唯一的异常举动是:摔手机。 各大厂商也就纷纷推出各种耐摔型手机。x星球的质监局规定了手机必须经过耐摔测试,并且评定出一个耐
查看>>
蓝桥杯真题 19省3-数列求值 给定数列 1, 1, 1, 3, 5, 9, 17, …,从第 4 项开始,每项都是前 3 项的和。求第 20190324 项的最后 4 位数字。
查看>>
大小写字母转换函数tolower();的用法
查看>>
蓝桥杯 15校4-7对数字 今有7对数字:两个1,两个2,两个3,...两个7,把它们排成一行。 要求,两个1间有1个其它数字,两个2间有2个其它数字,以此类推,两个7之间有7个其它数字。如下就是
查看>>
蓝桥杯真题 17省10-k倍区间 给定一个长度为N的数列,A1, A2, ... AN,如果其中一段连续的子序列Ai, Ai+1, ... Aj(i <= j)之和是K的倍数,我们就称这个区间[i
查看>>
TCP协议的流量控制
查看>>
TCP连接的三次握手过程,为什么不是两次或四次?
查看>>
小白都能看懂的DNS解析过程
查看>>
HTTP和HTTPS的区别?描述HTTPS的工作过程
查看>>
简述一下HTTP的状态码
查看>>
20210227vulhub靶场之环境配置---无法获得靶机IP的疑难解决方式(可以解决VBox和VMware不兼容问题)
查看>>
20210226web渗透学习之SSRF总结
查看>>
2021-06-01web渗透学习之sqlserver提权(转)
查看>>
大数据之Flume
查看>>
关于高可用配置hbase中出现的问题:Name or service not known
查看>>