苗木网站什么做,莱芜都市网征婚,wordpress设置文章第一张,南通通州建设工程质量监督网站你好#xff0c;我是 shengjk1#xff0c;多年大厂经验#xff0c;努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注#xff01;你会有如下收益#xff1a;
了解大厂经验拥有和大厂相匹配的技术等
希望看什么#xff0c;评论或者私信告诉我#xff01; 文章目录 一…你好我是 shengjk1多年大厂经验努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注你会有如下收益
了解大厂经验拥有和大厂相匹配的技术等
希望看什么评论或者私信告诉我 文章目录 一、前言二、Flink 代码优化2.0 问题发现2.1 原有代码2.2 CompletableFuture 优化 三、avatorscript 使用的简单介绍3.1 自定义函数3.2 从 Map 中取值3.3 使用 Java 的工具类3.4 AviatorScript 函数 四、总结 一、前言
目前 Flink 利用 avatorscript 脚本语言来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下自然而然这里就成为了瓶颈
二、Flink 代码优化
2.0 问题发现 通过 Flink UI 发现 window 算子是瓶颈而 window 算子的核心就是 avatorscript 表达式
2.1 原有代码
xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx经过测试平均执行时间在1毫秒以内但经不住数据量大所以Flink QPS一直在 11w 左右
2.2 CompletableFuture 优化
xxx
ListCompletableFuture executeFuturesnew ArrayList();CompletableFutureObject executeFuture CompletableFuture.supplyAsync(() - {return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);});
executeFutures.add(executeFuture);for (int i 0; i executeFutures.size(); i) {executeFutures.get(i).get()xxxx
}修改完上线后Flink QPS 有原来 11W 增加到 17W 左右
三、avatorscript 使用的简单介绍
为了让你更容易理解 avatorscript这里我们也可以先简单的介绍一下:
3.1 自定义函数
class AddFunction extends AbstractFunction {Overridepublic AviatorObject call(MapString, Object env,AviatorObject arg1, AviatorObject arg2) {Number left FunctionUtils.getNumberValue(arg1, env);Number right FunctionUtils.getNumberValue(arg2, env);return new AviatorDouble(left.intValue() right.intValue());}public String getName() {return add ;}
}public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());System.out.println(AviatorEvaluator.execute( add(2,1) ));
}3.2 从 Map 中取值
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());HashMapString, Object stringObjectHashMap new HashMap();stringObjectHashMap.put( testId1 , 1);stringObjectHashMap.put( testId2 , 2);Object execute AviatorEvaluator.execute( add(testId1,testId2) , stringObjectHashMap);3.3 使用 Java 的工具类
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {HashMapString, Object stringObjectHashMap new HashMap();stringObjectHashMap.put( ip , a1111 );// stringObjectHashMap.put(result, aBCd);stringObjectHashMap.put( voucher_endtime , 2022.03.02 11:32 );stringObjectHashMap.put( imei2 , v1aaaaaa1 );stringObjectHashMap.put( testId , v1ot_service_quality_1111 );stringObjectHashMap.put( testId1 , sku );stringObjectHashMap.put( a , 123 );stringObjectHashMap.put( a1 , null );stringObjectHashMap.put( b1 , 123);AviatorEvaluator.addStaticFunctions( doubleStatic , Double.class);AviatorEvaluator.addInstanceFunctions( doubleInstance , Double.class)execute2 AviatorEvaluator.execute( (doubleStatic.valueOf(sys_net_bandwidth)) , stringObjectHashMap);System.out.println(execute2);execute2 AviatorEvaluator.execute( doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) , stringObjectHashMap);System.out.println( ### execute2);execute2 AviatorEvaluator.execute( doubleInstance.longValue(doubleStatic.valueOf(str(voucher))) , stringObjectHashMap);3.4 AviatorScript 函数
## examples/function.av
fn add(x, y) {return x y;
}
p(add(1,2))public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {String function ## examples/function.av\n \n fn add(x, y) {\n return x y;\n } ;AviatorEvaluator.defineFunction( add , function);System.out.println( defineFunction6666 AviatorEvaluator.execute( add(1,2) , stringObjectHashMap));
}四、总结
本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时还介绍了 avatorscript 的使用方法包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍读者可以更好地了解 Flink 中 avatorscript 的使用方法以及如何优化代码来提高 Flink QPS。