一、默认的分区策略
(1)如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
(2)如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键取 hash 值然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。
二、自定义分区器
为了满足业务需求,你可能需要自定义分区器,例如,通话记录中,给客服打电话的记录要存到一个分区中,其余的记录均分的分布到剩余的分区中。我们就这个案例来进行演示。
(1) 自定义分区器
package com.bonc.rdpe.kafka110.partitioner;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
/**
* @Content: 自定义分区器
*/
public class PhonenumPartitioner implements Partitioner{
@Override
public void configure(Map<String, ?> configs) {
// TODO nothing
}
/**
* 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区)
* @param topic 消息队列名
* @param key 用户传入key
* @param keyBytes key字节数组
* @param value 用户传入value
* @param valueBytes value字节数据
* @param cluster 当前kafka节点数
* @return 如果3个节点数 返回 0 1 2 如果5个 返回 0 1 2 3 4 5
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 得到 topic 的 partitions 信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 模拟某客服
if(key.toString().equals("10000") || key.toString().equals("11111")) {
// 放到最后一个分区中
return numPartitions - 1;
}
String phoneNum = key.toString();
return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
}
@Override
public void close() {
// TODO nothing
}
}
(2) 使用自定义分区器
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title PartitionerProducer.java
* @Description 测试自定义分区器
*/
public class PartitionerProducer {
private static final String[] PHONE_NUMS = new String[]{
"10000", "10000", "11111", "13700000003", "13700000004",
"10000", "15500000006", "11111", "15500000008",
"17600000009", "10000", "17600000011"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
// 设置分区器
props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int count = 0;
int length = PHONE_NUMS.length;
while(count < 10) {
Random rand = new Random();
String phoneNum = PHONE_NUMS[rand.nextInt(length)];
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
RecordMetadata metadata = producer.send(record).get();
String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition();
System.out.println(result);
Thread.sleep(500);
count++;
}
producer.close();
}
}
(3) 测试结果
phonenum [11111] has been sent to partition 2
phonenum [11111] has been sent to partition 2
phonenum [17600000009] has been sent to partition 0
phonenum [17600000011] has been sent to partition 0
phonenum [13700000003] has been sent to partition 1
phonenum [10000] has been sent to partition 2
phonenum [10000] has been sent to partition 2
phonenum [15500000008] has been sent to partition 1
phonenum [10000] has been sent to partition 2
phonenum [17600000009] has been sent to partition 0
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: