当前位置: 首页 > news >正文

关于计算机网站建设的论文中国建设招标网住建部网站

关于计算机网站建设的论文,中国建设招标网住建部网站,网站建设伍际网络,怎样建立一个企业网站一、简介 flink 自定义实时数据源使用流处理比较简单#xff0c;比如 Kafka、MQ 等#xff0c;如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理#xff0c;比如定时从 mysql 或者 redis 获取一批数据#xff0c;传入 flink 做处…一、简介 flink 自定义实时数据源使用流处理比较简单比如 Kafka、MQ 等如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理比如定时从 mysql 或者 redis 获取一批数据传入 flink 做处理如下简单实现 二、pom.xml 文件 注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本内部自带 下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7Scala:2.12 project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.ye/groupIdartifactIdflink-study/artifactIdversion0.1/versionpackagingjar/packagingnameFlink Quickstart Job/namepropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.16.0/flink.version!--flink.version1.12.7/flink.version--target.java.version1.8/target.java.versionscala.binary.version2.12/scala.binary.versionmaven.compiler.source${target.java.version}/maven.compiler.sourcemaven.compiler.target${target.java.version}/maven.compiler.targetlog4j.version2.17.1/log4j.version/propertiesrepositoriesrepositoryidapache.snapshots/idnameApache Development Snapshot Repository/nameurlhttps://repository.apache.org/content/repositories/snapshots//urlreleasesenabledfalse/enabled/releasessnapshotsenabledtrue/enabled/snapshots/repository/repositoriesdependencies!-- Apache Flink dependencies --!-- These dependencies are provided, because they should not be packaged into the JAR file. --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency !--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!-- Add connector dependencies here. They must be in the default scope (compile). --!-- Example:dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency--!-- Add logging framework, to produce console output when running in the IDE. --!-- These dependencies are excluded from the application JAR by default. --dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/versionscoperuntime/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/versionscoperuntime/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/versionscoperuntime/scope/dependency/dependenciesbuildplugins!-- Java Compiler --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource${target.java.version}/sourcetarget${target.java.version}/target/configuration/plugin!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --!-- Change the value of mainClass.../mainClass if your program entry point changes. --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutions!-- Run shade goal on package phase --executionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationcreateDependencyReducedPomfalse/createDependencyReducedPomartifactSetexcludesexcludeorg.apache.flink:flink-shaded-force-shading/excludeexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludeorg.apache.logging.log4j:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClasscom.ye.DataStreamJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/pluginspluginManagementplugins!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --plugingroupIdorg.eclipse.m2e/groupIdartifactIdlifecycle-mapping/artifactIdversion1.0.0/versionconfigurationlifecycleMappingMetadatapluginExecutionspluginExecutionpluginExecutionFiltergroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversionRange[3.1.1,)/versionRangegoalsgoalshade/goal/goals/pluginExecutionFilteractionignore//action/pluginExecutionpluginExecutionpluginExecutionFiltergroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversionRange[3.1,)/versionRangegoalsgoaltestCompile/goalgoalcompile/goal/goals/pluginExecutionFilteractionignore//action/pluginExecution/pluginExecutions/lifecycleMappingMetadata/configuration/plugin/plugins/pluginManagement/build /project 三、自定义数据源 使用 Timer 定时任务当然也可以使用线程池 Executors自定义数据源每过五秒随机生成一串字符串 public class TimerSinkRich extends RichSourceFunctionString {private ConcurrentLinkedQueueString queue new ConcurrentLinkedQueue();private boolean flag true;private Timer timer;private TimerTask timerTask;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask new TimerTask() {Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random new Random();StringBuilder str new StringBuilder();for (int i 0; i 10; i) {char ranLowLetter (char) ((random.nextInt(26) 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}Overridepublic void run(SourceContextString ctx) throws Exception {while (flag){if(queue.size()0){ctx.collect(queue.remove());}}}Overridepublic void cancel() {if(null!timer) timer.cancel();if(null!timerTask) timerTask.cancel();// 撤销任务时flink 默认 30 s不同 flink 版本可能不同尝试关闭数据源关闭失败 TaskManager 不能释放 slot最终导致失败if(queue.size()0) flag false;} }四、flink 加载数据源并启动 public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSourceString streamSource executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute(TimerSinkStreamJob 定时任务打印数据);} }本地测试成功 五、上传 flink 集群 1、flink 1.16.0 启动成功 撤销任务成功 solt 也成功释放 2、flink 1.12.7 启动成功 撤销任务当然也没问题同样能正常释放 slot 当然你也可以不要 open() 方法 public class DiySinkRich extends RichSourceFunctionString {private TimerTask timerTask;private Timer timer;private boolean flag true;private ConcurrentLinkedQueueString queue new ConcurrentLinkedQueue();Overridepublic void run(SourceFunction.SourceContextString ctx) throws Exception {timerTask new TimerTask() {Overridepublic void run() {Random random new Random();StringBuilder str new StringBuilder();for (int i 0; i 10; i) {char ranLowLetter (char) ((random.nextInt(26) 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() 0) {ctx.collect(queue.remove());}}}Overridepublic void cancel() {if (timer ! null) timer.cancel();if (timerTask ! null) timerTask.cancel();if (queue.size() 0) flag false;} }以上就是 flink 定时加载数据源的简单实例
http://www.hkea.cn/news/14489471/

相关文章:

  • 本科学计算机是做网站吗做微网站的公司哪家好
  • 如何建个人网站教程公司网站优化推广方案
  • 中国建设银行亚洲网站专业的营销型网站定制
  • 摄影网站建设论文WordPress小工具是什么
  • 网站内容建设包括设计公司企业站
  • 济南网站建设公司哪个好点呢株洲关键词优化
  • 个人网站好备案吗PHP网站建设的课后笔记
  • 南宁网站定制易网
  • 济南网站建设yigeseo哈尔滨免费做网站
  • 山东网站建设服务网站建设的 关键词
  • 网站如何seo网站开发调查问卷题
  • 建设网站哪些公司好北京手机模板建站
  • 成都网站设计公司价格基于jsp的网站建设论文
  • 廊坊网站建站网站wordpress微信 群发
  • 寮步营销型网站建设价格创意设计提案
  • 展示类网站cms苏州做网站推广的公司哪家好
  • 建设银行360网站登录不了深圳设计网站排名
  • 景德镇网站维护电子商务网站建设规划
  • 淘宝网站建设的主要工作设计素材网站大全网站
  • 网站设计远程培训设计页面ui
  • 旅游网站建设的建议怎么用txt做网站
  • 网站建设的目标客户分析wordpress抽奖主题
  • 温州免费建站模板企业网是什么意思
  • 上海网站设计印刷拆除wordpress分类模板
  • 网站开发 质保金微信内转发的网页怎么制作
  • 青岛市工程建设信息网站建设领域工人管理网站
  • 网站红色济南网站建设和维护
  • 淘特app推广代理企业站seo点击软件
  • 永康网站建设公司数学很差能学计算机吗
  • 个人网站建设的流程建设部职业资格注册网站