vector

官方文档

VRL语言在线测试

Vector 是一个高性能的可观测性数据管道,使组织能够控制其可观测性数据。收集、转换所有日志、指标和跟踪并将其路由到您现在想要的任何供应商以及您明天可能需要的任何其他供应商。Vector 可以大幅降低成本、丰富新颖的数据和数据安全性,让您在需要的地方,而不是在供应商最方便的地方。开源,速度比所有替代方案快 10 倍。

TIP

总体来说,我们需要从kafka消费日志,然后用VRL语言处理日志,最后写到clickhouse

目录示例

.
├── config
│   ├── applog_test.toml
│   ├── applog.toml
│   ├── ngxlog.toml
│   └── vector.toml
├── data
└── setup.sh

setup.sh

#!/bin/bash

docker kill vector
docker rm vector

docker run -d --name vector \
--restart=always \
--net host \
-v /etc/timezone:/etc/timezone:ro \
-v /etc/localtime:/etc/localtime:ro \
-v $(pwd)/config/:/etc/vector/conf.d/ \
-v $(pwd)/data/:/var/lib/vector \
timberio/vector:0.46.1-alpine --config-dir /etc/vector/conf.d/

# reload
# docker kill --signal=HUP vector

配置文件

TIP

vector.toml为必须文件,然后再添加上另外的配置

基础配置vector.toml

data_dir = "/var/lib/vector"
timezone = "Asia/Shanghai"

[api]
enabled=true
address="0.0.0.0:8686"

其他配置

ngxlog.toml
applog.toml
mysql_slow_log.toml
pg_slow_log.toml
[sources.ngxlog_source]
type = "kafka"
bootstrap_servers = "10.0.18.2:9092"
group_id = "vector-clickhouse-hdy-nmg-server-001"
topics = [ "test_nginxlog" ]
offset_key = "kafka_offset"
decoding.codec = "json"
auto_offset_reset = "earliest"

[transforms.ngxlog_trans]
type="remap"
inputs=["ngxlog_source"]
source='''
# 解析 nginx 日志并添加 "nginx_" 前缀
.message_parsed = object(parse_json(.message) ?? {}) ?? {}
.message_parsed = map_keys(.message_parsed, recursive: false) -> |key| { "nginx_" + key }
. = merge(., .message_parsed)

# 解析 nginx_query_string 字段并添加 "query_params_" 前缀
.query_params = parse_query_string(.nginx_query_string) ?? {}
. = merge(., map_keys(.query_params) -> |key| { "query_params_" + key })

# 解析 nginx_request_body 字段并添加 "nginx_request_body_" 前缀
.nginx_request_body = object(parse_json(.nginx_request_body) ?? {}) ?? {}
. = merge(., map_keys(.nginx_request_body) -> |key| { "nginx_request_body_" + key })

# 解析 nginx_resp_body 字段并添加 "nginx_resp_body_" 前缀
.nginx_resp_body = object(parse_json(.nginx_resp_body) ?? {}) ?? {}
. = merge(., map_keys(.nginx_resp_body) -> |key| { "nginx_resp_body_" + key })

# 解析 nginx_resp_body 的 data 字段并添加 "nginx_resp_body_data_" 前缀
.nginx_resp_body_data = object(.nginx_resp_body.data) ?? {}
. = merge(., map_keys(.nginx_resp_body_data) -> |key| { "nginx_resp_body_data_" + key })

# 解析 nginx_http_Authorization 字段并解密添加 "auth_message_" 前缀
.auth_message_encode = split(.nginx_http_Authorization, ".")[1] ?? ""
.auth_message_encode_length = length(.auth_message_encode) ?? 0
.auth_message_mod = mod(.auth_message_encode_length, 4)
if (.auth_message_mod == 1) {
        .auth_message_encode = .auth_message_encode + "==="
} else if (.auth_message_mod == 2) {
        .auth_message_encode = .auth_message_encode + "=="
} else if (.auth_message_mod == 3) {
        .auth_message_encode = .auth_message_encode + "="
} else if (.auth_message_mod == 0) {
        .auth_message_encode = .auth_message_encode
}
.auth_message_decode,.err.auth_message_decode = decode_base64(.auth_message_encode)
.auth_message = object(parse_json(.auth_message_decode) ?? {}) ?? {}
. = merge(., map_keys(.auth_message) -> |key| { "auth_message_" + key })

# 时间字段解析
.create_time = parse_timestamp(.nginx_time_local,format: "%d/%b/%Y:%H:%M:%S %z") ?? now()

# 主机名
.hostname = .saltid

# mcc 解析
.mcc = chunks(.query_params_imsi, 3)[0] ?? null
'''

[sinks.ngxlog_clickhouse_sink]
type = "clickhouse"
inputs = [ "ngxlog_trans" ]
# 数据库配置
endpoint = "http://10.0.18.2:8123"
database = "log"
table = "ngxlog_test"
healthcheck.enabled = true
auth.strategy = "basic"
auth.user = "root"
auth.password = "password"
# 批量入库
batch.max_bytes = 10000000
batch.max_events = 1000
batch.timeout_secs = 3
# 缓存
# buffer.type = "disk"
# buffer.max_size = 1024000000
# 内存缓存
buffer.type = "memory"
buffer.max_events = 1000
# 缓存满了之后,block/drop_newest
buffer.when_full = "block"
# 压缩格式
compression = "gzip"
# 默认false,时间格式自动解析 RFC3339/ISO 8601
date_time_best_effort = true
# 自动丢弃多余字段
skip_unknown_fields = true