C# 中 Kafka Producer 无法连接 Kafka 集群时不抛出异常





0/5 (0投票)
关于处理 Kafka 故障和错误的快速提示
引言
Kafka 中故障和错误的处理并非易事,默认情况下,Producer 在尝试连接 Broker 时不会返回任何错误。
我正在使用 C# 的 Confluent.Kafka
1.4 库。
Using the Code
第一步是执行 Producer 和 Consumer 配置的参数化。
private void KafkaConfig(out string[] topics, out ProducerConfig producerConfig,
out ConsumerConfig consumerConfig)
{
var result = new StringBuilder();
topics = new string[] { "Teste" };
var config = new KafkaConsumerConfig();
_configuration.Bind("KafkaConsumer", config);
Write(this.HttpContext, "--Kafka Config....");
Write(this.HttpContext, "--Kafka topics: " + topics[0].ToString());
if (string.IsNullOrEmpty(config.BootstrapServers))
Write(this.HttpContext, "--Kafka Servers connection is empty.");
Write(this.HttpContext, "--BootstrapServers: " + config.BootstrapServers);
Write(this.HttpContext, "--GroupId: " + config.GroupId);
producerConfig = new ProducerConfig
{
BootstrapServers = config.BootstrapServers,
StatisticsIntervalMs = 5000,
MessageTimeoutMs = 10000, // by default, the producer will attempt
// to deliver messages
// for 5 minutes (default value of
// message.timeout.ms
SocketTimeoutMs = 10000,
ApiVersionRequestTimeoutMs = 10000,
MetadataRequestTimeoutMs = 5000,
RequestTimeoutMs = 5000
};
consumerConfig = new ConsumerConfig
{
BootstrapServers = config.BootstrapServers,
GroupId = config.GroupId,
EnableAutoCommit = true,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 10000
};
}
下一步是构建 Producer 以将消息记录到队列中。如果 Kafka 无法连接到 Broker,它将显示消息,表明服务器不可用。
private void KafkaProducer(string[] topics, ProducerConfig producerConfig)
{
Write(this.HttpContext, "--Kafka Producer...");
// Action<DeliveryReport<Null, string>> handlerDelivery = r =>
// Write(this.HttpContext, "--Kafka Producer message: " + r.Error.Reason +
// "Delivered message to" + r.TopicPartitionOffset);
// I don't recommend using this because of this:
// https://github.com/confluentinc/confluent-kafka-dotnet/issues/1025
using (var producer = new ProducerBuilder<Null, string>(producerConfig)
.SetErrorHandler((producer, error) =>
{
Write(this.HttpContext, "--Kafka Producer Error: " + error.Reason);
}
)
.SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json))
.Build())
{
for (int i = 0; i < 10; ++i)
{
// producer.Produce(topics[0].ToString(), new Message<Null, string>
// { Value = i.ToString() },
// handlerDelivery); //Não está retornando erro.
//(Não recomendo.)
var dr = producer.ProduceAsync(topics[0].ToString(),
new Message<Null, string> { Value = i.ToString() });
Write(this.HttpContext, "--Kafka dr: " + dr.Result.Value);
}
// wait for up to 1 seconds for any inflight messages to be delivered.
producer.Flush(TimeSpan.FromSeconds(1));
}
}
下一步是实现 Consumer
private void KafkaConsumer(string[] topics, ConsumerConfig consumerConfig)
{
Write(this.HttpContext, "--Kafka Consumer...");
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) =>
{
Write(this.HttpContext, "--Kafka Consumer Error: " + e.Reason);
}
)
.SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json ))
.Build())
{
consumer.Subscribe(topics[0].ToString());
try
{
while (true)
{
try
{
var cr = consumer.Consume(10000);
if(cr==null)
Write(this.HttpContext, "--Kafka Falha consumer null");
Write(this.HttpContext, "--Kafka Consumed message: " +
cr?.Message.Value +
" TopicPartitionOffset " + cr?.TopicPartitionOffset + " ");
}
catch (ConsumeException e)
{
Write(this.HttpContext, "--Kafka Error ConsumeException:" +
e.Error.Reason);
}
}
}
catch (OperationCanceledException ex)
{
Write(this.HttpContext,
"--Kafka OperationCanceledException:" + ex.Message);
// Ensure the consumer leaves the group cleanly and
// final offsets are committed.
consumer.Close();
}
}
}
下面的代码有助于可视化 Kafka 处理进度的服务器响应。
void Write(HttpContext context, string text)
{
context.Response.WriteAsync(string.Format("<p>[{0}] {1}</p> </br>",
DateTime.Now, text));
context.Response.Body.FlushAsync();
context.Response.Body.Flush();
}
关注点
Kafka 的标准错误处理策略有一个不显示处理消息时错误的策略。
资源
历史
- 2020 年 4 月 24 日:初始版本