使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载
2015-06-02 09:52
911 查看
原文:http://blog.csdn.net/changong28/article/details/39325079
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:
1、创建主题(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
String[] options = new String[]{
"--create",
"--zookeeper",
"zk_host:port/chroot",
"--partitions",
"20",
"--topic",
"my_topic_name",
"--replication-factor",
"3",
"--config",
"x=y"
};
TopicCommand.main(options);
2、查看所有主题
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
String[] options = new String[]{
"--list",
"--zookeeper",
"localhost:2181"
};
TopicCommand.main(options);
3、查看指定主题:
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:
String[] options = new String[]{
"--describe",
"--zookeeper",
"localhost:2181",
"--topic",
"my-replicated-topic",
};
TopicCommand.main(options);
4、修改主题:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
String[] options = new String[]{
"--alter",
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name",
"--deleteConfig",
"x"
};
TopicCommand.main(options);
5、删除出题:
【命令方式】:无
【JAVA API方式】:
String[] options = new String[]{
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name"
};
DeleteTopicCommand.main(options);
另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处/article/4505232.html)
0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。
kafka.admin.DeleteTopicCommand
其中删除topic的具体实现代码如下:
因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:
1、创建主题(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
String[] options = new String[]{
"--create",
"--zookeeper",
"zk_host:port/chroot",
"--partitions",
"20",
"--topic",
"my_topic_name",
"--replication-factor",
"3",
"--config",
"x=y"
};
TopicCommand.main(options);
2、查看所有主题
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
String[] options = new String[]{
"--list",
"--zookeeper",
"localhost:2181"
};
TopicCommand.main(options);
3、查看指定主题:
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:
String[] options = new String[]{
"--describe",
"--zookeeper",
"localhost:2181",
"--topic",
"my-replicated-topic",
};
TopicCommand.main(options);
4、修改主题:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
String[] options = new String[]{
"--alter",
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name",
"--deleteConfig",
"x"
};
TopicCommand.main(options);
5、删除出题:
【命令方式】:无
【JAVA API方式】:
String[] options = new String[]{
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name"
};
DeleteTopicCommand.main(options);
另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处/article/4505232.html)
0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。
kafka.admin.DeleteTopicCommand
其中删除topic的具体实现代码如下:
相关文章推荐
- eclipse项目文件上的红色感叹号
- Myeclipse环境Spring版本升级后Web Server启动失败
- Java普通工程转换成标准的maven工程
- JDK8 - The type java.util.Map$Entry cannot be resolved
- spring定时器时batch批量操作出现找不到session的解决办法
- 学习:二维码、QR码、J4L-QRCode、java
- Spring.Net 配置文件
- Spring Roo 实站( 一 )部署安装 & 第一个示例程序
- java实现在线用户统计
- myeclipse自定义java注释
- Eclipse-----快速打开当前工程目录 浏览文件插件 EasyExplorer 和 OpenExplorer
- Spring Roo 实站( 一 )部署安装 & 第一个示例程序
- 疯狂Java学习笔记(67)-----------Timer和TimerTask
- 【struts2】五大核心jar包 和 作用
- 集算器用作Java结构化文件计算类库
- 【Android】eclipse下gradle
- Myeclipse 快捷键大全(绝对全)
- springMVC 文件下载
- Simulated Test of SCJP for JAVA2 PlatFORM (only for training)【修改版】
- Java中Runnable和Thread的区别