logo
tt's Note
  • 运维
    • 数据库
    • linux
    • vpn
    • 日志
    • 中间件
    • 服务
    • 监控
    • shell
    • windows
    • 语言
    • 云服务
    • 其他
  • 开发
    • 工具
  • 软件
    • 浏览器
    • 多端
    • win
    • mac
    • 网站
  • 项目
    • 效率工具
    • 兴趣
  • 脚本
    • jenkins
    中间件概览
    haproxy
    kafka
    eclipse-mosquitto
    nginx
    rabbitmq
    redis-shake
    redis
    vector
    zookeeper
    上一页haproxy下一页eclipse-mosquitto

    #kafka

    kafka的3.0以下的版本,都是强依赖zookeeper, 在低版本kafka中, 我们采用了把zookeeper打入镜像中运行

    .
    ├── config-9092
    │   ├── run.sh
    └── setup.sh

    #启动

    低版本
    低版本配置文件
    高版本
    #!/bin/bash
    
    port=9092
    container_name="kafka-${port}"
    
    docker stop ${container_name}
    docker rm ${container_name}
    docker run -d \
    --hostname $(hostname) \
    --add-host $(hostname):10.0.18.2 \
    --name ${container_name} \
    --restart=always \
    -v /etc/timezone:/etc/timezone:ro \
    -v /etc/localtime:/etc/localtime:ro \
    -v $(pwd)/config-${port}/run.sh:/run.sh \
    -v $(pwd)/data-standalone:/data \
    --net host \
    -e NODE_ID="1001" \
    -e LISTENERS="PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094" \
    -e ADVERTISED_LISTENERS="PRIVATE://10.0.18.2:9092" \
    registry.cn-hangzhou.aliyuncs.com/buyfakett/kafka-standalone:2.13-2.8.0
    
    # 配置通过 run.sh 重写
    #!/bin/bash
    
    cd $KAFKA_HOME/
    
    export PRIVATE_IP=${PRIVATE_IP:-`hostname`}
    export PUBLIC_IP=${PUBLIC_IP:-`hostname`}
    
    export NODE_ID=${NODE_ID:-"1"}
    export PROCESS_ROLES=${PROCESS_ROLES:-"broker,controller"}
    export CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS:="$NODE_ID@`hostname`:9093"}
    export LISTENERS=${LISTENERS:-"PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094"}
    export INTER_BROKER_LISTENER_NAME=${INTER_BROKER_LISTENER_NAME:-PRIVATE}
    export ADVERTISED_LISTENERS=${ADVERTISED_LISTENERS:-"PRIVATE://$PRIVATE_IP:9092,PUBLIC://$PUBLIC_IP:9094"}
    export CONTROLLER_LISTENER_NAMES=${CONTROLLER_LISTENER_NAMES:-"CONTROLLER"}
    export LISTENER_SECURITY_PROTOCOL_MAP=${LISTENER_SECURITY_PROTOCOL_MAP:-"CONTROLLER:PLAINTEXT,PRIVATE:PLAINTEXT,PUBLIC:PLAINTEXT"}
    export LOG_DIRS=${LOG_DIRS:-"/data"}
    
    cat << EOF > ./config/kraft/server.properties
    process.roles=${PROCESS_ROLES}
    node.id=${NODE_ID}
    controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
    listeners=${LISTENERS}
    inter.broker.listener.name=${INTER_BROKER_LISTENER_NAME}
    advertised.listeners=${ADVERTISED_LISTENERS}
    controller.listener.names=${CONTROLLER_LISTENER_NAMES}
    listener.security.protocol.map=${LISTENER_SECURITY_PROTOCOL_MAP}
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    # 数据存放目录
    log.dirs=${LOG_DIRS}
    # 默认分区个数
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    # 默认复制实例个数
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    # 数据保留168小时
    log.retention.hours=168
    # 数据保留10G
    log.retention.bytes=10737418240
    # 300秒检查并删除数据
    log.retention.check.interval.ms=300000
    # 数据块大小1G
    log.segment.bytes=1073741824
    # 数据块切割30秒一次
    log.segment.delete.delay.ms=30000
    # 允许删除数据
    log.cleaner.enable=true
    log.cleanup.policy=delete
    delete.topic.enable=true
    EOF
    
    echo ${LOG_DIRS}
    
    if [ "$(ls -A ${LOG_DIRS})"x == ""x ];then
            ./bin/kafka-storage.sh random-uuid >> uuid.txt
            uuid=$(cat uuid.txt)
            ./bin/kafka-storage.sh format -t ${uuid} -c ./config/kraft/server.properties
    fi
    
    ./bin/kafka-server-start.sh ./config/kraft/server.properties
    #!/bin/bash
    
    PUBLIC_IP=""
    PRIVATE_IP=""
    NODE_ID=1
    
    kafka_version=3.8.0
    container_name="kafka"
    id=${id:-"$container_name"}
    
    # 初始化配置文件
    mkdir -p config/ && cat << EOF > config/.env
    KAFKA_NODE_ID=$NODE_ID
    # 功能
    KAFKA_PROCESS_ROLES=broker,controller
    KAFKA_LISTENERS=PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094
    KAFKA_ADVERTISED_LISTENERS=PRIVATE://$PRIVATE_IP:9092,PUBLIC://$PUBLIC_IP:9094
    KAFKA_INTER_BROKER_LISTENER_NAME=PRIVATE
    KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PRIVATE:PLAINTEXT,PUBLIC:PLAINTEXT
    # 集群配置
    KAFKA_CONTROLLER_QUORUM_VOTERS=$NODE_ID@$(hostname):9093
    # 默认 3 个副本
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    # 默认 3,事务主题的复制因子(设置得更高以确保可用性)。除非集群大小满足此复制因子要求,否则内部主题创建将失败。
    KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
    # 默认 2,必须确认对事务主题的写入才能被视为成功的最小副本数
    KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
    # 默认 3000(3s),组协调器在执行第一次重新平衡之前等待更多消费者加入新组的时间。延迟越长,重新平衡次数可能越少,但处理开始的时间就越长
    KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
    # 默认 1,topic 分区数量
    KAFKA_NUM_PARTITIONS=1
    # 数据存放目录
    KAFKA_LOG_DIRS=/data
    # 默认 168(7 天),数据保留时间
    KAFKA_LOG_RETENTION_HOURS=168
    # 默认 -1,每个 topic 保留大小 53687091200 字节(50 G)
    KAFKA_LOG_RETENTION_BYTES=53687091200
    EOF
    
    # 初始化数据目录
    if [ ! -d data ];then
        mkdir -p data && chown -R 1000:1000 data/
    fi
    
    # 重启服务
    docker stop $container_name
    docker rm $container_name
    docker run -d  \
    --name $container_name \
    --net host \
    --restart=always \
    --hostname $(hostname) \
    --add-host $(hostname):127.0.0.1 \
    -v /etc/timezone:/etc/timezone:ro \
    -v /etc/localtime:/etc/localtime:ro \
    --user 1000 \
    --env-file $(pwd)/config/.env \
    -v $(pwd)/data:/data/ \
    apache/kafka:$kafka_version
    
    # kafka 查看后台
    if ! docker ps -a -f name=redpandadata-console | grep -q redpandadata-console;then
        docker rm -f redpandadata-console
        docker run --network=host -d \
        --restart=always \
        --name redpandadata-console \
        -e KAFKA_BROKERS=${PRIVATE_IP}:9092 \
        -e SERVER_LISTENPORT=9096 \
        redpandadata/console:v2.3.3
    fi
    
    
    Tip

    如果要限制内存,可以在docker中加一行-e KAFKA_HEAP_OPTS="-Xmx256m -Xms256m" \

    #集群

    三个节点分开部署在三台服务器上,启动脚本中修改NODE_ID和KAFKA_CONTROLLER_QUORUM_VOTERS

    KAFKA_CONTROLLER_QUORUM_VOTERS=1@192.168.1.1,2@192.168.1.2,3@192.168.1.3

    Tip

    在三个节点的时候,删除一个节点可以正常使用(显示不健康),删除两个节点的时候不可用

    在增加节点的时候只需要在新节点的配置文件中增加自身和别的节点,即可增加节点

    #可视化

    redpandadata-console
    kafka-map
    #!/bin/bash
    
    ##################
    # 用于kafka查看
    # 部分功能收费如登录功能
    # https://docs.redpanda.com/current/get-started/
    ####################################################
    
    port=9096
    
    docker rm -f redpandadata-console-${port}
    docker run --network=host -d \
    --name redpandadata-console-${port} \
    -e KAFKA_BROKERS=10.0.18.2:9092 \
    -e SERVER_LISTENPORT=${port} \
    docker.redpanda.com/redpandadata/console:v2.3.3
    #!/bin/bash
    
    port=8087
    
    docker rm -f kafka-map-${port}
    
    docker run -d \
        -p ${port}:8080 \
        -v ./data:/usr/local/kafka-map/data \
        -e DEFAULT_USERNAME= \
        -e DEFAULT_PASSWORD= \
        --name kafka-map-${port} \
        --restart always \
        dushixiang/kafka-map:v1.3.3