Code

Kafka integration for C# beginners

Every technology feels like rocket science until you take the first step. I am trying to demonstrate a quick tutorial on setting up Kafka and writing two programs to consume from it. I am purposefully ignoring the what and why of Kafka and its event-driven architecture behaviour, as you can find tons of tutorials and interview material on that.

I will be using Docker Desktop to host Kafka.

You can find my Visual Studio solution on GitHub – https://github.com/ninethsense/code-share/tree/master/KafkaIntegrationSample

Step 0: Prerequisites

  1. Docker (or use your machine, but I don’t recommend it just for learning)
  2. Kafka
  3. C# – VS or VSCode with dotnet CLI installed

Step 1: Setup Kafka

We will be using KRaft method instead of old Zookeeper.

Use the below docker-compose.yml

version: '3'
services:
  kafka:
    image: apache/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      # KRaft settings
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      
      # Listeners
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      
      # Implicitly auto-create topics for learning convenience
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Then run the command:

docker-compose up -d

If you do not encounter any errors, you should see something similar:

You can verify whether the respective continer and image created in Docker. Here are my Docker Desktop screenshots:

Container:

Image:

You can run the image.

Step 2: Producer, aka message sending logic

Create a .NET project for Producer code.

I will be using Visual Studio, but you are free to use any – like VSCode with dotnet CLI.

Use below code.

using Confluent.Kafka;


class Program
{
    static async Task Main(string[] args)
    {
        // 1. Configuration
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092"
        };

        // 2. Create the Producer
        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        {
            Console.WriteLine("Kafka Producer Connected. Type a message and press Enter (or 'exit' to quit):");

            while (true)
            {
                string input = Console.ReadLine();
                if (input.ToLower() == "exit") break;

                try
                {
                    // 3. Send the Message to "test-topic"
                    var deliveryResult = await producer.ProduceAsync(
                        "test-topic",
                        new Message<Null, string> { Value = input }
                    );

                    Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                }
            }
        }
    }
}

Make sure you have necessary dependencies installed. You need Confluent.Kafka

dotnet add package Confluent.Kafka --version 2.13.0

Step 3: Consumer, aka message receiving logic

Create another .NET project, and use code below.

using Confluent.Kafka;

class Program
{
    static void Main(string[] args)
    {
        // 1. Configuration
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "my-csharp-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest // Start from beginning if no offset exists
        };

        // 2. Create the Consumer
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            // 3. Subscribe to the topic
            consumer.Subscribe("test-topic");

            Console.WriteLine("Listening for messages... (Press Ctrl+C to quit)");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    // 4. Consume loop
                    try
                    {
                        // Wait for a message
                        var consumeResult = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{consumeResult.Message.Value}' from Partition: {consumeResult.Partition}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occurred: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                consumer.Close();
            }
        }
    }
}

Step 4: Run the app side by side to see if it is working.

Whatever you type on the “Producer” app should be displayed on the “Consumer” app as well.

Those who use Visual Studio may configure the solution to rum both apps at once. I have used the same solution for both projects.

This is what I have when I run the solution:

Summary

  • First, we have established the infrastructure by deploying a lightweight, single-container Kafka broker using the apache/kafka image in KRaft mode
  • Next, we developed a Producer program that acts as a data source; this program connects to the broker and continuously streams simulated sensor data into a specific channel called a “topic.”
  • To complete the pipeline, we have created a Consumer program that subscribes to that same topic, efficiently polling for new messages and processing them the moment they arrive.
  • When we executed both programs simultaneously, we established a continuous data flow where messages generated by our Producer were instantly persisted by the Dockerized broker and delivered in real-time to the Consumer, demonstrating the core mechanics of event-driven architecture.

Leave a Reply