一.创建TCP连接
- 消费者端主要的程序入口是KafkaConsumer类。
- 生产者不同的是,构建KafkaConsumer实例时是不会创建任何TCP连接的
- 也就是执行完new KafkaConsumer(porperties)语句后,你会发现没有socket连接被创建出来。
与Java生产者的区别:
- 生产者入口类KafkaProducer再构建实例的时候,会在后台默默启动一个Sender线程,这个Sender线程负责Socket连接的创建。
评判:
- 作者认为KafkaConsumer设计比KafkaProducer要好。
- Java构造函数中启动线程,会造this指针的逃逸,始终是个隐患。
创建时间:
- TCP连接在调用KafkaConsumer.poll方法时被创建
1.发起FindCoordinator请求时。
- 消费者端有个组件叫协调者(Coordinator)
- 它驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理
- 当消费者程序首次启动调用poll方法时,需要向kafka集群发送一个名为FindCoordinator的请求,希望Kafka集群告诉它哪个Broker是他的协调者。
2.连接协调者时
- Broker处理完上一步发送的FindCoordinator请求之后,会返还对应的响应结果(Response),显式的告诉Broker真正的协调者
- 消费者知晓真正的协调者后,创建连向该Broker的Socket连接。
- 只有成功连入协调者,协调者才能开启正常的组协调操作,比如组分配方案,心跳请求处理,位移获取,位移提交等。
3.消费数据时
- 消费者会为每个要消费的分区创建与该分区领导者副本所在的Broker连接的TCP
- 假设消费者要消费5个分区,这5个分区各自领导副本分布在4台Broker上,
- 那么该消费者在消费时会创建于这4台Brokerd的Socket连接。
二.创建多少个TCP连接
消费者程序会创建3类TCP连接:
1、 确定协调者和获取集群元数据;
2、 连接协调者,令其执行组成员管理操作;
3、 执行实际的消息获取;
三.何时关闭TCP连接
- 和生产者类似,消费者关闭Socket也分为主动关闭和Kafka自动关闭
- 主动关闭是指显示的调用消费者API的方法关闭消费者,具体方式是手动调用KafkaConsumer.Close()方法,或者执行kill命令。
- Kafka自动关闭是由消费者端参数connection.max.idle.ms控制
- 该参数现在的默认值是9分钟,超过9分钟没有请求,消费者就会断开这个Socket连接。
长连接效果
- 和生产者不同的是:如果在编写消费者程序时,使用循环的方式调用poll方法,消费消息
- 上面的所有请求都会被定期发送到Broker,因此这些Socket连接总是能保证有请求在发送,从而实现了长连接效果---9分钟内不会断开
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: