您的位置:首页 > 编程语言 > Java开发

读取kafka 0.8 简单例子

2015-07-12 17:51 609 查看
Kafka0.8和kafka0.7变化非常大,使用方式和接口class都变掉了。这里简单介绍一下0.8的kafka如何读取数据。
系统中的flume会把数据写到kafka,为了验证数据的正确性,我就写了一个读取kafka的小程序。所以这个项目的主要意义是为了做数据验证,实际的生产环境的配置会比这个复杂。
我使用的是maven管理java项目。
首先maven的dependency是这样的:
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>org.iker.example</groupId>
<artifactId>kafka.consumer</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>

<name>kafka.consumer</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- -->

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>config</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>fully.qualified.MainClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin -->
</plugins>
</build>
</project>

有两个java class,一个用来提供用户交互,一个用来实际读取kafka。通过命令行从kafka读数据,可以根据这个程序,写出符合自己需求的程序。
 ConsumerApp用来和用户提供交互:
package org.iker.example.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

//import org.iker.example.kafka.producer.ProducerApp;

/**
* Simple Kafka message consumer
*/
public class ConsumerApp {

private final ConsumerConfig config;
private final ConsumerConnector consumer;
private ExecutorService executor;
private static String KAFKA_TOPIC = "dx_sem_server_logs";

public ConsumerApp(String zooKeeper, String groupId) {

Properties props = new Properties();
props.put("zookeeper.connect", zooKeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

this.config = new ConsumerConfig(props);
this.consumer = Consumer.createJavaConsumerConnector(this.config);
}

public void shutdown() {

if (this.consumer != null) {
this.consumer.shutdown();
}
if (this.executor != null) {
executor.shutdown();
}

try {
if (!this.executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out
.println("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
}
} catch (InterruptedException e) {
System.out
.println("Interrupted during shutdown, exiting uncleanly.");
}
}

public void run(int numThreads) {

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(ConsumerApp.KAFKA_TOPIC, new Integer(numThreads));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap
.get(ConsumerApp.KAFKA_TOPIC);

// now launch all the threads
//
this.executor = Executors.newFixedThreadPool(numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
this.executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
}

public static void main(String[] args) {

String zooKeeper = args[0];
String groupId = args[1];
//int threads = Integer.parseInt(args[2]);
int threads = 1;

ConsumerApp app = new ConsumerApp(zooKeeper, groupId);
app.run(threads);
try {
while (true) {
Thread.sleep(10000L);
}
} catch (Exception e) {
e.printStackTrace();
}
app.shutdown();
}
}


ConsumerThread用来读取kafka:
 package org.iker.example.kafka.consumer;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerThread implements Runnable {

private final int threadSerial;
private final KafkaStream<byte[], byte[]> stream;

public ConsumerThread(KafkaStream<byte[], byte[]> stream, int threadSerial) {
this.threadSerial = threadSerial;
this.stream = stream;
}

public void run() {
ConsumerIterator<byte[], byte[]> iter = this.stream.iterator();
String fileName = threadSerial + ".data";
System.out.println("Start Thread: " + this.threadSerial);
int cnt = 0;
try {
DataOutputStream out = new DataOutputStream(new FileOutputStream(fileName));
while (iter.hasNext()) {
byte[] bytes = iter.next().message();
out.writeInt(bytes.length);
out.write(bytes);
out.flush();
cnt++;
}
out.close();
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Total wr
9ce3
ite: " + cnt);
System.out.println("Shutting down Thread: " + this.threadSerial);
}
}

编译很简单:
 mvn cleanpackage
在target目录下,会得到一个jar包。
运行也很简单:
 java -cp$your_jar_file org.iker.example.kafka.consumer.ConsumerApp <zookeeper> <groupid>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka0.8 kafka java