Example

  • Event Details
    • topicName
    • eventName
    • service
    • aggregateKey
    • aggregateName

Config

  • concurrency: 10
  • max retry attempts: 3
  • headers:
    • eventName: ""

Properties and Config

  • spring.kafka.listener.concurrency: 10
  • EVENT_NAME_HEADER = “eventName”
@Configuration
@EnableConfigurationProperties(EcomKafkaConfiguration.Kafka.class)
@ConditionalOnProperty(value = "ecom.kafka.enabled", matchIfMissing = true)
public class EcomKafkaConfiguration extends BaseKafkaConfiguration {
 
}
  • KafkaListenerContainerFactory Bean:
    • consumerFactory: ConsumerFactory<String, Object>
    • typeMapper: Jackson2JavaTypeMapper
    • retrySupport: AsyncRetrySupport
    • recordFilterStrategy: RecordFilterStrategy<String, Object>
private static final long TEN_SECOND_DELAY = 10000L;
private static final int MAX_RETRY_ATTEMPTS = 3;
private static final double DOUBLE_THE_DELAY_ON_EACH_ATTEMPT = 1.0;
 
AsyncRetryable.Config.builder()
                .delay(TEN_SECOND_DELAY)
                .maxAttempts(MAX_RETRY_ATTEMPTS)
                .multiplier(DOUBLE_THE_DELAY_ON_EACH_ATTEMPT)
                .build();

@Configuration
@EnableConfigurationProperties(EcomKafkaConfiguration.Kafka.class)
@ConditionalOnProperty(value = "ecom.kafka.enabled", matchIfMissing = true)
public class EcomKafkaConfiguration extends BaseKafkaConfiguration {
 
    public static final String ECOM_CONSUMER_FACTORY = "ecom.kafkaConsumerFactory";
    public static final String ECOM_CONTAINER_FACTORY = "ecom.kafkaContainerListenerFactory";
    public static final String ECOM_TYPE_MAPPER = "ecom.kafkaTypeMapper";
    public static final String ECOM_RECORD_FILTER_STRATEGY = "ecom.kafkaRecordFilterStrategy";
 
    private static final long TEN_SECOND_DELAY = 10000L;
    private static final int MAX_RETRY_ATTEMPTS = 3;
    private static final double DOUBLE_THE_DELAY_ON_EACH_ATTEMPT = 1.0;
 
    public EcomKafkaConfiguration(Kafka kafkaProps) {
        super(kafkaProps);
    }
 
    @Bean(ECOM_CONSUMER_FACTORY)
    @Override
    public ConsumerFactory<String, Object> consumerFactory() {
        return super.consumerFactory();
    }
 
    @Bean(ECOM_CONTAINER_FACTORY)
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> containerFactory(
            @Qualifier(ECOM_CONSUMER_FACTORY) ConsumerFactory<String, Object> consumerFactory,
            @Qualifier(ECOM_TYPE_MAPPER) Jackson2JavaTypeMapper typeMapper,
            AsyncRetrySupport retrySupport,
            @Qualifier(ECOM_RECORD_FILTER_STRATEGY) RecordFilterStrategy<String, Object> recordFilterStrategy) {
        var factory = containerFactory(consumerFactory, typeMapper, recordFilterStrategy);
        var config = AsyncRetryable.Config.builder()
                .delay(TEN_SECOND_DELAY)
                .maxAttempts(MAX_RETRY_ATTEMPTS)
                .multiplier(DOUBLE_THE_DELAY_ON_EACH_ATTEMPT)
                .build();
        ((AbstractKafkaListenerContainerFactory<?, ?, ?>) factory).setCommonErrorHandler(new AsyncRetryErrorHandler(config, retrySupport));
 
        return factory;
    }
 
    @Bean
    IdentityContextKafkaConsumerAspect identityContextKafkaConsumerAspect() {
        return new IdentityContextKafkaConsumerAspect();
    }
 
    @Bean(ECOM_TYPE_MAPPER)
    @Override
    public DefaultJackson2JavaTypeMapper typeMapper() {
        return super.typeMapper();
    }
 
    @Bean(ECOM_RECORD_FILTER_STRATEGY)
    public RecordFilterStrategy<String, Object> recordFilterStrategy() {
        return new CompositeRecordFilterStrategy<>(List.of(newEventNameRecordFilterStrategy(), newTenantRecordFilterStrategy()));
    }
 
    private RecordFilterStrategy<String, Object> newEventNameRecordFilterStrategy() {
        var events = kafkaProps.getEvents();
        if (CollectionUtils.isEmpty(events)) {
            return new AllowOnlyKnownEventsFilteringStrategy<>(EVENT_NAME_HEADER, Collections.emptySet());
        }
        else {
            return new AllowOnlyKnownEventsFilteringStrategy<>(EVENT_NAME_HEADER, events.keySet());
        }
    }
 
    private RecordFilterStrategy<String, Object> newTenantRecordFilterStrategy() {
        Set<String> supportedTenants = new HashSet<>();
        var tenantsProperty = kafkaProps.getKafka().getProperty("supported-tenants");
        if (!ObjectUtils.isEmpty(tenantsProperty) && !tenantsProperty.isBlank()) {
            supportedTenants.addAll(
                    Arrays.stream(tenantsProperty.split(","))
                            .map(String::trim)
                            .toList());
        }
 
        return new AllowOnlyKnownTenantsFilteringStrategy<>(AggregateEventMessage.Headers.TENANT_ID_HEADER_NAME, supportedTenants);
    }
 
    @ConfigurationProperties(prefix = "ecom")
    public static class Kafka extends KafkaConsumerProperties {
 
    }
 
}

World’s largest 7k stores 20 DCs