当前位置: 首页 > news >正文

个人能建设网站吗王也高清壁纸第三季

个人能建设网站吗,王也高清壁纸第三季,浙江品牌网站建设,河北网站建设推广电话水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 … 水善利万物而不争处众人之所恶故几于道 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 fromCollection(waterSensors) public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ListWaterSensor waterSensors Arrays.asList(new WaterSensor(ws_001, 1577844001L, 45),new WaterSensor(ws_002, 1577844015L, 43),new WaterSensor(ws_003, 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 2. 从本地文件中读取数据 readTextFile(“input/words.txt”)支持相对路径和绝对路径 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(input/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}运行结果 3. 从HDFS中读取数据 readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”) 要先在pom文件中添加hadoop-client依赖 dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(hdfs://hadoop101:8020/flink/data/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 4. 从Socket中读取数据 socketTextStream(“hadoop101”,9999)这个输入源不支持多个并行度。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据 windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream(hadoop101,9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 5. 从Kafka中读取数据 addSource(new FlinkKafkaConsumer(“flink_source_kafka”,new SimpleStringSchema(),properties)) 第一个参数是topic 第二个参数是序列化器序列化器就是在Kafka和flink之间转换数据 - 官方注释The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。 第三个参数是Kafka的配置。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties new Properties();// 设置集群地址properties.setProperty(bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092);// 设置所属消费者组properties.setProperty(group.id, flink_consumer_group);env.addSource(new FlinkKafkaConsumer(flink_source_kafka,new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 6. 自定义Source addSource(new XXXX()) 大多数情况下前面的数据源已经能够满足需要但是难免会存在特殊情况的场合所以flink也提供了能自定义数据源的方式. public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}} }自定义数据源需要定义一个类然后实现SourceFunction接口然后实现其中的两个方法run和cancelrun方法包含具体读数据的逻辑当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止 public class RandomWatersensor implements SourceFunctionWaterSensor {private Boolean running true;Overridepublic void run(SourceContextWaterSensor sourceContext) throws Exception {Random random new Random();while (running){sourceContext.collect(new WaterSensor(sensor random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {running false;}}运行结果 demo2 - 自定义从socket中读取数据 public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource(hadoop102, 9999)).print();env.execute();}public static class MySource implements SourceFunctionWaterSensor {private String host;private int port;private volatile boolean isRunning true;private Socket socket;public MySource(String host, int port) {this.host host;this.port port;}Overridepublic void run(SourceContextWaterSensor ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket new Socket(host, port);BufferedReader reader new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line null;while (isRunning (line reader.readLine()) ! null) {String[] split line.split(,);ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {isRunning false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}} } /* sensor_1,1607527992000,20 sensor_1,1607527993000,40 sensor_1,1607527994000,50*/
http://www.hkea.cn/news/14312449/

相关文章:

  • 好看欧美视频网站模板下载 迅雷下载地址如何建立一个自己的网站啊
  • 域名做违法网站网站搭建官网
  • 东阿网站制作买表的网站
  • 被称为网页制作三剑客的是谷歌seo软件
  • 上海网站开发工程师企业画册模板
  • 做网站推广的方法有哪些给别人做网站必须有icp
  • 外国风格网站建设用途一个软件的制作过程
  • 做网站需要什么 图片视频泉州市住房和城乡建设局
  • 虫点子创意设计公司seo企业网站源码
  • 怎么做网站宣传转入已备案网站
  • 公司网站表达的内容企业营销型网站设计
  • 平台型网站如何推广微网站怎么注册账号
  • 国家企业信用公示信息系统(安徽)医疗网站怎么做优化
  • cnzz网站建设百度广告投放
  • 网站主持人制作方法微信开发者工具教程实例
  • 音乐网站程序源码wordpress的cdn缓存
  • 如何做com的网站wordpress引用js插件
  • 软件定制一般价格百度seo快速排名
  • 网站建设证据保全广告设计是干嘛的
  • 济南住房与城乡建设局网站小规模公司自学做账
  • 深圳企业网站制作企业58同城黄页推广
  • 备案网站 cdn网站怎么添加js广告位
  • 广西建设教育网站网络营销方式分析与对比
  • 网站技术团队网页浏览器哪个好
  • 个人网站设计说明手机分销网站公司
  • 设计本网站是用什么做的官方小程序开发
  • 建设专业网站哪家技术好wordpress中文摘要
  • 建站哪个网站好网络培训心得体会5篇
  • 国际教育机构网站建设开发方案免费申请靓号
  • 衡阳退休职工做面膜网站sem和seo是什么职位