当前位置: 首页 > news >正文

手机网站免费空间网络设计专业究竟好不好就业

手机网站免费空间,网络设计专业究竟好不好就业,崇州网站制作,徐州58同城网flink-connector-mysql-cdc#xff1a; 01 mysql-cdc基础配置代码演示02 mysql-cdc高级扩展03 mysql-cdc常见问题汇总04 mysql-cdc-kafka生产级代码分享05 flink-kafka-doris生产级代码分享06 flink-kafka-hudi生产级代码分享 flink-cdc版本#xff1a;3.2.0 flink版本 01 mysql-cdc基础配置代码演示02 mysql-cdc高级扩展03 mysql-cdc常见问题汇总04 mysql-cdc-kafka生产级代码分享05 flink-kafka-doris生产级代码分享06 flink-kafka-hudi生产级代码分享 flink-cdc版本3.2.0 flink版本flink-1.18.0 mysql版本8.0.26 java版本1.8 maven版本3.8.4 目录 1. Mysql数据库设置 1.1 开启binlog日志 1.2 创建用户 1.3 准备测试数据  2. 编写测试代码 2.1 maven 依赖 2.2 测试代码 3. mysql-cdc扩展 3.1 时区设置 3.2 为每个读取器设置不同的 SERVER ID 1. Mysql数据库设置 1.1 开启binlog日志 编辑 MySQL 配置文件 在 Unix/Linux 系统中通常是 /etc/my.cnf 或 /etc/mysql/my.cnf。 在 Windows 上可能位于 C:\ProgramData\MySQL\MySQL Server X.Y\my.ini。 # 在 mysqld 部分下添加以下内容如果已经存在请确认其值 [mysqld] log-binmysql-bin # 二进制日志文件前缀MySQL将生成名为 mysql-bin.000001, mysql-bin.000002 等的文件。 binlog-formatrow # 设置二进制日志格式为行级row可选值为 STATEMENT、ROW 和 MIXED这里推荐使用行级。 expire_logs_days7 # 设置二进制日志的过期时间单位为天超过这个天数后的日志将被自动删除这里以 7 天为例。 max-binlog-size100M # 设置单个二进制日志文件的最大大小超出后将自动创建一个新的日志文件可以根据需要调整。 1.2 创建用户 以 “flinkcdc”用户为例 # 创建 MySQL 用户 CREATE USER flinkcdclocalhost IDENTIFIED BY 123456; # 向用户授予所需的权限 GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO flinkcdc IDENTIFIED BY 123456; # 授权所有权限 GRANT ALL PRIVILEGES ON *.* TO flinkcdclocalhost; GRANT ALL PRIVILEGES ON *.* TO flinkcdc%; # 完成用户的权限 FLUSH PRIVILEGES;1.3 准备测试数据  # 使用flinkcdc用户登录数据库# 创建测试数据库 create database cdc_demo; # 创建测试表 CREATE TABLE cdc_demo.flink_cdc_test (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50) NOT NULL,description TEXT,age INT,balance DECIMAL(10, 2),is_active BOOLEAN DEFAULT TRUE,created_at DATETIME DEFAULT CURRENT_TIMESTAMP,birth_date DATE,long_value BIGINT,last_login TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); # 插入测试数据 INSERT INTO cdc_demo.flink_cdc_test (name, description, age, balance, is_active, created_at, birth_date, long_value, last_login) VALUES (Alice Smith, Alice is a software engineer with 5 years of experience., 30, 2500.50, TRUE, 2023-01-01 10:00:00, 1992-05-15, 12345678901234, 2023-05-20 10:00:00), (Bob Johnson, Bob enjoys hiking and outdoor activities., 25, 1500.00, TRUE, 2023-02-15 12:30:00, 1998-08-22, 987654321054321, 2023-05-18 14:00:00), (Charlie Brown, Charlie is an avid reader and coffee lover., 35, 3200.75, FALSE, 2023-03-22 14:45:00, 1988-01-11, 135792468012345, 2023-05-19 09:20:00), (Daisy Miller, Daisy loves painting and traveling., 28, 1800.25, TRUE, 2023-04-05 09:15:00, 1994-11-03, 24681357901234, 2023-05-21 12:30:00), (Ethan White, Ethan enjoys playing guitar and writing songs., 40, 5000.00, TRUE, 2023-05-18 16:20:00, 1983-07-30, 98765432102468, 2023-05-22 15:00:00);2. 编写测试代码 2.1 maven 依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.toroidal/groupIdartifactIdflink-connector-mysql-cdc-demo/artifactIdnameflink-connector-mysql-cdc-demo/nameversion1.0-SNAPSHOT/versionrepositoriesrepositoryidaliyunmaven/idname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public//url/repositoryrepositoryidmirrorId/idnameHuman Readable Name for this Mirror./nameurlhttp://my.repository.com/repo/path/url/repository/repositoriespropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.18.0/flink.versionscala.binary.version2.12/scala.binary.versionflinkcdc.version3.2.0/flinkcdc.versionmysql.version8.0.26/mysql.versionlog4j.version2.17.1/log4j.versionlombok.version1.18.24/lombok.versionfastjson.version1.2.83/fastjson.version/propertiesdependencies!-- flink Dependency --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-core/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version/dependency!-- mysql-cdc --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flinkcdc.version}/version/dependency!-- mysql --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion${mysql.version}/version/dependency!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- log --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion${lombok.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependency/dependenciesbuildsourceDirectorysrc/main/java/sourceDirectoryplugins!-- 编译插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.9.0/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-surefire-plugin/artifactIdversion2.18.1/versionconfigurationuseFilefalse/useFiledisableXmlReporttrue/disableXmlReportincludesinclude**/*Test.*/includeinclude**/*Suite.*/include/includes/configuration/plugin!-- 打包插件(会包含所有依赖) --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationfiltersfilterartifact*:*/artifactexcludes!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --excludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformer!-- 设置jar包的入口类(可选) --mainClasscom.toroidal.mysql.MysqlCdcStreamApp/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins/build /project 2.2 测试代码 package com.toroidal.mysql;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;/*** Author Toroidal* Date 2024/12/04 14:42* Version 1.0*/ public class MysqlCdcStreamApp {public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(localhost).port(3306).username(flinkcdc).password(123456)// 表所在的数据库库名如果需要捕获整个库的表配置为.*.如果需要捕获多个数据库配置为.databaseList(cdc01, cdc02).databaseList(cdc_demo)// 设置需要捕获日志的表名注意需要配置库名大小敏感.tableList(cdc_demo.flink_cdc_test)// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source).setParallelism(4).print().setParallelism(1);env.execute(Print MySQL Snapshot Binlog);} } 运行结果  3. mysql-cdc扩展 3.1 时区设置 mysql-cdc读取出来的 timestamp 字段时区相差8小时将时区和MySQL服务器时区设置一致即可 查询当前数据库时区 SELECT * FROM mysql.time_zone_name; 设置时区为东八时区 .serverTimeZone(Asia/Shanghai) MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(localhost).port(3306).username(flinkcdc).password(123456)// 表所在的数据库库名如果需要捕获整个库的表配置为.*.如果需要捕获多个数据库配置为.databaseList(cdc01, cdc02).databaseList(cdc_demo)// 设置需要捕获日志的表名注意需要配置库名大小敏感.tableList(cdc_demo.flink_cdc_test).serverTimeZone(Asia/Shanghai)// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build(); 3.2 为每个读取器设置不同的 SERVER ID 每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 ID称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此如果不同的作业共享相同的服务器 ID则可能会导致从错误的 binlog 位置读取。  .serverId(flink-cdc-01) MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(localhost).port(3306)// 表所在的数据库库名如果需要捕获整个库的表配置为.*.如果需要捕获多个数据库配置为.databaseList(cdc01, cdc02).databaseList(cdc)// 设置需要捕获日志的表名注意需要配置库名大小敏感.tableList(cdc.flink_cdc_test).username(flinkcdc).serverTimeZone(Asia/Shanghai).serverId(flink-cdc-01).password(123456)// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();
http://www.hkea.cn/news/14405573/

相关文章:

  • 深圳网站制作hi0755中国建设银行北京市互联网网站
  • 住房和城乡建设部网站 绿地申请网站建设费用的请示
  • sem和seo哪个工作好资阳优化团队信息
  • 科技类网站怎么做做外贸需要关注的网站有什么好处
  • 高端网站开发方案岳阳市住房和城乡建设路网站
  • 网站架构设计英文翻译百度网站下拉排名
  • 做网站不带优化的吗可视化网页编辑工具
  • 网易 自助网站建设wordpress js无效
  • 哪个网站做婚礼邀请函好wordpress教程自学网
  • 马克·扎克伯格大学做的网站大连建设局网站地址
  • 兴宁电子商务网站建设国家高新技术企业专利要求
  • 东莞建站响应式网站多少钱免费做logo的网站
  • 东莞大朗网站建设仗剑网站优化关键词排名公司
  • 加盟网官方网站做职业规划的网站
  • 晚上睡不着正能量网站福建建设执业资格官网
  • 站长工具权重查询wordpress使用技巧
  • 自做网站教程网站支付宝网上支付功能怎么做
  • 顺企网宁波网站建设网站设计师绩效
  • 有了自己的域名怎么做网站军事网站模板下载
  • 网站404是什么意思在局域网建设网站
  • 做wordpress 下载站vps一定要Wordpress吗
  • 网站内如何@网站建设公司专业开发北京网站
  • 上传网站需要什么软件教师遭网课入侵直播录屏曝光口
  • 姜堰网站建设团购网站APP怎么做
  • 网站建设业阿里巴巴网站备案号
  • 郑州东站附近网站建设公司音乐网站开发与需求
  • lol门户网站源码高端建站神器
  • 娄底住房和城乡建设部网站南昌的网站建设公司
  • 网站可以做哪些广告百度不做网站外链是什么原因
  • 口碑好的做网站公司哪家好定制酒