当前位置: 首页 > news >正文

凡客官方网seo搜索引擎优化到底是什么

凡客官方网,seo搜索引擎优化到底是什么,怎么做文化传媒公司网站,广州最穷的三个区大数据分析与应用实验任务十一 实验目的 通过实验掌握spark Streaming相关对象的创建方法#xff1b; 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法#xff1b; 熟悉spark Streaming的转换操作#xff0c;包括无状态和有状态转换。 熟悉spark S…大数据分析与应用实验任务十一 实验目的 通过实验掌握spark Streaming相关对象的创建方法 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法 熟悉spark Streaming的转换操作包括无状态和有状态转换。 熟悉spark Streaming输出编程操作。 实验任务 一、DStream 操作概述 创建 StreamingContext 对象 登录 Linux 系统后启动 pyspark。进入 pyspark 以后就已经获得了一个默认的 SparkConext 对象也就是 sc。因此可以采用如下方式来创建 StreamingContext 对象 from pyspark.streaming import StreamingContext sscluozhongye StreamingContext(sc, 1)如果是编写一个独立的 Spark Streaming 程序而不是在 pyspark 中运行则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象 from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf SparkConf() conf.setAppName(TestDStream) conf.setMaster(local[2]) sc SparkContext(conf conf) ssc StreamingContext(sc, 1) print(创建成功,lzy防伪)二、基本输入源 文件流 在 pyspark 中创建文件流 首先在 Linux 系统中打开第 1 个终端为了便于区分多个终端这里记作“数据源终端”创建一个 logfile 目录命令如下 cd /root/Desktop/luozhongye/ mkdir streaming cd streaming mkdir logfile其次在 Linux 系统中打开第二个终端记作“流计算终端”启动进入 pyspark然后依次输入如下语句 from pyspark import SparkContext from pyspark.streaming import StreamingContext ssc StreamingContext(sc, 10) lines ssc.textFileStream(file:///root/Desktop/luozhongye/streaming/logfile) words lines.flatMap(lambda line: line.split( )) wordCounts words.map(lambda x : (x,1)).reduceByKey(lambda a,b:ab) wordCounts.pprint() ssc.start() ssc.awaitTermination()采用独立应用程序方式创建文件流 #!/usr/bin/env python3 from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf SparkConf() conf.setAppName(TestDStream) conf.setMaster(local[2]) sc SparkContext(conf conf) ssc StreamingContext(sc, 10) lines ssc.textFileStream(file:///root/Desktop/luozhongye/streaming/logfile) words lines.flatMap(lambda line: line.split( )) wordCounts words.map(lambda x : (x,1)).reduceByKey(lambda a,b:ab) wordCounts.pprint() ssc.start() ssc.awaitTermination() print(2023年12月7日lzy)保存该文件并执行以下命令 cd /root/Desktop/luozhongye/streaming/logfile/ spark-submit FileStreaming.py套接字流 使用套接字流作为数据源 新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”在NetworkWordCount.py 中输入如下内容 #!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCount.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingNetworkWordCount)ssc StreamingContext(sc, 1)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts lines.flatMap(lambda line: line.split( )).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a b)counts.pprint()ssc.start()ssc.awaitTermination()使用如下 nc 命令生成一个 Socket 服务器端 nc -lk 9999新建一个终端记作“流计算终端”执行如下代码启动流计算 cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999使用 Socket 编程实现自定义数据源 新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”在 DataSourceSocket.py 中输入如下代码 #!/usr/bin/env python3 import socket# 生成 socket 对象 server socket.socket() # 绑定 ip 和端口 server.bind((localhost, 9999)) # 监听绑定的端口 server.listen(1) while 1:# 为了方便识别打印一个“I’m waiting the connect...”print(Im waiting the connect...)# 这里用两个值接收因为连接上之后使用的是客户端发来请求的这个实例# 所以下面的传输要使用 conn 实例操作conn, addr server.accept()# 打印连接成功print(Connect success! Connection is from %s % addr[0])# 打印正在发送数据print(Sending data...)conn.send(I love hadoop I love spark hadoop is good spark is fast.encode())conn.close()print(Connection is broken.) print(2023年12月7日lzy)执行如下命令启动 Socket 服务器端 cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit DataSourceSocket.py新建一个终端记作“流计算终端”输入以下命令启动 NetworkWordCount 程序 cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999RDD 队列流 Linux 系统中打开一个终端新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”输入以下代码 #!/usr/bin/env python3 import time from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ __main__:print()sc SparkContext(appNamePythonStreamingQueueStream)ssc StreamingContext(sc, 2)# 创建一个队列通过该队列可以把 RDD 推给一个 RDD 队列流rddQueue []for i in range(5):rddQueue [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 创建一个 RDD 队列流inputStream ssc.queueStream(rddQueue)mappedStream inputStream.map(lambda x: (x % 10, 1))reducedStream mappedStream.reduceByKey(lambda a, b: a b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContextTrue, stopGraceFullyTrue)下面执行如下命令运行该程序 cd /root/Desktop/luozhongye/streaming/rddqueue /usr/local/spark/bin/spark-submit RDDQueueStream.py三、转换操作 滑动窗口转换操作 对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”其内容如下 #!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ __main__:if len(sys.argv) ! 3:print(Usage: WindowedNetworkWordCount.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingWindowedNetworkWordCount)ssc StreamingContext(sc, 10)ssc.checkpoint(file:///root/Desktop/luozhongye/streaming/socket/checkpoint)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts lines.flatMap(lambda line: line.split( )) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()为了测试程序的运行效果首先新建一个终端记作“数据源终端”执行如下命令运行nc 程序 cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999然后再新建一个终端记作“流计算终端”运行客户端程序 WindowedNetworkWordCount.py命令如下 cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999在数据源终端内连续输入 10 个“hadoop”每个 hadoop 单独占一行即每输入一个 hadoop就按回车键再连续输入 10 个“spark”每个 spark 单独占一行。这时可以查看流计算终端内显示的词频动态统计结果可以看到随着时间的流逝词频统计结果会发生动态变化。 updateStateByKey 操作 在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py输入以下代码 #!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCountStateful.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingStatefulNetworkWordCount)ssc StreamingContext(sc, 1)ssc.checkpoint(file:///root/Desktop/luozhongye/streaming/stateful/)# RDD with initial state (key, value) pairsinitialStateRDD sc.parallelize([(uhello, 1), (uworld, 1)])def updateFunc(new_values, last_sum):return sum(new_values) (last_sum or 0)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts lines.flatMap(lambda line: line.split( )) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDDinitialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()新建一个终端记作“数据源终端”执行如下命令启动 nc 程序 nc -lk 9999新建一个 Linux 终端记作“流计算终端”执行如下命令提交运行程序 cd /root/Desktop/luozhongye/streaming/stateful /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999四、把 DStream 输出到文本文件中 下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改把生成的词频统计结果写入文本文件中。 修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下 #!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ __main__:if len(sys.argv) ! 3:print(Usage: NetworkWordCountStateful.py hostname port, filesys.stderr)exit(-1)sc SparkContext(appNamePythonStreamingStatefulNetworkWordCount)ssc StreamingContext(sc, 1)ssc.checkpoint(file:///root/Desktop/luozhongye/streaming/stateful/)# RDD with initial state (key, value) pairs initialStateRDD sc.parallelize([(uhello, 1), (uworld, 1)])def updateFunc(new_values, last_sum):return sum(new_values) (last_sum or 0)lines ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts lines.flatMap(lambda line: line.split( )) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDDinitialStateRDD)running_counts.saveAsTextFiles(file:///root/Desktop/luozhongye/streaming/stateful/output)running_counts.pprint()ssc.start()ssc.awaitTermination()新建一个终端记作“数据源终端”执行如下命令运行nc 程序 cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999新建一个 Linux 终端记作“流计算终端”执行如下命令提交运行程序 cd /root/Desktop/luozhongye/streaming/stateful /usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999实验心得 通过本次实验我深入理解了Spark Streaming包括创建StreamingContext、DStream等对象。同时我了解了Spark Streaming对不同类型数据流的处理方式如文件流、套接字流和RDD队列流。此外我还熟悉了Spark Streaming的转换操作和输出编程操作并掌握了map、flatMap、filter等方法。最后我能够自定义输出方式和格式。总之这次实验让我全面了解了Spark Streaming对未来的学习和工作有很大的帮助。
http://www.hkea.cn/news/14571661/

相关文章:

  • 网站ping怎么做外贸cms什么意思
  • 网站开发项目对自身的意义农产品销售网站建设方案
  • 本地搭建网站wordpress提醒美化
  • 四川省建设安全协会网站网站内容建设平面设计
  • 凯里市经济开发区建设局网站深圳it培训
  • 百度开户做网站2400大专毕业证怎么弄一个
  • 小程序 网站 开发做债的网站
  • 网站开发语言那个好沂源网站设计
  • 网站开发业务怎么做公众号开发网站公司
  • 给城市建设提议献策的网站广告外链购买交易平台
  • 自己能注册网站吗小公司怎么做免费网站
  • 网站前后台代码平面设计素材免费网站有哪些
  • 千博企业网站管理系统完整版 2014在阿里国际站做的网站
  • 企业网站营销优缺点做网站先用dw还是asp
  • 网络推广的网站2008 wordpress
  • 深圳公司建设网站制作网络公司经营范围包括哪些
  • 最新仿5173游戏装备交易网站 游戏币交易平台源码整合支付接口网站开发软件技术开发公司
  • lamp网站开发黄金组合下载移动端网站制作的有哪些要求
  • 网站常见 8搜索引擎优化包括( )方面的优化
  • wordpress图片分享插件下载杭州排名优化软件
  • 宁波建网站公司哪家hao台州知名网站
  • 有那些做任务的网站wordpress uploads 权限
  • 国外做网站的公司商务网站建设的基本步骤
  • 网站备案一般多久懂装修公司怎么样
  • 深圳企业网站开发费用公司公司网站建设公司
  • 上线了自助建站怎么查名字有没有被注册商标
  • wordpress自带的404资阳seo快速排名
  • 怎么建做网站免费网站建设行情
  • 浙江信息港德州乐陵德州seo公司
  • 海外公司网站 国内做备案网站建设与运营市场风险