亚马逊网站 如何做站内seo,网站建设官网免费模板,中山做网站哪家公司好,网站模块是指什么地方文章最前#xff1a; 我是Octopus#xff0c;这个名字来源于我的中文名--章鱼#xff1b;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github #xff1b;这博客是记录我学习的点点滴滴#xff0c;如果您对 Python、Java、AI、算法有兴趣#xff0c;可以关注我的… 文章最前 我是Octopus这个名字来源于我的中文名--章鱼我热爱编程、热爱算法、热爱开源。所有源码在我的个人github 这博客是记录我学习的点点滴滴如果您对 Python、Java、AI、算法有兴趣可以关注我的动态一起学习共同进步。 这篇文章旨在帮你写出健壮的pyspark 代码。
在这里通过它写pyspark单元测试看这个代码通过PySpark built下载该目录代码查看JIRA 看板票的pyspark测试 创建PySpark应用
这边一个例子是怎么创建pyspark应用如果你的应用已经测试你可以跳过这一段测试你的pyspark程序。
现在开始测试你的spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col# Create a SparkSession
spark SparkSession.builder.appName(Testing PySpark Example).getOrCreate()
接下来创建一个DataFrame
sample_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]df spark.createDataFrame(sample_data)
现在我们对我们的DataFrame来定义转换算子
from pyspark.sql.functions import col, regexp_replace# Remove additional spaces in name
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified columndf_transformed df.withColumn(column_name, regexp_replace(col(column_name), \\s, ))return df_transformedtransformed_df remove_extra_spaces(df, name)transformed_df.show()
-----------
|age| name|
-----------
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
-----------
测试你的pyspark应用
现在来测试你的pyspark转换算子。一个选择简化DataFrame测试结果可以简化数据或者输入数据。更好的方式写测试例子这里有一些例子怎么去测试我们的代码这些代码是基于spark 3.5以下版本。对于这些例子做笔记是非常值得的可以通过测试框架不管你是使用unittest or pytest built-in PySpark 测试是单机的意味着他兼容测试框架和CI测试
选项1 仅仅使用PySpark Built-in 测试方法
import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual# Example 1
df1 spark.createDataFrame(data[(1, 1000), (2, 3000)], schema[id, amount])
df2 spark.createDataFrame(data[(1, 1000), (2, 3000)], schema[id, amount])
assertDataFrameEqual(df1, df2) # pass, DataFrames are identical
# Example 2
df1 spark.createDataFrame(data[(1, 0.1), (2, 3.23)], schema[id, amount])
df2 spark.createDataFrame(data[(1, 0.109), (2, 3.23)], schema[id, amount])
assertDataFrameEqual(df1, df2, rtol1e-1) # pass, DataFrames are approx equal by rtol 您还可以简单地比较两个 DataFrame 模式
from pyspark.testing.utils import assertSchemaEqual
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleTypes1 StructType([StructField(names, ArrayType(DoubleType(), True), True)])
s2 StructType([StructField(names, ArrayType(DoubleType(), True), True)])assertSchemaEqual(s1, s2) # pass, schemas are identical
选项 2使用单元测试
对于更复杂的测试场景您可能需要使用测试框架。
最流行的测试框架选项之一是单元测试。让我们逐步了解如何使用内置 Pythonunittest库来编写 PySpark 测试。有关该unittest库的更多信息请参阅此处 https: //docs.python.org/3/library/unittest.html。
首先您需要一个 Spark 会话。您可以使用包classmethod中的装饰器unittest来负责设置和拆除 Spark 会话。
import unittestclass PySparkTestCase(unittest.TestCase):classmethoddef setUpClass(cls):cls.spark SparkSession.builder.appName(Testing PySpark Example).getOrCreate()classmethoddef tearDownClass(cls):cls.spark.stop() 现在我们来写一个unittest类。
from pyspark.testing.utils import assertDataFrameEqualclass TestTranformation(PySparkTestCase):def test_single_space(self):sample_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]# Create a Spark DataFrameoriginal_df spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df remove_extra_spaces(original_df, name)expected_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]expected_df spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df) 运行时unittest将选取名称以“test”开头的所有函数。 选项 3使用Pytest pytest我们还可以使用最流行的 Python 测试框架之一来编写测试。有关 的更多信息pytest请参阅此处的文档 https: //docs.pytest.org/en/7.1.x/contents.html。 使用pytest固定装置允许我们在测试之间共享 Spark 会话并在测试完成时将其拆除。 import pytestpytest.fixture
def spark_fixture():spark SparkSession.builder.appName(Testing PySpark Example).getOrCreate()yield spark 然后我们可以这样定义我们的测试 import pytest
from pyspark.testing.utils import assertDataFrameEqualdef test_single_space(spark_fixture):sample_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]# Create a Spark DataFrameoriginal_df spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df remove_extra_spaces(original_df, name)expected_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]expected_df spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)当您使用该pytest命令运行测试文件时它将选取名称以“test”开头的所有函数。 把它们放在一起 让我们在单元测试示例中一起查看所有步骤。 # pkg/etl.py
import unittestfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.testing.utils import assertDataFrameEqual# Create a SparkSession
spark SparkSession.builder.appName(Sample PySpark ETL).getOrCreate()sample_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]df spark.createDataFrame(sample_data)# Define DataFrame transformation function
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified column using regexp_replacedf_transformed df.withColumn(column_name, regexp_replace(col(column_name), \\s, ))return df_transformed # pkg/test_etl.py
import unittestfrom pyspark.sql import SparkSession# Define unit test base class
class PySparkTestCase(unittest.TestCase):classmethoddef setUpClass(cls):cls.spark SparkSession.builder.appName(Sample PySpark ETL).getOrCreate()classmethoddef tearDownClass(cls):cls.spark.stop()# Define unit test
class TestTranformation(PySparkTestCase):def test_single_space(self):sample_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]# Create a Spark DataFrameoriginal_df spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df remove_extra_spaces(original_df, name)expected_data [{name: John D., age: 30},{name: Alice G., age: 25},{name: Bob T., age: 35},{name: Eve A., age: 28}]expected_df spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df) unittest.main(argv[], verbosity0, exitFalse)在 1.734 秒内完成 1 次测试unittest.main.TestProgram 位于 0x174539db0