一、kafaka介绍

Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语:

1>Kafka将消息以topic为单位进行归纳;
2>将向Kafka topic发布消息的程序成为producers;
3>将预订topics并消费消息的程序成为consumer;
4>Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker。

producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:

创建一个topic时,可以指定partitions(分区)数目,partitions数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到producers发送的消息之后,会根据均衡策略将消息存储到不同的partitions中:

在每个partitions中,消息以顺序存储,最晚接收的的消息会最后被消费。

producers在向kafka集群发送消息的时候,可以通过指定partitions来发送到指定的partitions中。也可以通过指定均衡策略来将消息发送到不同的partitions中。如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的partitions中。

在consumer消费消息时,kafka使用offset来记录当前消费的位置:

在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。

对于一个group而言,consumer的数量不应该多于partitions的数量,因为在一个group中,每个partitions至多只能绑定到一个consumer上,即一个consumer可以消费多个partitions,一个partitions只能给一个consumer消费。因此,若一个group中的consumer数量大于partitions数量的话,多余的consumer将不会收到任何消息。

二、Kafka存储机制

Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic。topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

2.1 topic构成

在Kafka中,一个topic可以分为多个partition,一个partition分为多个segment,每个segment对应两个文件:.index和.log文件:

2.2 消息存储原理

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。如前面所说,每个segment对应.index文件和.log文件。这些文件位于一个以特定规则命名的文件夹下,该文件夹的命名 规则为:topic 名称 + 分区序号。

例如,我们在上一节创建了一个名称为test的topic,该topic只有一个分区,所以在Kafka日会志目录下会有个名为test-0的文件夹:

这些文件的含义如下:

类别作用
.index偏移量索引文件,存储数据对应的偏移量
.timestamp时间戳索引文件
.log日志文件,存储生产者生产的数据
.snaphot快照文件
Leader-epoch-checkpoint保存了每一任leader开始写入消息时的offset,会定时更新。 follower被选为leader时会根据这个确定哪些消息可用

index和log文件以当前segment的第一条消息的偏移量offset命名。偏移量offset是一个64位的长整形数,固定是20位数字,长度未达到,用0进行填补,索引文件和日志文件都由该作为文件名命名规则。所以从上图可以看出,我们的偏移量是从0开始的,.index和.log文件名称都为00000000000000000000。

上节中,我们通过生产者发送了wno和test等数据,所以我们可以查看下.log文件下是否有这些数据:

内容存在一些”乱码“,因为数据是经过序列化压缩的。

那么数据文件.log大小有限制吗,能保存多久时间?这些我们都可以通过Kafka目录下conf/server.properties配置文件修改:

1
2
3
4
5
# log文件存储时间,单位为小时,这里设置为1周
log.retention.hours=168

# log文件大小的最大值,这里为1g,超过这个值,则会创建新的segment(也就是新的.index和.log文件)
log.segment.bytes=1073741824

比如,当生产者生产数据量较多,一个segment存储不下触发分片时,在日志topic目录下你会看到类似如下所示的文件:

下图展示了Kafka查找数据的过程:

比如现在要查找偏移量offset为3的消息,根据.index文件命名我们可以知道,offset为3的索引应该从00000000000000000000.index里查找。根据上图所示,其对应的索引地址为756~911,所以Kafka将读取00000000000000000000.log 756~911区间的数据。

三、Spring Boot整合Kafaka

《Kafka运行环境安装》 简单介绍了Kafka的使用,下面我们开始在Spring Boot里使用Kafka。

3.1 引入依赖

1
2
3
4
5
6
7
8
9
        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

3.2 生产者与消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 8089

spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-consumer
auto-offset-reset: latest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.3 发布消息

配置好生产者,我们就可以开始发布消息了。

新建一个SendMessageController:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@Slf4j
public class SendMessageController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("send/{message}")
public void send(@PathVariable String message) {
this.kafkaTemplate.send("test", message);
}

}

我们注入了kafkaTemplate对象,key-value都为String类型,并通过它的send方法来发送消息。其中test为Topic的名称,上面我们已经使用命令创建过这个Topic了。

send方法是一个异步方法,我们可以通过回调的方式来确定消息是否发送成功,我们改造SendMessageController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@Slf4j
public class SendMessageController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("send/{message}")
public void send(@PathVariable String message) {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
}
});
}

}

消息发送成功后,会回调onSuccess方法,发送失败后回调onFailure方法。

3.4 消息消费

配置好消费者,我们就可以开始消费消息了,新建KafkaMessageListener:

1
2
3
4
5
6
7
8
@Component
@Slf4j
public class KafkaMessageListener {
@KafkaListener(topics = "test", groupId = "test-consumer")
public void listen(ConsumerRecord<?, ?> record) {
log.info("接收消息: topic is {}, offset is {}, partition is {}, value is {} ", record.topic(), record.offset(), record.partition(), record.value());
}
}

我们通过@KafkaListener注解来监听名称为test的Topic,消费者分组的组名为test-consumer。

3.5 演示

启动Spring Boot项目,启动过程中,控制台会输出Kafka的配置,启动好后,访问 http://localhost:8089/send/wno704,wno704 ,控制台输出如下:

2020-09-09 17:25:22.240 INFO 5740 --- [ad | producer-5] c.w.b.controller.SendMessageController : 成功发送消息:wno704,wno704,offset=[58]
2020-09-09 17:25:22.241 INFO 5740 --- [ntainer#0-0-C-1] c.w.boot.listener.KafkaMessageListener : 接收消息: topic is test, offset is 58, partition is 0, value is wno704,wno704

四、@KafkaListener详解

4.1 监听多个Topic

@KafkaListener除了可以指定Topic名称和分组id外,我们还可以同时监听来自多个Topic的消息:

@KafkaListener(topics = "topic1, topic2")

4.2 获取分区信息

我们还可以通过@Header注解来获取当前消息来自哪个分区(partitions):

1
2
3
4
5
@KafkaListener(topics = "test", groupId = "test-consumer")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
logger.info("接收消息: {},partition:{}", message, partition);
}

或者

1
2
3
4
5
6
7
8
@Component
@Slf4j
public class KafkaMessageListener {
@KafkaListener(topics = "test", groupId = "test-consumer")
public void listen(ConsumerRecord<?, ?> record) {
log.info("接收消息: topic is {}, offset is {}, partition is {}, value is {} ", record.topic(), record.offset(), record.partition(), record.value());
}
}

4.3 接收特定分区信息

我们可以通过@KafkaListener来指定只接收来自特定分区的消息:

1
2
3
4
5
6
7
8
9
@KafkaListener(groupId = "test-consumer",
topicPartitions = @TopicPartition(topic = "test",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0")
}))
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
logger.info("接收消息: {},partition:{}", message, partition);
}

如果不需要指定initialOffset,上面代码可以简化为:

1
2
@KafkaListener(groupId = "test-consumer", 
topicPartitions = @TopicPartition(topic = "test", partitions = { "0", "1" }))

五、消息过滤器

我们可以为消息监听添加过滤器来过滤一些特定的信息。我们新建一个消费者配置类KafkaConsumerConfig的kafkaListenerContainerFactory方法里配置过滤规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class KafkaInitialConfig {

// 监听器工厂
@Autowired
private ConsumerFactory consumerFactory;


// 配置一个消息过滤策略
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// ------- 过滤配置 --------
factory.setRecordFilterStrategy(
r -> r.value().contains("test")
);
return factory;
}
}

setRecordFilterStrategy接收RecordFilterStrategy<K, V>,他是一个函数式接口:

1
2
3
public interface RecordFilterStrategy<K, V> {
boolean filter(ConsumerRecord<K, V> var1);
}

所以我们用lambda表达式指定了上面这条规则,即如果消息内容包含fuck这个粗鄙之语的时候,则不接受消息。

配置好后我们重启项目,发送下面这三条请求:
http://localhost:8089/send/wno704,wno704
http://localhost:8089/send/wno704test
http://localhost:8089/send/wno704,test

观察控制台:

可以看到,wno704test、wno704,test这两条消息没有被接收。

六、更多配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup.
spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.key-store-location= # Location of the key store file.
spring.kafka.admin.ssl.key-store-password= # Store password for the key store file.
spring.kafka.admin.ssl.key-store-type= # Type of the key store.
spring.kafka.admin.ssl.protocol= # SSL protocol to use.
spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size".
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.key-store-location= # Location of the key store file.
spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.consumer.ssl.key-store-type= # Type of the key store.
spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
spring.kafka.jaas.options= # Additional JAAS options.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Default batch size.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers.
spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.key-store-location= # Location of the key store file.
spring.kafka.producer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.producer.ssl.key-store-type= # Type of the key store.
spring.kafka.producer.ssl.protocol= # SSL protocol to use.
spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.producer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.key-store-location= # Location of the key store file.
spring.kafka.ssl.key-store-password= # Store password for the key store file.
spring.kafka.ssl.key-store-type= # Type of the key store.
spring.kafka.ssl.protocol= # SSL protocol to use.
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name.
spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams.
spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads.
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.
spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application.
spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.streams.ssl.key-store-location= # Location of the key store file.
spring.kafka.streams.ssl.key-store-password= # Store password for the key store file.
spring.kafka.streams.ssl.key-store-type= # Type of the key store.
spring.kafka.streams.ssl.protocol= # SSL protocol to use.
spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.streams.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.state-dir= # Directory location for the state store.
spring.kafka.template.default-topic= # Default topic to which messages are sent.