09、Kafka实战:Java生产者如何管理TCP连接

一.kafka生产者程序概览

kafka的Java生产者API主要对象就是kafkaProducer

  • 构造生产者对象所需参数对象
  • 创建kafka对象实例
  • 使用kafkaProducer的send方法发送消息
  • 调用kafkaProducer的close方法关闭生产者并释放各种系统资源。
Properties props=new Properties();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);

try(Producer<String,String> producer=new KafkaProducer<>(props)){
       producer.send(new ProducerRecord<String, String>(……), callback);
}

二.producer客户端何时创建TCP连接

  • 生产者应用创建kafkaProducer实例时是会建立于Broker的TCP连接
  • 生产者应用会再后台创建并启动一个名为Sender的线程,该线程运行时首先会创建与Broker的连接
  • bootstrap.servers参数:Prodcuer核心参数之一,置顶启动这个Producer启动时要连接的Broker地址
  • 不建议把集群中所有的Broker信息都配置到bootstrap.servers中
  • 通常置顶3-4个就可以了
  • Producer一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息

TCp连接还可能再这两个地方被创建:

  • 更新元数据后
  • 消息发送时

Producer更新集群元数据信息的两个场景:

  • Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer这个主题不存在。此时Producer会发送METADATA请求给kafka集群,尝试获取最新的元数据信息。
  • Producer通过metadata.max.age.ms参数定期的更新元数据信息。该参数默认值是300000--5分钟,也就是5分支都会强制刷新一次元数据一保证它是最即时的数据。

三.何时关闭TCP连接

  • 用户主动关闭
  • kafka自动关闭

kafka Broker主动断开连接

  • Producer 端参数 connections.max.idle.ms 默认9分钟
  • 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭
  • 用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制
  • TCP 连接将成为永久长连接
  • TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close
  • 被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接

四.总结:

  • kafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接
  • KafkaProducer实例首次更新元数据信息后,还会再次创建于集群中所有Broker的TCP连接
  • 如果Producer端发送消息到某Broker时发现没有与该Broker的TCP连接,也会立即创建连接。
  • 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建TCP连接会被自动关闭;如果设置该参数为-1,则步骤1中创建的TCP连接将会无法被关闭从而称为僵尸连接。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: