SpringBoot集成kafka全面实战

SpringBoot集成kafka全面实战
一、前戏
 
1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),
 
advertised.listeners=PLAINTEXT://112.126.74.249:9092
 
2、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试,
 
[root@iZ2zegzlkedbo3e64vkbefZ ——]#  cd /usr/local/kafka-cluster/kafka1/bin/
 
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
 
Created topic topic1.
 
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
 
Created topic topic2.
 
当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send("topic1", normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下,
 
@Configuration
 
public class KafkaInitialConfiguration {
 
    // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
 
    @Bean
 
    public NewTopic initialTopic() {
 
        return new NewTopic("testtopic",8, (short) 2 );
 
    }
 
 
     // 如果要修改分区数,只需修改配置值重启项目即可
 
    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
 
    @Bean
 
    public NewTopic updateTopic() {
 
        return new NewTopic("testtopic",10, (short) 2 );
 
    }
 
}
 
3、新建SpringBoot项目
 
① 引入pom依赖
 
<dependency>
 
    <groupId>org.springframework.kafka</groupId>
 
    <artifactId>spring-kafka</artifactId>
 
</dependency>
 
② application.propertise配置(本文用到的配置项这里全列了出来)
 
###########【Kafka集群】###########
 
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
 
###########【初始化生产者配置】###########
 
# 重试次数
 
spring.kafka.producer.retries=0
 
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
 
spring.kafka.producer.acks=1
 
# 批量大小
 
spring.kafka.producer.batch-size=16384
 
# 提交延时
 
spring.kafka.producer.properties.linger.ms=0
 
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
 
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
 
 
# 生产端缓冲区大小
 
spring.kafka.producer.buffer-memory = 33554432
 
# Kafka提供的序列化和反序列化类
 
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
# 自定义分区器
 
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
 
 
###########【初始化消费者配置】###########
 
# 默认的消费组ID
 
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
 
# 是否自动提交offset
 
spring.kafka.consumer.enable-auto-commit=true
 
# 提交offset延时(接收到消息后多久提交offset)
 
spring.kafka.consumer.auto.commit.interval.ms=1000
 
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
 
# earliest:重置为分区中最小的offset;
 
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
 
# none:只要有一个分区不存在已提交的offset,就抛出异常;
 
spring.kafka.consumer.auto-offset-reset=latest
 
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
 
spring.kafka.consumer.properties.session.timeout.ms=120000
 
# 消费请求超时时间
 
spring.kafka.consumer.properties.request.timeout.ms=180000
 
# Kafka提供的序列化和反序列化类
 
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
# 消费端监听的topic不存在时,项目启动会报错(关掉)
 
spring.kafka.listener.missing-topics-fatal=false
 
# 设置批量消费
 
# spring.kafka.listener.type=batch
 
# 批量消费每次最多消费多少条消息
 
# spring.kafka.consumer.max-poll-records=50
 
二、Hello Kafka
 
1、简单生产者
 
@RestController
 
public class KafkaProducer {
 
    @Autowired
 
    private KafkaTemplate<String, Object> kafkaTemplate;
 
 
    // 发送消息
 
    @GetMapping("/kafka/normal/{message}")
 
    public void sendMessage1(@PathVariable("message") String normalMessage) {
 
        kafkaTemplate.send("topic1", normalMessage);
 
    }
 
}
 
 2、简单消费
 
@Component
 
public class KafkaConsumer {
 
    // 消费监听
 
    @KafkaListener(topics = {"topic1"})
 
    public void onMessage1(ConsumerRecord<?, ?> record){
 
        // 消费的哪个topic、partition的消息,打印出消息内容
 
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
 
    }
 
}
 
上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息,

推荐阅读