kafka清空topics
2015-12-16 08:20
567 查看
1、缘由(需求):因为测试时不小心,写了一大堆topics,看监测时一顿揪心。
2、解决方案:kafka是被zookeeper监管的,topics是在zookeeper上的(描述可能不是很准确),我发现kakfa原生api好像没有批量删除topics的方式,我登陆到zookeeper后,在zookeeper的目录下发现了topics。于是写了个递归程序将其删除。以下是代码:
注:连接代码是网上得的
3.重启kafka,完毕。
2、解决方案:kafka是被zookeeper监管的,topics是在zookeeper上的(描述可能不是很准确),我发现kakfa原生api好像没有批量删除topics的方式,我登陆到zookeeper后,在zookeeper的目录下发现了topics。于是写了个递归程序将其删除。以下是代码:
public class App { static String path = "/brokers"; static ZKConnection zkc = null; public static void main(String[] args) throws Exception { zkc = new ZKConnection(); zkc.connect(); get(zkc.zk.getChildren(path, true), path); } public static void get(List<String> paths, String path) throws Exception { for (String p : paths) { p = path + "/" + p; if (null == zkc.zk.exists(p, true)) { System.out.println(p); //zkc.zk.delete(p, 0); } else { get(zkc.zk.getChildren(p, true), p); int i =zkc.zk.exists(p,true).getVersion(); zkc.zk.delete(p,i); } System.out.println(p); } } } class ZKConnection { /** * server列表, 以逗号分割 */ protected String hosts = "server1:2181,server2:2181,server3:2181"; /** * 连接的超时时间, 毫秒 */ private static final int SESSION_TIMEOUT = 5000; private CountDownLatch connectedSignal = new CountDownLatch(1); public ZooKeeper zk; /** * 连接zookeeper server */ public void connect() throws Exception { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher()); // 等待连接完成 connectedSignal.await(); } public class ConnWatcher implements Watcher { public void process(WatchedEvent event) { // 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected if (event.getState() == KeeperState.SyncConnected) { // 放开闸门, wait在connect方法上的线程将被唤醒 connectedSignal.countDown(); } } }
注:连接代码是网上得的
3.重启kafka,完毕。
相关文章推荐
- Kafka 之 中级
- PowerShell中编程清空IE缓存方法
- mysql表的清空、删除和修改操作详解
- asp.net清空Cookie的两种方法
- C#实现清空回收站的方法
- Shell脚本定期清空大于1G的日志文件
- MySQL数据表字段内容的批量修改、清空、复制等更新命令
- asp清空application的方法
- Linux下Kafka单机安装配置方法(图文)
- JavaScript清空数组元素的两种方法简单比较
- Kafka使用入门教程第1/2页
- Logstash 与Elasticsearch整合使用示例
- Kafka+Log4j实现日志集中管理
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- kafka 一些基本知识