您的位置:首页 > 运维架构

kafka管理Topic

2016-03-23 10:20 375 查看
windows下:
//创建主题Create a topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查询所有主题
kafka-topics.bat --list  --zookeeper 10.180.192.16:2181
test

//删除主题
kafka-topics.bat --delete --zookeeper localhost:2181 --topic test


java代码

package util;

import kafka.admin.TopicCommand;

public class TopicUtil {

/****
* 查询所有主题
*
* @param zookeeperAddr
*/
public void queryTopics(String zookeeperAddr) {
String[] options = new String[] {
"--list",
"--zookeeper","10.180.192.16:2181"
};
TopicCommand.main(options);
}

/****
* 创建主题
*
* @param zookeeperAddr
* @param topicName
*/
public void createTopic(String zookeeperAddr, String topicName) {
String[] options = new String[] {
"--create",
"--zookeeper","localhost:2181",
"--partitions", "1",
"--topic", topicName,
"--replication-factor", "1"
};
TopicCommand.main(options);
}

/****
* 删除主题
*
* @param zookeeperAddr
* @param topicName
*            --delete --zookeeper host:port --topic topicname
*/
public void delTopic(String zookeeperAddr, String topicName) {
String[] options = new String[] {
"--delete",
"--zookeeper","localhost:2181",
"--topic", topicName };
TopicCommand.main(options);
}

/****
* 查询指定主题
*
* @param zookeeperAddr
* @param topicName
*/
public void detailTopic(String zookeeperAddr, String topicName) {
String[] options = new String[] {
"--describe",
"--zookeeper","localhost:2181",
"--topic", topicName, };
TopicCommand.main(options);
}

public static void main(String[] args) {
TopicUtil topicutil = new TopicUtil();
// topicutil.queryTopics("");
// topicutil.createTopic("", "testwh2");
// System.err.println("=============创建了新的主题==================");
// topicutil.queryTopics("");
// topicutil.delTopic("","testwh2");
topicutil.queryTopics("");
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: