0%

记录一次kafka消费慢解决

最近使用kafka, 消费者速度一直跟不上,造成线上消息堆积到了 101亿,困扰了好几天,终于解决了。(其实很简单,因为 kafka消费速度本来也不慢 😂😂😂😂,其实异步+ 批量消费就满可以达到要求了)

主要原因有两个,

  1. 对 @Async 注解的不理解
  2. 批量消费
  3. – 还有就是通过这次事件,也了解了点 自动提交与 手动提交的概念

一般 kafka 消费慢的 解决思路有一下几种:

  1. 增加分区数
  2. 批量消费(增加拉取批次, 默认 500)
  3. 每次拉取后,本地启线程池异步消费 (注意 kafkaConsumer 是 非线程安全的)
  4. 如果消费多个topic的话,修改分区策略为 StickyAssignor.class(默认 RangeAssignor.class)

这里使用, spring-kafka 注解形式

kafka 配置

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> listenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 飘零消费 需要设置 为 true
    factory.setBatchListener(true);
    // 单机消费者数量
    factory.setConcurrency(8);
    // 拉取超时时间
    factory.getContainerProperties().setPollTimeout(2000);
    return factory;
}

public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    // 最大拉取消息条数, 默认 500 ,可以调大此参数, 此参数过大, 也可能会出现 OOM
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, true);
    return propsMap;
}

消费

@Slf4j
@Component
public class Consumer {

    @Resource
    private KafkaConsumeService kafkaConsumerService;


    @KafkaListener(topics = {"${topic}", "${topic2}"}, groupId = "${group.id}", containerFactory = "listenerContainerFactory")
    public void consume(List<ConsumerRecord<String, String>> records) {
        kafkaConsumerService.consume(records);
    }
}

@Service
public class ConsumeServiceImpl implements KafkaConsumeService {

    @Resource
    private ADMapper AdMapper;

    /**
     * 批量消费
     * @param records
     */
    @Async("executor")
    @Override
    public void consume(List<ConsumerRecord<String, String>> records) {
        records.forEach(this::consume);
    }

    /**
     * 单条消费
     * @param record
     */
    public void consume(ConsumerRecord<String, String> record) {
        String key = record.key();
        String value = record.value();
        log.info("ConsumeServiceImpl.consume, key={}, value={}", key, value);

        try {
            saveInDb(value);
        } catch (Exception ex) {
            log.error("ConsumeServiceImpl.consume error, ", ex);
            return;
        }
    }

@Async 的使用

@Configuration
@EnableAsync
public class ExecutorConfig {

    /**
     * Set the ThreadPoolExecutor's core pool size.
     */
    @Value("${executor.corepoolsize:8}")
    private int corePoolSize;
    /**
     * Set the ThreadPoolExecutor's maximum pool size.
     */
    @Value("${executor.maxpoolsize:10}")
    private int maxPoolSize;
    /**
     * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
     */
    @Value("${executor.queueCapacity:500}")
    private int queueCapacity;


    @Bean("executor")
    public Executor insertExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

@Async 使用注意

@Async 标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作

  1. @EnableAsync
    @EnableAsync 不能放在启动类上
  2. 被注解的方法 需要返回值 为 void, 或者Future, 否则 @Async 无效
  3. @Async 注解的方法 和 其调用方法 不能放在同一个类里, 否则 @Async 注解无效

生产环境上 参数可以微调

  • 每次最大拉取的条数,
  • 线程池数, 以及队列数目
  • JVM 堆大小调整

总结

其实 kafka 的 消费速率很快,一般就是 一个 for 循环不停地拉取消息,然后交给线程池异步处理,一般不会产生消息堆积。
消息消费情况 可 堆积情况 一般都可以看一下监控, 我这次最多堆积了 10 亿条, (原因就是对 @Async 不了解,将@Async 和其调动方法卸载了同一个类里 😢😢)

欢迎关注我的其它发布渠道