您的位置:首页 > 数据库 > Redis

redis async client 与自有框架集成

2015-12-24 14:41 573 查看
hiredis的异步接口已经支持ae libuv libev 和 libevent集成,具体头文件可以参见redis/deps/hiredis/adapters,样例参见redis/deps/hiredis/examples.

完整样例参见: https://github.com/DavadDi/study_example/tree/master/async_redis_client
参照hireids的异步接口和libevent的集成可以很容易和其他网络框架集成,例如asio或者ace等。 以下样例为自己编写reactor框架的集成方式,

支持自动重练和asyncRedisContext对象的创建和释放,重练使用退步算法,最大连接时间间隔为32秒。

使用方式:
  将redis_client.hpp 放到 hiredis的adapter目录即可。

#ifndef redis_client_h
#define redis_client_h

#include "reactor/define.hpp"
#include "reactor/event_handler.hpp"
#include "hiredis.h"
#include "async.h"

using namespace reactor;

static  void redisReactorAddRead(void *arg);
static  void redisReactorDelRead(void *arg);
static  void redisReactorAddWrite(void *arg);
static  void redisReactorDelWrite(void *arg);
static  void redisReactorCleanup(void *arg);

void connectCallBack(const redisAsyncContext *c, int status);
void disconnectCallBack(const redisAsyncContext *c, int status);

void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
redisReply *reply = (redisReply *)r;
std::string *key_str = (std::string *)arg;

if (reply == NULL)
{
delete key_str;
return;
}

LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);

delete key_str;

/* Disconnect after receiving the reply to GET */
// redisAsyncDisconnect(c);
}

// -------------------------------------------------------------------
// !!NOTE, if obj conneted to server faild and unregister from epoll,
// prog exit, this object my leak memory
// -------------------------------------------------------------------

class CRedisClient : public CEventHandler
{
public:
// enum {MAX_BUF_SIZE = 4096};
typedef CEventHandler super;

CRedisClient(const char *srv_ip, uint16_t srv_port, reactor::CReactor *reactor)
: super(reactor)
{
m_srv_ip_str = srv_ip;
m_srv_port = srv_port;
}

int connect()
{
LOG_DEBUG("Enter CRedisClient connect()");
m_client_status = CONNECT_STATUS::CLIENT_CONNECTING;

clear_redis_context();

m_redis_context = redisAsyncConnect(m_srv_ip_str.c_str(), m_srv_port);
if (m_redis_context == nullptr)
{
return -1;
}

if (m_redis_context->err)
{
LOG_INFO("Connect to %s:%d Error: %s",
m_srv_ip_str.c_str(), m_srv_port, m_redis_context->errstr);

return -1;
}

if (m_timer_id == 0)
{
m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
LOG_DEBUG("Client regist timer to reactor id %d, timeout %d", m_timer_id, m_timeout_value);
}

this->attach();

return 0;
}

virtual ~CRedisClient()
{
// maybe should not free redis context in deconstuct!!
m_delete_redis_context = true;
clear_redis_context();
}

virtual int open(void *data = nullptr)
{
m_client_status = CONNECT_STATUS::CLIENT_CONNECTED;

m_delete_redis_context = false;

if (m_timer_id == 0)
{
m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
LOG_DEBUG("Client regist timer to reactor id %d, timeout %d",
m_timer_id, m_timeout_value);
}

LOG_INFO("Connect to RedisServer %s:%d succeed!!",
m_srv_ip_str.c_str(), m_srv_port);

return 0;
}

virtual int handle_input(socket_t socket)
{
redisAsyncHandleRead(m_redis_context);
return 0;
}

virtual int handle_output(socket_t socket)
{
redisAsyncHandleWrite(m_redis_context);
return 0;
}

virtual int handle_timeout(uint32_t tm, void *data = nullptr)
{
// LOG_DEBUG("Enter into timeout function....");
if (m_client_status == CONNECT_STATUS::CLIENT_CONNECTED)
{
/* just for test */
std::string key = std::to_string(tm);

LOG_DEBUG("Set key %s", key.c_str());
redisAsyncCommand(m_redis_context, NULL, NULL, "SET %s %s",key.c_str(), "aaa");
redisAsyncCommand(m_redis_context, get_call_fun, (char*)new string(key), "GET %s", key.c_str());
}
else
{
static uint32_t last_tm = 0;
if ((tm - last_tm) >= m_timeout_interval)
{
//reconnect
LOG_DEBUG("Start reconnect now ...");
this->connect();

m_timeout_interval = m_timeout_interval * 2;
if (m_timeout_interval > 32)
{
m_timeout_interval = 1;
}

last_tm = tm;
}
}

return 0;
}

virtual int handle_close(socket_t socket = INVALID_SOCKET, uint32_t mask = 0)
{
LOG_DEBUG("Enter into handle_close()");
m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;

// epoll call delete this handler
if (mask & RE_MASK_DEL)
{
LOG_DEBUG("Call RE_MASK_DEL now");

if (this->m_timer_id && (this->reactor() != nullptr))
{
this->reactor()->unregist_timer(this->m_timer_id);
this->m_timer_id = 0;
}

delete this;
return 0;
}

this->reactor()->del_event(this,0);
return 0;
}

void clear_redis_context()
{
if (m_delete_redis_context && m_redis_context != nullptr)
{
LOG_DEBUG("Call redisAsynFree() now");
redisAsyncFree(m_redis_context);
m_redis_context = nullptr;
}
}

int attach()
{
LOG_DEBUG("Enter attatch function... ");

redisContext *context = &(m_redis_context->c);
if (m_redis_context->ev.data != NULL)
{
return -1;
}

// set callback function
redisAsyncSetConnectCallback(m_redis_context,connectCallBack);
redisAsyncSetDisconnectCallback(m_redis_context,disconnectCallBack);

this->set_handle(context->fd); // set handler

m_redis_context->ev.addRead = redisReactorAddRead;
m_redis_context->ev.delRead = redisReactorDelRead;
m_redis_context->ev.addWrite = redisReactorAddWrite;
m_redis_context->ev.delWrite = redisReactorDelWrite;
m_redis_context->ev.cleanup = redisReactorCleanup;
m_redis_context->ev.data = this;

LOG_DEBUG("ac->ev.data %p", m_redis_context->ev.data);

this->add_read();
this->add_write();

return 0;
}

void add_read()
{
LOG_TRACE_METHOD(__func__);

if( (this->m_current_event_mask & reactor::EVENT_READ) > 0)
{
LOG_DEBUG("EV_READ(0x%0x) already in event_mask 0x%x",
reactor::EVENT_READ, this->m_current_event_mask);

return;
}

this->reactor()->add_event(this, reactor::EVENT_READ);
}

void del_read()
{
LOG_TRACE_METHOD(__func__);
this->reactor()->mod_event(this, this->m_current_event_mask&(~reactor::EVENT_READ));
}

void add_write()
{
LOG_TRACE_METHOD(__func__);
this->schedule_write();
}

void del_write()
{
LOG_TRACE_METHOD(__func__);
this->cancel_schedule_write();
}

void clean_up()
{
LOG_TRACE_METHOD(__func__);
}

// note!!!
// connenct not succeed. we can free redis context. ]
// But if connect succeed and borken, we don't connect

protected:
std::string m_srv_ip_str;
uint16_t    m_srv_port;

CONNECT_STATUS  m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;

int m_timer_id  = 0;
uint32_t m_timeout_value = 1;
uint32_t  m_timeout_interval = 1;

bool    m_delete_redis_context = true;
redisAsyncContext *m_redis_context = nullptr;
};

static void redisReactorAddRead(void *arg)
{
LOG_DEBUG("Enter redisReactorAddRead() arg %p", arg);
CRedisClient *event_handler = (CRedisClient *)arg;
event_handler->add_read();
}

static void redisReactorDelRead(void *arg)
{
CRedisClient *event_handler = (CRedisClient *)arg;
event_handler->del_read();
}

static void redisReactorAddWrite(void *arg)
{
CRedisClient *event_handler = (CRedisClient *)arg;
event_handler->add_write();
}

static void redisReactorDelWrite(void *arg)
{
CRedisClient *event_handler = (CRedisClient *)arg;
event_handler->del_write();
}

static void redisReactorCleanup(void *arg)
{
CRedisClient *event_handler = (CRedisClient *)arg;
event_handler->clean_up();
}

void connectCallBack(const redisAsyncContext *ac, int status)
{
if (status != REDIS_OK)
{
LOG_ERROR("connectCallBack() Error: %s", ac->errstr);
return;
}

CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
event_handler->open();

LOG_INFO("RedisClient Connected...");
}

void disconnectCallBack(const redisAsyncContext *ac, int status)
{
CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
event_handler->handle_close(0,0);

if (status != REDIS_OK)
{
LOG_INFO("disconnectCallBack()!! Error: %s", ac->errstr);
return;
}

LOG_INFO("RedisClient Disconnected...");
}

#endif /* redis_client_h */


使用的程序样例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

//#include <hiredis.h>
//#include <async.h>

#include <adapters/redis_client.hpp>
//#include "redis_client.hpp"

#include <signal.h>

static void signal_handler(int sig)
{
if (sig == SIGINT)
{
reactor::CReactor::instance()->end_event_loop();
}
}

/*
void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
redisReply *reply = (redisReply *)r;
std::string *key_str = (std::string *)arg;

if (reply == NULL)
{
delete key_str;
return;
}

LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);

delete key_str;

// Disconnect after receiving the reply to GET
// redisAsyncDisconnect(c);
}
*/

int main (int argc, char **argv)
{
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, signal_handler);

CLoggerMgr logger("reactor.prop");

reactor::CReactor *rt = reactor::CReactor::instance();
CRedisClient *redis_client = new CRedisClient("127.0.0.1", 6379, rt);
redis_client->connect();

rt->run_event_loop();

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: