Kafka 有三层结构:Kafka 有多个主题,每个主题有多个分区,每个分区又有多条消息。
分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题。一个分片的不同副本不能放到同一个 broker 上。
当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片。
分区对于 Kafka 集群的好处是:实现负载均衡,高存储能力、高伸缩性。分区对于消费者来说,可以提高并发度,提高效率。
副本:副本备份机制解决了数据存储的高可用问题。
当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。
多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。
Kafka 副本的作用:在 kafka 中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由 leader 副本进行处理的。Follower 副本仅有一个功能,那就是从 leader 副本拉取消息,尽量让自己跟 leader 副本的内容一致。
Follower 副本不对外提供服务,这样可以防止出现一些类似于数据库事务的幻读脏读的问题。为了提高一些性能而导致出现数据不一致问题,显然是不值得的。
从 Kafka 的大体角度上可以分为数据生产者,Kafka 集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。
消息生产者保证数据不丢失 – 消息确认机制(ACK 机制),参考值有三个:0,1,-1。
Kafka 消费消息的模型:
消费者丢失数据:由于 Kafka consumer 默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此时,broker 并不知道。
解决方案:
enable.auto.commit=false 关闭自动提交位移。
在消息被完整处理之后再手动提交位移。
Kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。
Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是
Kafka 容器数据目录:/kafka/kafka-logs-kafka1
Kafka 作为消息中间件,只负责消息的临时存储,并不是永久存储,所以需要删除过期的数据。如果将所有的数据都存储在一个文件中,要删除过期的数据的时候,就变得非常的麻烦。如果将其进行切分成多个文件后,如果要删除过期数据,就可以根据文件的日期属性删除即可。默认只保留 168 小时,即七天之内的数据。因此 Kafka 的数据存储方案是多文件存储。
Log 分段:
每个分片目录中,kafka 通过分段的方式将数据分为多个 LogSegment。
一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件 (00000000000000000000.index)。
其中日志文件是用来记录消息的,索引文件是用来保存消息的索引。
每个 LogSegment 的大小可以在 server.properties 中 log.segment.bytes=107370(设置分段大小,默认是 1 GB)选项进行设置。
当 log 文件等于 1 G 时,新的会写入到下一个 segment 中。
timeindex 文件,是 kafka 的具体时间日志。
存储的结构:
一个主题 –> 多个分区 –> 多个日志段(多个文件)。
第一步 – 查询 segment file:
segment file 命名规则跟 o?set 有关,根据 segment file 可以知道它的起始偏移量,因为 Segment file 的命名规则是上一个 segment 文件最后一条消息的 offset 值。所以只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。
比如,第一个 segment file 是 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0。第二个是 00000000000000091932.index – 代表消息量起始偏移量为 91933=91932 + 1。那么 offset=5000 时应该定位 00000000000000000000.index。
第二步 – 通过 segment file 查找 message:
通过第一步定位到 segment ?le,当 offset=5000 时,依次定位到 00000000000000000000.index 的元数据物理位置和 00000000000000000000.log 的物理偏移地址,然后再通过 00000000000000000000.log 顺序查找直到 offset=5000 为止。
Kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 DefaultPartitioner.class 类。
这个类中就定义数据分发的策略:
默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner
1) 如果是用户指定了 partition,生产就不会调用 DefaultPartitioner.partition() 方法。
数据分发策略的时候,可以指定数据发往哪个partition。
当 ProducerRecord 的构造参数中有 partition 的时候,就可以发送到对应 partition 上。
2) DefaultPartitioner 源码
如果指定 key,是取决于 key 的 hash 值。
如果不指定 key,轮询分发。
同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0 分区中的数据不能被 Consumer Group A 中 C1 与 C2 同时消费。
消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerCon?g.GROUP_ID_CONFIG, “groupName”);
如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。
如果有 3 个 Partition,p0/p1/p2,同一个消费组有 3 个消费者,c0/c1/c2,则为一一对应关系。如果有 3 个 Partition,p0/p1/p2,同一个消费组有 2 个消费者,c0/c1,则其中一个消费者消费 2 个分区的数据,另一个消费者消费一个分区的数据。如果有 2 个 Partition, p0/p1,同一个消费组有 3 个消费者,c0/c1/c3,则其中有一个消费者空闲,另外 2 个消费者消费分别各自消费一个分区的数据。
server.properties
1、broker.id=0
Kafka 集群是由多个节点组成的,每个节点称为一个 broker,中文翻译是代理。每个 broker 都有一个不同的 brokerId,由 broker.id 指定,是一个不小于 0 的整数,各 brokerId 必须不同,但不必连续。如果想扩展 kafka 集群,只需引入新节点,分配一个不同的 broker.id 即可。
启动 kafka 集群时,每一个 broker 都会实例化并启动一个 kafkaController,并将该 broker 的 brokerId 注册到 zooKeeper 的相应节点中。集群各 broker 会根据选举机制选出其中一个 broker 作为 leader,即 leader kafkaController。Leader kafkaController 负责主题的创建与删除、分区和副本的管理等。当 leader kafkaController 宕机后,其他 broker 会再次选举出新的 leader kafkaController。
2、log.dir=/export/data/kafka/
Broker 持久化消息到哪里,数据目录。
3、log.retention.hours=168
Log 文件最小存活时间,默认是 168h,即 7 天。相同作用的还有 log.retention.minutes、log.retention.ms。
数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据,也就是消费端能够多久去消费数据。
log.retention.bytes 和 log.retention.hours 任意一个达到要求,都会执行删除,会被 topic 创建时的指定参数覆盖。
4、log.retention.check.interval.ms
多长时间检查一次是否有 log 文件要删除。默认是 300000ms,即 5 分钟。
5、log.retention.bytes
限制单个分区的 log 文件的最大值,超过这个值,将删除旧的 log,以满足 log 文件不超过这个值。默认是 -1,即不限制。
6、log.roll.hours
多少时间会生成一个新的 log segment,默认是 168h,即 7 天。相同作用的还有 log.roll.ms、segment.ms。
7、log.segment.bytes
Log segment 多大之后会生成一个新的 log segment,默认是 1073741824,即 1G。
8、log.?ush.interval.messages
指定 broker 每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是 9223372036854775807。
Kafka 官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的 topic 设置不同的值,即在创建 topic 的时候设置值。
9、log.flush.interval.ms
指定 broker 每隔多少毫秒就把消息从内存刷到硬盘。默认值同 log.?ush.interval.messages 一样, 9223372036854775807。
同 log.?ush.interval.messages 一样,kafka 官方不建议使用这个配置。
10、delete.topic.enable=true
是否允许从物理上删除 topic。
在生产环境下,在 Kafka 集群中,消息数据变化是被关注的问题,当业务前提不复杂时,可以使用 Kafka 命令提供带有 Zookeeper 客户端工具的工具,可以轻松完成工作。随着业务的复杂性,增加 Group 和 Topic,那么使用 Kafka 提供命令工具,已经感到无能为力,那么 Kafka 监控系统目前尤为重要,需要观察消费者应用的细节。
为了简化开发者和服务工程师维护 Kafka 集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些 topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。
Kafka-eagle 在 Docker 中没有镜像。
环境要求:需要安装 jdk,启动 zk 以及 kafka 的服务。
Windows host 文件:
搭建步骤:
1) 下载 kafka-eagle 的源码包:
Kafka-eagle 官网 –
可以从官网上面直接下载最新的安装包即可 kafka-eagle-bin-1.3.2.tar.gz 这个版本即可。
代码托管地址 –
2) 上传安装包并解压:
这里选择将 kafak-eagle 安装在 node3 服务器。
如果要解压的是 zip 格式,需要先安装命令支持。
yum install unzip
unzip xxxx.zip
3) 准备数据库:
Kafka-eagle 需要使用一个数据库来保存一些元数据信息,这里直接使用 MySQL 数据库来保存即可,在本地执行以下命令创建一个 MySQL 数据库即可。
可以使用 SQLite 或者 MySQL 数据库。
4) 修改 kafka-eagle 配置文件:
默认情况下 MySQL 只允许本机连接到 MySQL 实例中,所以如果要远程访问,必须开放权限。
5) 配置环境变量:
Kafka-eagle 必须配置环境变量,node03 服务器执行以下命令来进行配置环境变量。
6) 启动 kakfa-eagle: