当前位置: 首页 > news >正文

网站编程好学吗个人养老金制度将推

网站编程好学吗,个人养老金制度将推,网站建设先做后付费,市场营销专业就业方向FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客 参考这篇文章#xff0c;写了kafka到mysql的代码例子#xff0c;因为自己改了表结构#xff0c;运行下面代码#x… FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客 参考这篇文章写了kafka到mysql的代码例子因为自己改了表结构运行下面代码 package org.test.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;//TODO 用DDL实现Kafka到MySQL的数据传输 public class FlinkSQL15_SQL_DDL_Kafka_MySQL {public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);//2.使用DDL的方式加载数据--注册SourceTabletableEnv.executeSql(create table source_sensor(account_id BIGINT) with ( connector.type kafka, connector.version universal, connector.topic testtopic, connector.properties.bootstrap.servers 11.0.24.216:9092, connector.properties.group.id bigdata1109, format.type json ));Table table tableEnv.sqlQuery(select * from source_sensor);//3.注册SinkTable:MysqltableEnv.executeSql(CREATE TABLE spend_report (\n account_id BIGINT,\n PRIMARY KEY (account_id) NOT ENFORCED) with ( connector jdbc, url jdbc:mysql://11.0.24.216:4306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingutf8useSSLfalse,table-name spend_report,username root,password 123456 ));//4.执行查询kafka数据 // Table source_sensor tableEnv.from(source_sensor); // //5.将数据写入Mysql // source_sensor.executeInsert(sink_sensor); //table.executeInsert(sink_sensor);//6.执行任务env.execute();} } 发现报错如下 Exception in thread main org.apache.flink.table.api.TableException: Sink default_catalog.default_database.sink_sensor does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:159)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.IterableLike.foreach(IterableLike.scala:74)at scala.collection.IterableLike.foreach$(IterableLike.scala:73)at scala.collection.AbstractIterable.foreach(Iterable.scala:56)at scala.collection.TraversableLike.map(TraversableLike.scala:286)at scala.collection.TraversableLike.map$(TraversableLike.scala:279)at scala.collection.AbstractTraversable.map(Traversable.scala:108)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)at org.test.flink.FlinkSQL15_SQL_DDL_Kafka_MySQL.main(FlinkSQL15_SQL_DDL_Kafka_MySQL.java:50) 点击table.executeInsert看了下源码 /*** Writes the {link Table} to a {link TableSink} that was registered under the specified path,* and then execute the insert operation.** pSee the documentation of {link TableEnvironment#useDatabase(String)} or {link* TableEnvironment#useCatalog(String)} for the rules on the path resolution.** pA batch {link Table} can only be written to a {code* org.apache.flink.table.sinks.BatchTableSink}, a streaming {link Table} requires a {code* org.apache.flink.table.sinks.AppendStreamTableSink}, a {code* org.apache.flink.table.sinks.RetractStreamTableSink}, or an {code* org.apache.flink.table.sinks.UpsertStreamTableSink}.** pExample:** pre{code* Table table tableEnv.fromQuery(select * from MyTable);* TableResult tableResult table.executeInsert(MySink);* tableResult...* }/pre** param tablePath The path of the registered TableSink to which the Table is written.* return The insert operation execution result.*/TableResult executeInsert(String tablePath); 发现executeInsert方法的参数tablePath需要传入表名这里的表名应该和 tableEnv.executeSql(create table source_sensor(account_id  BIGINT) 的表名source_sensor一致。 将 table.executeInsert(sink_sensor); 改成 table.executeInsert(source_sensor); 后执行成功。 flink1.2的demo完整代码flink-java-1.12.7: flink1.12.7的java demo包括flink wordcount示例如何连接kafka
http://www.hkea.cn/news/14330222/

相关文章:

  • 站长之家seo工具化妆品 网站建设案例
  • 网站开发项目具体的流程网站页面设计公司电话
  • 东莞宣传网站python基础教程for循环
  • 无锡网站建设 百家号公司企业网站程序下载
  • 品牌网站应该怎么做厦门做网站多
  • 内江 网站建设网站系统管理员模块
  • 青海网站建设优化花生壳网站无法登陆
  • 备案成功后怎么建设网站旅行社的网站建设
  • 桂林旅游网站制作阿里云无主体新增网站
  • 城市建设网站鹤岗市连云港优化推广
  • 南充建网站的资料长沙网站定制建设
  • 贵州省建设厅报名网站域名服务网站建设科技公司
  • 常用的网页编辑软件有哪些网站自然优化自学
  • 网站建设的好处有什么用wordpress怎样弄pdf
  • 网站整体优化产品开发流程8个步骤的总结
  • 亚马逊电商网站银川seo
  • 营销型网站建设需要注意什么网站建设方案的摘要
  • 网站建设免费课程怎么做网站备份
  • 奉贤做网站站长统计在线观看
  • 深圳 网站设拼多多网站怎么做
  • 手机网站后台源码自己做培训网站
  • 手机社交网站模板长春建工集团官网
  • wordpress手机版安装wordpress seo教程
  • 图书拍卖网站开发遇到的问题微信里的小程序怎么制作方法
  • 网站优化的作业及意义微信端微网站怎么做
  • php做的网站怎么运行wordpress.org配置
  • 做一个网站的流程oppo软件商店
  • 在百度里面做个网站怎么做的个人网站如何制作教程
  • 十堰市住房和城乡建设厅官方网站张家界网络营销
  • 邢台企业做网站价格中核五公司是国企还是央企