沭阳建设局网站,祁东网站设计公司,离婚协议书模板免费下载,济南建设网站制作优化分析2.1 官方案例运行
运行官方提供案例#xff0c;使用【$SPARK_HOME/bin/run-example】命令运行#xff0c;效果如下#xff1a;
具体步骤如下#xff1a;
第一步、准备数据源启动端口#xff0c;准备数据 nc -lk 9999 spark spark hive hadoop spark hive 第二步、运行…2.1 官方案例运行
运行官方提供案例使用【$SPARK_HOME/bin/run-example】命令运行效果如下
具体步骤如下
第一步、准备数据源启动端口准备数据 nc -lk 9999 spark spark hive hadoop spark hive 第二步、运行官方案例
使用官方提供命令行运行案例
# 官方入门案例运行词频统计
/export/server/spark/bin/run-example --master local[2] streaming.NetworkWordCount node1.itcast.cn 9999 第三步、运行结果 SparkStreaming模块对流式数据处理介于Batch批处理和RealTime实时处理之间处理数据方式。
2.2 编程实现
基于IDEA集成开发环境编程实现从TCP Socket实时读取流式数据对每批次中数据进行词频统计WordCount。
StreamingContext 回顾SparkCore和SparkSQL及SparkStreaming处理数据时编程
1、SparkCore
数据结构RDDSparkContext上下文实例对象
2、SparkSQL
数据结构Dataset/DataFrame RDD SchemaSparkSession会话实例对象 在Spark 1.x中SQLContext/HiveContext
3、SparkStreaming
数据结构DStream Seq[RDD]StreamingContext流式上下文实例对象底层还是SparkContext参数划分流式数据时间间隔BatchInterval1s5s演示
文档http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#initializing-streamingcontext 从官方文档可知提供两种方式构建StreamingContext实例对象截图如下
第一种方式构建SparkConf对象 第二种方式构建SparkContext对象 编写代码 针对SparkStreaming流式应用来说代码逻辑大致如下五个步骤 1、Define the input sources by creating input DStreams. 定义从哪个数据源接收流式数据封装到DStream中 2、Define the streaming computations by applying transformation and output operations to DStreams. 针对业务调用DStream中函数进行数据处理和输出 3、Start receiving data and processing it using streamingContext.start(). 4 、 Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination(). 5、The processing can be manually stopped using streamingContext.stop(). 启动流式应用并且一直等待程序终止人为或异常最后停止运行 完整StreamingWordCount代码如下所示
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 基于IDEA集成开发环境编程实现从TCP Socket实时读取流式数据对每批次中数据进行词频统计。
*/
object StreamingWordCount {
def main(args: Array[String]): Unit {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext {
// a. 创建SparkConf对象设置应用配置信息
val sparkConf new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix($))
.setMaster(local[3])
// b.创建流式上下文对象, 传递SparkConf对象TODO: 时间间隔 - 用于划分流式数据为很多批次Batch
val context new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 从数据源端读取数据此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] ssc.socketTextStream(
node1.itcast.cn, 9999
)
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] inputDStream
// 过滤不合格的数据
.filter(line null ! line line.trim.length 0)
// 按照分隔符划分单词
.flatMap(line line.trim.split(\\s))
// 转换数据为二元组表示每个单词出现一次
.map(word (word, 1))
// 按照单词分组聚合统计
.reduceByKey((tmp, item) tmp item)
// TODO: 4. 将结果数据输出 - 将每批次的数据处理以后输出
resultDStream.print(10)
// TODO: 5. 对于流式应用来说需要启动应用
ssc.start()
// 流式应用启动以后正常情况一直运行接收数据、处理数据和输出数据除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一是否关闭SparkContext参数二是否优雅的关闭
ssc.stop(stopSparkContext true, stopGracefully true)
}
}运行结果监控截图
Streaming 应用监控 运行上述词频统计案例登录到WEB UI监控页面http://localhost:4040查看相关监控信息。
其一、Streaming流式应用概要信息 每批次Batch数据处理总时间TD 批次调度延迟时间SD 批次数据处理时间PT。 其二、性能衡量标准 SparkStreaming实时处理数据性能如何是否可以实时处理数据如何衡量的呢 每批次数据处理时间TD BatchInterval每批次时间间隔