最近使用kafka, 消费者速度一直跟不上,造成线上消息堆积到了 101亿,困扰了好几天,终于解决了。(其实很简单,因为 kafka消费速度本来也不慢 😂😂😂😂,其实异步+ 批量消费就满可以达到要求了)
主要原因有两个,
- 对 @Async 注解的不理解
- 批量消费
- – 还有就是通过这次事件,也了解了点 自动提交与 手动提交的概念
一般 kafka 消费慢的 解决思路有一下几种:
- 增加分区数
- 批量消费(增加拉取批次, 默认 500)
- 每次拉取后,本地启线程池异步消费 (注意 kafkaConsumer 是 非线程安全的)
- 如果消费多个topic的话,修改分区策略为 StickyAssignor.class(默认 RangeAssignor.class)
这里使用, spring-kafka 注解形式
kafka 配置
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 飘零消费 需要设置 为 true
factory.setBatchListener(true);
// 单机消费者数量
factory.setConcurrency(8);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(2000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 最大拉取消息条数, 默认 500 ,可以调大此参数, 此参数过大, 也可能会出现 OOM
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, true);
return propsMap;
}
消费
@Slf4j
@Component
public class Consumer {
@Resource
private KafkaConsumeService kafkaConsumerService;
@KafkaListener(topics = {"${topic}", "${topic2}"}, groupId = "${group.id}", containerFactory = "listenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records) {
kafkaConsumerService.consume(records);
}
}
@Service
public class ConsumeServiceImpl implements KafkaConsumeService {
@Resource
private ADMapper AdMapper;
/**
* 批量消费
* @param records
*/
@Async("executor")
@Override
public void consume(List<ConsumerRecord<String, String>> records) {
records.forEach(this::consume);
}
/**
* 单条消费
* @param record
*/
public void consume(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
log.info("ConsumeServiceImpl.consume, key={}, value={}", key, value);
try {
saveInDb(value);
} catch (Exception ex) {
log.error("ConsumeServiceImpl.consume error, ", ex);
return;
}
}
@Async 的使用
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* Set the ThreadPoolExecutor's core pool size.
*/
@Value("${executor.corepoolsize:8}")
private int corePoolSize;
/**
* Set the ThreadPoolExecutor's maximum pool size.
*/
@Value("${executor.maxpoolsize:10}")
private int maxPoolSize;
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
*/
@Value("${executor.queueCapacity:500}")
private int queueCapacity;
@Bean("executor")
public Executor insertExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Async 使用注意
@Async
标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作
@EnableAsync
@EnableAsync
不能放在启动类上- 被注解的方法 需要返回值 为
void
, 或者Future
, 否则@Async
无效 @Async
注解的方法 和 其调用方法 不能放在同一个类里, 否则@Async
注解无效
生产环境上 参数可以微调
- 每次最大拉取的条数,
- 线程池数, 以及队列数目
- JVM 堆大小调整
总结
其实 kafka 的 消费速率很快,一般就是 一个 for 循环不停地拉取消息,然后交给线程池异步处理,一般不会产生消息堆积。
消息消费情况 可 堆积情况 一般都可以看一下监控, 我这次最多堆积了 10 亿条, (原因就是对 @Async 不了解,将@Async 和其调动方法卸载了同一个类里 😢😢)