当前位置: 首页 > news >正文

银川做网站最好的公司有哪些小程序直播

银川做网站最好的公司有哪些,小程序直播,wordpress投稿送积分,网站备案号显示红色简述 Apache Flink 是一个流处理和批处理的开源框架#xff0c;它允许从各种数据源#xff08;如 Kafka#xff09;读取数据#xff0c;处理数据#xff0c;然后将数据写入到不同的目标系统#xff08;如 MongoDB#xff09;。以下是一个简化的流程#xff0c;描述如何…简述 Apache Flink 是一个流处理和批处理的开源框架它允许从各种数据源如 Kafka读取数据处理数据然后将数据写入到不同的目标系统如 MongoDB。以下是一个简化的流程描述如何使用 Flink 从 Kafka 读取数据并保存到 MongoDB 1、环境准备 安装并配置 Apache Flink。安装并配置 Apache Kafka。安装并配置 MongoDB。创建一个 Kafka 主题并发送一些测试数据。确保 Flink 可以连接到 Kafka 和 MongoDB。 部署参考 1、flinkFlink 部署执行模式 2、kafkaFlink mongo Kafka 3、mongoDbmongo副本集本地部署 2. 添加依赖 在Flink 项目中需要添加 Kafka 和 MongoDB 的连接器依赖。对于 Maven 项目可以在 pom.xml 文件中添加相应的依赖。 对于 Kafka需要添加 Flink Kafka Connector 的依赖。 对于 MongoDB需要添加 Flink MongoDB Sink 的依赖。 3. 编写 Flink 作业 * 创建一个 Flink 作业使用 Flink 的 FlinkKafkaConsumer 从 Kafka 主题中读取数据。 * 对读取的数据进行必要的转换或处理。 * 使用 MongoDB 的 Java 驱动程序或第三方库将处理后的数据写入 MongoDB。4. 运行 Flink 作业 使用 Flink 的命令行工具或 IDE 运行 Flink 作业。确保 Kafka 和 MongoDB 正在运行并且 Flink 可以访问它们。 参考Flink 命令行提交、展示和取消作业 5. 监控和调试 使用 Flink 的 Web UI 或其他监控工具来监控作业。如果出现问题检查日志并进行调试。 6. 优化和扩展 根据需求和数据量优化 Flink 作业的性能和可扩展性。这可能包括调整并行度、增加资源、优化数据处理逻辑等。 代码 package com.wfg.flink.connector.kafka;import com.mongodb.client.model.InsertOneModel; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.mongodb.sink.MongoSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS; import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;/*** author wfg*/ public class KafkaToWriteMongo {public static void main(String[] args) throws Exception {// 1. 设置 Flink 执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(KAFKA_BROKERS).setTopics(TEST_TOPIC_PV).setGroupId(my-test-topic-pv).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString rs env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);// 创建RollingFileSinkMongoSinkString sink MongoSink.Stringbuilder().setUri(mongodb://root:123456127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSetrs0authSourceadmin).setDatabase(sjzz).setCollection(TestMongoPv).setMaxRetries(3) // .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setSerializationSchema((input, context) - {System.out.println(input);return new InsertOneModel(BsonDocument.parse(input));}).build();rs.sinkTo(sink);// 6. 执行 Flink 作业env.execute(Kafka Flink Job);} }
http://www.hkea.cn/news/14381344/

相关文章:

  • 电子商务网站开发实践费县住房和城乡建设局网站
  • 深圳网站设计九曲网站测速工具
  • 网站开发工期安排表wordpress pc手机端两套模板
  • dedecms搭建购物网站铜梁城乡建设网站
  • 做网站的困难wordpress广告赚钱
  • 如何设计和建立一个公司的网站手机wap网站是什么
  • 有交做拼多多网站的吗优化二十条
  • 苏州调查公司怎么收费搜索引擎营销优化诊断训练
  • 临沂网站推广app怎么制作多少钱
  • 海尔网站建设辽宁省网站建设
  • 响应式网站和不响应式陕西网络推广介绍
  • 营销型网站建设新感觉建站重庆放心seo整站优化
  • 做理财的网站有哪些问题建e网app
  • 网站的建设ppt模板网站ui怎么做的
  • 哪里有网站建设哪家好WordPress怎么文章分类
  • 深圳平台型网站建设公司做药品网站有哪些
  • 网站设计的初衷wordpress 个人资料按钮
  • 建设项目环保备案登记网站尚学教育
  • 网站建设哪里便宜网站建设什么意思
  • 258网站建设开鲁网站seo
  • 传统设计公司网站外贸网络推广
  • 开发技术网站开发技术做班级玩网站做哪些方面
  • 网站建设客户去哪里找淮安市哪里有做网站
  • 网站建设的团队分工仿站怎么修改成自己的网站
  • 三角镇建网站公司网站建设布吉
  • 长沙做网站公做海报的网站知乎
  • 环保设计院的网站建设成都市住房和城乡建设局电话
  • 网站运营部的职责做外贸要有英文网站吗
  • asp网站数据库扫描物流公司介绍模板
  • 福永网站建设公司有没有无代码网站建设