当前位置: 首页 > news >正文

网站建设学校培训学校网络营销与直播电商专升本

网站建设学校培训学校,网络营销与直播电商专升本,自己做网站难,网站开发 07551 FileBeat Filebeat 是使用 Golang 实现的轻量型日志采集器,也是 Elasticsearch stack 里面的一员。本质上是一个 agent ,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。 1.1 FileBeat 安装与使用 …

1 FileBeat

Filebeat 是使用 Golang 实现的轻量型日志采集器,也是 Elasticsearch stack 里面的一员。本质上是一个 agent ,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。

1.1 FileBeat 安装与使用

从 官网 下载对应的版本,我这里的 ElasticSearch 版本号是 6.4.3,所以下载 FileBeat 的版本也是 6.4.3
下载后解压:

cd /home/software
tar -zxvf filebeat-6.4.3-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.4.3-linux-x86_64/ filebeat-6.4.3

配置 Filebeat,可以参考 filebeat.full.yml 中的配置

vim /usr/local/filebeat-6.4.3/filebeat.yml

内容如下:

###################### Filebeat Configuration Example #########################
filebeat.prospectors:- input_type: logpaths:## 定义了日志文件路径,可以采用模糊匹配模式,如*.log- /workspaces/logs/logCollector/app-collector.log#定义写入 ES 时的 _type 值document_type: "app-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 [ 开头的字符串)negate: true                                # 是否需要匹配到match: after                                # 不匹配的行,合并到上一行的末尾max_lines: 2000                             # 最大的行数timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志fields: ## topic 对应的消息字段或自定义增加的字段logbiz: collectorlogtopic: app-log-collector   ## 按服务划分用作kafka topic,会在logstash filter 过滤数据时候作为 判断参数 [fields][logtopic]evn: dev- input_type: logpaths:## 定义了日志文件路径,可以采用模糊匹配模式,如*.log- /workspaces/logs/logCollector/error-collector.log#定义写入 ES 时的 _type 值document_type: "error-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 [ 开头的字符串)negate: true                                # 是否匹配到match: after                                # 不匹配的行,合并到上一行的末尾max_lines: 2000                             # 最大的行数timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志,直接进行推送操作fields: ## topic 对应的消息字段或自定义增加的字段logbiz: collectorlogtopic: error-log-collector   ## 按服务划分用作kafka topicevn: devoutput.kafka: ## filebeat 支持多种输出,支持向 kafka,logstash,elasticsearch 输出数据,此处设置数据输出到 kafka。enabled: true ## 启动这个模块hosts: ["192.168.212.128:9092"] ## 地址topic: '%{[fields.logtopic]}'  ## 主题(使用动态变量)partition.hash:  ## kafka 分区 hash 规则reachable_only: truecompression: gzip  ## 数据压缩max_message_bytes: 1000000  ## 最大容量required_acks: 1  ## 是否需要 ack
logging.to_files: true

检查配置是否正确:

./filebeat -c filebeat.yml -configtest

启动filebeat:

/usr/local/filebeat-6.4.3/filebeat &

注:需要启动 kafka

2 Logstash 日志过滤

在 《Elasticsearch入门笔记(Logstash数据同步)》 这边文章中已经介绍过任何安装配置 Logstash 了,不过那时输入的数据源是来自于 MySQL,此时输入的数据源是 Kafka。
/usr/local/logstash-6.4.3 目录下新建一个 script 用于存放对接 Kafka 的配置文件。
以下是该目录下创建的 logstash-script.conf 文件:

## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {kafka {topics_pattern => "app-log-.*"  ## kafka 主题 topicbootstrap_servers => "192.168.212.128:9092"  ## kafka 地址codec => json  ## 数据格式consumer_threads => 1  ## 增加consumer的并行消费线程数(数值可以设置为 kafka 的分片数)decorate_events => truegroup_id => "app-log-group" ## kafka 组别}kafka {topics_pattern => "error-log-.*"  ## kafka 主题 topicbootstrap_servers => "192.168.212.128:9092" ## kafka 地址codec => json  ## 数据格式consumer_threads => 1  ## 增加consumer的并行消费线程数(数值可以设置为 kafka 的分片数)decorate_events => truegroup_id => "error-log-group" ## kafka 组别}
}filter {## 时区转换,这里使用 ruby 语言,因为 logstash 本身是东八区的,这个时区比北京时间慢8小时,所以这里采用 ruby 语言设置为北京时区ruby {code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"}## [fields][logtopic] 这个是从 FileBeat 定义传入 Kafka 的if "app-log" in [fields][logtopic]{grok {## 表达式,这里对应的是Springboot输出的日志格式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}## [fields][logtopic] 这个是从 FileBeat 定义传入 Kafka 的if "error-log" in [fields][logtopic]{grok {## 表达式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}
}## 测试输出到控制台:
## 命令行输入 ./logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf  --verbose --debug
output {stdout { codec => rubydebug }
}## elasticsearch:
output {if "app-log" in [fields][logtopic]{## es插件elasticsearch {# es服务地址hosts => ["192.168.212.128:9200"]## 索引名,%{index_time} 是由上面配置的 ruby 脚本定义的日期时间,即每天生成一个索引index => "app-log-%{[fields][logbiz]}-%{index_time}"# 是否嗅探集群ip:一般设置true# 只需要知道一台 elasticsearch 的地址,就可以访问这一台对应的整个 elasticsearch 集群sniffing => true# logstash默认自带一个mapping模板,进行模板覆盖template_overwrite => true}}if "error-log" in [fields][logtopic]{elasticsearch {hosts => ["192.168.212.128:9200"]index => "error-log-%{[fields][logbiz]}-%{index_time}"sniffing => truetemplate_overwrite => true} }
}

查看一下其中的过滤规则,这需要结合 《日志收集笔记(架构设计、Log4j2项目初始化、Lombok)》 文章中的定义日志输出格式一起看:

["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
  • "message":logstash 固定的格式,统一叫传入的数据为 message
  • \[%{NOTSPACE:currentDateTime}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 currentDateTime
  • \[%{NOTSPACE:level}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 level,日志级别
  • \[%{NOTSPACE:thread-id}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 thread-id,线程ID
  • \[%{NOTSPACE:class}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 class,创建对应 logger 实例传入的 class
  • \[%{DATA:hostName}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用主机名称
  • \[%{DATA:ip}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用的 IP
  • \[%{DATA:applicationName}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用的 applicationName
  • \[%{DATA:location}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 location
  • \[%{DATA:messageInfo}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 messageInfo,日志输出的自定义内容
  • (\'\'|%{QUOTEDSTRING:throwable}):两个 ' 单引号之间的 | 表示,之间可为空,不为空就是 throwable 异常信息

启动 logstash:

/usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf
## 如果测试时,想要控制台输出日志,输入以下命令
/usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf --verbose --debug
http://www.hkea.cn/news/933116/

相关文章:

  • 深圳css3网站开发多少钱如何策划一个营销方案
  • 织梦统计网站访问量代码网络营销公司如何建立
  • 外贸营销型网站建设今日最新重大新闻
  • 个性化定制网站长春网络推广优化
  • 合肥庐阳区疫情最新消息seo优化首页
  • h5网站制作接单最新中高风险地区名单
  • 北京市住房城乡建设委网站公司怎么在网上推广
  • 网站建设首页怎样插入视频百度指数在线查询小程序
  • 青州网站制作哪家好aso优化哪家好
  • wordpress做网站优点郑州网站seo优化
  • 宝安做棋牌网站建设找哪家公司好湖南长沙疫情最新消息
  • 四川专业网站建设中国十大企业培训机构排名
  • 怎么切页面做网站灰色词首页排名接单
  • 网站右侧浮动广告代码百度推广代理公司广州
  • 固原建站公司旺道seo推广系统怎么收费
  • 适合做外链的网站海外广告联盟平台推广
  • 建筑模板规格型号郑州厉害的seo顾问
  • ppt做书模板下载网站有哪些内容国际婚恋网站排名
  • 上海网站建设内容更新网络营销策划目的
  • 重庆市建设信息网站关键词查询网
  • 做哪种网站流量大怎么打广告宣传自己的产品
  • 免费表白网站制作seo网络优化推广
  • 网站建设中可能升级中国科技新闻网
  • 网站制作内容文案网站如何快速被百度收录
  • 淘宝淘宝网页版登录入口免费seo公司
  • 竹溪县县建设局网站短视频营销
  • 好的网站有哪些搜索引擎seo是什么意思
  • 做音乐网站赚钱吗做小程序的公司
  • 坪地网站建设域名流量查询工具
  • 网站建设部署万能推广app