#vector
Vector 是一个高性能的可观测性数据管道,使组织能够控制其可观测性数据。收集、转换所有日志、指标和跟踪并将其路由到您现在想要的任何供应商以及您明天可能需要的任何其他供应商。Vector 可以大幅降低成本、丰富新颖的数据和数据安全性,让您在需要的地方,而不是在供应商最方便的地方。开源,速度比所有替代方案快 10 倍。
Tip
总体来说,我们需要从kafka消费日志,然后用VRL语言处理日志,最后写到clickhouse
#目录示例
config
applog_test.toml
applog.toml
ngxlog.toml
vector.toml
data
setup.sh
#setup.sh
setup.sh
#!/bin/bash
docker kill vector
docker rm vector
docker run -d --name vector \
--restart=always \
--net host \
-v /etc/timezone:/etc/timezone:ro \
-v /etc/localtime:/etc/localtime:ro \
-v $(pwd)/config/:/etc/vector/conf.d/ \
-v $(pwd)/data/:/var/lib/vector \
timberio/vector:0.46.1-alpine --config-dir /etc/vector/conf.d/
# reload
# docker kill --signal=HUP vector#配置文件
Tip
vector.toml为必须文件,然后再添加上另外的配置
#基础配置vector.toml
vector.toml
data_dir = "/var/lib/vector"
timezone = "Asia/Shanghai"
[api]
enabled=true
address="0.0.0.0:8686"#其他配置
ngxlog.toml
applog.toml
mysql_slow_log.toml
pg_slow_log.toml
[sources.ngxlog_source]
type = "kafka"
bootstrap_servers = "10.0.18.2:9092"
group_id = "vector-clickhouse-hdy-nmg-server-001"
topics = [ "test_nginxlog" ]
offset_key = "kafka_offset"
decoding.codec = "json"
auto_offset_reset = "earliest"
[transforms.ngxlog_trans]
type="remap"
inputs=["ngxlog_source"]
source='''
# 解析 nginx 日志并添加 "nginx_" 前缀
.message_parsed = object(parse_json(.message) ?? {}) ?? {}
.message_parsed = map_keys(.message_parsed, recursive: false) -> |key| { "nginx_" + key }
. = merge(., .message_parsed)
# 解析 nginx_query_string 字段并添加 "query_params_" 前缀
.query_params = parse_query_string(.nginx_query_string) ?? {}
. = merge(., map_keys(.query_params) -> |key| { "query_params_" + key })
# 解析 nginx_request_body 字段并添加 "nginx_request_body_" 前缀
.nginx_request_body = object(parse_json(.nginx_request_body) ?? {}) ?? {}
. = merge(., map_keys(.nginx_request_body) -> |key| { "nginx_request_body_" + key })
# 解析 nginx_resp_body 字段并添加 "nginx_resp_body_" 前缀
.nginx_resp_body = object(parse_json(.nginx_resp_body) ?? {}) ?? {}
. = merge(., map_keys(.nginx_resp_body) -> |key| { "nginx_resp_body_" + key })
# 解析 nginx_resp_body 的 data 字段并添加 "nginx_resp_body_data_" 前缀
.nginx_resp_body_data = object(.nginx_resp_body.data) ?? {}
. = merge(., map_keys(.nginx_resp_body_data) -> |key| { "nginx_resp_body_data_" + key })
# 解析 nginx_http_Authorization 字段并解密添加 "auth_message_" 前缀
.auth_message_encode = split(.nginx_http_Authorization, ".")[1] ?? ""
.auth_message_encode_length = length(.auth_message_encode) ?? 0
.auth_message_mod = mod(.auth_message_encode_length, 4)
if (.auth_message_mod == 1) {
.auth_message_encode = .auth_message_encode + "==="
} else if (.auth_message_mod == 2) {
.auth_message_encode = .auth_message_encode + "=="
} else if (.auth_message_mod == 3) {
.auth_message_encode = .auth_message_encode + "="
} else if (.auth_message_mod == 0) {
.auth_message_encode = .auth_message_encode
}
.auth_message_decode,.err.auth_message_decode = decode_base64(.auth_message_encode)
.auth_message = object(parse_json(.auth_message_decode) ?? {}) ?? {}
. = merge(., map_keys(.auth_message) -> |key| { "auth_message_" + key })
# 时间字段解析
.create_time = parse_timestamp(.nginx_time_local,format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
# 主机名
.hostname = .saltid
# mcc 解析
.mcc = chunks(.query_params_imsi, 3)[0] ?? null
'''
[sinks.ngxlog_clickhouse_sink]
type = "clickhouse"
inputs = [ "ngxlog_trans" ]
# 数据库配置
endpoint = "http://10.0.18.2:8123"
database = "log"
table = "ngxlog_test"
healthcheck.enabled = true
auth.strategy = "basic"
auth.user = "root"
auth.password = "password"
# 批量入库
batch.max_bytes = 10000000
batch.max_events = 1000
batch.timeout_secs = 3
# 缓存
# buffer.type = "disk"
# buffer.max_size = 1024000000
# 内存缓存
buffer.type = "memory"
buffer.max_events = 1000
# 缓存满了之后,block/drop_newest
buffer.when_full = "block"
# 压缩格式
compression = "gzip"
# 默认false,时间格式自动解析 RFC3339/ISO 8601
date_time_best_effort = true
# 自动丢弃多余字段
skip_unknown_fields = true
[sources.source_applog_test]
type = "kafka"
bootstrap_servers = "10.0.18.2:9092"
group_id = "vector-loki-hdy-nmg-server-001"
topics = [ "log" ]
offset_key = "kafka_offset"
decoding.codec = "json"
auto_offset_reset = "earliest"
[sinks.sink_loki_applog]
type = "loki"
inputs = [ "source_applog_test" ]
endpoint = "http://10.0.18.2:3100"
healthcheck.enabled = true
encoding.codec = "raw_message"
batch.max_bytes = 1000000
batch.max_events = 100000
batch.timeout_secs = 1
buffer.type = "memory"
buffer.max_events = 1000
buffer.when_full = "block"
compression = "gzip"
labels."object" = "{{ .object }}"
labels."appName" = "{{ .appName }}"
labels."containerName" = "{{ .containerName }}"
labels."hostname" = "{{ .hostname }}"
labels."logver" = "{{ .logver }}"[sources.mysql_slow_log_test]
type = "kafka"
bootstrap_servers = "192.168.1.1:9092"
group_id = "dev3"
topics = [ "mysql-slow-log" ]
offset_key = "kafka_offset"
decoding.codec = "json"
[transforms.mysql_slow_log_test_remap_format]
type="remap"
inputs=["mysql_slow_log_test"]
source='''
# 截取时间字段
. |= parse_regex!(.message, r'^# Time:\s*(?P<mysql_local_time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)')
# 截取连接字段
. |= parse_regex!(.message, r'# User@Host:\s*(?P<connect>.*?)\s+Id:')
# 截取id字段
. |= parse_regex!(.message, r'Id:\s*(?P<str_id>\d+)')
# 截取时间和慢查询sql字段
. |= parse_regex!(replace!(.message, "\n", ""), r'# Query_time:\s*(?P<str_query_time>[0-9.]+)\s+Lock_time:\s*(?P<str_lock_time>[0-9.]+)\s+Rows_sent:\s*(?P<str_rows_sent>[0-9]+)\s+Rows_examined:\s*(?P<str_rows_examined>[0-9]+)(?P<slow_sql>.*)')
# 字段转成int和float
.id = parse_int!(.str_id)
.query_time = parse_float!(.str_query_time)
.lock_time = parse_float!(.str_lock_time)
.rows_sent = parse_float!(.str_rows_sent)
.rows_examined = parse_float!(.str_rows_examined)
# 时间字段解析
.create_time = parse_timestamp(.mysql_local_time,format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
# 主机名
.hostname = .saltid
# 数据库id
.database_id = .databaseid
'''
[sinks.mysql_slow_log_clickhouse]
acknowledgements.enabled = true
type = "clickhouse"
database = "test"
inputs = [ "mysql_slow_log_test_remap_format" ]
endpoint = "http://192.168.1.1:8123"
auth.strategy = "basic"
auth.user = "root"
auth.password = "password"
table = "mysql_slow_log_test"
# 自动丢弃多余字段
skip_unknown_fields = true
# 时间自动解析
date_time_best_effort = true
[sources.pg_slow_log_test]
type = "kafka"
bootstrap_servers = "192.168.1.1:9092"
group_id = "dev3"
topics = [ "pg-slow-log" ]
offset_key = "kafka_offset"
decoding.codec = "json"
[transforms.pg_slow_log_test_remap_format]
type="remap"
inputs=["pg_slow_log_test"]
source='''
# 解析时间字段
. |= parse_regex!(.message, r'^(?P<pg_local_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) CST')
# 解析执行耗时字段
. |= parse_regex!(.message, r'duration: (?P<duration>[0-9.]+) ms')
# 解析SQL语句字段
. |= parse_regex!(.message, r'statement: (?P<slow_sql>.+)$')
# 转换时间字段格式
.create_time = parse_timestamp(.pg_local_time, format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
# 转换耗时字段为浮点数
.duration = parse_float!(.duration)
# 数据库id
.database_id = .databaseid
# 主机名
.hostname = .saltid
'''
[sinks.pg_slow_log_clickhouse]
acknowledgements.enabled = true
type = "clickhouse"
database = "test"
inputs = [ "pg_slow_log_test_remap_format" ]
endpoint = "http://192.168.1.1:8123"
auth.strategy = "basic"
auth.user = "root"
auth.password = "password"
table = "pg_slow_log_test"
# 自动丢弃多余字段
skip_unknown_fields = true
# 时间自动解析
date_time_best_effort = true
