您的位置:首页 > 其它

curator 操作zookeeper 获取kafka 信息 util

2014-10-28 14:27 387 查看
采用maven 管理的 增加依赖项

<dependency>

      <groupId>org.scala-lang</groupId>

      <artifactId>scala-library</artifactId>

      <version>2.9.2</version>

    </dependency>

    <dependency>

      <groupId>com.yammer.metrics</groupId>

      <artifactId>metrics-core</artifactId>

      <version>2.2.0</version>

    </dependency>

    <dependency>

      <groupId>com.101tec</groupId>

      <artifactId>zkclient</artifactId>

      <version>0.3</version>

    </dependency>

    

    <dependency>  

    <groupId>org.apache.curator</groupId>

    <artifactId>curator-recipes</artifactId>

    <version>2.3.0</version>

    </dependency>

</dependencies>

util代码 



import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.util.List;

import java.util.logging.Level;

import java.util.logging.Logger;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.RetryNTimes;

public class kafkaShell {

//设置消费者根目录
private static  String _zkconsumers="/consumers";
//存储消费者组  按应用设置变量
private static List<String> grouplist=null;

//设置curator 客户端参数 具体参数 操作参考  http://macrochen.iteye.com/blog/1366136
static RetryPolicy retryPolicy=new RetryNTimes(3, 1000);
static CuratorFramework client=CuratorFrameworkFactory.newClient("192.168.50.129:2181,192.168.50.169:2181", 10000, 10000, retryPolicy);

    
/**
* 执行Shell
* @param command
* @return
* @throws InterruptedException
*/
public static String ec(String command) throws InterruptedException {
String returnString = "";
Process pro = null;
Runtime runTime = Runtime.getRuntime();
if (runTime == null) {
System.err.println("Create runtime false!");
}
try {
pro = runTime.exec(command);
BufferedReader input = new BufferedReader(new InputStreamReader(
pro.getInputStream()));
PrintWriter output = new PrintWriter(new OutputStreamWriter(
pro.getOutputStream()));
String line;
while ((line = input.readLine()) != null) {
// System.out.println(line);
returnString = returnString + line + "\n";
}
input.close();
output.close();
pro.destroy();
} catch (IOException ex) {
Logger.getLogger(kafkaShell.class.getName()).log(Level.SEVERE, null, ex);
}
return returnString;
}

public static void main(String[] args) {

}

/**
* 打印当前consumer下 topics 信息
*/
public void PrintMessage(){
try {
client.start();
//获取当前所有消费者组
grouplist=client.getChildren().forPath(_zkconsumers);
if(grouplist!=null&&grouplist.size()>0){
for (String groupstr : grouplist) {
//打印消费者组
System.err.println("group---------->"+groupstr);
//获取消费者组下的topic 的 offsets集合
List<String> listtopics=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets");

if(listtopics!=null&&listtopics.size()>0){
//遍历集合 获取分区offset值
for (String topicstr : listtopics) {
System.err.println("topicpath:"+_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
List<String> listpartition=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
for (String partition : listpartition) {
String partitionpath=_zkconsumers+"/"+groupstr+"/offsets/"+topicstr+"/"+partition;
System.err.println("partitionpath:"+partitionpath);
byte[]  newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":offset:--------------------------->"+new String(newoffset,"UTF-8"));
}
}
}
}
}

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

/**
* 修改制定消费者组下 topic下分区offset
* @param partitionpath
* @param offsetvalue
*/
public  void UpdateOffsetValue(String partitionpath,String offsetvalue){

try {
client.start();
byte[]  newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":oldoffset:--------------------------->"+new String(newoffset,"UTF-8"));
client.setData().forPath(partitionpath, offsetvalue.getBytes());
 newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":newoffset:--------------------------->"+new String(newoffset,"UTF-8"));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
client.close();
}
}

/**
* 打印消费者组下面 消费者id
* /consumers/[groupId]/ids/[consumerIdString]
* 每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
* @param path
*/
public static void PrintIds(CuratorFramework client,String path){

try {
List<String> consumerids=client.getChildren().forPath(path);
for (String consumerid : consumerids) {
System.err.println(path+"=consumerid---------------------------------->"+consumerid);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper curator kafka