Skip to content

Kafka Producer and Consumer example with Confluent's .Net Client

Notifications You must be signed in to change notification settings

mrargun/dotnet_kafka_client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 

Repository files navigation

dotnet-kafka-client

Basic Consumer and Producer example by using Confluent's Kafka client for .Net

All you need is to install Confluent.Kafka from NuGet Package Manager.

  • Basic Producer

          var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };
    
          Action<DeliveryReport<Null, string>> handler = r =>
              Console.WriteLine(!r.Error.IsError
                  ? $"Delivered message to {r.TopicPartitionOffset}"
                  : $"Delivery Error: {r.Error.Reason}");
    
          using (var p = new ProducerBuilder<Null, string>(conf).Build())
          {
              for (int i = 0; i < 100; ++i)
              {  
                  //Enter the name of the topic, and start producing 
                  
                  p.Produce("TEST_TOPIC", new Message<Null, string> { Value = "Test "+i.ToString() }, handler);
              }
    
              // wait for up to 10 seconds for any inflight messages to be delivered.
              p.Flush(TimeSpan.FromSeconds(10));
          }
    
  • Basic Consumer

          var conf = new ConsumerConfig
          {
              GroupId = "test-consumer-group",
              BootstrapServers = "localhost:9092",
              // Note: The AutoOffsetReset property determines the start offset in the event
              // there are not yet any committed offsets for the consumer group for the
              // topic/partitions of interest. By default, offsets are committed
              // automatically, so in this example, consumption will only start from the
              // earliest message in the topic 'TEST_TOPIC' the first time you run the program.
              AutoOffsetReset = AutoOffsetReset.Earliest
          };
    
          using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
          {
              // Enter name of the topic to consume
              c.Subscribe("TEST_TOPIC");
    
              CancellationTokenSource cts = new CancellationTokenSource();
              Console.CancelKeyPress += (_, e) => {
                  e.Cancel = true; // prevent the process from terminating.
                  cts.Cancel();
              };
    
              try
              {
                  while (true)
                  {
                      try
                      {
                          var cr = c.Consume(cts.Token);
                          //Consuming Started
                          //Do the job
    
                          Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                      }
                      catch (ConsumeException e)
                      {
                          Console.WriteLine($"Error occured: {e.Error.Reason}");
                      }
                  }
              }
              catch (OperationCanceledException)
              {
                  // Ensure the consumer leaves the group cleanly and final offsets are committed.
                  c.Close();
              }
    

Before running the project, make sure that ZooKeeper instance is up and running and Kafka server is started!

Releases

No releases published

Packages

No packages published

Languages