在哪里找做网站的客户,网站维护发展,怎么办?,网站运营内容方案目录
一、目的与要求
二、实验内容
三、实验步骤
1、利用Spark Streaming对三种类型的基本数据源的数据进行处理
2、利用Spark Streaming对Kafka高级数据源的数据进行处理
3、完成DStream的两种有状态转换操作
4、把DStream的数据输出保存到文本文件或MySQL数据库中
四…目录
一、目的与要求
二、实验内容
三、实验步骤
1、利用Spark Streaming对三种类型的基本数据源的数据进行处理
2、利用Spark Streaming对Kafka高级数据源的数据进行处理
3、完成DStream的两种有状态转换操作
4、把DStream的数据输出保存到文本文件或MySQL数据库中
四、结果分析与实验体会 一、目的与要求
1、通过实验掌握Spark Streaming的基本编程方法 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。
二、实验内容
1、参照教材示例利用Spark Streaming对三种类型的基本数据源的数据进行处理。 2、参照教材示例完成kafka集群的配置利用Spark Streaming对Kafka高级数据源的数据进行处理注意topic为你的姓名全拼。 3、参照教材示例完成DStream的两种有状态转换操作。 4、参照教材示例完成把DStream的数据输出保存到文本文件或MySQL数据库中。
三、实验步骤
1、利用Spark Streaming对三种类型的基本数据源的数据进行处理
1文件流
首先打开第一个终端作为数据流终端创建一个logfile目录
[rootbigdata zhc]# cd /home/zhc/mycode/sparkstreaming
[rootbigdata sparkstreaming]# mkdir logfile
[rootbigdata sparkstreaming]# cd logfile然后打开第二个终端作为流计算终端在“/logfile/”目录下面新建一个py程序
[rootbigdata logfile]# vim FileStreaming.py
输入如下代码
#/home/zhc/mycode/sparkstreaming/logfile/FileStreaming.pyfrom pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextconf SparkConf()
conf.setAppName(TestDStream)
conf.setMaster(local[2])
sc SparkContext(conf conf)
ssc StreamingContext(sc, 10)
lines ssc.textFileStream(file:///home/zhc/mycode/sparkstreaming/logfile)
words lines.flatMap(lambda line: line.split( ))
wordCounts words.map(lambda x : (x,1)).reduceByKey(lambda a,b:ab)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()保存该文件并执行如下命令
[rootbigdata logfile]# spark-submit FileStreaming.py
然后我们进入数据流终端在logfile目录下新建一个log2.txt文件然后往里面输入一些英文语句后保存退出再次切换到流计算终端就可以看见打印出单词统计信息了。 2套接字流
1使用套接字流作为数据源
继续在流计算端的sparkstreaming目录下创建一个socket目录然后在该目录下创建一个NetworkWordCount.py程序
[rootbigdata sparkstreaming]# mkdir socket
[rootbigdata sparkstreaming]# cd socket
[rootbigdata socket]# vim NetworkWordCount.py输入如下代码
#/home/zhc/mycode/sparkstreaming/socket/NetworkWordCount.pyfrom __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCount.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingNetworkWordCount)ssc StreamingContext(sc, 5)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).reduceByKey(lambda a, b: ab)counts.pprint()ssc.start()ssc.awaitTermination()再在数据流终端启动Socket服务器端
[rootbigdata logfile]# nc -lk 9999然后再进入流计算终端执行如下代码启动流计算
[rootbigdata socket]# spark-submit NetworkWordCount.py localhost 9999然后在数据流终端内手动输入一行英文句子后回车多输入几次流计算终端就会不断执行词频统计并打印出信息。 2使用Socket编程实现自定义数据源
下面我们再前进一步把数据源头的产生方式修改一下不要使用nc程序而是采用自己编写的程序产生Socket数据源。在数据流终端执行以下命令编写DataSourceSocket.py文件
[rootbigdata logfile]# cd /home/zhc/mycode/sparkstreaming/socket
[rootbigdata socket]# vim DataSourceSocket.py输入如下代码
#/home/zhc/mycode/sparkstreaming/socket/DataSourceSocket.py
import socket
# 生成socket对象
server socket.socket()
# 绑定ip和端口
server.bind((localhost, 9999))
# 监听绑定的端口
server.listen(1)
while 1:# 为了方便识别打印一个“我在等待”print(Im waiting the connect...)# 这里用两个值接受因为连接上之后使用的是客户端发来请求的这个实例# 所以下面的传输要使用conn实例操作conn,addr server.accept()# 打印连接成功print(Connect success! Connection is from %s % addr[0])# 打印正在发送数据print(Sending data...)conn.send(I love hadoop I love spark hadoop is good spark is fast.encode())conn.close()print(Connection is broken.)继续在数据流终端执行如下命令启动Socket服务端
[rootbigdata socket]# spark-submit DataSourceSocket.py 再进入流计算终端执行如下代码启动流计算
[rootbigdata socket]# spark-submit NetworkWordCount.py localhost 99993RDD队列流
继续在sparkstreaming目录下新建rddqueue目录并在该目录下创建RDDQueueStream.py程序
[rootbigdata sparkstreaming]# mkdir rddqueue
[rootbigdata sparkstreaming]# cd rddqueue
[rootbigdata rddqueue]# vim RDDQueueStream.py输入如下代码
#/home/zhc/mycode/sparkstreaming/rddqueue/RDDQueueStreaming.py
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ __main__:sc SparkContext(appNamePythonStreamingQueueStream)ssc StreamingContext(sc, 2)#创建一个队列通过该队列可以把RDD推给一个RDD队列流rddQueue []for i in range(5):rddQueue [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)#创建一个RDD队列流inputStream ssc.queueStream(rddQueue)mappedStream inputStream.map(lambda x: (x % 10, 1))reducedStream mappedStream.reduceByKey(lambda a, b: a b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContextTrue, stopGraceFullyTrue)保存退出后进入流计算终端再执行如下命令
[rootbigdata rddqueue]# spark-submit RDDQueueStream.py 2、利用Spark Streaming对Kafka高级数据源的数据进行处理
此过程可以参照这篇博客的第四、五部分内容:
【数据采集与预处理】数据接入工具Kafka-CSDN博客https://blog.csdn.net/Morse_Chen/article/details/135273370?spm1001.2014.3001.5501
3、完成DStream的两种有状态转换操作
说明上面的词频统计程序NetworkWordCount.py采取了无状态转换操作。
1滑动窗口转换操作
在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码
#/home/zhc/mycode/sparkstreaming/socket/WindowedNetworkWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ __main__:if len(sys.argv) ! 3:print(Usage: WindowedNetworkWordCount.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingWindowedNetworkWordCount)ssc StreamingContext(sc, 10)ssc.checkpoint(file:///home/zhc/mycode/sparkstreaming/socket/checkpoint)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).reduceByKeyAndWindow(lambda x, y: x y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()然后在数据流终端执执行如下命令运行nc程序
[rootbigdata sparkstreaming]# cd /home/zhc/mycode/sparkstreaming/socket
[rootbigdata socket]# nc -lk 9999然后再在流计算终端运行WindowedNetworkWordCount.py代码
[rootbigdata socket]# spark-submit WindowedNetworkWordCount.py localhost 9999这时可以查看流计算终端内显示的词频动态统计结果可以看到随着时间的流逝词频统计结果会发生动态变化。 2updateStateByKey操作
在“/home/zhc/mycode/sparkstreaming/”路径下新建目录“/stateful”并在该目录下新建代码文件NetworkWordCountStateful.py。
[rootbigdata sparkstreaming]# mkdir stateful
[rootbigdata sparkstreaming]# cd stateful
[rootbigdata stateful]# vim NetworkWordCountStateful.py输入如下代码
#/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStateful.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCountStateful.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingStatefulNetworkWordCount)ssc StreamingContext(sc, 1)ssc.checkpoint(file:///home/zhc/mycode/sparkstreaming/stateful/) # RDD with initial state (key, value) pairsinitialStateRDD sc.parallelize([(uhello, 1), (uworld, 1)]) def updateFunc(new_values, last_sum):return sum(new_values) (last_sum or 0) lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDDinitialStateRDD) running_counts.pprint()ssc.start()ssc.awaitTermination()在“数据源终端”执行如下命令启动nc程序
[rootbigdata stateful]# nc -lk 9999在“流计算终端”执行如下命令提交运行程序
[rootbigdata stateful]# spark-submit NetworkWordCountStateful.py localhost 9999在数据源终端内手动输入一些单词并回车再切换到流计算终端可以看到已经输出了类似如下的词频统计信息 4、把DStream的数据输出保存到文本文件或MySQL数据库中
1把DStream输出到文本文件中
在stateful目录下新建NetworkWordCountStatefulText.py文件
[rootbigdata stateful]# vim NetworkWordCountStatefulText.py
输入如下代码
#/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStatefulText.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCountStateful.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingStatefulNetworkWordCount)ssc StreamingContext(sc, 1)ssc.checkpoint(file:///home/zhc/mycode/sparkstreaming/stateful/statefultext)# RDD with initial state (key, value) pairsinitialStateRDD sc.parallelize([(uhello, 1), (uworld, 1)])def updateFunc(new_values, last_sum):return sum(new_values) (last_sum or 0)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDDinitialStateRDD)running_counts.saveAsTextFiles(file:///home/zhc/mycode/sparkstreaming/stateful/statefultext/output)running_counts.pprint()ssc.start()ssc.awaitTermination()在“数据源终端”执行如下命令启动nc程序
[rootbigdata stateful]# nc -lk 9999在“流计算终端”执行如下命令提交运行程序
[rootbigdata stateful]# spark-submit NetworkWordCountStatefulText.py localhost 9999在数据源终端内手动输入一些单词并回车再切换到流计算终端可以看到已经输出了类似如下的词频统计信息 在“/home/zhc/mycode/sparkstreaming/stateful/statefultext”目录下便可查看到如下输出目录结果 进入某个目录下就可以看到类似part-00000的文件里面包含了流计算过程的输出结果。 2把DStream写入到MySQL数据库中
首先启动MySQL数据库
[rootbigdata stateful]# systemctl start mysqld.service
[rootbigdata stateful]# mysql -u root -p然后创建spark数据库和wordcount表
mysql use spark;
mysql create table wordcount (word char(20), count int(4));然后再在终端安装python连接MySQL的模块
[rootbigdata stateful]# pip3 install PyMySQL 在stateful目录并在该目录下创建NetworkWordCountStatefulDB.py文件
[rootbigdata stateful]# vim NetworkWordCountStatefulDB.py
输入如下代码
#/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStatefulDB.py
from __future__ import print_function
import sys
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCountStateful hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingStatefulNetworkWordCount)ssc StreamingContext(sc, 1)ssc.checkpoint(file:///home/zhc/mycode/sparkstreaming/stateful/statefuldb) # RDD with initial state (key, value) pairsinitialStateRDD sc.parallelize([(uhello, 1), (uworld, 1)]) def updateFunc(new_values, last_sum):return sum(new_values) (last_sum or 0) lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDDinitialStateRDD) running_counts.pprint() def dbfunc(records):db pymysql.connect(hostlocalhost,userroot,passwordMYsql123!,databasespark)cursor db.cursor() def doinsert(p):sql insert into wordcount(word,count) values (%s, %s) % (str(p[0]), str(p[1]))try:cursor.execute(sql)db.commit()except:db.rollback()for item in records:doinsert(item) def func(rdd):repartitionedRDD rdd.repartition(3)repartitionedRDD.foreachPartition(dbfunc)running_counts.foreachRDD(func)ssc.start()ssc.awaitTermination()在“数据源终端”执行如下命令启动nc程序
[rootbigdata stateful]# nc -lk 9999在“流计算终端”执行如下命令提交运行程序
[rootbigdata stateful]# spark-submit NetworkWordCountStatefulDB.py localhost 9999在数据源终端内手动输入一些单词并回车再切换到流计算终端可以看到已经输出了类似如下的词频统计信息 到MySQL终端便可以查看wordcount表中的内容
mysql select * from wordcount;
.......
四、结果分析与实验体会 Spark Streaming是一个用于实时数据处理的流式计算框架它基于 Apache Spark 平台提供了高可靠性、高吞吐量和容错性强等特点。在进行 Spark Streaming 编程的实验中掌握了Spark Streaming的基本编程方法能够利用Spark Streaming处理来自不同数据源的数据以及DStream的各种转换操作把DStream的数据输出保存到文本文件或MySQL数据库中。 理解DStreamDStream 是 Spark Streaming 的核心概念代表连续的数据流。在编程时我们可以通过输入源比如 Kafka、Flume、HDFS创建一个 DStream 对象并对其进行转换和操作。需要注意的是DStream 是以时间片为单位组织数据的因此在编写代码时要考虑时间窗口的大小和滑动间隔。 适当设置批处理时间间隔批处理时间间隔决定了 Spark Streaming 处理数据的粒度过小的时间间隔可能导致频繁的任务调度和资源开销而过大的时间间隔则可能造成数据处理延迟。因此在实验中需要根据具体场景和需求来选择合适的时间间隔。 使用合适的转换操作Spark Streaming 提供了丰富的转换操作如 map、flatMap、filter、reduceByKey 等可以实现对数据流的转换和处理。在实验中需要根据具体业务逻辑和需求选择合适的转换操作并合理组合这些操作以获取期望的结果。 考虑容错性和数据丢失Spark Streaming 具备很好的容错性可以通过记录数据流的偏移量来保证数据不会丢失。在实验中需要注意配置合适的容错机制确保数据处理过程中的异常情况能够被恢复并尽量避免数据丢失。 优化性能和资源利用对于大规模的实时数据处理任务性能和资源利用是非常重要的。在实验中可以通过调整并行度、合理设置缓存策略、使用广播变量等手段来提高性能和资源利用效率。 总的来说Spark Streaming 是一个功能强大且易用的流式计算框架通过合理使用其提供的特性和操作可以实现各种实时数据处理需求。在实验中需要深入理解其原理和机制并根据具体需求进行合理配置和优化以获得良好的性能和结果。