65.9K
CodeProject 正在变化。 阅读更多。
Home

C# 中 Kafka Producer 无法连接 Kafka 集群时不抛出异常

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2020年4月24日

CPOL
viewsIcon

13875

关于处理 Kafka 故障和错误的快速提示

引言

Kafka 中故障和错误的处理并非易事,默认情况下,Producer 在尝试连接 Broker 时不会返回任何错误。

我正在使用 C# 的 Confluent.Kafka 1.4 库。

Using the Code

第一步是执行 Producer 和 Consumer 配置的参数化。

        private void KafkaConfig(out string[] topics, out ProducerConfig producerConfig, 
                                 out ConsumerConfig consumerConfig)
        {
            var result = new StringBuilder();
            topics = new string[] { "Teste" };
            var config = new KafkaConsumerConfig();
            _configuration.Bind("KafkaConsumer", config);

            Write(this.HttpContext, "--Kafka Config....");
            Write(this.HttpContext, "--Kafka topics: " + topics[0].ToString());

            if (string.IsNullOrEmpty(config.BootstrapServers))
                Write(this.HttpContext, "--Kafka Servers connection is empty.");

            Write(this.HttpContext, "--BootstrapServers: " + config.BootstrapServers);
            Write(this.HttpContext, "--GroupId: " + config.GroupId);

            producerConfig = new ProducerConfig
            {
                BootstrapServers = config.BootstrapServers,                
                StatisticsIntervalMs = 5000,
                MessageTimeoutMs = 10000,   // by default, the producer will attempt 
                                            // to deliver messages 
                                            // for 5 minutes (default value of 
                                            // message.timeout.ms 
                SocketTimeoutMs = 10000,
                ApiVersionRequestTimeoutMs = 10000,
                MetadataRequestTimeoutMs = 5000,
                RequestTimeoutMs = 5000                
            };
            consumerConfig = new ConsumerConfig
            {
                BootstrapServers = config.BootstrapServers,
                GroupId = config.GroupId,
                EnableAutoCommit = true,
                StatisticsIntervalMs = 5000,
                SessionTimeoutMs = 10000
            };
        }

下一步是构建 Producer 以将消息记录到队列中。如果 Kafka 无法连接到 Broker,它将显示消息,表明服务器不可用。

        private void KafkaProducer(string[] topics, ProducerConfig producerConfig)
        {
            Write(this.HttpContext, "--Kafka Producer...");
             
            // Action<DeliveryReport<Null, string>> handlerDelivery = r =>
            // Write(this.HttpContext, "--Kafka Producer message: " + r.Error.Reason + 
            //                         "Delivered message to" + r.TopicPartitionOffset); 
            // I don't recommend using this because of this: 
            // https://github.com/confluentinc/confluent-kafka-dotnet/issues/1025
            
            using (var producer = new ProducerBuilder<Null, string>(producerConfig)
            .SetErrorHandler((producer, error) =>
                {
                    Write(this.HttpContext, "--Kafka Producer Error: " + error.Reason);
                }
            )
            .SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json))
            .Build()) 
            {                
                for (int i = 0; i < 10; ++i)
                {
                    // producer.Produce(topics[0].ToString(), new Message<Null, string> 
                    // { Value = i.ToString() }, 
                    //                  handlerDelivery); //Não está retornando erro. 
                                                          //(Não recomendo.)
                    var dr = producer.ProduceAsync(topics[0].ToString(), 
                             new Message<Null, string> { Value = i.ToString() });
                    Write(this.HttpContext, "--Kafka dr: " + dr.Result.Value);
                }

                // wait for up to 1 seconds for any inflight messages to be delivered.
                producer.Flush(TimeSpan.FromSeconds(1));
            }
        }

下一步是实现 Consumer

        private void KafkaConsumer(string[] topics, ConsumerConfig consumerConfig)
        {
            Write(this.HttpContext, "--Kafka Consumer...");
            
            using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
            .SetErrorHandler((_, e) => 
                {
                    Write(this.HttpContext, "--Kafka Consumer Error: " + e.Reason);
                }
            )
            .SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json ))
            .Build())
            {
                consumer.Subscribe(topics[0].ToString());

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = consumer.Consume(10000);
                            
                            if(cr==null)
                                Write(this.HttpContext, "--Kafka Falha consumer  null");

                            Write(this.HttpContext, "--Kafka Consumed message: " + 
                                                    cr?.Message.Value +
                                  " TopicPartitionOffset " + cr?.TopicPartitionOffset + " ");
                        }
                        catch (ConsumeException e)
                        {
                            Write(this.HttpContext, "--Kafka Error ConsumeException:" + 
                                                     e.Error.Reason);
                        }
                    }
                }
                catch (OperationCanceledException ex)
                {
                    Write(this.HttpContext, 
                          "--Kafka OperationCanceledException:" + ex.Message);
                    // Ensure the consumer leaves the group cleanly and 
                    // final offsets are committed.
                    consumer.Close();
                }
            }
        }

下面的代码有助于可视化 Kafka 处理进度的服务器响应。

        void Write(HttpContext context, string text)
        {
            context.Response.WriteAsync(string.Format("<p>[{0}] {1}</p> </br>", 
                                        DateTime.Now, text));
            context.Response.Body.FlushAsync();
            context.Response.Body.Flush();
        }

关注点

Kafka 的标准错误处理策略有一个不显示处理消息时错误的策略。

资源

历史

  • 2020 年 4 月 24 日:初始版本
© . All rights reserved.