#kafka
kafka的3.0以下的版本,都是强依赖zookeeper, 在低版本kafka中, 我们采用了把zookeeper打入镜像中运行
config-9092
run.sh
setup.sh
#启动
低版本
低版本配置文件
高版本
#!/bin/bash
port=9092
container_name="kafka-${port}"
docker stop ${container_name}
docker rm ${container_name}
docker run -d \
--hostname $(hostname) \
--add-host $(hostname):10.0.18.2 \
--name ${container_name} \
--restart=always \
-v /etc/timezone:/etc/timezone:ro \
-v /etc/localtime:/etc/localtime:ro \
-v $(pwd)/config-${port}/run.sh:/run.sh \
-v $(pwd)/data-standalone:/data \
--net host \
-e NODE_ID="1001" \
-e LISTENERS="PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094" \
-e ADVERTISED_LISTENERS="PRIVATE://10.0.18.2:9092" \
registry.cn-hangzhou.aliyuncs.com/buyfakett/kafka-standalone:2.13-2.8.0
# 配置通过 run.sh 重写#!/bin/bash
cd $KAFKA_HOME/
export PRIVATE_IP=${PRIVATE_IP:-`hostname`}
export PUBLIC_IP=${PUBLIC_IP:-`hostname`}
export NODE_ID=${NODE_ID:-"1"}
export PROCESS_ROLES=${PROCESS_ROLES:-"broker,controller"}
export CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS:="$NODE_ID@`hostname`:9093"}
export LISTENERS=${LISTENERS:-"PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094"}
export INTER_BROKER_LISTENER_NAME=${INTER_BROKER_LISTENER_NAME:-PRIVATE}
export ADVERTISED_LISTENERS=${ADVERTISED_LISTENERS:-"PRIVATE://$PRIVATE_IP:9092,PUBLIC://$PUBLIC_IP:9094"}
export CONTROLLER_LISTENER_NAMES=${CONTROLLER_LISTENER_NAMES:-"CONTROLLER"}
export LISTENER_SECURITY_PROTOCOL_MAP=${LISTENER_SECURITY_PROTOCOL_MAP:-"CONTROLLER:PLAINTEXT,PRIVATE:PLAINTEXT,PUBLIC:PLAINTEXT"}
export LOG_DIRS=${LOG_DIRS:-"/data"}
cat << EOF > ./config/kraft/server.properties
process.roles=${PROCESS_ROLES}
node.id=${NODE_ID}
controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
listeners=${LISTENERS}
inter.broker.listener.name=${INTER_BROKER_LISTENER_NAME}
advertised.listeners=${ADVERTISED_LISTENERS}
controller.listener.names=${CONTROLLER_LISTENER_NAMES}
listener.security.protocol.map=${LISTENER_SECURITY_PROTOCOL_MAP}
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 数据存放目录
log.dirs=${LOG_DIRS}
# 默认分区个数
num.partitions=1
num.recovery.threads.per.data.dir=1
# 默认复制实例个数
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 数据保留168小时
log.retention.hours=168
# 数据保留10G
log.retention.bytes=10737418240
# 300秒检查并删除数据
log.retention.check.interval.ms=300000
# 数据块大小1G
log.segment.bytes=1073741824
# 数据块切割30秒一次
log.segment.delete.delay.ms=30000
# 允许删除数据
log.cleaner.enable=true
log.cleanup.policy=delete
delete.topic.enable=true
EOF
echo ${LOG_DIRS}
if [ "$(ls -A ${LOG_DIRS})"x == ""x ];then
./bin/kafka-storage.sh random-uuid >> uuid.txt
uuid=$(cat uuid.txt)
./bin/kafka-storage.sh format -t ${uuid} -c ./config/kraft/server.properties
fi
./bin/kafka-server-start.sh ./config/kraft/server.properties#!/bin/bash
PUBLIC_IP=""
PRIVATE_IP=""
NODE_ID=1
kafka_version=3.8.0
container_name="kafka"
id=${id:-"$container_name"}
# 初始化配置文件
mkdir -p config/ && cat << EOF > config/.env
KAFKA_NODE_ID=$NODE_ID
# 功能
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_LISTENERS=PRIVATE://:9092,CONTROLLER://:9093,PUBLIC://:9094
KAFKA_ADVERTISED_LISTENERS=PRIVATE://$PRIVATE_IP:9092,PUBLIC://$PUBLIC_IP:9094
KAFKA_INTER_BROKER_LISTENER_NAME=PRIVATE
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PRIVATE:PLAINTEXT,PUBLIC:PLAINTEXT
# 集群配置
KAFKA_CONTROLLER_QUORUM_VOTERS=$NODE_ID@$(hostname):9093
# 默认 3 个副本
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
# 默认 3,事务主题的复制因子(设置得更高以确保可用性)。除非集群大小满足此复制因子要求,否则内部主题创建将失败。
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
# 默认 2,必须确认对事务主题的写入才能被视为成功的最小副本数
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
# 默认 3000(3s),组协调器在执行第一次重新平衡之前等待更多消费者加入新组的时间。延迟越长,重新平衡次数可能越少,但处理开始的时间就越长
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
# 默认 1,topic 分区数量
KAFKA_NUM_PARTITIONS=1
# 数据存放目录
KAFKA_LOG_DIRS=/data
# 默认 168(7 天),数据保留时间
KAFKA_LOG_RETENTION_HOURS=168
# 默认 -1,每个 topic 保留大小 53687091200 字节(50 G)
KAFKA_LOG_RETENTION_BYTES=53687091200
EOF
# 初始化数据目录
if [ ! -d data ];then
mkdir -p data && chown -R 1000:1000 data/
fi
# 重启服务
docker stop $container_name
docker rm $container_name
docker run -d \
--name $container_name \
--net host \
--restart=always \
--hostname $(hostname) \
--add-host $(hostname):127.0.0.1 \
-v /etc/timezone:/etc/timezone:ro \
-v /etc/localtime:/etc/localtime:ro \
--user 1000 \
--env-file $(pwd)/config/.env \
-v $(pwd)/data:/data/ \
apache/kafka:$kafka_version
# kafka 查看后台
if ! docker ps -a -f name=redpandadata-console | grep -q redpandadata-console;then
docker rm -f redpandadata-console
docker run --network=host -d \
--restart=always \
--name redpandadata-console \
-e KAFKA_BROKERS=${PRIVATE_IP}:9092 \
-e SERVER_LISTENPORT=9096 \
redpandadata/console:v2.3.3
fi
Tip
如果要限制内存,可以在docker中加一行-e KAFKA_HEAP_OPTS="-Xmx256m -Xms256m" \
#集群
三个节点分开部署在三台服务器上,启动脚本中修改NODE_ID和KAFKA_CONTROLLER_QUORUM_VOTERS
KAFKA_CONTROLLER_QUORUM_VOTERS=1@192.168.1.1,2@192.168.1.2,3@192.168.1.3
Tip
在三个节点的时候,删除一个节点可以正常使用(显示不健康),删除两个节点的时候不可用
在增加节点的时候只需要在新节点的配置文件中增加自身和别的节点,即可增加节点
#可视化
redpandadata-console
kafka-map
#!/bin/bash
##################
# 用于kafka查看
# 部分功能收费如登录功能
# https://docs.redpanda.com/current/get-started/
####################################################
port=9096
docker rm -f redpandadata-console-${port}
docker run --network=host -d \
--name redpandadata-console-${port} \
-e KAFKA_BROKERS=10.0.18.2:9092 \
-e SERVER_LISTENPORT=${port} \
docker.redpanda.com/redpandadata/console:v2.3.3#!/bin/bash
port=8087
docker rm -f kafka-map-${port}
docker run -d \
-p ${port}:8080 \
-v ./data:/usr/local/kafka-map/data \
-e DEFAULT_USERNAME= \
-e DEFAULT_PASSWORD= \
--name kafka-map-${port} \
--restart always \
dushixiang/kafka-map:v1.3.3