Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreI am pleased to announce that the spring-integration-kafka (Spring Integration Kafka Support) First Milestone for version 2.0 is now available.
The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka.
Starting with this version 2.0 the project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x.
The artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1
is available in the Milestone repository.
Having the MessageListenerContainer foundation from the spring-kafka project,
the KafkaMessageDrivenChannelAdapter definition is very simple now:
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
With the XML configuration we should declare just single component as well:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
With the KafkaTemplate foundation from the the spring-kafka project, the KafkaProducerMessageHandler
is simple too:
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
The XML configuration has been simplified, too:
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
Starting with version 1.2 Spring Integration Java DSL introduces Kafka09 Factory to cover the functionality for aforementioned channel adapters from this new 2.0 version.
For example the producing part may look like:
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
And finally, don't miss Spring for Apache Kafka announcement, too!
Together with the next Spring for Apache Kafka we may consider to implement some adapters for Kafka Streams as well.
Since the code base of the project became pretty straightforward and looks like Apache Kafka API is going to be stable, we intend to absorb this project in the Spring Integration Code 5.0, when the time comes.
Meanwhile we look forward to your feedback and if all goes well plan to release 2.0.0.RELEASE in the next few weeks!