Basic Consumer and Producer example by using Confluent's Kafka client for .Net
All you need is to install Confluent.Kafka from NuGet Package Manager.
-
Basic Producer
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i = 0; i < 100; ++i) { //Enter the name of the topic, and start producing p.Produce("TEST_TOPIC", new Message<Null, string> { Value = "Test "+i.ToString() }, handler); } // wait for up to 10 seconds for any inflight messages to be delivered. p.Flush(TimeSpan.FromSeconds(10)); }
-
Basic Consumer
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", // Note: The AutoOffsetReset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. By default, offsets are committed // automatically, so in this example, consumption will only start from the // earliest message in the topic 'TEST_TOPIC' the first time you run the program. AutoOffsetReset = AutoOffsetReset.Earliest }; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { // Enter name of the topic to consume c.Subscribe("TEST_TOPIC"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); //Consuming Started //Do the job Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); }
Before running the project, make sure that ZooKeeper instance is up and running and Kafka server is started!