优速网站建设,网站建设飠金手指科杰十二,做宠物网站心得,做网站的公司 贵阳一、说明
如果Flink没有提供给我们可以直接使用的连接器#xff0c;那我们如果想将数据存储到我们自己的存储设备中#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表
CREATE TABLE sensor (id int(10)
) ENGINEInnoDB DEFAULT CHARSETutf8二…一、说明
如果Flink没有提供给我们可以直接使用的连接器那我们如果想将数据存储到我们自己的存储设备中mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表
CREATE TABLE sensor (id int(10)
) ENGINEInnoDB DEFAULT CHARSETutf8二、pom.xml 导入驱动
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version
/dependency三、编写程序
package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);KeyedStreamInteger, Integer keyedStream dataStreamSource.keyBy(new KeySelectorInteger, Integer() {Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class MysqlSink extends RichSinkFunctionInteger{private Connection sunbo;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);sunbo DriverManager.getConnection(jdbc:mysql://192.168.220.100:3306/test?useSSLfalse, sunbo, Mysql123456#);}Overridepublic void close() throws Exception {if (sunbo ! null) {sunbo.close();}}Overridepublic void invoke(Integer value, Context context) throws Exception {String sql insert into sensor(id)values(?);PreparedStatement ps sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}四、运行测试