Kafka.net使用编程入门
2016-06-12 21:22
295 查看
最近研究分布式消息队列,分享下!
首先zookeeper 和 kafka 压缩包 解压 并配置好!
我本机zookeeper环境配置如下:
D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg
以下是kafka的配置
D:\Worksoftware\Apachekafka2.11\config\server.properties
我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer
然后执行cmd命令:
结果:
然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:
重头戏来了,开始kafka C#客户端处理:
首先引用kafka-net.dll,可以用vs2013的nuget下载,
以下是Prorame.cs
结果:
闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686
首先zookeeper 和 kafka 压缩包 解压 并配置好!
我本机zookeeper环境配置如下:
D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg
以下是kafka的配置
D:\Worksoftware\Apachekafka2.11\config\server.properties
我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer
然后执行cmd命令:
结果:
然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:
重头戏来了,开始kafka C#客户端处理:
首先引用kafka-net.dll,可以用vs2013的nuget下载,
以下是Prorame.cs
class Program { static void Main(string[] args) { const string topicName = "test"; var options = new KafkaOptions(new Uri("http://localhost:9092")) { Log = new ConsoleLog() }; Task.Run(() => { var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() }); foreach (var data in consumer.Consume()) { Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String()); } }); //创建一个生产者发消息 var producer = new Producer(new BrokerRouter(options)) { BatchSize = 100, BatchDelayTime = TimeSpan.FromMilliseconds(2000) }; Console.WriteLine("打出一条消息按 enter..."); while (true) { var message = Console.ReadLine(); if (message == "quit") break; if (string.IsNullOrEmpty(message)) { // SendRandomBatch(producer, topicName, 200); } else { producer.SendMessageAsync(topicName, new[] { new Message(message) }); } } //释放资源 using (producer) { } } private static async void SendRandomBatch(Producer producer, string topicName, int count) { //发送多个消息 var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString()))); Console.WriteLine("传送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); var response = await sendTask; Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); foreach (var result in response.OrderBy(x => x.PartitionId)) { Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset); } } }
结果:
闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686
相关文章推荐
- leetcode-java-198. House Robber
- JAVA基础之装饰者模式
- 生产者与消费者问题,java实现
- Python 模块使用
- spring注解
- 如何让进程运行在指定的cpu上
- Java 队列2:循环队列
- C++高级排序算法详解
- Django setting 常用配置
- 在qt下获取屏幕分辨率
- C语言URL解析器(代码分享)
- 在struts2中默认执行的execute方法
- c++实现Face++ API的调用
- lab 相关--使用vb.net实现对控制文件的管理操作
- PHP学习建议及编码规范
- Filter编程:(1)Filter基础
- composer及laravel安装及环境配置
- Java命令——javap
- 命令行音乐播放器代码
- java排序之快速排序