当前位置: 首页 > news >正文

济南手机网站建设公司排名wordpress怎么安装到阿里云

济南手机网站建设公司排名,wordpress怎么安装到阿里云,南京网站推广排名,网站建设综合实训心得体会大纲 标量函数入参并非表中一行#xff08;Row#xff09;入参是表中一行#xff08;Row#xff09;alias PyFlink中关于用户定义方法有#xff1a; UDF#xff1a;用户自定义函数。UDTF#xff1a;用户自定义表值函数。UDAF#xff1a;用户自定义聚合函数。UDTAF… 大纲 标量函数入参并非表中一行Row入参是表中一行Rowalias PyFlink中关于用户定义方法有 UDF用户自定义函数。UDTF用户自定义表值函数。UDAF用户自定义聚合函数。UDTAF用户自定义表值聚合函数。 这些字母可以拆解如下 UD表示User Defined用户自定义F表示Function方法;T表示Table表;A表示Aggregate聚合 Aggregate聚合函数是指以多行数据为输入计算出一个新的值的函数。这块我们会在后续的章节介绍本文我们主要介绍非聚合类型的用户自定义方法的简单使用。 标量函数 即我们常见的UDF。 def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataType, str, List[str]] None,result_type: Union[DataType, str] None,deterministic: bool None, name: str None, func_type: str general,udf_type: str None) - Union[UserDefinedScalarFunctionWrapper, Callable]:我们主要关注result_type和input_types它们分别用于确定函数的输入和输出。 input_types可以是List[DataType], DataType, str, List[str]之一任何一种这个要视使用者决定。UDTF也是这种类型它们没啥区别。 result_type只能是DataType或str而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。 我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符然后统计字符出现的次数。 在介绍例子之前我们先构造Execute之前的准备环境 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctionword_count_data [A, B, C, D, E, F, G, H, I, J, A, G] def word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(word, DataTypes.STRING())])tab_source t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema Schema.new_builder() \.column(word, DataTypes.STRING().not_null()) \.column(count, DataTypes.BIGINT()) \.primary_key(word) \.build()# Create a sink descriptorsink_descriptor TableDescriptor.for_connector(print)\.schema(sink_schema) \.build()t_env.create_temporary_table(WordsCountTableSink, sink_descriptor)这段代码从读取数据word_count_data并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时UDF怎么写 入参并非表中一行Row udf(result_typeDataTypes.ROW([DataTypes.FIELD(lower_word, DataTypes.STRING())]), input_types[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())input_types我们设置成[DataTypes.STRING()]即该数组中只有一个参数也表示修饰的方法只有一个参数类型是String。如果觉得input_types写起来麻烦这个参数可以不设置。 result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述即一行只有一个字段——lower_word它的类型也是String。 tab_lowertab_source.map(colFunc(col(word)))map方法中我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段只有UDF中result_type定义的lower_word。 def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) - Table:后续只要使用这个新表新字段即可。 tab_lower.group_by(col(lower_word)) \.select(col(lower_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()完整代码 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctionword_count_data [A, B, C, D, E, F, G, H, I, J, A, G] def word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(word, DataTypes.STRING())])tab_source t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema Schema.new_builder() \.column(word, DataTypes.STRING().not_null()) \.column(count, DataTypes.BIGINT()) \.primary_key(word) \.build()# Create a sink descriptorsink_descriptor TableDescriptor.for_connector(print)\.schema(sink_schema) \.build()t_env.create_temporary_table(WordsCountTableSink, sink_descriptor)udf(result_typeDataTypes.ROW([DataTypes.FIELD(lower_word, DataTypes.STRING())]), input_types[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lowertab_source.map(colFunc(col(word))) tab_lower.group_by(col(lower_word)) \.select(col(lower_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()if __name__ __main__:word_count()入参是表中一行Row udf(result_typeDataTypes.ROW([DataTypes.FIELD(lower_word, DataTypes.STRING())]), input_typesrow_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lowertab_source.map(rowFunc) tab_lower.group_by(col(lower_word)) \.select(col(lower_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()主要的区别是map方法直接传递udf修饰的方法而不是直接其调用返回值。input_types是原始表的行结构——RowType而不是一个参数数组。 map方法给rowFunc传递原始表tab_source的每行数据然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段而不是老表中的字段。 alias 前面两个案例在定义UDF时我们严格设置了result_type和input_types。实际input_types可以不用设置但是result_type必须设置。上面例子中result_type我们都设置为RowType即表行的结构。如果觉得这样写很麻烦可以考虑使用alias来实现。 udf(result_typeDataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lowertab_source.map(colFunc(col(word))).alias(lower_word)tab_lower.group_by(col(lower_word)) \.select(col(lower_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()udf(result_typeDataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lowertab_source.map(rowFunc).alias(lower_word)tab_lower.group_by(col(lower_word)) \.select(col(lower_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()这样我们在定义udf时只是指定了返回类型是个字符串也不知道它在新表中叫啥名字实际叫f0。但是为了便于后续使用我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。
http://www.hkea.cn/news/14473133/

相关文章:

  • 长春建站优化医疗网站建设多少钱
  • 自己的网站怎么赚钱水淼wordpress
  • wordpress网站弹窗插件湖南省建筑信息网
  • 简诉网站建设的基本流程响应式网站wordpress
  • 浅析淘宝网站的建设与运营论文wordpress表单录入
  • 建设网站免费wordpress登陆界面打开慢
  • 如何在网站做淘宝页面青海建设厅质检站网站
  • 手机网站营销方法做货代哪个网站上好找客户
  • 沈阳企业自助建站祥云平台做网站如何
  • 重庆网站公司建设做网站专题页的字大小是多少
  • 凡客网站建立天津市建设与管理局网站下载
  • 博罗中山网站建设网站变灰色代码
  • 筹划建设协会网站的方案网站百度收录是什么意思
  • 怎么弄个人网站做网站明细范文
  • txt电子书下载网站推荐上海国企排名100强
  • 苏州公司技术支持 苏州网站建设网站建设seo优化内蒙
  • 网站前后端分离怎么做贵阳市做网站公司
  • 一个域名访问不同的网站网站建设ydwzjs
  • 免费建设网站赚钱优普道建筑网校
  • 什么叫网站外链免费企业logo设计
  • 做快递网站难吗wordpress用户修改文章
  • 网站项目建设措施国外app设计网站
  • 全站仪快速建站光学网站建设
  • 做外贸维护网站需要注意什么前几年做啥网站能致富
  • 教育类网站建设方案做app和做网站的区别
  • 网站开发只要wordpress 文章导航
  • 如何在阿里网站做外单高并发网站开发语言
  • 网站建设业务的销售人员培训文档沭阳住房和城乡建设局网站
  • 网站单页面怎么做微信开发者工具安装教程
  • 网站推广优化软件wordpress编辑器模板