Example
- Event Details
- topicName
- eventName
- service
- aggregateKey
- aggregateName
Config
- concurrency: 10
- max retry attempts: 3
- headers:
- eventName: ""
Properties and Config
spring.kafka.listener.concurrency: 10- EVENT_NAME_HEADER = “eventName”
@Configuration
@EnableConfigurationProperties(EcomKafkaConfiguration.Kafka.class)
@ConditionalOnProperty(value = "ecom.kafka.enabled", matchIfMissing = true)
public class EcomKafkaConfiguration extends BaseKafkaConfiguration {
}- KafkaListenerContainerFactory Bean:
- consumerFactory:
ConsumerFactory<String, Object> - typeMapper:
Jackson2JavaTypeMapper - retrySupport:
AsyncRetrySupport - recordFilterStrategy:
RecordFilterStrategy<String, Object>
- consumerFactory:
private static final long TEN_SECOND_DELAY = 10000L;
private static final int MAX_RETRY_ATTEMPTS = 3;
private static final double DOUBLE_THE_DELAY_ON_EACH_ATTEMPT = 1.0;
AsyncRetryable.Config.builder()
.delay(TEN_SECOND_DELAY)
.maxAttempts(MAX_RETRY_ATTEMPTS)
.multiplier(DOUBLE_THE_DELAY_ON_EACH_ATTEMPT)
.build();@Configuration
@EnableConfigurationProperties(EcomKafkaConfiguration.Kafka.class)
@ConditionalOnProperty(value = "ecom.kafka.enabled", matchIfMissing = true)
public class EcomKafkaConfiguration extends BaseKafkaConfiguration {
public static final String ECOM_CONSUMER_FACTORY = "ecom.kafkaConsumerFactory";
public static final String ECOM_CONTAINER_FACTORY = "ecom.kafkaContainerListenerFactory";
public static final String ECOM_TYPE_MAPPER = "ecom.kafkaTypeMapper";
public static final String ECOM_RECORD_FILTER_STRATEGY = "ecom.kafkaRecordFilterStrategy";
private static final long TEN_SECOND_DELAY = 10000L;
private static final int MAX_RETRY_ATTEMPTS = 3;
private static final double DOUBLE_THE_DELAY_ON_EACH_ATTEMPT = 1.0;
public EcomKafkaConfiguration(Kafka kafkaProps) {
super(kafkaProps);
}
@Bean(ECOM_CONSUMER_FACTORY)
@Override
public ConsumerFactory<String, Object> consumerFactory() {
return super.consumerFactory();
}
@Bean(ECOM_CONTAINER_FACTORY)
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> containerFactory(
@Qualifier(ECOM_CONSUMER_FACTORY) ConsumerFactory<String, Object> consumerFactory,
@Qualifier(ECOM_TYPE_MAPPER) Jackson2JavaTypeMapper typeMapper,
AsyncRetrySupport retrySupport,
@Qualifier(ECOM_RECORD_FILTER_STRATEGY) RecordFilterStrategy<String, Object> recordFilterStrategy) {
var factory = containerFactory(consumerFactory, typeMapper, recordFilterStrategy);
var config = AsyncRetryable.Config.builder()
.delay(TEN_SECOND_DELAY)
.maxAttempts(MAX_RETRY_ATTEMPTS)
.multiplier(DOUBLE_THE_DELAY_ON_EACH_ATTEMPT)
.build();
((AbstractKafkaListenerContainerFactory<?, ?, ?>) factory).setCommonErrorHandler(new AsyncRetryErrorHandler(config, retrySupport));
return factory;
}
@Bean
IdentityContextKafkaConsumerAspect identityContextKafkaConsumerAspect() {
return new IdentityContextKafkaConsumerAspect();
}
@Bean(ECOM_TYPE_MAPPER)
@Override
public DefaultJackson2JavaTypeMapper typeMapper() {
return super.typeMapper();
}
@Bean(ECOM_RECORD_FILTER_STRATEGY)
public RecordFilterStrategy<String, Object> recordFilterStrategy() {
return new CompositeRecordFilterStrategy<>(List.of(newEventNameRecordFilterStrategy(), newTenantRecordFilterStrategy()));
}
private RecordFilterStrategy<String, Object> newEventNameRecordFilterStrategy() {
var events = kafkaProps.getEvents();
if (CollectionUtils.isEmpty(events)) {
return new AllowOnlyKnownEventsFilteringStrategy<>(EVENT_NAME_HEADER, Collections.emptySet());
}
else {
return new AllowOnlyKnownEventsFilteringStrategy<>(EVENT_NAME_HEADER, events.keySet());
}
}
private RecordFilterStrategy<String, Object> newTenantRecordFilterStrategy() {
Set<String> supportedTenants = new HashSet<>();
var tenantsProperty = kafkaProps.getKafka().getProperty("supported-tenants");
if (!ObjectUtils.isEmpty(tenantsProperty) && !tenantsProperty.isBlank()) {
supportedTenants.addAll(
Arrays.stream(tenantsProperty.split(","))
.map(String::trim)
.toList());
}
return new AllowOnlyKnownTenantsFilteringStrategy<>(AggregateEventMessage.Headers.TENANT_ID_HEADER_NAME, supportedTenants);
}
@ConfigurationProperties(prefix = "ecom")
public static class Kafka extends KafkaConsumerProperties {
}
}World’s largest 7k stores 20 DCs