当前位置: 首页 > news >正文

企业网站托管软件推广的渠道是哪里找的

企业网站托管,软件推广的渠道是哪里找的,网站制作需要哪些软件,电脑上不了建设厅网站📌 当前需要处理的业务场景: 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询. 同步数据到 es. 概述 1. 什么是 CDC 2. 什么是 Flink CDC 3. Flink CDC Connectors 和 Flink 的版本映射 实战 1. 宽表查…

📌 当前需要处理的业务场景:

  • 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询.

  • 同步数据到 es.

  • 概述

    • 1. 什么是 CDC

    • 2. 什么是 Flink CDC

    • 3. Flink CDC Connectors 和 Flink 的版本映射

  • 实战

    • 1. 宽表查询

      • 1.1 创建 mysql 表

      • 1.2 启动 Flink 集群和 Flink SQL CLI

      • 1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

      • 1.4 关联订单数据并且将其写入 Elasticsearch 中

      • 1.5 修改 MySQL 中的数据

本文用到的 mysql 版本为 8.0.28,ES 版本为 7.17.3,flink 版本为 1.13.6,flink-sql-connector-mysql-cdc 版本为 2.2.1

flink-sql-connector-elasticsearch 版本为 7_2.11-1.13.2

概述

1. 什么是 CDC

CDC (Change Data Capture) 是 变更数据获取的简称。核心思想是监测并捕获数据库的变动(数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整地记录下来,写入到消息中间件中以供其他服务进行订阅并消费。

2. 什么是 Flink CDC

Flink 社区开发了 flink-cdc-connectors 组件,这个一个可以直接从 MySQL、PostgreSQL等数据库直接 读取全量数据增量变更数据的source 组件。

3. Flink CDC Connectors 和 Flink 的版本映射

Flink CDC Conectors VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*

实战

1. 宽表查询

在商城项目中,商品、订单、物流的数据往往是存储在 MySQL 中,为了加速查询的效率,可以将数据组织成一张宽表,并实时地把它写到 ElasticSearch 中。

确保要监听的 mysql 服务器开启了 binlog 和对应监听 binlog 的账号有相对应的权限。

1.1 创建 mysql 表

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
​
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
​
CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
​
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);CREATE TABLE shipments (shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_id INTEGER NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),(default,10003,'Shanghai','Hangzhou',false);

1.2 启动 Flink 集群和 Flink SQL CLI

# 进入 Flink 目录
cd flink-13.6
​
# 启动 Flink 集群.启动成功的话,可以在 http://ip:8081/ 访问到对应的 Flink Web UI
./bin/start-cluster.sh
​
# 启动 Flink SQL CLI
./bin/sql-client.sh

1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

首先开启 checkpoint, 每隔 3s 做一次 checkpoint.

-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;

然后,对于数据库中的表 productsordersshipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQL
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'products');
​
​
Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'orders');
​
​
Flink SQL> CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'shipments');

最后,创建 enriched_orders 表,用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://114.132.43.99:9200','index' = 'enriched_orders');

1.4 关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQL
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.idLEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在可以通过 Kibana 查询包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

1.5 修改 MySQL 中的数据

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
​
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新.

http://www.hkea.cn/news/453726/

相关文章:

  • 做网站私活长沙网络营销公司
  • 网站建设公司 广告法被处罚沧州网络推广外包公司
  • 电商网站 开发成本惠州seo外包服务
  • 佛山做网站建设价格百度网盘官方下载
  • 网上购物商城网站建设个人免费域名注册网站
  • 成都学网站建设电子营销主要做什么
  • 织梦cms通用蓝白简介大气企业网站环保科技公司源码网络推广员招聘
  • 网站后台怎么添加图片视频app推广
  • 网站秒收录怎么做的经典软文案例和扶贫农产品软文
  • 珠海疫情最新情况厦门搜索引擎优化
  • 中国菲律宾历史战绩网站关键词优化工具
  • 西宁网站建设最好的公司哪家好优秀网站设计案例
  • 沧州做网站费用搜索引擎优化是做什么的
  • 社区网站推广方案线上运营的5个步骤
  • 湘潭学校网站建设 z磐石网络网站关键词优化教程
  • wordpress多程序用户同步汕头seo排名
  • 旅游网站 建设平台分析百度seo一本通
  • 怎么用dw做网站app开发网站
  • 昆山做网站的公司有哪些seo整站优化推广
  • 网站建设谈单情景对话青岛seo百科
  • 网站做自适应好不好网页分析报告案例
  • 大连手机自适应网站建设公司seo诊断站长
  • 有哪些好的网站十大电商代运营公司
  • 个人网页设计欣赏网站整站优化快速排名
  • 多少钱立案seo 公司
  • 医学类的网站做Google百度怎么优化排名
  • 手机网站怎样做枸橼酸西地那非片的功效与作用
  • 邯郸做wap网站的公司六六seo基础运营第三讲
  • 六安市建设银行网站seo编辑的工作内容
  • seo外包平台福州百度快照优化