当前位置: 首页 > news >正文

优质的小企业网站建设wordpress 查询页面

优质的小企业网站建设,wordpress 查询页面,域名网站建设,网站建设的一般步骤包括文章目录 前言知识积累CDC简介CDC的种类常见的CDC方案比较 Springboot接入Flink CDC环境准备项目搭建 本地运行集群运行将项目打包将包传入集群启动远程将包部署到flink集群 写在最后 前言 前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建#xff0c… 文章目录 前言知识积累CDC简介CDC的种类常见的CDC方案比较 Springboot接入Flink CDC环境准备项目搭建 本地运行集群运行将项目打包将包传入集群启动远程将包部署到flink集群 写在最后 前言 前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建相信各位看官都已经搭建好了自己的运行环境。那么今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 知识积累 CDC简介 CDC 的全称是 Change Data Capture变更数据捕获技术 在广义的概念上只要是能捕获数据变更的技术我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更是一种用于捕获数据库中数据变更的技术。 CDC的种类 CDC 的技术方案非常多目前业界主流的实现机制可以分为两种 基于查询的 CDC ◆离线调度查询作业批处理。把一张表同步到其他系统每次通过查询去获取表中最新的数据 ◆无法保障数据一致性查的过程中有可能数据已经发生了多次变更 ◆不保障实时性基于离线调度存在天然的延迟。 基于日志的 CDC ◆实时消费日志流处理例如 MySQL 的 binlog 日志完整记录了数据库中的变更可以把 binlog 文件当作流的数据源 ◆保障数据一致性因为 binlog 文件包含了所有历史变更明细 ◆保障实时性因为类似 binlog 的日志文件是可以流式消费的提供的是实时数据。 常见的CDC方案比较 Springboot接入Flink CDC 由于Flink官方提供了Java、Scala、Python语言接口用以开发Flink应用程序故我们可以直接用Maven引入Flink依赖进行功能实现。 环境准备 1、SpringBoot 2.4.3 2、Flink 1.13.6 3、Scala 2.11 4、Maven 3.6.3 5、Java 8 6、mysql 8 7、es 7 Springboot、Flink、Scala版本一定要相匹配也可以严格按照本博客进行配置。 注意 如果只是本机测试玩玩Maven依赖已经整合计算环境不用额外搭建Flink环境如果需要部署到Flink集群则需要额外搭建Flink集群。另外Scala 版本只是用于依赖选择不用关心Scala环境。 项目搭建 1、引入Flink CDC Maven依赖 pom.xml parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.4.3/versionrelativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdflink-demo/artifactId version0.0.1-SNAPSHOT/version nameflink-demo/name descriptionDemo project for Spring Boot/description propertiesjava.version8/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingflink.version1.13.6/flink.version /properties dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.23/version/dependency!-- Flink CDC connector for MySQL --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.1.0/versionexclusionsexclusiongroupIdorg.apache.flink/groupIdartifactIdflink-shaded-guava/artifactId/exclusion/exclusions/dependency!-- Flink CDC connector for ES https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7_2.11--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-elasticsearch7_2.11/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge_2.11 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner_2.11 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.11/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients_2.11 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.11 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency /dependencies2、创建测试数据库表users users表结构 CREATE TABLE users (id bigint NOT NULL AUTO_INCREMENT COMMENT ID,name varchar(50) NOT NULL COMMENT 名称,birthday timestamp NULL DEFAULT NULL COMMENT 生日,ts timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci COMMENT用户;3、es索引操作 es操作命令 es索引会自动创建 #设置es分片与副本 curl -X PUT 10.10.22.174:9200/users -u elastic:VaHcSC3mOFfovLWTqW6E -H Content-Type: application/json -d {settings : {number_of_shards : 3,number_of_replicas : 2} }#查询index下全部数据 curl -X GET http://10.10.22.174:9200/users/_search -u elastic:VaHcSC3mOFfovLWTqW6E -H Content-Type: application/json #删除index curl -X DELETE 10.10.22.174:9200/users -u elastic:VaHcSC3mOFfovLWTqW6E本地运行 SpringBootTest class FlinkDemoApplicationTests {/*** flinkCDC* mysql to es* author senfel* date 2023/8/22 14:37 * return void*/Testvoid flinkCDC() throws Exception{EnvironmentSettings fsSettings EnvironmentSettings.newInstance()//.useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env,fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL CREATE TABLE users (\n id BIGINT PRIMARY KEY NOT ENFORCED ,\n name STRING,\n birthday TIMESTAMP(3),\n ts TIMESTAMP(3)\n ) WITH (\n connector mysql-cdc,\n hostname 10.10.10.202,\n port 6456,\n username root,\n password MyNewPass2021,\n server-time-zone Asia/Shanghai,\n database-name cdc,\n table-name users\n );// 输出目标表String sinkDDL CREATE TABLE users_sink_es\n (\n id BIGINT PRIMARY KEY NOT ENFORCED,\n name STRING,\n birthday TIMESTAMP(3),\n ts TIMESTAMP(3)\n ) \n WITH (\n connector elasticsearch-7,\n hosts http://10.10.22.174:9200,\n index users,\n username elastic,\n password VaHcSC3mOFfovLWTqW6E\n );// 简单的聚合处理String transformSQL INSERT INTO users_sink_es SELECT * FROM users;tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result tableEnv.executeSql(transformSQL);result.print();env.execute(mysql-to-es);}请求es用户索引发现并无数据 [rootbluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’ {“took”:0,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:0,“relation”:“eq”},“max_score”:null,“hits”:[]}} 操作mysql数据库新增多条数据 5 senfel 2023-08-30 15:02:28 2023-08-30 15:02:36 6 sebfel2 2023-08-30 15:02:43 2023-08-30 15:02:47 再次获取es用户索引查看数据 [rootbluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’ {“took”:67,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:2,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“6”,“_score”:1.0,“_source”:{“id”:6,“name”:“sebfel2”,“birthday”:“2023-08-30 15:02:43”,“ts”:“2023-08-30 15:02:47”}}]}} 由上测试结果可知本地运行无异常。 集群运行 项目树 1、创建集群运行代码逻辑 /*** FlinkMysqlToEs* author senfel* version 1.0* date 2023/8/22 14:56*/ public class FlinkMysqlToEs {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings EnvironmentSettings.newInstance()//.useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env,fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL CREATE TABLE users (\n id BIGINT PRIMARY KEY NOT ENFORCED ,\n name STRING,\n birthday TIMESTAMP(3),\n ts TIMESTAMP(3)\n ) WITH (\n connector mysql-cdc,\n hostname 10.10.10.202,\n port 6456,\n username root,\n password MyNewPass2021,\n server-time-zone Asia/Shanghai,\n database-name cdc,\n table-name users\n );// 输出目标表String sinkDDL CREATE TABLE users_sink_es\n (\n id BIGINT PRIMARY KEY NOT ENFORCED,\n name STRING,\n birthday TIMESTAMP(3),\n ts TIMESTAMP(3)\n ) \n WITH (\n connector elasticsearch-7,\n hosts http://10.10.22.174:9200,\n index users,\n username elastic,\n password VaHcSC3mOFfovLWTqW6E\n );// 简单的聚合处理String transformSQL INSERT INTO users_sink_es SELECT * FROM users;tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result tableEnv.executeSql(transformSQL);result.print();env.execute(mysql-to-es);} }2、集群运行需要将Flink程序打包不同于普通的jar包这里必须采用shade buildfinalNameflink-demo/finalNamepluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationcreateDependencyReducedPomfalse/createDependencyReducedPomartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/exclude/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludemodule-info.class/excludeexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourceMETA-INF/spring.handlers/resourceresourcereference.conf/resource/transformertransformerimplementationorg.springframework.boot.maven.PropertiesMergingResourceTransformerresourceMETA-INF/spring.factories/resource/transformertransformerimplementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourceMETA-INF/spring.schemas/resource/transformertransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer /transformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClasscom.example.flinkdemo.FlinkMysqlToEs/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins /build将项目打包将包传入集群启动 1、项目打包 mvn package -Dmaven.test.skiptrue 2、手动上传到服务器拷贝如集群内部运行 /opt/flink/bin# ./flink run …/flink-demo.jar 3、测试操作mysql数据库 删除id 6只剩下id5的用户 5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36 4、查询es用户索引 [rootbluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’ {“took”:931,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:1,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}}]}}[ 如上所示es中只剩下了id5的数据 经测试手动部署到集群环境成功。 远程将包部署到flink集群 1、新增controller触发接口 /*** remote runTask* author senfel* date 2023/8/30 16:57 * return org.apache.flink.api.common.JobID*/ GetMapping(/runTask) public JobID runTask() {try {// 集群信息Configuration configuration new Configuration();configuration.setString(JobManagerOptions.ADDRESS, 10.10.22.91);configuration.setInteger(JobManagerOptions.PORT, 6123);configuration.setInteger(RestOptions.PORT, 8081);RestClusterClientStandaloneClusterId client new RestClusterClient(configuration, StandaloneClusterId.getInstance());//jar包存放路径也可以直接调用hdfs中的jarFile jarFile new File(input/flink-demo.jar);SavepointRestoreSettings savepointRestoreSettings SavepointRestoreSettings.none();//构建提交任务参数PackagedProgram program PackagedProgram.newBuilder().setConfiguration(configuration).setEntryPointClassName(com.example.flinkdemo.FlinkMysqlToEs).setJarFile(jarFile).setSavepointRestoreSettings(savepointRestoreSettings).build();//创建任务JobGraph jobGraph PackagedProgramUtils.createJobGraph(program, configuration, 1, false);//提交任务CompletableFutureJobID result client.submitJob(jobGraph);return result.get();} catch (Exception e) {e.printStackTrace();return null;} }2、启动Springboot项目 3、postman请求 4、查看Fink集群控制台 由上图所示已将远程部署完成。 5、测试操作mysql数据库 5 senfel000 2023-08-30 15:02:28 2023-08-30 15:02:36 7 eeeee 2023-08-30 17:12:00 2023-08-30 17:12:04 8 33333 2023-08-30 17:12:08 2023-08-30 17:12:11 6、查询es用户索引 [rootbluejingyu-1 ~]# curl -X GET “http://10.10.22.174:9200/users/_search” -u elastic:VaHcSC3mOFfovLWTqW6E -H ‘Content-Type: application/json’ {“took”:766,“timed_out”:false,“_shards”:{“total”:3,“successful”:3,“skipped”:0,“failed”:0},“hits”:{“total”:{“value”:3,“relation”:“eq”},“max_score”:1.0,“hits”:[{“_index”:“users”,“_type”:“_doc”,“_id”:“5”,“_score”:1.0,“_source”:{“id”:5,“name”:“senfel000”,“birthday”:“2023-08-30 15:02:28”,“ts”:“2023-08-30 15:02:36”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“7”,“_score”:1.0,“_source”:{“id”:7,“name”:“eeeee”,“birthday”:“2023-08-30 17:12:00”,“ts”:“2023-08-30 17:12:04”}},{“_index”:“users”,“_type”:“_doc”,“_id”:“8”,“_score”:1.0,“_source”:{“id”:8,“name”:“33333”,“birthday”:“2023-08-30 17:12:08”,“ts”:“2023-08-30 17:12:11”}}]}} 如上所以es中新增了两条数据 经测试远程发布Flink Task完成。 写在最后 大数据Flink CDC同步Mysql数据到ElasticSearch搭建与测试运行较为简单对于基础的学习测试环境独立集群目前只支持单个任务部署如果需要多个任务或者运用于生产可以采用Yarn与Job分离模式进行部署。
http://www.hkea.cn/news/14521935/

相关文章:

  • 百度网盘私人资源链接seo北京公司
  • 西部数码网站助手 安装棋牌类网站怎么做
  • 政务网站建设发言材料祁东网站建设
  • 虚拟主机建设网站两个优秀企业网站的优缺点
  • 武进常州做网站学网站平面设计
  • 专门下载工程建设标准的网站网站建设搜索优
  • 外贸网站建设大概多少钱wordpress 主题 激活
  • 学年论文网站建设石家庄关键词优化软件
  • 毕节建设局网站html解析wordpress
  • 虚拟主机怎么发布网站贵阳网站建设包首页
  • dw软件做网站哪些网站可以做视频搬运
  • 海口建站模板网站建设和推广的完整话术
  • 国外产品展示网站源码广州各区风险区域最新动态
  • 国内企业网站模板微微营销
  • 欧美做视频网站有哪些网页设计面试自我介绍
  • 建站步骤图视频做网站背景
  • 网上接网站项目宁波比较好的外贸公司
  • 别人网站的字体付费阅读小说网站开发建设源码
  • 安徽建设银行网站哈尔滨电商设计企业
  • 阜蒙县自治区建设学校网站绵阳seo
  • 济南市建设工程招标投标协会网站网站平台建设要多久
  • 佛山如何网站建设在哪里做wordpress没有页脚选项
  • 长沙h5手机网站制作网站上的菠菜游戏哪里可以做
  • 电商网站分析报告网页制作软件dw还需要什么
  • 怎么搭建手机网站m聚豪云免费虚拟主机
  • 金蝶直播wordpress模板优化
  • 站长工具seo推广xampp wordpress 建站教程
  • 不屏蔽网站的浏览器福建省建设干部网站
  • php网站开发工资多少钱wordpress固定链接怎么设置
  • 建网站做seo微信二维码在线制作