Fork me on GitHub

使用Kafka Manager管理Kafka集群

公司使用到了Kafka,想借助一套可视化的工具更好地管理Kafka。笔者将目光瞄准了Kafka Manager。

安装Kafka

一、下载Kafka

前往http://kafka.apache.org/downloads ,根据自己的需要,下载合适版本的Kafka,笔者使用的版本是kafka_2.12-0.10.2.1。

二、启动Kafka

  • 启动Zookeeper。Kafka依赖Zookeeper,因此,启动Kafka前得先启动Zookeeper。在Kafka的根路径执行如下命令即可启动Zookeeper

    1
    bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 启动Kafka。启动完Zookeeper后,即可启动Kafka。执行如下命令即可。

    1
    bin/kafka-server-start.sh config/server.properties &

三、测试Kafka

  • 创建一个Topic

    1
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    如创建成功,将会显示类似如下的内容:

    1
    Created topic "test".
  • 查看创建的Topic

    1
    bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 创建一个消息生产者

    1
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    此时,终端会滞留,我们可以随便输入一些内容,该内容将会生产到Kafka中。

  • 创建一个消息消费者。新建一个终端,输入如下命令:

    1
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    如消费者可收到在生产者中输入的内容,则说明Kafka搭建成功。

安装Kafka Manager

一、Kafka Manager简介

Kafka Manager是一款非常强大的Kafka管理工具。功能如下:

  • Manage multiple clusters
  • Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • Run preferred replica election
  • Generate partition assignments with option to select brokers to use
  • Run reassignment of partition (based on generated assignments)
  • Create a topic with optional topic configs (0.8.1.1 has different configs than 0.8.2+)
  • Delete topic (only supported on 0.8.2+ and remember set delete.topic.enable=true in broker config)
  • Topic list now indicates topics marked for deletion (only supported on 0.8.2+)
  • Batch generate partition assignments for multiple topics with option to select brokers to use
  • Batch run reassignment of partition for multiple topics
  • Add partitions to existing topic
  • Update config for existing topic
  • Optionally enable JMX polling for broker level and topic level metrics.
  • Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

二、下载Kafka Manager

前往https://github.com/yahoo/kafka-manager/releases ,根据自己的需要,下载合适版本的Kafka Manager源码,笔者使用的版本是1.3.3.13。

三、构建

由于Kafka Manager只提供了源码,并没有提供二进制包,因此我们需要对下载下来的源码进行构建,构建依赖工具sbt。

  • 安装sbt。在macOS中,执行如下命令即可安装sbt。对于其他操作系统,可参考官方文档进行安装:http://www.scala-sbt.org/0.13/docs/Setup.html

    1
    brew install sbt
  • 解压源码

  • 在源码根目录下执行如下命令构建项目:

    1
    sbt clean dist

    由于伟大的墙,尽管我为sbt配置了Aliyun 等若干镜像仓库,依然很慢。经过长达100年的等待,本机依然无法构建成功。笔者实在受不了了,干脆在Linode 上购置了一台云服务器。在服务器上安装Java、sbt,并执行sbt clean dist 进行构建,只花了5分钟不到,构建就完成了,不得不鄙视国内的【局域网】。

  • 构建成功后,在target/universal 目录下找到类似kafka-manager-1.3.3.13.zip 的文件,这个就是构件好的Kafka Manager啦。

四、配置

  • 解压kafka-manager-1.3.3.13.zip

  • 修改conf/application.conf ,将配置属性kafka-manager.zkhosts 改为您的zk集群地址。

  • 在Kafka Manager根目录下执行如下命令,即可启动Kafka Manager。

    1
    2
    3
    nohup bin/kafka-manager \
    -Dconfig.file=conf/application.conf \
    -Dhttp.port=9000 &

    如提示

    1
    2
    akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]]
    at akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:144)

    则在conf/application.conf 中找到类似如下是内容:

    1
    2
    3
    4
    akka {
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = "INFO"
    }

    改为:

    1
    2
    3
    4
    5
    akka {
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = "INFO"
    logger-startup-timeout = 30s
    }

    这样就不会报超时了。

  • 使用http://localhost:9000 即可访问Kafka Manager啦。

配置Kafka Manager

初次访问时,需配置想要监控的Kafka集群。

  • 点击http://localhost:9000 导航栏上的”Add Cluster”按钮,
  • 配置Kafka集群名称、Kafka集群的ZK列表等参数,如下图:
  • 这样即可监控你的Kafka集群啦。

参考文档

评论系统未开启,无法评论!