本文共 5983 字,大约阅读时间需要 19 分钟。
它是一个可扩展,高吞吐具有容错性的流式计算框架
吞吐量:单位时间内成功传输数据的数量
之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。
Storm
流式计算框架,来一条处理一条
以record为单位处理数据,支持micro-batch方式(Trident)
对python不友好
flink
Spark
对比:
需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。
1,需要安装一个nc工具:sudo yum install -y nc
2,执行指令:nc -lk 9999 -v
import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ == "__main__": sc = SparkContext("local[2]",appName="NetworkWordCount") #参数2:指定执行计算的时间间隔 ssc = StreamingContext(sc, 1) #监听ip,端口上的上的数据 lines = ssc.socketTextStream('localhost',9999) #将数据按空格进行拆分为多个单词 words = lines.flatMap(lambda line:line.split(' ')) #将单词转换为(单词,1)的形式 pairs = words.map(lambda word:(word,1)) #统计单词个数 wordCounts = pairs.reduceByKey(lambda x,y:x+y) #打印结果信息,会使得前面的transformation操作执行 类似于action wordCounts.pprint() #启动StreamingContext ssc.start() #等待计算结束 这里在jupyter notebook交互式环境中才需要加 ssc.awaitTermination()
可视化查看效果: 主机地址:4040 点击streaming,查看效果
Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。
需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作,一般超过一天都是用RDD或Spark SQL来进行离线批处理
在Spark Streaming中存在两种状态操作
使用有状态的transformation,需要开启Checkpoint
步骤:
需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来
代码
import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark.streaming import StreamingContextfrom pyspark.sql.session import SparkSession# 创建SparkContextspark = SparkSession.builder.master("local[2]").getOrCreate()sc = spark.sparkContextssc = StreamingContext(sc, 3)#开启检查点ssc.checkpoint("checkpoint")#定义state更新函数def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream("localhost", 9999)# 对数据以空格进行拆分,分为多个单词counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数 counts.pprint()ssc.start()ssc.awaitTermination()
每隔G秒,统计最近L秒的数据
操作细节
典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。
监听网络端口的数据,每隔3秒统计前6秒出现的单词数量
import os# 配置spark driver和pyspark运行时,所使用的python解释器路径PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"# 当存在多个版本时,不指定很可能会导致出错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONos.environ['JAVA_HOME']=JAVA_HOMEos.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.sql.session import SparkSessiondef get_countryname(line): country_name = line.strip() if country_name == 'usa': output = 'USA' elif country_name == 'ind': output = 'India' elif country_name == 'aus': output = 'Australia' else: output = 'Unknown' return (output, 1)if __name__ == "__main__": #定义处理的时间间隔 batch_interval = 10 # base time unit (in seconds) #定义窗口长度 window_length = 6 * batch_interval #定义滑动时间间隔 frequency = 1 * batch_interval #获取StreamingContext spark = SparkSession.builder.master("local[2]").getOrCreate() sc = spark.sparkContext ssc = StreamingContext(sc, batch_interval) #需要设置检查点 ssc.checkpoint("checkpoint") lines = ssc.socketTextStream('localhost', 9999) addFunc = lambda x, y: x + y invAddFunc = lambda x, y: x - y #调用reduceByKeyAndWindow,来进行窗口函数的调用 window_counts = lines.map(get_countryname) \ .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency) #输出处理结果信息 window_counts.pprint() ssc.start() ssc.awaitTermination()
转载地址:http://qmdsn.baihongyu.com/