您的位置:首页 > 其它

简单的Kafka0.8消费者程序

2016-05-01 17:41 483 查看

简单的Kafka0.8消费者程序

Table of Contents

1. 建立连接
2. 基本功能
3. 后续解决3.1. run变量
3.2. 读取多个broker
3.3. 断点续读
3.4. 和zookeeper交互读取元数据

1 建立连接

建立和kafka的连接,需要以下几个基本信息:
brokers IP 和 端口号
topic名称

2 基本功能

读取数据需要以下信息
partition id
开始的offset
下面的代码来自于exaple,做了简化,去掉了参数功能,指演示以下功能
仅读取本地一个broker的数据
partition只有一个,id为0
当接收到外界kill信号或者读取到最后一个offset的时候,会退出程序
当读取数据时出现错误会退出
可以接收rdkafka的事件,虽然现在还不知道有多大用
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include <list>
#include "librdkafka/rdkafkacpp.h"

using std::string;
using std::list;
using std::cout;
using std::endl;

static bool run = true;
static bool exit_eof = true;

class MyEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;

case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;

case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;

default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};

void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;

case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
cout << static_cast<const char *>(message->payload()) << endl;
break;

case RdKafka::ERR__PARTITION_EOF:
cout << "reach last message" << endl;
/* Last message */
if (exit_eof) {
run = false;
}
break;

case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;

default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}

class MyConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};

static void sigterm (int sig) {
run = false;
}

int main (int argc, char **argv) {
/*
* Process kill signal, quit from the loop
*/
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);

/*
* Create configuration objects
*/
RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

/*
* Set configuration properties
*/
string brokers = "localhost";
string errstr;
global_conf->set("metadata.broker.list", brokers, errstr);

/*
* Accept event from RdKafka
*/
MyEventCb ex_event_cb;
global_conf->set("event_cb", &ex_event_cb, errstr);

/*
* Create consumer using accumulated global configuration.
*/
RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}

std::cout << "% Created consumer " << consumer->name() << std::endl;

/*
* Create topic handle.
*/
string topic_name = "test";
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}

/*
* Start consumer for topic+partition at start offset
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}

/*
* Consume messages
*/
MyConsumeCb ex_consume_cb;
int use_ccb = 0;
while (run) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000,
&ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}

/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);

delete topic;
delete consumer;

/*
* Wait for RdKafka to decommission.
* This is not strictly needed (when check outq_len() above), but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/
RdKafka::wait_destroyed(5000);

return 0;
}

3 后续解决

3.1 run变量

会被signle处理函数,myconsumer回调函数设置,是否都在一个线程中需要确定?如果在不同的线程中,需要使用atomic进行同步,否则可能会出现脏读、脏写的问题

3.2 读取多个broker

3.3 断点续读

3.4 和zookeeper交互读取元数据

Created: 2016-05-01 Sun 17:39Validate
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka