您的位置:首页 > 运维架构

kafka清空topics

2015-12-16 08:20 567 查看
1、缘由(需求):因为测试时不小心,写了一大堆topics,看监测时一顿揪心。

2、解决方案:kafka是被zookeeper监管的,topics是在zookeeper上的(描述可能不是很准确),我发现kakfa原生api好像没有批量删除topics的方式,我登陆到zookeeper后,在zookeeper的目录下发现了topics。于是写了个递归程序将其删除。以下是代码:

public class App {
static String path = "/brokers";
static ZKConnection zkc = null;

public static void main(String[] args) throws Exception {
zkc = new ZKConnection();
zkc.connect();
get(zkc.zk.getChildren(path, true), path);

}

public static void get(List<String> paths, String path) throws Exception {
for (String p : paths) {
p = path + "/" + p;
if (null == zkc.zk.exists(p, true)) {
System.out.println(p);
//zkc.zk.delete(p, 0);
} else {
get(zkc.zk.getChildren(p, true), p);
int i =zkc.zk.exists(p,true).getVersion();
zkc.zk.delete(p,i);
}

System.out.println(p);
}
}
}

class ZKConnection {
/**
* server列表, 以逗号分割
*/
protected String hosts = "server1:2181,server2:2181,server3:2181";
/**
* 连接的超时时间, 毫秒
*/
private static final int SESSION_TIMEOUT = 5000;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public ZooKeeper zk;

/**
* 连接zookeeper server
*/
public void connect() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
// 等待连接完成
connectedSignal.await();
}

public class ConnWatcher implements Watcher {
public void process(WatchedEvent event) {
// 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected
if (event.getState() == KeeperState.SyncConnected) {
// 放开闸门, wait在connect方法上的线程将被唤醒
connectedSignal.countDown();
}
}
}


注:连接代码是网上得的

3.重启kafka,完毕。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka topics 清空