Elatic Stack
参考:
环境:
- CentOS7.x86_64
- Elastcisearch v6.2.3
- Kibana v6.2.3
- Logstash v6.2.3
- Beats v6.2.3
综述
开源的 Elastic Stack: 能够安全可靠地获取任何来源、任何格式的数据,并且能够实时地对数据进行搜索、分析和可视化。


Elastic指的是elastic公司下的几款产品:
- Elasticsearch
- Logstash
- Kibana
- Beats
- X-Pack
Elasticsearch
- 开放源码且自由使用
- License: Apache License 2.0
- GitHub: https://github.com/elastic/elasticsearch
- Doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
搜索、分析和存储您的数据。 Elasticsearch 是基于 JSON 的分布式搜索和分析引擎,专为实现水平扩展、高可用和管理便捷性而设计。 Elasticsearch 是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。
基于Lucene。Lucene是一套用于全文检索和搜寻的开放源码程式库,由Apache软件基金会支持和提供。 Lucene提供了一个简单却强大的应用程式介面,能够做全文索引和搜寻,在Java开发环境里Lucene是一个成熟的免费开放原始码工具;就其本身而论,Lucene是现在并且是这几年,最受欢迎的免费Java资讯检索程式库。
Logstash
- 开放源码且自由使用
- GitHub: https://github.com/elastic/logstash
- Doc: https://www.elastic.co/guide/en/logstash/current/index.html
集中、转换和存储数据 Logstash 是动态数据收集管道,拥有可扩展的插件生态系统,能够与 Elasticsearch 产生强大的协同作用。 Logstash 是开源的服务器端数据处理管道,能够同时 从多个来源采集数据、转换数据,然后将数据发送到您最喜欢的 “存储库” 中。(我们的存储库当然是 Elasticsearch。)
Kibana
- 开放源码且自由使用
- GitHub: https://github.com/elastic/kibana
- Doc: https://www.elastic.co/guide/en/kibana/current/index.html
实现数据可视化 Kibana 让您能够可视化 Elasticsearch 中的数据并操作 Elastic Stack,因此您可以在这里解开任何疑问:例如,为何会在凌晨 2:00 被传呼,雨水会对季度数据造成怎样的影响。
Beats
- 开放源码且自由使用
- GitHub: https://github.com/elastic/beats
- Doc: https://www.elastic.co/guide/en/beats/libbeat/current/index.html
Beats 是轻量型采集器的平台,从边缘机器向 Logstash 和 Elasticsearch 发送数据。 Beats 平台集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向 Logstash 或 Elasticsearch 发送数据。
X-Pack
一个程序包,带来丰富的可能性 单就其自身而言,Elastic Stack 就是一款值得考虑的强大工具。X-Pack 将诸多强大功能集合到一个单独的程序包中,更将它带上了一个新的层次。 X-Pack 是集成了多种便捷功能的单个插件 — security、alerting、monitoring、reporting、graph 探索和 machine learning — 您可以在 Elastic Stack 中放心地使用这些功能。
使用Docker
docker hub里面有ELK的镜像,可以直接拉取使用。推荐使用官方ELK镜像。
我自己做了一个ELK的image,上传到了我的docker hub里。我自己做这个镜像不推荐,因为使用了centos7,导致了镜像很大,这应该避免。
在docker中运行centos7
直接拉取的centos没有systemd的权限,需要在运行添加docker run -id --privileged <image-id> /usr/sbin/init选项。
或者使用Docker Hub上CentOS提供的支持systemd的Dockerfile来构建centos: https://hub.docker.com/_/centos/
其实Dockfile就是有这条命令CMD ["/usr/sbin/init
|  |  | 
将安装了各类软件的容器构建为一个新的镜像
|  |  | 
将新镜像上传到Hub
我用的是Docker Hub免费版,当然线上的话可能是阿里云或腾讯云。
|  |  | 
安装
安装步骤:
- Elasticsearch
- Kibana
- Logstash
- Install X-Pack into Elasticsearch
- Install X-Pack into Kibana
安装ELKF
需要依赖JDK(java),请先安装。 我是直接使用的RPM包安装。
|  |  | 
由于elk默认将软件安装到/usr/share/下,因此我把它们的bin路径加入PATH。
|  |  | 
ELKF使用RPM安装的布局说明:
- 主目录
- /usr/share/elasticsearch
- /usr/share/kibana
- /usr/share/logstash
- /usr/share/filebeat
 
- 二进制文件
- /usr/share/elasticsearch/bin
- /usr/share/kibana/bin
- /usr/share/logstash/bin
- /usr/share/filebeat/bin
 
- 配置文件
- /etc/elastcisearch
- /etc/kibana
- /etc/logstash
- /etc/filebeat
 
- 环境变量
- /etc/sysconfig/elasticsearch
 
- 插件
- /usr/share/elastcisearch/plugins
- /usr/share/kibana/plugins
 
安装X-Pack
注意 由于自动升级到Elastic v6.3自带了X-Pack,不需要额外安装。之前安装的一些插件会导致Elastic无法运行,请卸载这些插件。
|  |  | 
安装X-Pack前,请先安装ELK。 请安装匹配版本的X-Pack。
- Install X-Pack on Elasticsearch
- Install X-Pack on Kibana
- Install X-Pack on Logstash

启用或禁用X-Pack功能
有些功能默认开启,有些默认关闭。请在配置文件中查看详情。 添加某些功能可能导致软件无法启动,请注意查看日志。
在以下文件中配置它们:
- elasticsearch.yml
- kibana.yml
- logstash.yml
- filebeat.yml
X-Pack功能:
| 功能 | 描述 | 
|---|---|
| xpack.graph.enabled | X-Pack图形功能 | 
| xpack.ml.enabled | X-Pack机器学习功能 | 
| xpack.monitoring.enabled | X-Pack监视功能 | 
| xpack.reporting.enabled | X-Pack报告功能 | 
| xpack.security.enabled | X-Pack安全功能 | 
| xpack.watcher.enabled | X-Pack观察器 | 
在ELK中启动X-Pack monitoring功能
|  |  | 
安装: 建议使用密码!
|  |  | 
启动ELK
建议给他们加上密码! 不知道为什么,我的ElasticStack都能用systemd来管理了!
|  |  | 
|  |  | 
启动时可能遇到的问题
- 
can not run elasticsearch as root - 专门建立一个管理ELK的用户,切换到此用户后运行,注意修改ELK相关目录权限
- 或者修改ELK各自用户的/etc/passwd,切换到对应用户后运行。注意权限 – su elasticsearch && elasticsearch
 
- 
elasticsearch process is too low, increase to at least [65536] 
|  |  | 
访问elasticsearch
|  |  | 

访问kibana
|  |  | 

启用xpack注意事项
启用X-PACK后,请注意在kibana配置文件中认证Elasticsearch用户和密码,并且使用Elasticsearch的用户和密码登录Kibana的前端界面。
由于我使用kibana用户登录,导致很多地方访问Elasticsearch都没有权限。请注意。
这样使用Elasticsearch登录后,便可以之间在Dev Tools中通过REST API获取和更新相关信息,并且创建和管理相关用户和角色。
安装Filebeat
由于前面我们添加了ELK-repo,所以这里我们可以直接安装。
|  |  | 
修改ELK jvm内存大小
在此版本中,可直接在配置文件目录下的jvm.options里修改JVM 内存大小。
|  |  | 
与Nginx结合使用
将Kibana展现到Nginx上的话,便可以不对Kibana开放外网访问。
|  |  | 
可能会遇到的问题
- Nignx错误日志: Permission denied) while connecting to upstream
|  |  | 
Logstash文档

Logstash的pipeline有两个必须的元素:
- input
- 消耗来自source的数据
 
- output
- 将修改后的数据写入destination
 
以及一个可选元素:
- filter
- 根据你的定义来修改数据
 
介绍
Logstash是一个具有实时流水线(pipeling)功能的开源数据收集引擎。它可以动态统一来自不同source的数据,并将数据正常化的你的destination。
任何类型的事件都可以通过大量的输入、过滤和输出插件进行丰富和转换,通过本地编解码器进一步简化了摄取过程。
Logstash的能量
具有强大的Elasticsearch和Kibana系统的水平可伸缩数据处理流水线。

Logstash喜欢的数据
所有数据来者不拒!
Logs and Metrics
- 处理所有类型的日志数据
- Apache
- Nginx
- Syslog
 
- 使用Filebeat享受互补的安全日志转发功能
- 从Ganglia,JMx,NetFlow和TCP,UDP收集metrics
Web
- 将http request转换为events- 分析Web服务
- 支持Webhook
 
- 通过轮询HTTP endpoint创建事件- 通过Web API捕获健康状况、性能和其它类型的数据
 
数据存储和流
从你已经拥有的数据中发现更多价值。
Sensors and IoT
探索广泛的其它数据。
轻松丰富一切
在摄取过程中清理并转换数据,以便在index或output时立即获得实时信息。Logstash具有许多聚合和突变以及模式匹配,地理映射和动态查找功能。
- Grok是Logstash filter的金刚钻,用于从非结构化数据中派生出结构化数据
- 通过解析来自IP的地理坐标,标准化提起复杂性,简单K-V对和CSV数据,并通过本地查找或Elasticsearch查询进一步丰富你的数据,从而扩展你的视野
- 编解码器通常用于缓解JSON和多行事件等常见事件结构的处理
选择你的储藏室
将数据放在最重要的位置。通过存储,分析和对数据采取行动,解锁各种downstream分析和操作用例。
- Analysis
- Elasticsearch
- Data stores(MongoDB, Redis)
 
- Archiving
- HDFS
- S3
 
- Monitoring
- Nagios
- Zabbix
- Ganglia
 
- Alerting
- Watcher(Elasticsearch)
 
入门
安装,储藏,解析,汇聚多个Input/Output。
储藏第一个事件
测试Logstash和运行一个基本的pipeline
|  |  | 
启动logstsh时的一个问题: WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash
虽然通过RPM安装Logstash存在/etc/logstash文件,但是还是会报错。
|  |  | 
通过Logstash解析Logs
前面我们创建了一个基本的Logstash pipeline来测试Logstash,但真正处理logs的Logstash pipeline不会这么简单,它可能会有多个input, filter, output。
本节利用一个Filebeat,将Nginx Web Logs作为Logstash pipeline的input,解析这些logs中创建的特定命名字段,并将解析的数据写入Elasticsearch集群。
配置Filebeat以发送Log Lines到Logstash
在创建Logstash pipeline之前,你将配置Filebeat以发送Log lines到Logstash。Filebeat从服务器上的文件收集日志,并将这些日志转发给Logstash实例进行处理。
Filebeat专为可靠性和低延迟而设计。它占用的资源极少,beats input插件(默认安装)最大限度地减少了Logstash实例的资源需求。任何Beat框架编写的beat都可以讲事件数据发送到Logstash。
在你的data source主机上安装Filebeat。安装之后,配置filebeat.yml文件:
|  |  | 
为Filebeat Input配置Logstash
配置一个Logstash pipeline,使用beat input plugin接受来自beats的事件。 格式如下:
|  |  | 
使用Grok filter plugin解析Web Logs
在某些时候,可能输出的日志信息的格式并不理想。你想要解析log以创建特定的命名字段。
grok过滤插件使你能够将非结构化的日志数据解析为结构化和可查询的内容。
由于grok过滤器插件在传入的日志数据中查找模式,因此配置插件需要你作出关于如何识别你的用例。
你可以使用%{COMBINEDAPACHELOG} grok模式,它从如下模式的日志中构建行:
| 信息 | Field Name | 
|---|---|
| IP Add | clientip | 
| User ID | ident | 
| User Auth | auth | 
| timestamp | timestamp | 
| HTTP Verb | verb | 
| Request body | request | 
| HTTP Status code | respone | 
| Referer URL | referer | 
| User agent | agent | 
|  |  | 
通过Geoip过滤插件增强数据
除了解析日志数据以获得更好的搜索外,过滤插件还可从现有的数据中后去补充信息。
geoip插件查找IP地址,从IP地址获取地理位置信息,并将该位置信息添加到日志中。
配置Logstash实例来使用geoip过滤插件:
|  |  | 
索引数据到Elasticsearch
现在Web log已经被处理为指定的字段,现在Logstash pipeline便可以索引数据到一个Elasticsearch集群中。
|  |  | 
验证:
这里遇到一个错误:
- index_not_found_exception
这里要将logstash-$DATE反映索引的实际名称,也就是在通过下面的命令得到的logstash-2018.04.13。把我坑惨了!
|  |  | 

Kibana中的可视化效果:

拼接多个输入和输出插件
你需要管理的信息通常来自多个不同的source,并且可能需要多个不同的destination来存储数据。Lostash pipeline可以使用多个输入和输出插件来处理这些需求。
官方文档中使用Twitter and Filebeat这两者作为Logstash input,并将信息输出到Elasticsearch和file。
- 配置Logstash实例使用Filebeat input plugin
- 配置Logstash实例写入Elasticsearch多节点(cluster)
- 配置Logstash pipeline将数据写入file
|  |  | 
Input
输入插件可以使特定的事件源由Logstash读取。
可用的输入插件: 我只列出了常见的,具体请参考: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
| 插件 | 描述 | 
|---|---|
| beats | 从Elastic框架接收事件 | 
| couchdb_changes | 从CouchDB的 _changesURI流式传输事件 | 
| dead_letter_queue | 从Logstash的dead letter queue读取事件 | 
| elasticsearch | 从Elasticsearch集群中读取查询结果 | 
| exec | 抓取shell命令的输出作为事件 | 
| file | 来自文件的流事件 | 
| github | 从GitHub webhook读取事件 | 
| heartbeat | 为测试生成心跳事件 | 
| http | 通过HTTP/HTTPS接收事件 | 
| http_poller | 解码HTTP API输出为事件 | 
| imap | 从IMAP服务器读取邮件 | 
| jmx | 通过JVM从java程序检索标准 | 
| kafka | 从kafka中读取事件 | 
| log4j | 通过TCP socket从Log4j对象读取事件 | 
| pipe | 从长时间运行的命令管道中获取流事件 | 
| rabbitmq | 从Redis实例读取事件 | 
| sqlite | 基于SQLite数据库中的行创建事件 | 
| stdin | 从标准输入中读取事件 | 
| syslog | 读取系统日志作为事件 | 
| tcp | 从TCP socket读取事件 | 
| udp | 从UDP读取事件 | 
| unix | 通过Unix socket读取事件 | 
| websocket | 从一个websocket读取事件 | 
**input filter通用选项: **
| Setting | Input type | Required | 
|---|---|---|
| add_field | hash | No | 
| codec | codec | No | 
| enable_metric | boolean | No | 
| id | string | No | 
| tags | array | No | 
| type | string | No | 
- 
add_field添加一个字段到一个事件,默认值为{}
- 
codec用于输入数据的编解码器。默认值是plain
- 
enable_metric为特定插件实例禁用或启用度量标准日志记录,默认值为true
- 
id为插件配置添加一个唯一的ID,如果未指定,Logstash会自动生成一个
- 
tags为事件添加任意数量的任意标签
- 
type为所有input处理的事件添加一个type
beats
此插件使Logstash能够从Elasticsearch框架中接收事件。
栗子:
|  |  | 
Beats Input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| cipher_suites | array | No | 
| client_inactivity_timeout | number | No | 
| host | string | No | 
| include_codec_tag | boolean | No | 
| port | number | Yes | 
| ssl | boolean | No | 
| ssl_certificate | a valid filesystem path | No | 
| ssl_certificate_authorities | array | No | 
| ssl_handshake_timeout | number | No | 
| ssl_key | a valid filesystem path | No | 
| ssl_key_passphrase | password | No | 
| ssl_verify_mode | string, one of [none, peer,force_peer] | No | 
| tls_max_version | number | No | 
| tls_min_version | number | No | 
elasticsearch
Elasticsearch Input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| ca_file | a valid filesystem path | No | 
| docinfo | boolean | No | 
| docinfo_fields | array | No | 
| docinfo_target | string | No | 
| hosts | array | No | 
| index | string | No | 
| password | password | No | 
| query | string | No | 
| schedule | string | No | 
| scroll | string | No | 
| size | number | No | 
| ssl | boolean | No | 
| user | string | No | 
栗子:
|  |  | 
exec
定期运行shell命令,并抓取整个输出为事件。
栗子:
|  |  | 
exec Input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| command | string | Yes | 
| interval | number | No | 
| schedule | string | No | 
此调度表示方法如同Linux中定时任务* 5 * 1-3 *。
file
从文件读取流事件。
file input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| close_older | number | No | 
| delimiter | string | No | 
| discover_interval | number | No | 
| exclude | array | No | 
| ignore_older | number | No | 
| max_open_files | number | No | 
| path | array | Yes | 
| sincedb_path | string | No | 
| sincedb_write_interval | number | No | 
| start_position | string, one of [“beginning”, “end”] | No | 
| stat_interval | number | No | 
github
github input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| drop_invalid | boolean | No | 
| ip | string | No | 
| port | number | Yes | 
| secret_token | string | No | 
kafka
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
redis
从redis实例读取事件,它支持redis的channel和list类型。
redis input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| batch_count | number | No | 
| data_type | string, one of [list,channel,pattern_channel] | Yes | 
| db | number | No | 
| host | string | No | 
| path | string | No | 
| key | string | Yes | 
| password | password | No | 
| port | number | No | 
| ssl | boolean | No | 
| threads | number | No | 
| timeout | number | No | 
sqlite
栗子:
|  |  | 
sqlite input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| batch | number | No | 
| exclude_tables | array | No | 
| path | string | Yes | 
stdin
syslog
栗子:
|  |  | 
syslog input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| facility_labels | array | No | 
| grok_pattern | string | No | 
| host | string | No | 
| locale | string | No | 
| port | number | No | 
| proxy_protocol | boolean | No | 
| severity_labels | array | No | 
| syslog_field | string | No | 
| timezone | string | No | 
| use_labels | boolean | No | 
tcp
栗子:
|  |  | 
tcp input配置项:
| Setting | Input_type | Required | 
|---|---|---|
| host | string | No | 
| mode | string, one of [server, client] | No | 
| port | number | Yes | 
| proxy_protocol | boolean | No | 
| ssl_cert | a valid file system path | No | 
| ssl_enable | boolean | No | 
| ssl_extra_chain_certs | array | No | 
| ssl_key | a valid file system path | No | 
| ssl_key_passphrase | password | No | 
| ssl_verify | boolean | No | 
udp
unix
websocket
Output
输出将事件数据发送到特定的目标。输出是事件管道的最后阶段。
输出列表:
- boundary
- circonus
- CSV
- datadog
- Elasticsearch
- exec
- file
- gelf
- ganglia
- http/https
- influxdb
- irc
- kafka
- librato
- loggly
- lumberjack
- metriccatcher
- mongodb
- nagios
- opentsdb
- pipe
- rabbitmq
- redis
- redmine
- stdout
- syslog
- tcp
- udp
- websocket
- zabbix
**output通用配置项: **
| Setting | Input type | Required | 
|---|---|---|
| codec | codec | No | 
| enable_metric | boolean | No | 
| id | string | No | 
- 
codec用于输出数据的编解码器,默认值是json_lines
- 
enable_metric为特定插件实例启用或禁用度量日志记录,默认值是true
- 
id为插件配置添加一个唯一的ID,如果未指定ID,Logstash会自动生成。
csv
csv output配置选项:
| Setting | Input_type | Required | 
|---|---|---|
| create_if_deleted | boolean | No | 
| csv_options | hash | No | 
| dir_mode | number | No | 
| fields | array | Yes | 
| file_mode | number | No | 
| filename_failure | string | No | 
| flush_interval | number | No | 
| gzip | boolean | No | 
| path | string | Yes | 
| spreadsheet_safe | boolean | No | 
elasticsearch
Elasticsearch output配置项:
Setting | Input type | Required
- | - | - action | string | No bulk_path | string | No cacert | a valid filesystem path | No doc_as upsert | boolean | No document_id | string | No document_type | string | No failure_type logging whitelist | array | No healthcheck_path | string | No hosts | uri | No http_compression | boolean | No index | string | No keystore | a valid filesystem path | No keystore_password | password | No manage_template | boolean | No parameters | hash | No parent | string | No password | password | No path | string | No pipeline | string | No pool_max | number | No pool_max per route | number | No proxy | uri | No resurrect_delay | number | No retry_initial interval | number | No retry_max_interval | number | No retry_on_conflict | number | No routing | string | No script | string | No script_lang | string | No script_type | string, one of [inline, indexed, file] | No script_var_name | string | No scripted_upsert | boolean | No sniffing | boolean | No sniffing_delay | number | No sniffing_path | string | No ssl | boolean | No ssl_certificate verification | boolean | No template | a valid filesystem path | No template_name | string | No template_overwrite | boolean | No timeout | number | No truststore | a valid filesystem path | No truststore_password | password | No upsert | string | No user | string | No validate_after inactivity | number | No version | string | No version_type | string, one of [internal, external, external gt, external gte, force] | No
exec
栗子:
|  |  | 
exec output配置项:
Setting | Input type | Required
- | - | - command | string | Yes quiet | boolean | No
file
栗子:
|  |  | 
file output配置项:
Setting | Input type | Required
- | - | - create_if_deleted | boolean | No dir_mode | number | No file_mode | number | No filename_failure | string | No flush_interval | number | No gzip | boolean | No path | string | Yes write_behavior | string | No
kafka
栗子:
|  |  | 
kafka output配置项:
Setting | Input type | Required
- | - | - acks | string, one of [0, 1, all] | No batch_size | number | No bootstrap_servers | string | No buffer_memory | number | No client_id | string | No compression_type | string, one of [none, gzip, snappy, lz4] | No jaas_path | a valid filesystem path | No kerberos_config | a valid filesystem path | No key_serializer | string | No linger_ms | number | No max_request size | number | No message_key | string | No metadata_fetch_timeout_ms | number | No metadata_max_age_ms | number | No receive_buffer_bytes | number | No reconnect_backoff_ms | number | No request_timeout_ms | string | No retries | number | No retry_backoff_ms | number | No sasl_kerberos_service name | string | No sasl_mechanism | string | No security_protocol | string, one of [PLAINTEXT, SSL, SASL PLAINTEXT, SASL SSL] | No send_buffer_bytes | number | No ssl_key_password | password | No ssl_keystore_location | a valid filesystem path | No ssl_keystore_password | password | No ssl_keystore_type | string | No ssl_truststore_location | a valid filesystem path | No ssl_truststore_password | password | No ssl_truststore_type | string | No topic_id | string | Yes value_serializer | string | No
mongodb
mongodb output配置项:
Setting | Input type | Required
- | - | - bulk | boolean | No bulk_interval | number | No bulk_size | number | No collection | string | Yes database | string | Yes generateId | boolean | No isodate | boolean | No retry_delay | number | No uri | string | Yes
redis
将Redis作为消息队列缓存能极大降低系统负载,减轻系统压力。
redis output配置项:
Setting | Input type | Required
- | - | - batch | boolean | No batch_events | number | No batch_timeout | number | No congestion_interval | number | No congestion_threshold | number | No data_type | string, one of [list, channel] | No db | number | No host | array | No key | string | No password | password | No port | number | No reconnect_interval | number | No shuffle_hosts | boolean | No timeout | number | No
redmine
栗子:
|  |  | 
redmine output配置项:
Setting | Input type | Required
- | - | - assigned_to_id | number | No categorie_id | number | No description | string | No fixed_version_id | number | No parent_issue_id | number | No priority_id | number | Yes project_id | number | Yes ssl | boolean | No status_id | number | Yes subject | string | No token | string | Yes tracker_id | number | Yes url | string | Yes
output
栗子:
|  |  | 
syslog
syslog output配置:
Setting | Input type | Required
- | - | - appname | string | No facility | string | No host | string | Yes message | string | No msgid | string | No port | number | Yes priority | string | No procid | string | No protocol | string, one of [tcp, udp, ssl-tcp] | No reconnect interval | number | No rfc | string, one of [rfc3164, rfc5424] | No severity | string | No sourcehost | string | No ssl_cacert | a valid filesystem path | No ssl_cert | a valid filesystem path | No ssl_key | a valid filesystem path | No ssl_key passphrase | password | No ssl_verify | boolean | No use_labels | boolean | No
zabbix
zabbix output配置项:
Setting | Input type | Required
- | - | - multi_value | array | No timeout | number | No zabbix_host | string | Yes zabbix_key | string | No zabbix_server host | string | No zabbix_server port | number | No zabbix_value | string | No
Filter
过滤器插件对事件执行中介(intermediary)处理,过滤器通常根据事件的特征有条件的应用。
下面是Elastic支持的插件列表:
| 插件 | 描述 | 
|---|---|
| aggregate | 汇总来自单个任务的多个事件的信息 | 
| alter | 对mutate过滤器无法处理的字段进行常规更改 | 
| cidr | 根据网络块列表检查IP地址 | 
| cipher | 对事件应用或移除cipher(密码) | 
| clone | 重复事件 | 
| csv | 将csv(comma separated value)解析为单个字段 | 
| date | 解析字段中的日期,以用作事件的Logstash timestamp | 
| de_dot | Computationally expensive filter that removes dots from a field name | 
| dissect | 使用分隔符将非结构化事件数据提取到字段中 | 
| dns | 执行标准或反向DNS查询 | 
| drop | 删除所有事件 | 
| elapsed | 计算一对事件之间的经过时间 | 
| elasticsearch | 将Elasticsearch中以前的日志事件的字段复制到当前事件中 | 
| environment | 将环境变量存储为元数据子字段 | 
| extractnumbers | 从字符串中提取数字 | 
| fingerprint | 由一致的散列值的替换值的指纹字段 | 
| geoip | 添加有关IP地址的地理信息 | 
| grok | 将非结构化事件数据解析到字段中 | 
| i18n | 从字段中删除特定字符 | 
| jdbc_static | 使用从远程数据库预加载的数据来丰富事件 | 
| jdbc_streaming | 用你的数据库数据丰富事件 | 
| json | 解析JSON事件 | 
| json_encode | 将字段序列化为JSON | 
| kv | 解析键值对 | 
| metricize | 处理包含多个度量标准的复杂事件并将它们分成多个事件,每个事件都包含一个度量标准 | 
| metrics | 汇总指标(Aggregates metrics) | 
| mutate | 对字段执行突变 | 
| prune | 将基于字段列表的事件数据精简为黑名单或白名单 | 
| range | 检查指定的字段是否在给定的大小或长度限制内 | 
| ruby | 执行任意Ruby代码 | 
| sleep | 休息一段指定的时间 | 
| split | 将多行消息拆分成不同的事件 | 
| syslog_pri | 解析syslog消息的优先字段 | 
| throttle | 限制事件的数量 | 
| tld | 用你在配置中指定的任何内容替换默认消息字段的内容 | 
| translate | 根据散列或YAML文件,替换字段内容 | 
| truncate | 截断长度超过给定长度的字段 | 
| urldecode | 解码URL编码的字段 | 
| useragent | 将用户代理字符串解析到字段中 | 
| uuid | 为事件添加UUID | 
| xml | 将XML解析到字段 | 
所有过滤器都支持的配置选项:
| Setting | Input_type | Required | 
|---|---|---|
| add_field | hash | No | 
| add_tag | array | No | 
| enable_metric | boolean | No | 
| id | string | No | 
| periodic_flush | boolean | No | 
| remove_field | array | No | 
| remove_tag | array | No | 
- 
add_field如果此过滤器成功,添加任意字段到此事件。字段名称可以是动态的,并使用%{field}包含事件的部分内容
- 
add_tag如果此过滤器成功,添加任意标签到此事件。标签可以是动态的,并使用%{field}语法包含事件的部分内容
- 
enable_metric为特定插件实例启用/禁用度量标准日志记录
- 
id为插件配置添加一个唯一的ID,如果没有指定ID,Logstash会生成一个。强烈建议在配置中设置此ID 当你有多个相同类型的插件时,这特别有用
- 
periodic_flush定期调用过滤器flush方法
- 
remove_field如果此过滤器成功,从事件中移除任意字段
- 
remove_tag如果此过滤器成功,从事件中移除任意标签
Aggregate
此过滤器的目的是聚合属于同一任务的多个事件(通常是日志行)中可用的信息,并将最终聚合信息推送到最终任务事件中。
**Aggregate Filter Configuration Options: **
| Setting | Input_type | Required | 
|---|---|---|
| aggregate_maps_path | string, a valid filesystem path | No | 
| code | string | Yes | 
| end_of_task | boolean | No | 
| inactivity_timeout | number | No | 
| map_action | string, one of [“create”, “update”, “create_or_update”] | No | 
| push_map_as_event_on_timeout | boolean | No | 
| push_previous_map_as_event | boolean | No | 
| task_id | string | Yes | 
| timeout | number | No | 
| timeout_code | string | No | 
| timeout_tags | array | No | 
| timeout_task_id_field | string | No | 
| timeout_timestamp_field | string | No | 
- 
aggregate_maps_pathLogstash停止时存储聚合地图的文件路径,以及Logstash启动时加载的路径。 如果未定义,聚合映射将不会存储在Logstash中,并且会丢失。
- 
code使用当前事件执行更新map的代码;或使用当前的map执行更新事件的代码 你将有一个可用的map variable 和 event variable
- 
end_of_task告诉过滤器该任务已结束,因此在代码执行后删除聚合map
- 
inactivity_timeout一个任务被认为已到期的秒数 当某个任务超时时,其聚合map将被逐出 必须小于timeout
- 
map_action- create
- update
- create_or_update告诉过滤器如何处理聚合map
 
- 
push_map_as_event_on_timeout每次检测到任务超时时,它都会将任务集合映射推送为新的Logstash事件
- 
push_previous_map_as_event每次聚合插件检测到新任务ID时,它会将先前的聚合映射推送为新的Logstash事件,然后为下一个任务创建新的空映射
- 
task_id定义了关联日志的任务ID的表达式 该值必须唯一标识任务
- 
timeout
- 
time_code
- 
timeout_tags在生成超时事件添加的标记
- 
timeout_task_id_field
- 
timeout_timestamp_field默认情况下,使用系统时间计算超时
栗子:
给定日志:
|  |  | 
过滤器:
|  |  | 
Alter
alter filter允许对未包含在正常变异过滤器中的字段进行一般更改。
**安装: **
|  |  | 
**配置项: **
| Setting | Input type | Required | 
|---|---|---|
| coalesce | array | No | 
| condrewrite | array | No | 
| condrewriteother | array | No | 
- 
coalesce将file_name的值设置为其参数的第一个非空表达式
- 
condrewrite如果实际内容等于预期内容,则将字段内容更改为指定值
- 
condrewriteother如果另一个字段内容等于预期内容,则将字段内容更改为指定值
cidr
CIDR filter用于检查时间中的IP地址与可能包含它的网络块列表。可以针对多个网络检查多个地址,任何匹配都可以成功。成功后,可将其它标记/字段添加到事件中。
**配置项: **
| Setting | Input_type | Required | 
|---|---|---|
| address | array | No | 
| network | array | No | 
| network_path | a valid filesystem path | No | 
| refresh_interval | number | No | 
| separator | string | No | 
- 
address要检查的IP地址
- 
network要检查的IP网络
- 
network_path包含过滤器应检查的网络的外部文件的完整路径
- 
refresh_interval检查外部文件的更新频率
- 
seperator从network_path指定的外部文件解析网络的分隔符
csv
csv filter处理包含csv数据的事件字段,解析它,并将其存储为单个字段 此过滤器还可解析使用任何分隔符的数据,而不仅仅是逗号
**配置项: **
| Setting | Input_type | Required | 
|---|---|---|
| autodetect_column_names | boolean | No | 
| autogenerate_column_names | boolean | No | 
| columns | array | No | 
| convert | hash | No | 
| quote_char | string | No | 
| separator | string | No | 
| skip_empty_columns | boolean | No | 
| skip_empty_rows | boolean | No | 
| skip_header | boolean | No | 
| source | string | No | 
| target | string | No | 
- 
autodetect_column_names是否应该从标题列自动检测列名称,默认false
- 
autogenerate_column_names是否应该自动生成列名,默认true。 如果设置为false,那么没有指定header的列将不会被解析
- 
columns列名称的列表
- 
convert应用于列的数据类型转换的集合,可能的转换: integer, float, date, date_time, boolean
- 
quote_char用于引用csv字段的字符,默认"
- 
separator列分隔符值。默认值comma,
- 
skip_empty_columns是否应该跳过空列,默认false
- 
skip_empty_rows是否应该跳过空行,默认false
- 
skip_header是否应该跳过header,默认false
- 
source源字段值中的csv数据将被扩展为数据结构
- 
target放置数据的目标字段
date
date filter从字段中解析日期,然后使用该日期或时间戳作为事件的Logstash时间戳。它对事件排序和回填旧数据尤其重要。 在没有此过滤器的情况下,如果timestamp尚未在事件中设置,则Logstash将根据首次查看事件是(input time)选择一个时间戳。
date filter配置项:
| Setting | Input_type | Required | 
|---|---|---|
| locale | string | No | 
| match | array | No | 
| tag_on_failure | array | No | 
| target | string | No | 
| timezone | string | No | 
locale
使用POSIX语言标记指定用于日期解析的环境(locale),如en,en_US
如果未指定,则将使用平台默认值
match 有字段名称和格式模式的数组,[ field, formats…]
如果时间字段有多种格式,你可这样做:
|  |  | 
有几个例外:
- ISO8601: 解析任何有效的ISO8601时间戳,如- 2011-04-19T03:44:01.103Z
- UNIX: 解析float/int Unix原子时间(s)
- UNIX_MS: 解析int Unix原子时间
- TAI64N: 解析tai64n时间值
语法细节: 用于解析日期和时间文本的语法使用字母来指示时间值的种类,以及重复的字母来指示该值的形式。
以下是可用于解析日期和时间的内容:
- 
y year - yyyy
完整年号,如2018
- yy
两位数年份,如18
 
- yyyy
完整年号,如
- 
M month of the year - M
最小数字月份,1-12
- MM
两位数字月份,01-12
- MMM
缩写的月份文本,Jan, Feb...
- MMMM
完整的月份文本,January, February...
 
- M
最小数字月份,
- 
d day of the month - d
最小数字日,1, 2...
- dd
两位数字日,01, 02...
 
- d
最小数字日,
- 
H hour of the day - H
最小数字小时,0, 1...
- HH
两位数字小时,00, 01...
 
- H
最小数字小时,
- 
m minutes of the hour - m
最小数字分钟,0, 1...
- mm
两位数字分钟,00, 01...
 
- m
最小数字分钟,
- 
s seconds of the minute - s
最小数字秒数,0, 1...
- ss
两位数字秒数,00, 01...
 
- s
最小数字秒数,
- 
S 秒的最大精度(毫秒),附加零 - S 十分之一秒
- SS 百分之一秒
- SSS 千分之一秒
 
- 
Z time zone offset or identity - Z
时区偏移量结构为HHmm(如上海),+0800
- ZZ
时区偏移量结构为HH:mm,+08:00
- ZZZ
时区身份(如上海),Asia/Shanghai
 
- Z
时区偏移量结构为HHmm(如上海),
- 
z time zone names. Time zone names (z) cannot be parsed 
- 
w week of the year - w
最小数字周数,1, 2...
- ww
两位数字周数,01, 02...
 
- w
最小数字周数,
- 
D day of the year 
- 
e day of the week(number) 
- 
E day of the week(text) - E, EE, EEE
星期几的缩写,Mon, Tue, Wed, Thu, Fri, Sat, Sun
- EEEE
星期几的全文,Monday, Tuesday...
 
- E, EE, EEE
星期几的缩写,
对于非格式化的语法,你需要在值的周围放置单引号字符。如"yyyy-MM-dd’T’HH:mm:ss"
tag_on_failure
没有成功匹配时,将值附加到tag字段,默认值["_dateparsefailure"]
target
将匹配的timestamp存储到给定目标字段中。如果未提供,则默认更新事件的@timestamp字段
timezone
指定用于日期分析的时区标准ID,如Asia/Shanghai
dissect
dissect filter是一种拆分操作。与对整个字符串应用一个分隔符的常规拆分操作不同,此操作将一组分隔符应用于字符串值。dissect不使用正则表达式,所以速度非常快。 但是,如果文本结构因行而异,则Grok更适合。有一种混合的情况,dissect可用来结构可靠地重复部分,然后Grok用于余下的字段值,并具有更多的正则表达式可预测性和更少的整体工作。
一组字段和分隔符被称为dissection,它使用一组%来描述:
|  |  | 
dissect filter配置项
| Setting | Input type | Required | 
|---|---|---|
| convert_datatype | hash | No | 
| mapping | hash | No | 
| tag_on_failure | array | No | 
- convert_datatype可以指定int, float数据类型转换。这些将在mapping发生后完成,如果没有mapping部分,请自由使用此设置。
|  |  | 
- mappingA hash of dissections of- field => value不要在值中使用具有转移的- \n,它会被看做两个字符- \+n+而不是实际的换行符。
|  |  | 
- tag_on_failuredissection失败时,将值添加到tag字段。默认值为- ["_dissectfailure"]
geoip
geoip filter根据Maxmind GeoLite2数据库的数据,添加有关IP地址的地理位置信息。
此插件与GeoLite City Database数据库捆绑在一起。GeoLite2是免费的IP地址位置数据库,与MaxMind的GeoIP2数据库相比,不如其精确。 如果需要使用捆绑的DeoLite之外的数据库,可从MaxMind下载它: https://dev.maxmind.com/geoip/geoip2/geolite2/
如果GeoIP返回查找到的经度(latitude)和纬度(longitude),则会创建[geoip][location]字段。
Geoip Filter配置项
| Setting | Inpu_type | Required | 
|---|---|---|
| cache_size | number | No | 
| database | a valid filesystem path | No | 
| default_database_type | City or ASN | No | 
| fields | array | No | 
| source | string | Yes | 
| tag_on_failure | array | No | 
| target | string | No | 
- 
cache_size默认值为1000。GeoIP查询的成本非常高昂。缓存设置的越高,项目在缓存中的可能性就越大,并且此filter运行的越快。但是,如果设置得太高,则会耗费太多内存。如果缓存已满,则无法添加更多记录。尝试使用此选项的不同值来为数据集找到最佳性能。 这个值必须大于0。
- 
database地理数据库的文件路径,如果未指定,则默认为logstash自带的GeoLite2-City数据库。
- 
default_database_type默认值是City。唯一可接受的值是City和ASN。
- 
fields包含在事件中的geoip字段数组。可能的字段取决于数据库类型。
- 
source包含要通过geoip映射的IP地址或主机名的字段。
- 
tag_on_failure默认值为["_geoip_lookup_failure"].
- 
target默认值为geoip.指定Logstash应该存储的geoip数据的字段。
grok
Parse arbitrary text and structure it. Grok是将非结构化日志数据解析为结构化和可查询的好方法。
它非常适用于syslog, apache or webserver logs, mysql logs以及通常为人类而不是计算机编写的任何日志格式。
默认情况下,Logstash ship附带了大约120种模式。它们在这: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
要grok某类日志文件的时候,可以先到上面的地址查看有无对应的模式。然后复制对应内容到patterns_dir下,再在filter中使用。
当然,你也可以自定义模式来匹配你的日志。在这测试: http://grokdebug.herokuapp.com
Grok filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| break_on_match | boolean | No | 
| keep_empty_captures | boolean | No | 
| match | hash | No | 
| named_captures_only | boolean | No | 
| overwrite | array | No | 
| pattern_definitions | hash | No | 
| patterns_dir | array | No | 
| patterns_files_glob | string | No | 
| tag_on_failure | array | No | 
| tag_on_timeout | string | No | 
| timeout_millis | number | No | 
- 
break_on_matchBreak on first match. grok的第一个成功的匹配将导致filter结束。如果你想grok尝试所有的模式,请将其设置为false。默认值为true。
- 
keep_empty_captures默认值为false。如果为true,则将空捕获保留为事件字段。
- 
matchfield ⇒ value的散列匹配,默认值为{}
|  |  | 
- 
named_captures_only默认值为true。如果为true,只保存来自grok的命名捕获。
- 
overwrite要覆盖的字段,这使你可覆盖已存在的字段中的值。
|  |  | 
- 
pattern_definitions默认值为{}模式名称和模式元组的散列,用于定义当前过滤器要使用的自定义模式。匹配现用名称的模式将覆盖预先存在的定义。
- 
patterns_dir默认值为[]logstash默认提供了一堆模式,除非添加额外模式,否则不需要自定义模式。你可以使用此设置指向多个模式目录。grok将读取与patterns_files_glob匹配的目录汇总的所有文件,并假定它为模式文件。
|  |  | 
- 
patterns_files_glob默认值为"*"Glob模式,用于从patterns_dir目录中选择模式文件。
- 
tag_on_failure默认值为["_grokparsefailure"]匹配没有成功时,将值添加到tags字段。
- 
tag_on_timeout默认值为"_groktimeout"如果grok正则表达式超时,则应用此tag.
- 
timeout_millis默认值为30000尝试在这段时间后终止正则表达式。设置为0以禁用超时。
基础知识
Grok工作方式,将文本模式组合成与你的日志模式相匹配的内容。
Grok模式的语法为 %{SYNTAX:SEMANTIC}
- SYNTAX, 文本匹配的模式的名称
- SEMANTIC, 正在匹配的文本的标识符
|  |  | 
你也可以将数据类型转换添加到Grok模式。默认情况下,所有的语义(semantic)都保存为字符串(strings)。
如果你想转换语义的数据类型,如将string转换为int。例如%{NUMBER:num:int}将num语义从string转换为integer。当前情况下,只支持转换为int和float.
|  |  | 
正则表达式
Grok位于正则表达式之上,所以任何正则表达式在grok中都是有效的。 Regular Expression Library: https://github.com/kkos/oniguruma/blob/master/doc/RE
示例
grok处理nginx/access.log日志: 首先针对nginx.conf中日志格式来决定如何写logstash pattern
|  |  | 
grok debugger

grok-patterns 这是grok官方写得patterns,当然,你也可以自己写。就像Nginx日志那样!
|  |  | 
json
这是一个json解析过滤器。
Json Filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| skip_on_invalid_json | boolean | No | 
| source | string | Yes | 
| tag_on_failure | array | No | 
| target | string | No | 
- 
skip_on_invalid_json默认值是false.允许跳过无效json上的过滤器。
- 
sourcejson filter的配置 如,从message字段中解析json
|  |  | 
- target定义放置解析数据的目标字段。如果目标字段已存在,则它会被覆盖。
|  |  | 
kv
此过滤器有助于自动解析key=value类型的消息。
这对于postfix, iptables和倾向于key=value语法类型的日志非常有用。
|  |  | 
kv filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| allow_duplicate_values | boolean | No | 
| default_keys | hash | No | 
| exclude_keys | array | No | 
| field_split | string | No | 
| include_brackets | boolean | No | 
| include_keys | array | No | 
| prefix | string | No | 
| recursive | boolean | No | 
| remove_char_key | string | No | 
| remove_char_value | string | No | 
| source | string | No | 
| target | string | No | 
| transform_key | string, | one of [“lowercase”, “uppercase”, “capitalize”] | 
| transform_value | string, | one of [“lowercase”, “uppercase”, “capitalize”] | 
| trim_key | string | No | 
| trim_value | string | No | 
| value_split | string | No | 
- 
allow_duplicate_values默认值为true.用于删除重复 键/值对的布尔选项。
- 
default_keys默认值为{}.一个散列,用于指定在解析源字段中不存在的键时应添加到事件中的默认值及其值。
- 
exclude_keys默认值为[].一个数组,用于指定不应添加到事件中的解析键。默认情况下,没有键被排除。
- 
field_split默认值为" ".用作解析出键值对后的单字符字段分隔符的字符串。
|  |  | 
- field_split_pattern一个正则表达式,用作字段分隔符来解析键值对。用于定义多字符字段分隔符。 它优先于- field_split选项。
|  |  | 
- include_brackets默认值为true.一个布尔值,指定是否将 方括号[square bracket],尖括号- 和括号(bracket) 视为的包装器(wrapper),是否应该从值中删除。 
|  |  | 
- 
include_keys默认值为[].一个数字,用于指定应该添加到解析的键。默认情况下,所有的键都会被添加。
- 
prefix默认值为空。预先添加到所有提取的键的字符串。
- 
recursive默认值为false.一个布尔值,执行是否向下提取值并递归获取更多的键值对。
- 
remove_char_key要从键中移除的字符串。
- 
remove_char_value要从值中移除的字符串。
- 
source默认值为message.要在其上执行key=value搜索的字段。
- 
target将所有键值对放入的容器的名称。
- 
transform_key将键转换为大写,小写。
- 
transform_value将值转换为大写,小写
- 
trim_key从键中修建的字符串。如果键包含在括号中或以空格开头,这很有用。
- 
trim_value从值中修建的字符串。如果你的值包含在括号中或以逗号结尾。这很有用。
- 
value_split默认值为=.一个非空字符串,用作解析出键值对的单字符分隔符。
- 
value_split_pattern用作值分隔符来解析出键值对的正则表达式。优先级高于value_split。
metrics
metrics filter用于聚合度量(aggregating metrics).
|  |  | 
metrics filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| clear_interval | number | No | 
| flush_interval | number | No | 
| ignore_older_than | number | No | 
| meter | array | No | 
| percentiles | array | No | 
| rates | array | No | 
| timer | hash | No | 
- 
clear_interval默认值为-1.清理间隔,所有的计数器都被重置。
- 
flush_interval默认值为5.刷新间隔,当metrics事件被创建时。此值必须是5的倍数。
- 
ignore_older_than默认值为0.不要跟着@timestamp超过某个秒数的事件。
- 
meter语法:meter => [ "name of metric", "name of metric" ]
- 
percentiles默认值为percentiles.计时器值应该测量和发出的百分位数。
- 
rates默认值为[1, 5, 15].应该按分钟测量的比率。
- 
timer语法:timer => [ "name of metric", "%{time_value}" ]
meter values
meter => "something", 会收到如下字段:
- “[thing][count]” - the total count of events
- “[thing][rate_1m]” - the per-second event rate in a 1-minute sliding window
- “[thing][rate_5m]” - the per-second event rate in a 5-minute sliding window
- “[thing][rate_15m]” - the per-second event rate in a 15-minute sliding window
timer values
timer => { "thing" => "%{duration}"}, 会收到如下字段:
- “[thing][count]” - the total count of events
- “[thing][rate_1m]” - the per-second average value in a 1-minute sliding window
- “[thing][rate_5m]” - the per-second average value in a 5-minute sliding window
- “[thing][rate_15m]” - the per-second average value in a 15-minute sliding window
- “[thing][min]” - the minimum value seen for this metric
- “[thing][max]” - the maximum value seen for this metric
- “[thing][stddev]” - the standard deviation for this metric
- “[thing][mean]” - the mean for this metric
- “[thing][pXX]” - the XXth percentile for this metric (see percentiles)
mutate
mutate filter允许你在字段上执行常规突变。你可以重命名,删除,替换和修改事件中的字段。
mutate filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| convert | hash | No | 
| copy | hash | No | 
| gsub | array | No | 
| join | hash | No | 
| lowercase | array | No | 
| merge | hash | No | 
| coerce | hash | No | 
| rename | hash | No | 
| replace | hash | No | 
| split | hash | No | 
| strip | array | No | 
| update | hash | No | 
| uppercase | array | No | 
| capitalize | array | No | 
- 
convert将字段的值转换为其它类型,如将string转换为int.如果只为数组,则所有成员都将转换;如果是散列,则不处理。
- 
copy将现有字段复制到另一个字段(会覆盖)。
- 
gsub将正则表达式与字段值进行匹配,并用替换字符替换所有匹配项。 只支持string或string array.
|  |  | 
- 
join加入一个带分隔符的数组。对非数组字段没有任何作用。
- 
lowercase将字符串转换为小写
- 
merge合并数组或散列的两个字段。字符串字段将被自动转换为数组。
|  |  | 
- 
coerce为已存在但为空的字段设置默认值。
- 
rename重命名一个或多个字段。
- 
replace用新值替换一个字段。新值可以包含%{foo}字符串,以帮助你从事件的其它部分创建新值。
|  |  | 
- 
split使用分隔符将字段拆分为数组。只适用于字符串字段。
- 
strip从字段剥离空白符。
- 
update用新值更新现有字段。
xml
XML filter.获取包含XML的字段并将其展开为实际的数据结构。
XML Filter配置项
| Setting | Input_type | Required | 
|---|---|---|
| force_array | boolean | No | 
| force_content | boolean | No | 
| namespaces | hash | No | 
| remove_namespaces | boolean | No | 
| source | string | Yes | 
| store_xml | boolean | No | 
| suppress_empty | boolean | No | 
| target | string | No | 
| xpath | hash | No | 
- 
force_array默认值为true.过滤器强制单个元素为数组。将其设置为false防止在数组中存储单个元素。
- 
force_content默认值为false.过滤器将以不同于标签内的内容的方式展开属性。
- 
namespace默认值为{}.这允许配置所有命名空间声明来解析XML文档。
|  |  | 
- 
remove_namespaces从文档中的所有节点中删除所有命名空间。
- 
source
- 
store_xml默认为true.过滤器会将整个解析的XML存储在目标字段中。
- 
suppress_empty默认值为true.默认情况下,如果元素为空,这不输出。如果设置为false,则空元素将产生一个空的散列对象。
- 
target定义放置数据的目标。
条件判断
使用条件判断决定filter和output处理特定的事件。
Logstash条件类似于编程语言,条件语句,可以嵌套:
- if
- else if
- else
比较操作:
- ==
- !=
- <
- >
- <=
- >=
- =~匹配正则
- !~不匹配正则
- in包含
- not in不包含
布尔操作:
- and
- or
- nand
- xor
一元运算符:
- !取反
- ()复合表达式
栗子:
|  |  | 
Filebeat文档
概述
filebeat是一个beat,它基于libbeat框架。
Filebeat是一个本地文件的日志数据搬运(shipper)。作为Agent安装,filebeat监视日志目录或指定的日志文件,并将它们转发给Elasticsearch或logstash进行索引。 启动filebeat时,它会启动一个或多个prospectors(勘探者),查看为日志指定的本地路径。对于prospectors所在的每个日志文件,filebeat启动harvester。每个harvester为新内容读取单一日志文件,并将新日志发送到filebeat配置的输出。

入门
开始filebeat前,请确保安装和配置了如下产品:
- Elasticsearch(存储和索引数据)
- Kibana(UI)
- Logstash(可选)
配置
filebeat module为常用日志格式提供了入门体验。

|  |  | 
配置filebeat使用logstash
|  |  | 
在Elasticsearch中载入索引模板
在Elasticsearch中,索引模板用于定义设置(setting)和映射(mapping),以确定如何分析字段(fields)。
filebeat推荐的索引模板文件有filebeat软件包安装。在成功连接到Elasticsearch后,它会默认自动载入索引模板(fields.yml)。如果模板存在,它不会覆盖除,除非你配置要覆盖。
通过修改配置文件,你也可以禁用自动载入模板,或者载入你自己的模板。
配置模板载入
|  |  | 
修改索引名
- filebeat的默认索引名为 filebeat-<version>-yyyy.MM.dd
- 在output.elasticsearch设置选项
- 你指定的索引名称应该包含索引的根名、索引版本和日期信息
|  |  | 
手动载入模板
|  |  | 
强制Kibana查看最新文件
|  |  | 
设置Kibana面板
Filebeat附带了实例的Kibana dashboards, visualization和可视化搜索。
在使用仪表板前,你需要创建索引filebeat-*,并将仪表板加载到Kibana中。你可使用setup命令或配置文件加载它。
启动Filebeat
|  |  | 
查看示例Kibana仪表板
访问你的kibana web端(localhost:5601),可用Nginx做反向代理,再加上域名解析。
快速开始常见日志格式
filebeat提供了一套预构建模块,可使用它快速实施和部署日志监视方案。
先决条件:
- 安装和配置Elastic Stack
- 安装filebeat
- 安装Ingest Node GeoIP和User Agent plugins
- 验证Elasticsearch和Kibana能从filebeat接收数据
|  |  | 
运行filebeat模块
|  |  | 
最后就可以在Kibana中可视化查看日志。
查看dashboard时,遇到一个错误: Could not locate that index-pattern (id: filebeat-*)
解决办法:
|  |  | 
output
我们可根据系统的负载情况将Filebeat的output到合适的地方,output只能有一个! 如果有时候系统负载过高的话,可以考虑output到Redis或Elasticsearch。
redis和logstash都还需要logstash的pipeline转交给Elasticsearch,但你可以filter。而直接使用Elasticsearch便不能过滤。
- Logstash
- Elasticsearch
- Redis
|  |  | 
定义索引
为filebeat定义index:
|  |  | 
配置
RPM安装的配置文件默认是/etc/filebeat/filebeat.yml,还有一个完整的示例配置文件/etc/filebeat/filebeat.reference.yml,显示了所有未弃用的选项。配置文件使用YAML语法。
指定运行module
Specify which modules to run
Filebeat module提供了一种快速处理常见日志格式的方法。它包含默认配置。
有几种不同方法来启用modules:
- 配置modules.d目录
- filebeat命令启动
- 配置filebeat.yml文件
|  |  | 
指定变量设置
Specify variable settings
每个模块和文件集合都有变量,你可以设置这些变量来更改木块的默认行为。
|  |  | 
高级设置
在幕后,每个木块都会启动filebeat input。高级用户可以添加或覆盖任何input设置。
|  |  | 
读取动态文件名
filbeat配置文件虽然可以将索引设置为: indexname-%{+yyyy.MM.dd} 的日志格式,但这个是发送给ES的,ES可以处理此配置,但filebeat是无法直接处理的,它会把它当做普通字符。
假如我要读取一个按日期取名的日志文件,如service_20180808.log,filebeat配置文件中是无法直接配置和处理。
后来想到,可以用sh写一个脚本来做此操作。
|  |  | 
input
DEPRECATED: prospectors are deprecated, Use inputs instead. Will be removed in version: 7.0.0
要手动配置filebeat(而不是使用modules),需要在filebeat.yml的filebeat.inputs部分指定输入列表(一个YAML 数据)。你可指定多个输入,并可多次指定相同的输入类型。
input types
- log
- stdin
- redis
- udp
- docker
- tcp
- syslog
input 通用选项:
|  |  | 
log
使用log input从日志文件中读取行。
|  |  | 
你可以将其它配置设置(fields, include_lines, exclude_lines, mutiline)应用于从日志文件获取的行。这里指定的选项将应用于input的所有文件。 将不同的配置应用于不同的文件,需要定义多个input sections:
|  |  | 
log input 配置项
|  |  | 
stdin
使用stdin input从标准输入读取事件。此输入不可与其它输入类型同时运行。
|  |  | 
stdin input 配置项:
|  |  | 
udp
使用 udp input通过udp读取事件。
|  |  | 
udp input 配置项:
|  |  | 
tcp
使用 tcp input 通过tcp读取事件。
|  |  | 
tcp input 配置项:
|  |  | 
docker
使用docker input从docker container读取日志。
|  |  | 
docker input 配置项:
|  |  | 
syslog
使用 syslog input通过tcp/udp/读取事件。
修改syslog配置:
|  |  | 
|  |  | 
它的配置项就是tcp/udp的配置项。
之后查看主机端口情况:
|  |  | 
output
你可以通过在filebet.yml配置文件的output部分设置选项来配置filebeat以特定方式输出。只能定义一个输出。
filebeat支持如下输出:
- Elasticsearch
- Logstash
- Kafka
- Redis
- File
- Console
elasticsearch
filebeat使用es http api将事务发送到es。
|  |  | 
配置项:
|  |  | 
logstash
kafka
redis
redis output将事件插入redis list或redis channel。
|  |  | 
配置项:
|  |  | 
file
file output将事务转储到文件中,每个事务都是json格式。
|  |  | 
配置项:
|  |  | 
console
console output将事件以json格式输出到标准输出。
|  |  | 
配置项:
|  |  | 
loadbalance
filebeat提供配置项,用于将事件发送到多个主机时微调负载均衡。 loadbalance对redis, logstash, es output可用。
|  |  | 
Kibana文档
Kibana是一个开源分析和可视化平台,旨在与Elasticsearch合作。你可使用Kibana来检索(search),查看(view)存储在Elasticsearch索引中的数据并与其进行交互(interact)。你可以很轻松地执行高级数据分析,并在各种图表、表格和地图中可视化你的数据。 Kibana可以很容易地理解大量的数据。基于浏览器的接口能够快速创建和分享动态仪表盘,实时显示Elasticsearch查询的变化。
入门
在开始前,请确保已安装Kibana并与Elasticsearch建立了连接。
载入示例数据
本节依赖如下示例数据:
- shakespeare.json: https://download.elastic.co/demos/kibana/gettingstarted/shakespeare_6.0.json
- accounts.zip: https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip
- uzip accounts.zip
 
- logs.jsonl.gz: https://download.elastic.co/demos/kibana/gettingstarted/logs.jsonl.gz
- gunzip logs.jsonl.gz
 
shakespeare按以下模式组织:
|  |  | 
accounts按以下模式组织:
|  |  | 
日志数据的模式有许多不同的字段,此例使用字段如下:
|  |  | 
载入数据前,需要为字段设置映射。 映射将索引中的文档分成逻辑组,并指定字段特性。如可搜索性、标记化、分解为单独的单词。
在Kibana界面中的Dev Tools中输入如下命令,为shakespeare数据设置映射。
|  |  | 
日志数据集logs.jsonl需要映射才能将日志中的经纬度标记为地理位置。
|  |  | 
|  |  | 
|  |  | 
accounts数据集不需要映射,这一点上使用Elasticsearch的bulk API去载入数据集:
|  |  | 
定义你的索引模式
加载到Elasticsearch的每组数据集都有一个索引模式(index pattern)。索引模式是一个带有可匹配多个索引的可使用通配符的字符串。
在前面,Shakespeare数据集有一个名为: shakespeare 的索引;Account数据集有一个名为:bank 的索引。
如,在常见的日志文件中,一个典型的索引包含YYYY.MM.DD日期格式,类似于logstash-2015.05.*。
进入Kibana界面,点击Management, Index Patterns, Create Index Pattern 来创建一个索引模式。
shakespeare和account数据集不包含 time-series data。确保为此数据集创建索引模式时,不包含基于时间的事件。logs数据集包含了时序数据,因此索引需要包含基于时间的事件。
- shakes*
- ba*
- logstash-2015*
定义索引模式时,与Elasticsearch匹配的索引必须存在。
在Kibana的DevTools中输入: GET _cat/indices 来查看索引。
数据发现
点击Kibana界面中的Discover以显示数据发现功能。
可视化
Visualize
Visualize允许你在Elasticsearch索引中创建数据的可视化。然后可以构建显示相关可视化的仪表盘。
Kibana的可视化基于Elasticsearch查询。通过使用一系列Elasticsearch聚合来提取和处理你的数据。你可以创建图标来显示你需要了解的趋势。
创建可视化
Elasticsearch文档
入门
Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎。它允许你快速、近乎实时地存储、搜索和分析大量数据。
Elasticsearch的几个例子:
- 使用Elasticsearch来存储产品目录和库存,并为其提供搜索和建议
- 收集日志或交易数据,并分析和挖掘数据以便于查找趋势、统计数据、汇总或异常信息
- 价格提醒平台,允许顾客制定规则,收到相应规则信息
- 分析智能需求,快速调查、分析、可视化并对大量数据提出特别的问题
基本概念
Near Realtime(NRT) Elasticsearch是一个近乎实时的搜索平台。这意味着从索引文档到可搜索之间存在轻微的延迟(通常为1s)
Cluster
集群是一个或多个节点(服务器)的集合,它们一起保存所有数据,并提供跨节点的联合索引和搜索功能。集群由默认名为elasticsearch的唯一名称标识,它很重要。
确保不要在不同的环境中重复使用相同集群名称,否则可能会导致节点加入错误的集群。
集群可以只有一个节点!你也可以拥有多个独立的集群,每个集群有自己唯一的集群名称。
Node
节点是属于集群一部分的单个服务器,存储数据并参与集群的索引和索引。
与集群一样,一个节点由一个名称来标识,启动时随机分配的UUID。你也可以自定义节点名。
配置节点通过集群名称加入特定的集群,默认加入elasticsearch集群。
在单集群中,你可以拥有任意数量的节点。
Index 索引是一些具有相似特征的文档集合。例如,客户数据的索引,产品目录的索引,订单数据的索引…… 索引由名称标识(必须全小写),文档执行索引、搜索、更新和删除操作时引用索引。 在一个单集群中,你可以定义任何你想要的索引。
Document 文档是可被索引的基本信息单位。例如,单个客户的文档,单个产品的文档,单个订单的文档… 文档以JSON格式表示。 一条记录就是一个文档。
Shards和Replicas 索引可潜在地存储大量数据,这些数据可能会超多单个节点的硬件限制。例如,占用1TB磁盘空间的十亿文档的单个索引可能不适合单个节点的磁盘,或者可能太慢而无法单独向单个节点提供搜索请求。 为了解决这个问题,Elasticsearch提供了将索引细分为称为分片的多个碎片上。当你创建索引时,你可以简单定义所需的分片数量。 每个分片本身都是一个功能齐全且独立的索引,可以在集群中的任何节点上进行托管。
分片重要的两个原因:
- 允许你水平分割/缩放内容量
- 允许分布和并行操作跨分片,从而提高性能和吞吐量(throughput)
在任何时候都可能出现的网络环境中,强烈建议使用故障切换机制,以防止分片/节点因任何原因而消失。为此,Elasticsearch允许你将索引分片制作为一个或多个称为副本分片的副本集。 副本集分片永远不会分配到与原始分片相同的节点上。
副本集重要的原因:
- 在分片/节点失效的情况下提供高可用性
- 因为搜索可以在所有副本上并行执行,它允许你扩展搜索量和吞吐量
总而言之,每个索引都可以分成多个分片,索引也可以被复制。一旦复制,每个索引将具有主分片和副本分片。在创建索引时,可为每个索引定义分片和副本数量。在索引创建之后,你可以动态更改副本的数量,但无法更改分片的数量。
默认情况下,Elasticsearch中的每个索引都分配了5个主分片和副本。
每个Elasticsearch分片都是一个Lucene索引。单个Lucene索引有最大文档数量限制。
探索你的集群
The REST API REST(Representational State Transfer)表现层状态转换,是一种万维网软件架构风格,目的是便于不同程序在网络中互相传递信息。REST通常使用HTTP, URI, XML和HTML这些协议和标准。
启动节点,下一步便是理解如何与它通信。幸运的是,Elasticsearch提供了一个非常全面(comprehensive)和强大的REST API,可以使用它与集群进行交互。
使用API可以完成如下几件事:
- 检查集群、节点和索引的健康、状态和统计信息
- 管理集群、节点、索引数据和元数据
- 执行CRUD(create, read, update, delete)
- 执行高级搜索操作(分页、排序、过滤、脚本、聚合…)
集群健康
基本健康检查,看看集群正在做什么。
使用_catAPI检查集群健康。可使用Kibana Console或curl等工具。
|  |  | 
集群健康:
- green: 万事OK(集群功能齐全)
- yellow: 所有数据可用,但一些副本尚未分配(集群功能齐全)
- red: 一些数据因某种原因不可用(集群部分功能)
集群名称:
- 集群名称被修改为docker-elk
列出集群中的节点:
|  |  | 
随机节点名: LGrAIE5
列出所有索引
|  |  | 
创建索引
创建一个名为customer的索引,然后列出索引
|  |  | 
你可能注意到了,索引的健康状态是yellow,表明有一些副本尚未分配。
这个索引发生这种情况的原因是Elasticsearch默认为这个索引创建了一个副本。由于此刻我们只有一个节点在运行,因此只有在其它几点加入集群后才能分配一个副本。一旦副本分配到另外的节点,健康状态会变成green。
索引和查询文档
现在让我们把一些东西放入customer索引中。讲一个简单的customer文档放入customer索引中,ID为1:
|  |  | 
删除索引
|  |  | 
修改数据
Elasticsearch几乎提示提供数据操作和搜索功能。从索引、更新、删除数据时可能会有1s延迟。数据在事物完成后立即可用。
索引/替换 文档
|  |  | 
未指定ID:
ID是可选的。如果未指定ID,Elasticsearch会生成随机ID。
注意,此时使用POST方法。
|  |  | 
更新文档
除了能够索引和替换文档,我们还可以更新文档。 Elasticsearch实际上并没有在原地就地更新,它是先删除旧文档,然后一次性更新索引新文档。
更新同样能够使用简单的脚本。
Elasticsearch提供了通过查询条件(类似于SQL-UPDATE-WHERE)更细多个文档的能力。
|  |  | 
删除文档
也可通过API匹配查询,删除所匹配的文档。
|  |  | 
批量处理
Elasticsearch同样提供了使用_bulkAPI批量执行上述任何操作的功能。这是一种高效的机制,尽可能快地完成多项操作。
Bulk API不会因其中一个操作失败而停止,它将继续处理后面的动作。当它完成是,它会返回每个操作的状态,以便你可以检查是否失败。
|  |  | 
探索你的数据
简单数据集 准备一个更加真实的数据集。如下生成的JSON文档,每个文档都有如下要点:
|  |  | 
载入这个数据集 下载Elasticsearch提供的accounts.json
|  |  | 
这样我们成功批量索引了1000个文档到bank索引。
Search API
现在让我们做一些简单的搜索(search)。有两种基本搜索方式:
- REST request URI
- REST request body
- 以可读的JSON格式定义你的搜索,推荐方式
 
搜索的REST API可从_search端点访问:
|  |  | 
- took: Elasticsearch执行搜索花费的事件(ms)
- timed_out: 查询超时与否
- _shards: 搜索了多少分片,包含成功和失败的次数
- hits: 搜索结果
- hits.total: 匹配搜索的文档数
- hits.hits: 搜索结果数组(默认前十个文档)
- hits.sort: 结果的排序键
- hits._score,- max_score: 忽略的字段
REST request body方法
|  |  | 
查询语法
Elasticsearch提供了可用于执行查询的JSON格式语言,这被称为 Query DSL
|  |  | 
处理query参数,我们还可以传递其它参数来搜索结果:
|  |  | 
执行搜索
搜索某些字段:
|  |  | 
匹配查询:
|  |  | 
布尔查询:
- must
- should
- must_not
|  |  | 
过滤
前面我们跳过了称为文档分数的_score字段。它是文档与搜索查询匹配度相度量的一个数值。数值越大,与文档越相关。
但查询并不总是需要产生分数,特别是当它们仅用于过滤时。Elasticsearch检测这些情况并自动优化查询执行,以便不计算无用的分数。
- range query: 通过一系列值来过滤文档
|  |  | 
除了前面这些查询类型,还有很多其它类型。由于只是入门章节,所以并不会涉及太多太难。
聚合
聚合(Aggregation)提供了从数据中分组和提取统计的功能。
考虑聚合最简单方法是将其大致等同于SQL GROUP BY和SQL聚合函数。
在Elasticsearch中,你可以执行返回匹配的搜索,同时还可以在一个响应中返回与匹配不同的聚合结果。你可以运行查询和多个聚合,并一次性获得多个操作的结果。
|  |  | 
|  |  | 
还有很多其它聚合方法,请参考https://www.elastic.co/guide/en/elasticsearch/reference/6.2/search-aggregations.html。
elasticsearch-py
Python可使用elasticsearch-py模块来操作Elasticsearch,具体文档请查看Python这篇文章的elasticsearch第三方模块。
Lucene查询
ElasticSearch提供的一些查询方式(query types)能够被Lucene的查询解析器(query parser)语法所支持。可直接在Kibana的发现面板上直接使用。
全文搜索
- string
- “string1 string2”
Kibana会匹配和展示对应的string。
键值对
- key:value: 全文搜索
- "key:value": 精确搜索
- _exists_:key: 返回结果中需要有key字段
- _missing__:key: 不能含有key字段
如:http.code:502,log-levle:warn
通配符
- ?
- *
这两者都不能用作第一个字符,如?.txt, *.txt
正则表达式
它也支持性能较差的正则表达式。
模糊搜索
- ~: 在一个单词后面加上- ~启用模糊搜索
- ~n: 设置编辑距离(整数),指定需要多少相似度,越大越接近原始值
- 在短语后面加~,可以搜索到被隔开或顺序不同的单词
first~也可以匹配到frist
"hello world"~5表示两者之间可以隔着5个单词
范围搜索
数值/时间/IP/字符串 类型的字段可以对某一范围进行查询
|  |  | 
优先级
使用^使一个词语比另一个搜索优先级更高,默认为1。可以为0~1之间的浮点数,来降低优先级
逻辑操作
- AND
- OR
- NOT
- +: 搜索结果中必须包含此项
- -: 不能包含此项
|  |  | 
转义字符
- \:使用转义字符来转移特殊字符
Metricbeat
Metricbeat是一个轻量级的托运器(lightweight shipper), 你可从安装该软件的操作系统和服务器上定期收集指标信息。它可将收集到的指标信息或统计信息发送到指定的输出(如elasticsearch/Logstash)。
具体使用方法也和Filebeat差不多!
Metricbeat通过从服务器上运行的系统和服务收集指标来帮助你监控服务器。如:
- Apache
- Docker
- Kafka
- Kubernets
- HAProxy
- MongoDB
- MySQL
- Nginx
- PHP-FPM
- PostgreSQL
- Redis
- RabbitMQ
- System
- Zookeeper
- …
Packetbeat
Packetbeat是一个实时网络数据包分析器,可与Elasticsearch一起提供应用程序监控和性能分析。
Packetbeat通过捕获应用服务器之间的网络流量,解码应用层协议(HTTP, MySQL, Redis…),将请求与响应关联起来,并记录每个事务感兴趣的字段。 Packetbeat可以帮助你轻松地注意到后端应用程序的问题,例如错误或性能问题,并且可以更快地排除故障并进行修复。 Packetbeat捕获服务器之间的流量,即时分析应用层协议,并将这些消息关联到事务中。并将这些事务插入到Elasticsearch或使用Redis和Logstash的队列中。
Packetbeat支持的协议如下:
- ICMP
- DNS
- HTTP
- AMQP
- Cassandra
- MySQL
- PostgreSQL
- Redis
- MongoDB
- Thrift-RPC
- TLS
Heartbeat
Heartbeat是一个轻量级守护进程,用以定期检查服务的状态并确定它们是否可用。与Metricbeat不同,Metricbeat只会告诉你服务器是down/up,而Heartbeat会告诉你服务是否可以访问(reached)。
当你需要验证是否满足服务级别协议的服务正常运行时间时,Heartbeat非常有用。当需要验证外部没有人能访问企私有服务器上的服务时,这也很有用。 你可以配置Heartbeat来ping指定主机名的所有DNS可解析的IP地址。这样,你可以检查所有负载均衡的服务,看他们是否可用。 配置Heartbeat时,你可以指定用于表示要检查的主机名的监视器(monitor)。每台监视器都根据你指定的时间表运行。
Heartbeat目前支持通过通过如下方式监控主机:
- 
ICMP 当你指向检查服务是否可用时,请使用icmp监视器。此功能需要root权限 
- 
TCP 支持SSL/TLS/proxy 你可以选择配置此监视器,通过发送 and/or 接收自定义有效内容来验证端点 
- 
HTTP 支持SSL/TLS/proxy 你可以选择配置此监视器,来验证该服务是否会返回预期的响应。如特定状态码,响应header或内容 
Auditbeat
Auditbeat是一个轻量化的托运器(shipper),在系统上安装它,以审核(audit)系统上用户和进程的活动。
例如,你可以使用Auditbeat从Linux Audit Framework收集和集中审计事件。你还可以使用它来检查关键文件的改动,并识别潜在的安全策略违规。
Topbeat
在v5.0, Topbeat被Metricbeat取代!
Topbeat的版本与其它Elastic Stack组件不同步,ES是v6.2.4, 而Topbeat是v1.3。所以需要额外安装repo.
Topbeat是一个轻量化的托运器(shipper),来定期读取系统和每个进程的CPU和内存统计信息,然后为Elasticsearch中的统计信息编制索引。
Topbeat通过收集如下指标来帮助你监控你的服务器:
ystem-wide statistics
- system load
- 1, 5, 15
 
- system wide CPU usage
- user, system, idle, IOWait
 
- system wide memory uusage
- total, used, free
 
- system wide swap usage
- total, used, free
 
Per-process statistics
- process name
- process parent pid
- process state
- process pid
- process CPU usage
- process Memory usage
File system statistics
- avaliable disks
- name, type, mounted
- total, used, free, available
APM
APM(Application Performance Monitoring)应用程序性能监控,自动收集应用程序内部的深入性能指标和错误。
它由三个组件组成:
- Agents
- Node.js
- Django
- Flask
- Ruby on Rails
- Rack
- JS
 
- Server
- UI
ElastAlert
ElastAlert是一个简单灵活的用于Elasticsearch中数据异常的告警框架。它使用Python2.x编写,不支持Python3。 ElastAlert功能与Watcher类似,只不过Watcher是Elastic Enterprise中才支持,而ElastAlert是一个开源软件。
Kibana非常适合可视化和查询数据,但它需要一个配套工具来对数据进行告警,出于这种需要,ElastAlert诞生了。 如果你几乎实时地将数据写入Elasticsearch,并希望在数据与某些模式匹配时收到告警,则ElastAlert就是适合你的工具。
综述
ElastAlert被设计为可靠、高度模块化、易于设置和配置。 它使用两种类型的组件与Elasticsearch进行结合:
- rule type
- alerts
定期检查Elasticsearch并将数据传递给规则类型,它确定了何时找到匹配项。当匹配发生时,它触发一个或多个报警,而这些报警便采取具体行动。
每组规则定义了一个查询、一个规则类型和一组警报。
ElasAlert几种通用规则类型:
- 
frequency Match where there are X events in Y time 
- 
spike Match when the rate of events increases or decreases 
- 
flatline Match when there are less than X events in Y time 
- 
blacklist/whitelist Match when a certain field matches a blacklist/whitelist 
- 
any Match on any event matching a given filter 
- 
change Match when a field has two different values within some time 
ElasAlert几种内建报警类型:
- Command
- JIRA
- OpsGenie
- SNS
- HipChat
- Slack
- Telegram
- Debug
- Stomp
你也可以导入和编写规则类型和报警类型。
除了这些基础用法外,还有许多其它功能:
- Alerts link to Kibana dashboards
- Aggregate counts for arbitrary fields
- Combine alerts into periodic reports
- Separate alerts by using a unique key field
- Intercept and enhance match data
可靠性
Reliability
ElasAlert有多种功能,可在restart或Elasticsearch不可用时使其更可靠:
- ElastAlert将其状态保存到Elasticsearch,并在启动时先恢复先前停止的状态
- 如果Elasticsearch没有响应,ElastAlert将等待它恢复,然后再继续
- 抛出错误的警报可能会在一段时间内自动重试
模块性
Modularity
ElastAlert有3个主要组件,可作为模块导入或自定义。
- 
rule types 规则类型负责处理从Elasticsearch返回的数据。 
- 
alerts 警报负责根据匹配采取行动。 
- 
enhancements 增强功能是一种拦截警报并以某种方式修改或增强警报的方法。 
配置
配置项
ElastAlert有一个全局配置文件config.yaml,它定义了几个操作方面:
|  |  | 
运行ElastAlert
运行:
|  |  | 
一些参数:
|  |  | 
首次运行ElastAlert
Running ElastAlert for the First Time
依赖
Requirements:
- es
- ISO8601 or Unxi timestamped data
- Python 2.7
- python2-pip python-dev libffi-dev libssl-dev
安装
|  |  | 
之后修改配置文件,我将ElastAlert目录移动到了/etc/下。
修改配置文件,并将ElastAlert的config.yaml.example配置保存为config.yaml。
设置es
Setting Up Elasticsearch
ElastAlert将有关其查询及报警的信息和元数据报错到Elasticsearch。这虽然不是必须的,但却强烈建议使用。
|  |  | 
创建一个规则
Creating a Rule
每个规则定义要执行的查询,触发匹配的参数以及每个匹配要触发的报警列表。
cat ./example_rules/example_frequency.yaml
|  |  | 
栗子 elastalert:
|  |  | 
测试规则
运行elasticalert-test-rule工具将测试你的配置文件是否成功加载并在过去24h内以调试模式运行:
|  |  | 
配置首选项将按如下方式加载:
- yaml文件中指定的配置
- 配置文件中指定的配置
- 默认配置
运行ElastAlert
有两种方式来调用ElastAlert:
- Supervisor
- Python
为了便于调试,下面将直接调用。
|  |  | 
使用Python3创建索引:
|  |  | 
规则类型和配置项
Rule Types and Configuration Options
规则配置项
Rule Configuration Cheat Sheet
选项太多,自己去看: https://elastalert.readthedocs.io/en/latest/ruletypes.html
通用配置项
每个在rules_folder下的.yaml文件默认都会被执行。
必须的配置
- es_host
- es_port
- index
- name
- type
- alert
可选配置 自己去看。
规则类型
Rule Types
在elastalert/ruletypes.py中定义的各种RuleType class构成了ElastAlert的主要逻辑。每个规则都在内存中保存一个实例,传递通过给定过滤器查询es返回的所有数据,并根据该数据生成匹配。
- any 任意规则都将匹配所有内容。查询返回的每个匹配都会生成一个警报。
- blacklist 黑名单规则根据黑名单检查某个字段,如果它存在于黑名单中,则匹配。
黑名单规则需要两个额外项:
compare_key——与黑名单进行比较的字段。如果为空,事件将被忽略。
blacklist——黑名单列表值或黑名单文件列表("!file ./blacklist.txt")
栗子:
|  |  | 
- whitelist 白名单规则根据白名单检查某个字段,如果列表中不包含此字段,则匹配。
白名单规则需要三个额外项:
compare_key——与白名单进行比较的字段
ignore_null——如果为true,则没有compare_key字段的事件将不匹配
whitelist——白名单列表值或白名单文件列表
栗子:
|  |  | 
- change 此规则将监视某个字段,如果此字段改变就匹配。
此规则需要三个额外项:
compare_key——监控要改变的字段名。可以是一个列表,如果任意字段发生标号,都将触发警报。
ignore_null——如果为true,则没有compare_key字段的事件将不计为已更改。
query_key——此规则基于每个查询键应用。
一个可选字段:
timeframe——改变之间的最大时间
- frequency 此规则匹配在给定时间范围内至少一定数量的事件。
此规则需要两个额外项:
num_events——将会触发报警的事件数
timeframe——上面事件的时间范围
- spike(突增)
当给定时间段内的事件量的spike_height次数大于或小于前一个时间段时,此规则匹配。它使用两个滑动窗口(引用和当前)来比较。
此规则需要三个额外项:
spike_height——上次时间段时间数与前时间段事件数的比率,将处罚告警
spike_type——up/down/both
timeframe:时间段
- flatline(脉波) 当一段时间内事件总数匹配给定阈值时,此规则匹配。
此规则需要两个额外项:
threshold——不触发警报的最小事件数
timeframe——时间段
- new term(术语) 当一个以前从未见过的新值出现在字段中时,此规则匹配。
此规则需要一个额外项:
fields——要监控的新术语的字段列表
cardinality(基数) 在一个时间范围内,当某个字段的唯一值的总数高于或低于阈值时,此规则匹配。
此规则需要:
timeframe——时间段
cardinality_field——计算基数的字段
最大或最小基数取一个
max_cardinality——数据的基数大于此报警
min_cardinality——数据基数小于此报警
metric aggregation
当计算窗口中的度量值高于或低于阈值时,此规则匹配。默认值为buffer_time。
此规则需要:
metric_agg_key——计算度量标准的字段
metric_agg_type——字段的类型
doc_type——指定要搜索的文档类型
最大和最小至少需要一个
max_threshold——计算的度量标准大与此,报警
min_threshold——计算的度量标准小于此,报警
percentage match
当计算窗口内匹配桶(bucket)中的文档百分比高于或低于阈值时,此规则匹配。默认情况下,计算窗口为buffer_time。
此规则需要:
match_bucket_filter—— ES filter DSL。为匹配桶定义了一个过滤器,它应用匹配查询过滤器并返回文档的子集。
doc_type——指定查询文档类型
最大和最小至少需要一个
min_percentage——匹配文档的百分比小于此,报警
max_percentage——匹配文档的百分比大于此,报警
Alerts
每条规则都可以附加任意数量的报警。Alerts是Alerter的子类,并从ElastAlert传递包含相关信息的字典或字典列表。与规则配置类似,它们在规则配置文件中配置。
|  |  | 
多个邮件:
|  |  | 
|  |  | 
Alert Subject
可通过添加包含自定义摘要的alert_subject来自定义电子邮件主题。
|  |  | 
|  |  | 
如果规则匹配索引中的多个对象,则仅使用第一个匹配来填充格式化程序的参数。
Alert Content
有几种方法可以格式化给种类型事件的正文:
|  |  | 
默认:
|  |  | 
command
命令报警允许你执行任意命令并从匹配中传递参数或stdin。该命令的参数可以使用Python格式的字符串语法来访问匹配的部分内容。报警器将打开一个子进程并可选地传递匹配,或在聚合报警的情况下,将其作为json阿虎组匹配到进程的stdin。
此报警需要一个选项:
command——要执行的参数列表或要执行的字符串。如果是列表格式,则第一个参数是要执行的程序名。如果传递了一个字符串,则该命令通过shell执行。
字符串可使用%或.format()进行格式化。这是Python的替换。
如果在命令中使用格式化数据,清泪建议使用args列表格式而不是shell字符串。
|  |  | 
此报警将会发送电子邮件。它默认连接到smtp_host服务器。
它需要一个选项:
email——接收报警的地址
Jira
Debug
调试报警器经使用Python logger的info level记录报警信息。它被记录到名为elastalert的Python logger对象中,可以使用getLogger命令轻松访问该对象。
HTTP POST
此报警类型使用HTTP POST将结果发送到JSON ENDPOINT。默认情况下,json会包含所有匹配,除非你指定http_post_payload。
需要:
http_post_url
|  |  | 
ElastAlert元数据索引
ElastAlert Metadata Index
ElastAlert使用Elasticsearch存储有关其状态的各种信息。这不仅允许对ElastAlert操作进行某种程度的审计和调试,而且还可以在ElastAlert关闭、重启或崩溃时避免数据丢失或重复报警。此集群和索引信息在全局配置文件中使用es_host, es_port, writeback_index定义。ElastAlert必须能够写入到此索引。elastalert-create-index将为你创建具有正确映射的索引,并可选择从现有的ElastAlert写回索引中复制文档。
ElastAlert将会在writeback index中创建三种不同类型的文档:
- elastalert_status
- elastalert
- elastalert_error
elastalert_status
elastalert_status是为给定规则执行查询的日志,包含:
- @timestamp
- rule_name
- starttime
- endtime
- hits: 查询的结果数
- matches: 匹配数
- time_taken: 查询所用秒数
elastalert
elastalert是有关触发的每个报警的日志信息,包含:
- @timestamp
- rule_name
- alert_info
- alert_sent
- alert_time
- match_body
- alert_exception
- aggregate_id
elastalert_error
当ElastAlert发生错误时,它将写入Elasticsearch和stderr。elastalert_error类型包含:
- @timestamp
- message
- traceback
- data
silence
silence是指由于重新设置或使用-silence而抑制给定规则的警报的记录。
- @timestamp
- rule_name
- until:警报在此开始发送的时间戳
- exponent:除非设置了- exponential_realert,否则它将为0
添加一个新规则类型
Adding a New Rule Type
添加一个新报警器
Adding a New Alerter
为规则编写过滤器
Writing Filters For Rules
增强功能
Enhancements
增强功能是一些模板,可让你在发送警报之前修改匹配项。
在容器内运行
构建基础镜像:
|  |  | 
以后只需将配置文件导入基础镜像就好:
|  |  | 
kibana-plugin
elastalert kibana-plugin是一个第三方插件。 ElastAlert Kibana plugin repository: https://github.com/bitsensor/elastalert-kibana-plugin
注意,安装的时候要注意kibana的版本。具体信息见README。