当前位置: 首页 > news >正文

六安网站建设 220谷歌浏览器2021最新版

六安网站建设 220,谷歌浏览器2021最新版,淘宝直播要先建设个网站吗,wap模板10 Flink CDC 1. CDC是什么2. CDC 的种类3. 传统CDC与Flink CDC对比4. Flink-CDC 案例5. Flink SQL 方式的案例 1. CDC是什么 CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数…

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}
http://www.hkea.cn/news/379967/

相关文章:

  • 为什么上不了建设银行个人网站漳州网络推广
  • 天津手机网站建站培训代运营公司可靠吗
  • 网站制作的一般步骤长春网站优化平台
  • Python做网站 性能上海seo培训中心
  • 网上投诉平台公众号排名优化
  • 网页模板网站推荐媒体公关是做什么的
  • 泰安的网站建设公司爱站网域名查询
  • 台州椒江网站制作公司广告推销
  • 南康做网站合肥seo招聘
  • 成都网站建设定长沙专业网站制作
  • 有什么网站是python做的如何自己开发一个平台
  • 网站建设标志设计北京网站优化公司
  • 图标使用wordpress杭州seo博客
  • 企业网站如何做推广竞价推广托管公司介绍
  • 网站如何做微信登录seo公司 杭州
  • 中山里水网站建设软文广告案例分析
  • 做外贸是用什么网站做新型网络营销方式
  • 心理咨询网站开发百度手机seo软件
  • 17网站一起做网批seo营销优化
  • 做赚钱网站程序员培训班要多少钱
  • 已经收录大规模修改收录页面对网站有影响吗什么软件可以推广自己的产品
  • 丁香园做科室网站厦门网络推广
  • 免费的企业网站制作提高网站权重的方法
  • 兰州网站制作怎么样网页在线生成
  • 自建网站网址雅虎搜索引擎首页
  • 注册科技有限公司可以做网站吗百度搜索排名机制
  • 武汉做网站好网站制作多少钱一个
  • 安阳网站建设怎么从网上找客户
  • 文章博客媒体网站模板怎样在百度上打广告
  • 做网站是不是要模板直接打开百度