今天就跟大家聊聊有关.NET Core如何使用Kafka,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
安装
CentOS安装 kafka
下载并解压
# 下载,并解压
$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
$ tar -zxvf kafka_2.12-2.1.1.tgz
$ mv kafka_2.12-2.1.1.tgz /data/kafka
# 下载 zookeeper,解压
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/zookeeper
启动 ZooKeeper
# 复制配置模版
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg
# 看看配置需不需要改
$ vim zoo.cfg
# 命令
$ ./bin/zkServer.sh start # 启动
$ ./bin/zkServer.sh status # 状态
$ ./bin/zkServer.sh stop # 停止
$ ./bin/zkServer.sh restart # 重启
# 使用客户端测试
$ ./bin/zkCli.sh -server localhost:2181
$ quit
启动 Kafka
# 备份配置
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy
# 修改配置
$ vim /data/kafka/config/server.properties
# 集群配置下,每个 broker 的 id 是必须不同的
# broker.id=0
# 监听地址设置(内网)
# listeners=PLAINTEXT://ip:9092
# 对外提供服务的IP、端口
# advertised.listeners=PLAINTEXT://106.75.84.97:9092
# 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3
# num.partitions=3
# zookeeper 配置
# zookeeper.connect=localhost:2181
# 通过配置启动 kafka
$ ./bin/kafka-server-start.sh config/server.properties&
# 状态查看
$ ps -ef|grep kafka
$ jps
docker下安装Kafka
docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
介绍
kafka partition 和 consumer 数目关系
在 .NET Core 项目中安装组件
Install-Package Confluent.Kafka
开源地址: https://github.com/confluentinc/confluent-kafka-dotnet
添加 IKafkaService
服务接口
public interface IKafkaService
{
/// <summary>
/// 发送消息至指定主题
/// </summary>
/// <typeparam name="TMessage"></typeparam>
/// <param name="topicName"></param>
/// <param name="message"></param>
/// <returns></returns>
Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;
/// <summary>
/// 从指定主题订阅消息
/// </summary>
/// <typeparam name="TMessage"></typeparam>
/// <param name="topics"></param>
/// <param name="messageFunc"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}
实现 IKafkaService
public class KafkaService : IKafkaService
{
public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class
{
var config = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092"
};
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync(topicName, new Message<string, string>
{
Key = Guid.NewGuid().ToString(),
Value = message.SerializeToJson()
});
}
public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
{
var config = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
GroupId = "crow-consumer",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
//const int commitPeriod = 5;
using var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) =>
{
Console.WriteLine($"Error: {e.Reason}");
})
.SetStatisticsHandler((_, json) =>
{
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
})
.Build();
consumer.Subscribe(topics);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
TMessage messageResult = null;
try
{
messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
}
catch (Exception ex)
{
var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
Console.WriteLine(errorMessage);
messageResult = null;
}
if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
{
messageFunc(messageResult);
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine(e.Message);
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
await Task.CompletedTask;
}
}
注入 IKafkaService
,在需要使用的地方直接调用即可。
public class MessageService : IMessageService, ITransientDependency
{
private readonly IKafkaService _kafkaService;
public MessageService(IKafkaService kafkaService)
{
_kafkaService = kafkaService;
}
public async Task RequestTraceAdded(XxxEventData eventData)
{
await _kafkaService.PublishAsync(eventData.TopicName, eventData);
}
}
以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{
// Your logic
Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
}, cts.Token);
在 IKafkaService
中已经写了订阅消息的接口,这里也是注入后直接使用即可。
生产者消费者示例
生产者
static async Task Main(string[] args)
{
if (args.Length != 2)
{
Console.WriteLine("Usage: .. brokerList topicName");
// 127.0.0.1:9092 helloTopic
return;
}
var brokerList = args.First();
var topicName = args.Last();
var config = new ProducerConfig { BootstrapServers = brokerList };
using var producer = new ProducerBuilder<string, string>(config).Build();
Console.WriteLine("\n-----------------------------------------------------------------------");
Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
Console.WriteLine("> key value<Enter>");
Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
Console.WriteLine("> value<enter>");
Console.WriteLine("Ctrl-C to quit.\n");
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancelled = true;
};
while (!cancelled)
{
Console.Write("> ");
var text = string.Empty;
try
{
text = Console.ReadLine();
}
catch (IOException)
{
break;
}
if (string.IsNullOrWhiteSpace(text))
{
break;
}
var key = string.Empty;
var val = text;
var index = text.IndexOf(" ");
if (index != -1)
{
key = text.Substring(0, index);
val = text.Substring(index + 1);
}
try
{
var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>
{
Key = key,
Value = val
});
Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
}
}
消费者
static void Main(string[] args)
{
if (args.Length != 2)
{
Console.WriteLine("Usage: .. brokerList topicName");
// 127.0.0.1:9092 helloTopic
return;
}
var brokerList = args.First();
var topicName = args.Last();
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = "consumer",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
const int commitPeriod = 5;
using var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) =>
{
Console.WriteLine($"Error: {e.Reason}");
})
.SetStatisticsHandler((_, json) =>
{
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");
//Console.WriteLine($"Statistics: {json}");
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build();
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
看完上述内容,你们对.NET Core如何使用Kafka有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。