当前位置: 首页 > news >正文

网站专题报道页面怎么做的图书馆网站建设建议

网站专题报道页面怎么做的,图书馆网站建设建议,杭州网站建设 博采网络有限公司,半年工作总结最近需求#xff0c;仅想提高sink2es的qps#xff0c;所以仅调节了sink2es的并行度#xff0c;但在调节不同算子并行度时遇到一些问题#xff0c;找出问题的根本原因解决问题#xff0c;并分析整理。 实例代码 --SET table.exec.state.ttl86400s; --24 hour,默认: 0 ms …最近需求仅想提高sink2es的qps所以仅调节了sink2es的并行度但在调节不同算子并行度时遇到一些问题找出问题的根本原因解决问题并分析整理。 实例代码 --SET table.exec.state.ttl86400s; --24 hour,默认: 0 ms SET table.exec.state.ttl2592000s; --30 days,默认: 0 msCREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src mapstring,string,cur mapstring,string,cus mapstring,string,account_id AS IF(cur[account_id] IS NOT NULL , cur[account_id], src [account_id]),publish_time AS IF(cur[publish_time] IS NOT NULL , cur[publish_time], src [publish_time]),msg_status AS IF(cur[msg_status] IS NOT NULL , cur[msg_status], src [msg_status]),send_type AS IF(cur[send_type] IS NOT NULL , cur[send_type], src [send_type])--event_time as cast(IF(cur[update_time] IS NOT NULL , cur[update_time], src [update_time]) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL 1 MINUTE --SECOND ) WITH (connector kafka,topic t1,properties.bootstrap.servers xx.xx.xx.xx:9092,properties.group.id g1,scan.startup.mode earliest-offset, --group-offsets/earliest-offset/latest-offset-- properties.enable.auto.commit, true -- default:false, 如果为false则在发生checkpoint时触发offset提交format json );CREATE TABLE es_sink(send_type STRING,account_id STRING,publish_time STRING,grouping_id INTEGER,init INTEGER,init_cancel INTEGER,push INTEGER,succ INTEGER,fail INTEGER,init_delete INTEGER,update_time STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED ) with (connector elasticsearch-6,index es_sink,document-type es_sink,hosts http://xxx:9200,format json,filter.null-valuetrue,sink.bulk-flush.max-actions 1000,sink.bulk-flush.max-size 10mb );CREATE view tmp as selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) INSERT and msg_status0 then 1 else 0 end AS init,case when UPPER(opt) UPDATE and send_type1 and msg_status4 then 1 else 0 end AS init_cancel,case when UPPER(opt) UPDATE and msg_status3 then 1 else 0 end AS push,case when UPPER(opt) UPDATE and (msg_status1 or msg_status5) then 1 else 0 end AS succ,case when UPPER(opt) UPDATE and (msg_status2 or msg_status6) then 1 else 0 end AS fail,case when UPPER(opt) DELETE and send_type1 and msg_status0 then 1 else 0 end AS init_delete,event_time,opt,ts FROM kafka_table where (UPPER(opt) INSERT and msg_status0 ) or (UPPER(opt) UPDATE and msg_status in (1,2,3,4,5,6)) or (UPPER(opt) DELETE and send_type1 and msg_status0);--send_type1 send_type0 --初始化-0 初始化-0 --取消-4 --推送-3 推送-3 --成功-1 成功-5 --失败-2 失败-6CREATE view tmp_groupby as selectCOALESCE(send_type,N) AS send_type ,COALESCE(account_id,N) AS account_id ,COALESCE(publish_time,N) AS publish_time ,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id ,sum(init) as init ,sum(init_cancel) as init_cancel ,sum(push) as push ,sum(succ) as succ ,sum(fail) as fail ,sum(init_delete) as init_delete from tmp --GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ()) GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time from tmp_groupby 发现问题 由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同上游算子同一个key的数据可能会下发到下游多个不同算子中。 导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id)报错如下 ...Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [typeversion_conflict_engine_exception, reason[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)... 1 more[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [typeversion_conflict_engine_exception, reason[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]问题原因 Flink中存在八种分区策略常用Operator Chain链接方式有三种分区器 forward上下游并行度相同且不发生shuffle,直连的分区器hash将数据按照key的Hash值下发到下游的算子中rebalance数据会被循环或者随机下发到下游算子中改变并行度若无keyby默认使用RebalancePartitioner分区策略 rebalance分区器可能会将上游算子的同一个key随机下发到下游不同算子中因而引起报错如下图 模型如下 解决方案 分组聚合算子与sink2es算子配置成相同的并行度即使用forward分区器如下图 模型如下 sink2es算子的并行度配置为1如下图 模型如下 总结 归根结底就是需要保证上游subtask中同一个key只能下发到下游一个subtask中。
http://www.hkea.cn/news/14381993/

相关文章:

  • 亚马逊网站开发者平台顺德公司做网站
  • 网站建设意味着什么企业网站的综合要求
  • 建筑模板的规格及价格福州百度快速优化排名
  • 永久免费网站怎么创建网站外链怎么发布
  • 价格优化网站建设网络营销的推广工具有哪些
  • 河西网站建设优化seo网站开发包含的项目和分工
  • 做超市促销海报哪个网站好免费空间和域名
  • 专业seo整站优化新网站上线怎么做seo
  • 接平面设计私活的网站北京给网站做系统的公司名称
  • 如何在百度上搜索到自己的网站在手机上怎么做app软件
  • 什么网站可以在图片上做超链接wordpress不同侧边栏
  • 购物网站设计说明网站制作网站建设需要多少钱
  • 定制网站模板wordpress 短代码嵌套
  • 青岛做门户网站的有哪些wordpress指定分类主动推送百度
  • 做网站需要基础吗广州地铁21号线
  • 怎样自己做代刷网站wordpress伪静态格式
  • 有些网站突然无法访问开网店的流程和费用
  • 一起做网店类似网站北京网站优建设
  • 深圳宝安做网站企业年金怎么查
  • 成都网站建设单位fifa17做任务网站
  • 网站如何做分享wordpress 数据库建立
  • 建设外贸商城网站网络营销是什么的产物
  • 12306网站多少钱做的网站建设 电话咨询
  • 可做生物试卷的网站装饰工程设计东莞网站建设
  • 邯郸有建网站吗哪个公司好些昆明网站设计制作公司
  • 动易网站后台编辑器无效问题让公司做网站要注意什么
  • 网站建设服务支持wordpress 非插件代码高亮
  • 网站开发进度计划是什么专业做酒类营销的网站
  • 制作一个简单网站的代码一个完整的樱花html代码
  • 微信网站推广网站怎么做竞价推广