上饶专业的企业网站开发公司,番禺网站建设三杰科技,做织梦网站时图片路径显示错误,网站ico如何添加数据输入
RDD对象
如图可见#xff0c;PySpark支持多种数据的输入#xff0c;在输入完成后#xff0c;都会得到一个#xff1a;RDD类的对象
RDD全称为#xff1a;弹性分布式数据集#xff08;Resilient Distributed Datasets#xff09;
PySpark针对数据的处理…数据输入
RDD对象
如图可见PySpark支持多种数据的输入在输入完成后都会得到一个RDD类的对象
RDD全称为弹性分布式数据集Resilient Distributed Datasets
PySpark针对数据的处理都是以RDD对象作为载体即
数据存储在RDD内各类数据的计算方法也都是RDD的成员方法RDD的数据计算方法返回值依旧是RDD对象 PySpark的编程模型上图可以归纳为
准备数据到RDD - RDD迭代计算 - RDD导出为list、文本文件等即源数据 - RDD - 结果数据
Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法将
listtuplesetdictstr
转换为PySpark的RDD对象
注意
字符串会被拆分出1个个的字符存入RDD对象字典仅有key会被存入RDD对象
读取文件转RDD对象
PySpark也支持通过SparkContext入口对象来读取文件来构建出RDD对象。 总结
1. RDD对象是什么为什么要使用它
RDD对象称之为分布式弹性数据集是PySpark中数据计算的载体它可以
提供数据存储提供数据计算的各类方法数据计算的方法返回值依旧是RDDRDD迭代计算
后续对数据进行各类计算都是基于RDD对象进行
2. 如何输入数据到Spark即得到RDD对象
通过SparkContext的parallelize成员方法将Python数据容器转换为RDD对象通过SparkContext的textFile成员方法读取文本文件得到RDD对象
数据计算
map方法
PySpark的数据计算都是基于RDD对象来进行的那么如何进行呢
自然是依赖RDD对象内置丰富的成员方法算子 语法 演示PySpark代码加载数据即数据输入from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# # 通过parallelize方法将Python对象加载到Spark内成为RDD对象
rdd1 sc.parallelize([1, 2, 3, 4, 5])
rdd2 sc.parallelize((1, 2, 3, 4, 5))
rdd3 sc.parallelize(abcdefg)
rdd4 sc.parallelize({1, 2, 3, 4, 5})
rdd5 sc.parallelize({key1: value1, key2: value2, key3: value3})# # 如果要查看RDD里边有什么内容需要用collect方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
# 用过textFile方法读取文件数据加载到Spark内成为RDD对象
rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt)
print(rdd.collect())
sc.stop() 总结 1. map算子成员方法
接受一个处理函数可用lambda表达式快速编写对RDD内的元素逐个处理并返回一个新的RDD
2. 链式调用
对于返回值是新RDD的算子可以通过链式调用的方式多次调用算子。
flatMap方法 总结
flatMap算子
计算逻辑和map一样可以比map多出解除一层嵌套的功能
reduceByKey方法 PySpark代码加载数据reduceByKey方法
针对KV型 RDD
自动按照key分组,然后根据你提供的聚合逻辑完成组内数(value)的聚合操作.二元元祖
from pyspark import SparkConf, SparkContext
# 配置Python解释器
import os
os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)rdd sc.parallelize([(男, 99), (女, 88),(女,99), (男,77), (男, 55)])
# 需求求男生和女生俩个组的成绩之和
rdd2 rdd.reduceByKey(lambda a, b: a b)
print(rdd2.collect())总结
reduceByKey算子
接受一个处理函数对数据进行两两计算 练习案例1
WordCount案例
使用学习到的内容完成
读取文件统计文件内单词的出现数量 hello.txt itheima itheima itcast itheima spark python spark python itheima itheima itcast itcast itheima python python python spark pyspark pyspark itheima python pyspark itcast spark 完成练习案例单词计数统计# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
import os
os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)
# 2.读取数据
rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt)
# 3.取出全部单词
wor_rdd rdd.flatMap(lambda a: a.split( ))
# print(wor_rdd.collect())
# 4.将所有单词都转换成二元元组单词为keyValue设置为1
word_with_one_rdd wor_rdd.map(lambda word: (word, 1))
# print(word_with_one_rdd.collect())
# 5.分组并求和
result word_with_one_rdd.reduceByKey(lambda a, b: a b)
# 6.打印输出结果
print(result.collect())结果
[(python, 6), (itheima, 7), (itcast, 4), (spark, 4), (pyspark, 3)]filter方法
功能过滤想要的数据进行保留 PySpark代码加载数据Filter方法from pyspark import SparkConf, SparkContext
# 配置Python解释器
import osos.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7])
# 对RDD的数据进行过滤
rdd2 rdd.filter(lambda num: num % 2 0) # 整数返回true 奇数返回falseprint(rdd2.collect())结果
[2, 4, 6] 总结
filter算子
接受一个处理函数可用lambda快速编写函数对RDD数据逐个处理得到True的保留至返回值的RDD中
distinct方法
功能对RDD数据进行去重返回新的RDD
语法 rdd.distinct 无需传参 PySpark代码加载数据distinct方法
去重 无需传参from pyspark import SparkConf, SparkContext
# 配置Python解释器
import osos.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1, 1, 2, 3, 3, 3, 5, 6, 7, 7, 7, 7, 7])
# 对RDD的数据进行去重
rdd2 rdd.distinct()
print(rdd2.collect())结果
[1, 2, 3, 5, 6, 7]总结 distinct算子
完成对RDD内数据的去重操作
sortBy方法
功能对RDD数据进行排序基于你指定的排序依据 PySpark代码加载数据sortBy方法
排序
语法:
rdd.sortBy(func,ascendingFalse, numPartitions1)
# func:(T)U:告知按照rdd中的哪个数据进行排序
比如lambda x:x[1]表示按照rdd中的第二列元素进行排序
# ascending True升序 False降序
# numPartitions:用多少分区排序# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
import os
os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)
# 2.读取数据
rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt)
# 3.取出全部单词
wor_rdd rdd.flatMap(lambda a: a.split( ))
# 4.将所有单词都转换成二元元组单词为keyValue设置为1
word_with_one_rdd wor_rdd.map(lambda word: (word, 1))
# 5.分组并求和
result word_with_one_rdd.reduceByKey(lambda a, b: a b)
# 6.打印输出结果
print(result.collect())
# 7.对结果进行排序
a result.sortBy(lambda x: x[1], ascendingFalse, numPartitions1) # 降序
print(a.collect())b result.sortBy(lambda x: x[1], ascendingTrue, numPartitions1) # 升序
print(b.collect())结果
[(python, 6), (itheima, 7), (itcast, 4), (spark, 4), (pyspark, 3)]
[(itheima, 7), (python, 6), (itcast, 4), (spark, 4), (pyspark, 3)]
[(pyspark, 3), (itcast, 4), (spark, 4), (python, 6), (itheima, 7)]总结
sortBy算子
接收一个处理函数可用lambda快速编写函数表示用来决定排序的依据可以控制升序或降序全局排序需要设置分区数为1
练习案例2
案例 {id:1,timestamp:2019-05-08T01:03.00Z,category:平板电脑,areaName:北京,money:1450}|{id:2,timestamp:2019-05-08T01:01.00Z,category:手机,areaName:北京,money:1450}|{id:3,timestamp:2019-05-08T01:03.00Z,category:手机,areaName:北京,money:8412} {id:4,timestamp:2019-05-08T05:01.00Z,category:电脑,areaName:上海,money:1513}|{id:5,timestamp:2019-05-08T01:03.00Z,category:家电,areaName:北京,money:1550}|{id:6,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:杭州,money:1550} {id:7,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:北京,money:5611}|{id:8,timestamp:2019-05-08T03:01.00Z,category:家电,areaName:北京,money:4410}|{id:9,timestamp:2019-05-08T01:03.00Z,category:家具,areaName:郑州,money:1120} {id:10,timestamp:2019-05-08T01:01.00Z,category:家具,areaName:北京,money:6661}|{id:11,timestamp:2019-05-08T05:03.00Z,category:家具,areaName:杭州,money:1230}|{id:12,timestamp:2019-05-08T01:01.00Z,category:书籍,areaName:北京,money:5550} {id:13,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:5550}|{id:14,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:1261}|{id:15,timestamp:2019-05-08T03:03.00Z,category:电脑,areaName:杭州,money:6660} {id:16,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:天津,money:6660}|{id:17,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:9000}|{id:18,timestamp:2019-05-08T05:01.00Z,category:书籍,areaName:北京,money:1230} {id:19,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:杭州,money:5551}|{id:20,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:2450} {id:21,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:5520}|{id:22,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:6650} {id:23,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:1240}|{id:24,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:天津,money:5600} {id:25,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:7801}|{id:26,timestamp:2019-05-08T01:01.00Z,category:服饰,areaName:北京,money:9000} {id:27,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:5600}|{id:28,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:8000}|{id:29,timestamp:2019-05-08T02:03.00Z,category:服饰,areaName:杭州,money:7000} 需求复制以上内容到文件中使用Spark读取文件进行计算
各个城市销售额排名从大到小全部城市有哪些商品类别在售卖北京市有哪些商品类别在售卖 使用Spark读取文件进行计算
各个城市销售额排名从大到小
全部城市有哪些商品类别在售卖
北京市有哪些商品类别在售卖import json
from pyspark import SparkConf, SparkContext
import os
os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# TOD0 需求1城市销售额排名
# 1.1 读取文件得到RDD
rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/orders.txt)
# print(rdd.collect())# 1.2取出一个个JSON字符串
json_str rdd.flatMap(lambda a: a.split(|))
# print(json_str.collect())# 1.3将一个个JSON字符串转换为字典
my_dict json_str.map(lambda x: json.loads(x))
# print(my_dict.collect())# 1.4 取出城市和销售额数据
# 城市销售额
city_with_money_rdd my_dict.map(lambda x: (x[areaName], int(x[money])))
# print(city_with_money_rdd.collect())# 1.5 按城市分组按销售额聚合
city_result city_with_money_rdd.reduceByKey(lambda a, b: a b)
# print(city_result.collect())# 1.6 按销售额聚合结果进行排序
sorting city_result.sortBy(lambda a: a[1], ascendingFalse, numPartitions1)
print(f需求1的结果是{sorting.collect()})# TODD 需求2全部城市有哪些商品类别在售卖
city_with_category_rdd my_dict.map(lambda x: (x[category])).distinct()
print(f需求2的结果是{city_with_category_rdd.collect()})# TODD 需求3北京市有哪些商品类别在售卖
# 3.1 过滤北京是市的数据
bj_dict my_dict.filter(lambda a: a[areaName] 北京)
# print(bj_dict.collect())# # 3.2 取出全部商品列表
# bj_category bj_dict.map(lambda a: a[category])
# print(bj_category.collect())# # 3.3 进行商品类别去重
# bj_category_distinct bj_category.distinct()
# print(f北京市售卖的商品有{bj_category_distinct.collect()})# 3.2 取出全部商品列表 进行商品类别去重
bj_category bj_dict.map(lambda a: a[category]).distinct()
print(f北京市售卖的商品有{bj_category.collect()})结果
需求1的结果是[(北京, 91556), (杭州, 28831), (天津, 12260), (上海, 1513), (郑州, 1120)]需求2的结果是[电脑, 家电, 食品, 平板电脑, 手机, 家具, 书籍, 服饰]北京市售卖的商品有[家电, 电脑, 食品, 平板电脑, 手机, 家具, 书籍, 服饰]
数据输出
输出为Python对象
数据输入
sc.parallelizesc.textFile
数据计算
rdd.maprdd.flatMaprdd.reduceByKey... collect算子
功能将rdd各个分区内的数据统一收集到Driver中形成一个List对象
用法
rdd.collect
返回一个List
reduce算子
功能对RDD数据集按照你传入的逻辑进行聚合 返回值等同于计算函数的返回值 take算子 功能取RDD的前N个元素组合成List返回给你
用法
sc.parallelize([1, 2, 65, 5, 8, 841, 2, 48, 12, 21, 48]).take(6)结果:
[1, 2, 65, 5, 8, 841] count算子
功能计算RDD有多少条数据返回值是一个数字
总结
1. Spark的编程流程就是
将数据加载为RDD数据输入对RDD进行计算数据计算将RDD转换为Python对象数据输出
2. 数据输出的方法
collect将RDD内容转换为listreduce对RDD内容进行自定义聚合take取出RDD的前N个元素组成listcount统计RDD元素个数
数据输出可用的方法是很多的简单的介绍了4个。
输出到文件中
saveAsTextFile算子
功能将RDD的数据写入文本文件中
支持 本地写成hdfs等文件系统
代码 rdd sc.parallelize([1, 2, 3, 4, 5, 6]) rdd.saveAsTextFile(D:/output) 注意事项
调用保存文件的算子需要配置Hadoop依赖
下载Hadoop安装包
解压到电脑任意位置在Python代码中使用os模块配置os.environ[‘HADOOP_HOME’] ‘HADOOP解压文件夹路径’下载winutils.exe并放入Hadoop解压文件夹的bin目录内下载hadoop.dll并放入:C:/Windows/System32 文件夹内 修改rdd分区为1个
方式1SparkConf对象设置属性全局并行度为1
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
# 设置spark全局并行度为1
conf.set(spark.default.parallelism, 1)sc SparkContext(confconf)
方式2创建RDD的时候设置parallelize方法传入numSlices参数为1
rdd1 sc.parallelize([1, 2, 3, 4, 5, 6], numSlices1) # 设置分区为1rdd1 sc.parallelize([1, 2, 3, 4, 5, 6], 1) # 设置分区为1
总结
1. RDD输出到文件的方法
rdd.saveAsTextFile(路径)输出的结果是一个文件夹有几个分区就输出多少个结果文件
2. 如何修改RDD分区
SparkConf对象设置conf.set(spark.default.parallelism, 1)创建RDD的时候sc.parallelize方法传入numSlices参数为1