logo
tt's Note
  • 运维
    • 数据库
    • linux
    • vpn
    • 日志
    • 中间件
    • 服务
    • 监控
    • shell
    • windows
    • 语言
    • 云服务
    • 其他
  • 开发
    • 工具
  • 软件
    • 浏览器
    • 多端
    • win
    • mac
    • 网站
  • 项目
    • 效率工具
    • 兴趣
  • 脚本
    • jenkins
    中间件概览
    haproxy
    kafka
    eclipse-mosquitto
    nginx
    rabbitmq
    redis-shake
    redis
    vector
    zookeeper
    上一页redis下一页zookeeper

    #vector

    官方文档

    VRL语言在线测试

    Vector 是一个高性能的可观测性数据管道,使组织能够控制其可观测性数据。收集、转换所有日志、指标和跟踪并将其路由到您现在想要的任何供应商以及您明天可能需要的任何其他供应商。Vector 可以大幅降低成本、丰富新颖的数据和数据安全性,让您在需要的地方,而不是在供应商最方便的地方。开源,速度比所有替代方案快 10 倍。

    Tip

    总体来说,我们需要从kafka消费日志,然后用VRL语言处理日志,最后写到clickhouse

    #目录示例

    .
    ├── config
    │   ├── applog_test.toml
    │   ├── applog.toml
    │   ├── ngxlog.toml
    │   └── vector.toml
    ├── data
    └── setup.sh

    #setup.sh

    #!/bin/bash
    
    docker kill vector
    docker rm vector
    
    docker run -d --name vector \
    --restart=always \
    --net host \
    -v /etc/timezone:/etc/timezone:ro \
    -v /etc/localtime:/etc/localtime:ro \
    -v $(pwd)/config/:/etc/vector/conf.d/ \
    -v $(pwd)/data/:/var/lib/vector \
    timberio/vector:0.46.1-alpine --config-dir /etc/vector/conf.d/
    
    # reload
    # docker kill --signal=HUP vector

    #配置文件

    Tip

    vector.toml为必须文件,然后再添加上另外的配置

    #基础配置vector.toml

    data_dir = "/var/lib/vector"
    timezone = "Asia/Shanghai"
    
    [api]
    enabled=true
    address="0.0.0.0:8686"

    #其他配置

    ngxlog.toml
    applog.toml
    mysql_slow_log.toml
    pg_slow_log.toml
    [sources.ngxlog_source]
    type = "kafka"
    bootstrap_servers = "10.0.18.2:9092"
    group_id = "vector-clickhouse-hdy-nmg-server-001"
    topics = [ "test_nginxlog" ]
    offset_key = "kafka_offset"
    decoding.codec = "json"
    auto_offset_reset = "earliest"
    
    [transforms.ngxlog_trans]
    type="remap"
    inputs=["ngxlog_source"]
    source='''
    # 解析 nginx 日志并添加 "nginx_" 前缀
    .message_parsed = object(parse_json(.message) ?? {}) ?? {}
    .message_parsed = map_keys(.message_parsed, recursive: false) -> |key| { "nginx_" + key }
    . = merge(., .message_parsed)
    
    # 解析 nginx_query_string 字段并添加 "query_params_" 前缀
    .query_params = parse_query_string(.nginx_query_string) ?? {}
    . = merge(., map_keys(.query_params) -> |key| { "query_params_" + key })
    
    # 解析 nginx_request_body 字段并添加 "nginx_request_body_" 前缀
    .nginx_request_body = object(parse_json(.nginx_request_body) ?? {}) ?? {}
    . = merge(., map_keys(.nginx_request_body) -> |key| { "nginx_request_body_" + key })
    
    # 解析 nginx_resp_body 字段并添加 "nginx_resp_body_" 前缀
    .nginx_resp_body = object(parse_json(.nginx_resp_body) ?? {}) ?? {}
    . = merge(., map_keys(.nginx_resp_body) -> |key| { "nginx_resp_body_" + key })
    
    # 解析 nginx_resp_body 的 data 字段并添加 "nginx_resp_body_data_" 前缀
    .nginx_resp_body_data = object(.nginx_resp_body.data) ?? {}
    . = merge(., map_keys(.nginx_resp_body_data) -> |key| { "nginx_resp_body_data_" + key })
    
    # 解析 nginx_http_Authorization 字段并解密添加 "auth_message_" 前缀
    .auth_message_encode = split(.nginx_http_Authorization, ".")[1] ?? ""
    .auth_message_encode_length = length(.auth_message_encode) ?? 0
    .auth_message_mod = mod(.auth_message_encode_length, 4)
    if (.auth_message_mod == 1) {
            .auth_message_encode = .auth_message_encode + "==="
    } else if (.auth_message_mod == 2) {
            .auth_message_encode = .auth_message_encode + "=="
    } else if (.auth_message_mod == 3) {
            .auth_message_encode = .auth_message_encode + "="
    } else if (.auth_message_mod == 0) {
            .auth_message_encode = .auth_message_encode
    }
    .auth_message_decode,.err.auth_message_decode = decode_base64(.auth_message_encode)
    .auth_message = object(parse_json(.auth_message_decode) ?? {}) ?? {}
    . = merge(., map_keys(.auth_message) -> |key| { "auth_message_" + key })
    
    # 时间字段解析
    .create_time = parse_timestamp(.nginx_time_local,format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
    
    # 主机名
    .hostname = .saltid
    
    # mcc 解析
    .mcc = chunks(.query_params_imsi, 3)[0] ?? null
    '''
    
    [sinks.ngxlog_clickhouse_sink]
    type = "clickhouse"
    inputs = [ "ngxlog_trans" ]
    # 数据库配置
    endpoint = "http://10.0.18.2:8123"
    database = "log"
    table = "ngxlog_test"
    healthcheck.enabled = true
    auth.strategy = "basic"
    auth.user = "root"
    auth.password = "password"
    # 批量入库
    batch.max_bytes = 10000000
    batch.max_events = 1000
    batch.timeout_secs = 3
    # 缓存
    # buffer.type = "disk"
    # buffer.max_size = 1024000000
    # 内存缓存
    buffer.type = "memory"
    buffer.max_events = 1000
    # 缓存满了之后,block/drop_newest
    buffer.when_full = "block"
    # 压缩格式
    compression = "gzip"
    # 默认false,时间格式自动解析 RFC3339/ISO 8601
    date_time_best_effort = true
    # 自动丢弃多余字段
    skip_unknown_fields = true
    
    [sources.source_applog_test]
    type = "kafka"
    bootstrap_servers = "10.0.18.2:9092"
    group_id = "vector-loki-hdy-nmg-server-001"
    topics = [ "log" ]
    offset_key = "kafka_offset"
    decoding.codec = "json"
    auto_offset_reset = "earliest"
    
    [sinks.sink_loki_applog]
    type = "loki"
    inputs = [ "source_applog_test" ]
    endpoint = "http://10.0.18.2:3100"
    healthcheck.enabled = true
    encoding.codec = "raw_message"
    batch.max_bytes = 1000000
    batch.max_events = 100000
    batch.timeout_secs = 1
    buffer.type = "memory"
    buffer.max_events = 1000
    buffer.when_full = "block"
    compression = "gzip"
    labels."object" = "{{ .object }}"
    labels."appName" = "{{ .appName }}"
    labels."containerName" = "{{ .containerName }}"
    labels."hostname" = "{{ .hostname }}"
    labels."logver" = "{{ .logver }}"
    [sources.mysql_slow_log_test]
    type = "kafka"
    bootstrap_servers = "192.168.1.1:9092"
    group_id = "dev3"
    topics = [ "mysql-slow-log" ]
    offset_key = "kafka_offset"
    decoding.codec = "json"
    
    [transforms.mysql_slow_log_test_remap_format]
    type="remap"
    inputs=["mysql_slow_log_test"]
    source='''
    # 截取时间字段
    . |= parse_regex!(.message, r'^# Time:\s*(?P<mysql_local_time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)')
    # 截取连接字段
    . |= parse_regex!(.message, r'# User@Host:\s*(?P<connect>.*?)\s+Id:')
    # 截取id字段
    . |= parse_regex!(.message, r'Id:\s*(?P<str_id>\d+)')
    # 截取时间和慢查询sql字段
    . |= parse_regex!(replace!(.message, "\n", ""), r'# Query_time:\s*(?P<str_query_time>[0-9.]+)\s+Lock_time:\s*(?P<str_lock_time>[0-9.]+)\s+Rows_sent:\s*(?P<str_rows_sent>[0-9]+)\s+Rows_examined:\s*(?P<str_rows_examined>[0-9]+)(?P<slow_sql>.*)')
    
    # 字段转成int和float
    .id = parse_int!(.str_id)
    .query_time = parse_float!(.str_query_time)
    .lock_time = parse_float!(.str_lock_time)
    .rows_sent = parse_float!(.str_rows_sent)
    .rows_examined = parse_float!(.str_rows_examined)
    
    # 时间字段解析
    .create_time = parse_timestamp(.mysql_local_time,format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
    
    # 主机名
    .hostname = .saltid
    
    # 数据库id
    .database_id = .databaseid
    '''
    
    [sinks.mysql_slow_log_clickhouse]
    acknowledgements.enabled = true
    type = "clickhouse"
    database = "test"
    inputs = [ "mysql_slow_log_test_remap_format" ]
    endpoint = "http://192.168.1.1:8123"
    auth.strategy = "basic"
    auth.user = "root"
    auth.password = "password"
    table = "mysql_slow_log_test"
    # 自动丢弃多余字段
    skip_unknown_fields = true
    # 时间自动解析
    date_time_best_effort = true
    
    [sources.pg_slow_log_test]
    type = "kafka"
    bootstrap_servers = "192.168.1.1:9092"
    group_id = "dev3"
    topics = [ "pg-slow-log" ]
    offset_key = "kafka_offset"
    decoding.codec = "json"
    
    [transforms.pg_slow_log_test_remap_format]
    type="remap"
    inputs=["pg_slow_log_test"]
    source='''
    # 解析时间字段
    . |= parse_regex!(.message, r'^(?P<pg_local_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) CST')
    # 解析执行耗时字段
    . |= parse_regex!(.message, r'duration: (?P<duration>[0-9.]+) ms')
    # 解析SQL语句字段
    . |= parse_regex!(.message, r'statement: (?P<slow_sql>.+)$')
    
    # 转换时间字段格式
    .create_time = parse_timestamp(.pg_local_time, format: "%d/%b/%Y:%H:%M:%S %z") ?? now()
    
    # 转换耗时字段为浮点数
    .duration = parse_float!(.duration)
    
    # 数据库id
    .database_id = .databaseid
    
    # 主机名
    .hostname = .saltid
    '''
    
    [sinks.pg_slow_log_clickhouse]
    acknowledgements.enabled = true
    type = "clickhouse"
    database = "test"
    inputs = [ "pg_slow_log_test_remap_format" ]
    endpoint = "http://192.168.1.1:8123"
    auth.strategy = "basic"
    auth.user = "root"
    auth.password = "password"
    table = "pg_slow_log_test"
    # 自动丢弃多余字段
    skip_unknown_fields = true
    # 时间自动解析
    date_time_best_effort = true