win系统下启动linux上的kafka集群及使用
2015-07-15 13:36
441 查看
一、首先在win系统下C:\Windows\System32\drivers\etc目录中hosts文件添加如下内容:
10.61.6.167 slaves1
10.61.6.168 slaves2
10.61.6.169 slaves3
二、启动kafka集群类
package com.conn.server.start;
import java.io.IOException;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
public class StartKafka {
public static void main(String[] args) {
StartKafka startzk=new StartKafka();
//启动kafka自带zookeeper集群
startzk.startZk1();
startzk.startZk2();
startzk.startZk3();
//启动kafka
startzk.startKafka1();
startzk.startKafka2();
startzk.startKafka3();
}
public static void startZk1(){
String hostname = "10.61.6.166";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
//使用多个命令用分号隔开
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
//只允许使用一行命令,即ssh对象只能使用一次execCommand这个方法,多次使用则会出现异常
}
//连接的Session和Connection对象都需要关闭
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startZk2(){
String hostname = "10.61.6.168";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startZk3(){
String hostname = "10.61.6.169";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka1(){
String hostname = "10.61.6.167";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka2(){
String hostname = "10.61.6.168";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka3(){
String hostname = "10.61.6.169";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
三、生产者类
package com.performanceTest;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerSample {
public static void main(String[] args) throws FileNotFoundException {
//ProducerSample ps = new ProducerSample();
Properties props = new Properties();
props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182"); //这里也可以改为集群各节点的ip地址
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list","slaves1:9092,slaves2:9092,slaves3:9092");
props.put("request.required.acks", "1");
props.put("batch.num.messages","200");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
File file=new File("E:/test","110-140_1.txt");
BufferedReader readtxt=new BufferedReader(new FileReader(file));
String line=null;
byte[] item=null;
try {
while((line=readtxt.readLine())!=null){
line = line.replaceAll("\\t", ",");
item=line.getBytes();
String str = new String(item);
producer.send(new KeyedMessage<String, String>("mykafka",str));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
四、消费者类
package com.performanceTest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;
private final String topic;
private final List<String> messages = new ArrayList<String>();
public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("mykafka");
consumerThread.start();
}
public ConsumerTest(String topic) {
System.out.println(topic);
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms", "400000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
10.61.6.167 slaves1
10.61.6.168 slaves2
10.61.6.169 slaves3
二、启动kafka集群类
package com.conn.server.start;
import java.io.IOException;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
public class StartKafka {
public static void main(String[] args) {
StartKafka startzk=new StartKafka();
//启动kafka自带zookeeper集群
startzk.startZk1();
startzk.startZk2();
startzk.startZk3();
//启动kafka
startzk.startKafka1();
startzk.startKafka2();
startzk.startKafka3();
}
public static void startZk1(){
String hostname = "10.61.6.166";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
//使用多个命令用分号隔开
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
//只允许使用一行命令,即ssh对象只能使用一次execCommand这个方法,多次使用则会出现异常
}
//连接的Session和Connection对象都需要关闭
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startZk2(){
String hostname = "10.61.6.168";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startZk3(){
String hostname = "10.61.6.169";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka1(){
String hostname = "10.61.6.167";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka2(){
String hostname = "10.61.6.168";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void startKafka3(){
String hostname = "10.61.6.169";
String username = "root";
String password = "pass@word2";
//指明连接主机的IP地址
Connection conn = new Connection (hostname);
Session ssh = null;
try {
//连接到主机
conn.connect();
//使用用户名和密码校验
boolean isconn = conn.authenticateWithPassword(username, password);
if(!isconn){
System.out.println("用户名称或者是密码不正确");
}else{
System.out.println("已经连接OK");
ssh = conn.openSession();
ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");
}
ssh.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
三、生产者类
package com.performanceTest;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerSample {
public static void main(String[] args) throws FileNotFoundException {
//ProducerSample ps = new ProducerSample();
Properties props = new Properties();
props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182"); //这里也可以改为集群各节点的ip地址
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list","slaves1:9092,slaves2:9092,slaves3:9092");
props.put("request.required.acks", "1");
props.put("batch.num.messages","200");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
File file=new File("E:/test","110-140_1.txt");
BufferedReader readtxt=new BufferedReader(new FileReader(file));
String line=null;
byte[] item=null;
try {
while((line=readtxt.readLine())!=null){
line = line.replaceAll("\\t", ",");
item=line.getBytes();
String str = new String(item);
producer.send(new KeyedMessage<String, String>("mykafka",str));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
四、消费者类
package com.performanceTest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;
private final String topic;
private final List<String> messages = new ArrayList<String>();
public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("mykafka");
consumerThread.start();
}
public ConsumerTest(String topic) {
System.out.println(topic);
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms", "400000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
相关文章推荐
- linux配置jdk失败
- CentOS yum升级内核
- CentOS 下yum快速升级内核
- Linux系统性能之内存(memory)
- vsftpd创建匿名Linux FTP服务器
- UNIX/Linux下C语言的学习路线
- Linux 文件系统
- Linux 文件系统
- Linux 文件系统
- Linux 文件系统
- redhat linux iptables (NAT)
- [转][linux(ubuntu14.04)+GPU+cuda6.5+caffe+openCV2.4.9+matlab2013b+python2.7的新手配置转]
- Linux软连接必须写绝对路径
- SecureCRT连接linux设置vim显示颜色
- linux下目录的作用
- 详解Linux命令行下常用svn命令
- 虚拟机Linux----Ubuntu1404----root登录设置
- Linux-支持中文
- linux下重新定位SVN URL方法
- Linux性能之CPU性能