curator 操作zookeeper 获取kafka 信息 util
2014-10-28 14:27
387 查看
采用maven 管理的 增加依赖项
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
util代码
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
public class kafkaShell {
//设置消费者根目录
private static String _zkconsumers="/consumers";
//存储消费者组 按应用设置变量
private static List<String> grouplist=null;
//设置curator 客户端参数 具体参数 操作参考 http://macrochen.iteye.com/blog/1366136
static RetryPolicy retryPolicy=new RetryNTimes(3, 1000);
static CuratorFramework client=CuratorFrameworkFactory.newClient("192.168.50.129:2181,192.168.50.169:2181", 10000, 10000, retryPolicy);
/**
* 执行Shell
* @param command
* @return
* @throws InterruptedException
*/
public static String ec(String command) throws InterruptedException {
String returnString = "";
Process pro = null;
Runtime runTime = Runtime.getRuntime();
if (runTime == null) {
System.err.println("Create runtime false!");
}
try {
pro = runTime.exec(command);
BufferedReader input = new BufferedReader(new InputStreamReader(
pro.getInputStream()));
PrintWriter output = new PrintWriter(new OutputStreamWriter(
pro.getOutputStream()));
String line;
while ((line = input.readLine()) != null) {
// System.out.println(line);
returnString = returnString + line + "\n";
}
input.close();
output.close();
pro.destroy();
} catch (IOException ex) {
Logger.getLogger(kafkaShell.class.getName()).log(Level.SEVERE, null, ex);
}
return returnString;
}
public static void main(String[] args) {
}
/**
* 打印当前consumer下 topics 信息
*/
public void PrintMessage(){
try {
client.start();
//获取当前所有消费者组
grouplist=client.getChildren().forPath(_zkconsumers);
if(grouplist!=null&&grouplist.size()>0){
for (String groupstr : grouplist) {
//打印消费者组
System.err.println("group---------->"+groupstr);
//获取消费者组下的topic 的 offsets集合
List<String> listtopics=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets");
if(listtopics!=null&&listtopics.size()>0){
//遍历集合 获取分区offset值
for (String topicstr : listtopics) {
System.err.println("topicpath:"+_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
List<String> listpartition=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
for (String partition : listpartition) {
String partitionpath=_zkconsumers+"/"+groupstr+"/offsets/"+topicstr+"/"+partition;
System.err.println("partitionpath:"+partitionpath);
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":offset:--------------------------->"+new String(newoffset,"UTF-8"));
}
}
}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 修改制定消费者组下 topic下分区offset
* @param partitionpath
* @param offsetvalue
*/
public void UpdateOffsetValue(String partitionpath,String offsetvalue){
try {
client.start();
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":oldoffset:--------------------------->"+new String(newoffset,"UTF-8"));
client.setData().forPath(partitionpath, offsetvalue.getBytes());
newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":newoffset:--------------------------->"+new String(newoffset,"UTF-8"));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
client.close();
}
}
/**
* 打印消费者组下面 消费者id
* /consumers/[groupId]/ids/[consumerIdString]
* 每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
* @param path
*/
public static void PrintIds(CuratorFramework client,String path){
try {
List<String> consumerids=client.getChildren().forPath(path);
for (String consumerid : consumerids) {
System.err.println(path+"=consumerid---------------------------------->"+consumerid);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
util代码
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
public class kafkaShell {
//设置消费者根目录
private static String _zkconsumers="/consumers";
//存储消费者组 按应用设置变量
private static List<String> grouplist=null;
//设置curator 客户端参数 具体参数 操作参考 http://macrochen.iteye.com/blog/1366136
static RetryPolicy retryPolicy=new RetryNTimes(3, 1000);
static CuratorFramework client=CuratorFrameworkFactory.newClient("192.168.50.129:2181,192.168.50.169:2181", 10000, 10000, retryPolicy);
/**
* 执行Shell
* @param command
* @return
* @throws InterruptedException
*/
public static String ec(String command) throws InterruptedException {
String returnString = "";
Process pro = null;
Runtime runTime = Runtime.getRuntime();
if (runTime == null) {
System.err.println("Create runtime false!");
}
try {
pro = runTime.exec(command);
BufferedReader input = new BufferedReader(new InputStreamReader(
pro.getInputStream()));
PrintWriter output = new PrintWriter(new OutputStreamWriter(
pro.getOutputStream()));
String line;
while ((line = input.readLine()) != null) {
// System.out.println(line);
returnString = returnString + line + "\n";
}
input.close();
output.close();
pro.destroy();
} catch (IOException ex) {
Logger.getLogger(kafkaShell.class.getName()).log(Level.SEVERE, null, ex);
}
return returnString;
}
public static void main(String[] args) {
}
/**
* 打印当前consumer下 topics 信息
*/
public void PrintMessage(){
try {
client.start();
//获取当前所有消费者组
grouplist=client.getChildren().forPath(_zkconsumers);
if(grouplist!=null&&grouplist.size()>0){
for (String groupstr : grouplist) {
//打印消费者组
System.err.println("group---------->"+groupstr);
//获取消费者组下的topic 的 offsets集合
List<String> listtopics=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets");
if(listtopics!=null&&listtopics.size()>0){
//遍历集合 获取分区offset值
for (String topicstr : listtopics) {
System.err.println("topicpath:"+_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
List<String> listpartition=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
for (String partition : listpartition) {
String partitionpath=_zkconsumers+"/"+groupstr+"/offsets/"+topicstr+"/"+partition;
System.err.println("partitionpath:"+partitionpath);
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":offset:--------------------------->"+new String(newoffset,"UTF-8"));
}
}
}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 修改制定消费者组下 topic下分区offset
* @param partitionpath
* @param offsetvalue
*/
public void UpdateOffsetValue(String partitionpath,String offsetvalue){
try {
client.start();
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":oldoffset:--------------------------->"+new String(newoffset,"UTF-8"));
client.setData().forPath(partitionpath, offsetvalue.getBytes());
newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":newoffset:--------------------------->"+new String(newoffset,"UTF-8"));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
client.close();
}
}
/**
* 打印消费者组下面 消费者id
* /consumers/[groupId]/ids/[consumerIdString]
* 每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
* @param path
*/
public static void PrintIds(CuratorFramework client,String path){
try {
List<String> consumerids=client.getChildren().forPath(path);
for (String consumerid : consumerids) {
System.err.println(path+"=consumerid---------------------------------->"+consumerid);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
相关文章推荐
- Java curator操作zookeeper获取kafka
- 获取当前操作职员等信息
- 操作 Wave 文件(5): 获取 Wave 文件的格式信息
- Linux环境编程简明教程(1)获取进程相关信息的操作
- DELPHI 获取版本号、格式化版本信息、比较版本号等相关操作
- windows的磁盘操作之三——获取和删除磁盘分区信息
- JS去空格、IFrame提示信息做后台操作、Cookie存值获取累加,调整过期时间。
- Environment类——获取当前操作环境和平台的信息
- Android软件开发之获取通讯录联系人信息 + android联系人信息的存储结构 + Android联系人读取操作笔记
- linux__获取文件信息___操作
- Python利用系统命令获取文件(夹)信息以及Python对Excel的简单操作
- 网页获取"活动目录"里信息时的报错:服务器不可操作
- 使用Adsutil.vbs脚本获取IIS配置信息及账号密码
- DELPHI 获取版本号、格式化版本信息、比较版本号等相关操作
- php获取文件类型和文件信息操作
- 操作 Wave 文件(4): 获取 Wave 文件主块与子块的信息
- QTP连接QC时,通过QCUtil对象获取QC中的Test信息
- 使用Adsutil.vbs脚本获取IIS配置信息及账号密码
- windows的磁盘操作之三——获取和删除磁盘分区信息
- DBF操作:多线程下的ADO连接DBF文件获取信息的解决