Setting up Kafka in Azure Event Hubs and establishing a distributed messaging system between Java and .NET Core applications

In this post, I will share the steps to set up Kafka using Azure Event Hubs and produce messages from a Java Spring Boot application, while .NET Core application will be used as a consumer.

There are various options available in Azure Marketplace to setup Kafka, for example, Kafka cluster is available from Bitnami, Azure HDInsight, Event Hubs and so on. In this post, we will be using Event Hubs for Apache Kafka protocol.

Azure Event Hubs Kafka endpoint enables developers to connect to Azure Event Hub using Kafka protocol. It is a fully managed service in the cloud, very easy to set up, the endpoint is accessible over the internet. The infrastructure is completely managed and you just need to focus on building your application rather setting up or managing the infrastructure components. Another advantage is that the integration with existing client applications using Kafka protocol is seamless. Just need to provide the new configuration values and you are good to use the Kafka endpoint in minutes.

Setting up Event Hub for Kafka endpoint 

Event hub for Kafka protocol can easily be set up by creating a new resource in Azure and search for Event Hubs.

image

One thing to note while provisioning this resource is to check “Enable Kafka” as shown below.

image

Once the Kafka namespace is created we can add topics. Topics can be created by selecting the Event Hubs option under Entities and click on +EventHub option.

image

Once the topic is created, we can start producing messages into that topic and consume them. 

Setting up Producer: Adding Kafka support in Java application

We will first add the dependencies required to use Kafka in Java application.  In our case, I have a web API built on Java Spring Boot framework that exposes an endpoint. Once the user hit particular endpoint, I want to read a certain value and push it to the Kafka topic.

To add Kafka support, edit pom.xml file and add the following dependency

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

Now, we will create the producer.config file and add connection string, Kafka server endpoint etc. Here is the configuration for the producer.config file. Add this file under /src/main/resources folder.

bootstrap.servers= {serverendpoint}:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”{password}”;

To obtain {serverendpoint} and {password} values, go to Event Hub and click on Shared access policies tab. Click the policy and copy the Connection string-primary key value. This whole value is the password. You can then extract the server endpoint from the same value and provide it for the bootstrap.servers key. It should be something like {youreventhubnamespace}.servicebus.windows.net

Now, we can add the following code snippet to send messages to Kafka and use the configuration values from the producer.config file.

Properties properties = new Properties();

properties.load(new FileReader(“src/main/resources/producer.config”));

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

LongSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<Long, String> producer = new KafkaProducer<>(properties);

long time = System.currentTimeMillis();

final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(“bidtopic”,time,”This is a test message”);

producer.send(record, new Callback() {

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception != null) {

System.out.println(exception);

System.exit(1);

}

}

});

}

catch (Exception ex)

{

System.out.print(ex.getMessage());

}

 

The above code is initializing KafkaProducer object by passing the producer config properties and then sending a message using a producer.send method and passing ProducerRecord object.

Setting up Consumer: Adding Kafka support in .NET Core application

To add Kafka support in the .NET Core application, there are many Kafka libraries available. For this sample, I have used the Confluent Kafka library that can be added as a NuGet package.

Open NuGet manager and add Confluent Kafka library as shown below

image

Create a class and add ConsumeMessages method to receive messages from the topic.

public void ConsumeMessages(string topic)
{

var config = new ConsumerConfig
{
GroupId = “onlineauctiongroup”,
BootstrapServers = “{serverendpoint}:9093”,
SaslUsername = “$ConnectionString”,
SaslPassword = “{password}”,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
Debug = “security,broker,protocol”
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topic);

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($”Consumed message ‘{cr.Value}’ at: ‘{cr.TopicPartitionOffset}’.”);
}
catch (ConsumeException e)
{
Console.WriteLine($”Error occured: {e.Error.Reason}”);
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}

}

In the above code, ConsumerConfig is used to specify the Kafka specific configuration values. ConsumerBuilder to build the consumer object by passing the ConsumerConfig object. We can listen to specific topics by calling the consumer.subscribe method and finally consume messages using the Consume method.

Hope this helps!