当前位置: 首页 > news >正文

建设与管理局网站新产品推广方案范文

建设与管理局网站,新产品推广方案范文,资源类网站怎么做,自己做的视频网站上传电影重试机制 当任务出现异常的时候,会直接停止任务——解决方式,重试机制 1、设置checkpoint后,会给任务一个重启策略——无限重启 2、可以手动设置任务的重启策略 代码设置 //开启checkpoint后,默认是无限重启,可以…

重试机制

当任务出现异常的时候,会直接停止任务——解决方式,重试机制

1、设置checkpoint后,会给任务一个重启策略——无限重启

2、可以手动设置任务的重启策略

代码设置

//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,  // 无限重启次数Time.of(10, TimeUnit.SECONDS)  // 每次重启的延迟时间
));

维表join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果

那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

维表一般的特点是变化比较慢。——名词表,维度表。

解决方式

 解决维表join的方式方式一:可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。缺点:数据不能动态改变了方式二:在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次缺点:数据可以动态改变,但是去mysql查找的次数太多了方式三:创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)唯一的是,若是多线程,可能会去mysql查询多次

方式一

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.List;
import java.util.Map;
import java.util.Properties;/*** 直接从mysql中拿出* 弊端 只能拿到一次 不能实现动态*/
public class _03_维表join_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;List<Map<String, Object>> list = null;@Overridepublic void open(Configuration parameters) throws Exception {// 在open中执行sqlpool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);String sql = "select * from city ";list = queryRunner.query(sql, new MapListHandler());}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] split = line.split(",");Object cityName = "未知";for (Map<String, Object> map : list) {String cityId = (String)map.get("city_id");if (cityId.equals(split[1])){cityName = map.get("city_name");}}return line+","+cityName;}}).print();env.execute();}
}

方式二

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Map;
import java.util.Properties;/*** 每次从kafka中拿到一条数据就从mysql中查一遍* 弊端 对mysql的压力加大*/
public class _03_维表join_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {// 在处理逻辑中执行sqlString[] split = line.split(",");String sql = "select city_name from city where city_id = ?";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);String cityName="未知";if (rs !=null){cityName = (String) rs.get("city_name");}return line+","+cityName;}}).print();env.execute();}
}

方式三

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** 最终 非常好的方式* 现在内存中查 查不到在去mysql中找* 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法*/
public class _03_维表join_03_cache {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);// 记得设置并行度env.setParallelism(1);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;// 定义一个Cache// 第一个是传入的参数类型 第二个是存放的值的类型// 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值LoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(50, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListener<String, String>() {@Overridepublic void onRemoval(RemovalNotification<String, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {String sql = "select city_name from city where city_id = ? ";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);String cityName = null;if (rs!=null){cityName = (String) rs.get("city_name");}System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);return cityName;}});}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] arr = line.split(",");// 使用这种方式取值String cityName = cache.get(arr[1]);return line+","+cityName;}}).print();env.execute();}
}

http://www.hkea.cn/news/729849/

相关文章:

  • 毕业设计做网站怎么样微信crm管理系统
  • 个人网站开发多少钱电脑培训班零基础
  • 互联网有哪些岗位宁波免费seo在线优化
  • 惠州做棋牌网站建设哪家技术好哪里的网络推广培训好
  • 如何做线上赌博的网站推广策略有哪些方法
  • 男的女的做那个视频网站百度收录需要多久
  • 大通县wap网站建设公司网站免费制作
  • 哪个网站教做公众号甘肃百度推广电话
  • 网站怎么让百度收录广告网络推广
  • 小型网站设计及建设论文定制网站制作公司
  • 视频网站建设费用排名优化网站seo排名
  • 怎么自己做网站服务器linux百度账号查询
  • 梧州网站推广方案百度热搜 百度指数
  • 网站不兼容ie6自助建站模板
  • 甘肃网站建设公司百中搜优化软件
  • 国内外贸网站建设公司seo教程 百度网盘
  • 一物一码二维码生成系统最好用的系统优化软件
  • 如何在大网站做外链镇江网站建站
  • 杭州网站建设公司导航短视频营销案例
  • 昆明做网站建设有哪些长尾关键词排名工具
  • 一女被多男做的视频网站网站seo系统
  • 网站建设 青海网站建设找哪家好
  • win7 网站配置优化方案官网电子版
  • 广州seo优化公司排名浙江seo博客
  • 全网推广的方式有哪些抖音seo推荐算法
  • 网站开发开源架构抖音营销软件
  • 自己做的网站能放到网上么青岛seo经理
  • 营业推广策划方案邵阳网站seo
  • 手机网站横向切换kol合作推广
  • 专门做超市海报的网站宁波seo咨询