您的位置:首页 > 运维架构 > Linux

win系统下启动linux上的kafka集群及使用

2015-07-15 13:36 441 查看
一、首先在win系统下C:\Windows\System32\drivers\etc目录中hosts文件添加如下内容:

10.61.6.167 slaves1

10.61.6.168 slaves2

10.61.6.169 slaves3

二、启动kafka集群类

package com.conn.server.start;

import java.io.IOException;

import ch.ethz.ssh2.Connection;

import ch.ethz.ssh2.Session;

public class StartKafka {



public static void main(String[] args) {

StartKafka startzk=new StartKafka();

//启动kafka自带zookeeper集群

startzk.startZk1();

startzk.startZk2();

startzk.startZk3();

//启动kafka

startzk.startKafka1();

startzk.startKafka2();

startzk.startKafka3();

}



public static void startZk1(){

String hostname = "10.61.6.166";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

//使用多个命令用分号隔开

ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");



//只允许使用一行命令,即ssh对象只能使用一次execCommand这个方法,多次使用则会出现异常

}

//连接的Session和Connection对象都需要关闭

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}



}



public static void startZk2(){

String hostname = "10.61.6.168";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");



}

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}

}



public static void startZk3(){

String hostname = "10.61.6.169";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");

}

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}

}



public static void startKafka1(){

String hostname = "10.61.6.167";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");

}

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}

}



public static void startKafka2(){

String hostname = "10.61.6.168";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");

}

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}

}



public static void startKafka3(){

String hostname = "10.61.6.169";

String username = "root";

String password = "pass@word2";

//指明连接主机的IP地址

Connection conn = new Connection (hostname);

Session ssh = null;

try {

//连接到主机

conn.connect();

//使用用户名和密码校验

boolean isconn = conn.authenticateWithPassword(username, password);

if(!isconn){

System.out.println("用户名称或者是密码不正确");

}else{

System.out.println("已经连接OK");

ssh = conn.openSession();

ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");

}

ssh.close();

conn.close();



} catch (IOException e) {

e.printStackTrace();

}

}



}

三、生产者类

package com.performanceTest;

import java.io.BufferedReader;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.io.IOException;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class ProducerSample {

public static void main(String[] args) throws FileNotFoundException {

//ProducerSample ps = new ProducerSample();



Properties props = new Properties();

props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182"); //这里也可以改为集群各节点的ip地址

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list","slaves1:9092,slaves2:9092,slaves3:9092");

props.put("request.required.acks", "1");

props.put("batch.num.messages","200");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

File file=new File("E:/test","110-140_1.txt");

BufferedReader readtxt=new BufferedReader(new FileReader(file));

String line=null;

byte[] item=null;

try {

while((line=readtxt.readLine())!=null){

line = line.replaceAll("\\t", ",");

item=line.getBytes();

String str = new String(item);

producer.send(new KeyedMessage<String, String>("mykafka",str));

}

} catch (IOException e) {

e.printStackTrace();

}



}

}

四、消费者类

package com.performanceTest;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerTest extends Thread {

private final ConsumerConnector consumer;

private final String topic;

private final List<String> messages = new ArrayList<String>();



public static void main(String[] args) {

ConsumerTest consumerThread = new ConsumerTest("mykafka");

consumerThread.start();

}



public ConsumerTest(String topic) {

System.out.println(topic);

consumer = kafka.consumer.Consumer

.createJavaConsumerConnector(createConsumerConfig());

this.topic = topic;

}



private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();

props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182");

props.put("group.id", "0");

props.put("zookeeper.session.timeout.ms", "400000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");



return new ConsumerConfig(props);



}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: