您的位置:首页 > 数据库 > Redis

C++封装的Redis

2016-05-05 16:10 302 查看
#include "ServerBasePCH.h"
#include "RedisManager.h"

#ifdef _WIN32
#include <hiredis/win32fixes.h>
#endif

RedisReply::RedisReply() : _reply(NULL) {
}

RedisReply::~RedisReply() {
SetReply(NULL);
}

void RedisReply::SetReply(redisReply* r) {
if (_reply != NULL) {
freeReplyObject(_reply);
}
_reply = r;
}

Redis::Redis()
: m_pool_index(-1)
, m_c(NULL)
, m_pending_reply_count(0)
, m_host(NULL)
, m_port(0)
{
}

Redis::~Redis() {
Close();

if (m_host) {
free(m_host);
m_host = NULL;
}
}

void Redis::SetPoolIndex(int index) {
m_pool_index = index;
}

int Redis::GetPoolIndex() {
return m_pool_index;
}

bool Redis::Connect(const char* host, unsigned int port) {
m_pending_reply_count = 0;

if (host != m_host) {
if (m_host != NULL) {
free(m_host);
}
m_host = (char*)calloc(strlen(host)+1, sizeof(char));
memcpy(m_host, host, strlen(host));
}

m_port = port;
NSLOG("Connecting to Redis `%s`...\n", host);

m_c = redisConnect(host, port);
if (m_c->err) {
ESLOG("[redis] Connection error: %s\n", m_c->errstr);
return false;
}
return true;
}

bool Redis::ReConnect() {
Close();
return Connect(m_host, m_port);
}

void Redis::Close() {
if (m_c != NULL) {
redisFree(m_c);
m_c = NULL;
}
}

bool Redis::Ensure(bool *reconnected) {
if (reconnected != NULL) {
*reconnected = false;
}

if (m_c == NULL || m_c->err != 0) {
if (reconnected != NULL) {
*reconnected = true;
}
return ReConnect();
}
return true;
}

bool Redis::Ping()
{
return ExecuteCommand(NULL, "PING");
}

bool Redis::ExecuteCommand(RedisReply &reply, const char *format, ...) {
va_list ap;
va_start(ap, format);
bool ret = ExecuteCommandV(&reply, format, ap);
va_end(ap);
return ret;
}

bool Redis::ExecuteCommand(RedisReply *reply, const char *format, ...) {
va_list ap;
va_start(ap, format);
bool ret = ExecuteCommandV(reply, format, ap);
va_end(ap);
return ret;
}

bool Redis::ExecuteCommandV(RedisReply *reply, const char *format, va_list ap) {
if (!Ensure()) {
return false;
}

redisReply* r = (redisReply*)redisvCommand(m_c, format, ap);
if (r == NULL) {
if (ReConnect()) {
r = (redisReply*)redisvCommand(m_c, format, ap);
}
}

if (r == NULL) {
return false;
}

if (reply != NULL) {
reply->SetReply(r);
} else {
freeReplyObject(r);
}
return true;
}

bool Redis::ExecuteCommandArgv(RedisReply *reply, int argc, const char **argv, const size_t *argvlen) {
if (!Ensure()) {
return false;
}

redisReply* r = (redisReply*)redisCommandArgv(m_c, argc, argv, argvlen);
if (r == NULL) {
if (ReConnect()) {
r = (redisReply*)redisCommandArgv(m_c, argc, argv, argvlen);
}
}

if (r == NULL) {
return false;
}

if (reply != NULL) {
reply->SetReply(r);
} else {
freeReplyObject(r);
}
return true;
}

bool Redis::ExecuteCommandInPipeline(const char *format, ...) {
va_list ap;
va_start(ap, format);
bool ret = ExecuteCommandInPipelineV(format, ap);
va_end(ap);
return ret;
}

bool Redis::ExecuteCommandInPipelineV(const char *format, va_list ap) {
if (!Ensure()) {
return false;
}

int r = redisvAppendCommand(m_c, format, ap);
if (r == REDIS_ERR) {
if (ReConnect()) {
r = redisAppendCommand(m_c, format, ap);
}
}

m_pending_reply_count++;

if (r == REDIS_ERR) {
GetReplyInPipeline(NULL);
return false;
}

return true;
}

bool Redis::ExecuteCommandInPipelineArgv(int argc, const char **argv, const size_t *argvlen) {
if (!Ensure()) {
return false;
}

int r = redisAppendCommandArgv(m_c, argc, argv, argvlen);
if (r == REDIS_ERR) {
if (ReConnect()) {
r = redisAppendCommandArgv(m_c, argc, argv, argvlen);
}
}

m_pending_reply_count++;

if (r == REDIS_ERR) {
GetReplyInPipeline(NULL);
return false;
}

return true;
}

bool Redis::GetReplyInPipeline(RedisReply &reply) {
return GetReplyInPipeline(&reply);
}

bool Redis::GetReplyInPipeline(RedisReply *reply) {
bool reconnected = false;
if (!Ensure(&reconnected) || reconnected == true || m_pending_reply_count <= 0) {
// prevent getreply block current thread when connection reconnected.
return false;
}

m_pending_reply_count--;

redisReply *r = NULL;
if (redisGetReply(m_c, (void**)&r) != REDIS_OK) {
return false;
}

if (reply != NULL) {
reply->SetReply(r);
} else {
freeReplyObject(r);
}
return true;
}

bool Redis::GetReply(RedisReply *reply) {
bool reconnected = false;
if (!Ensure(&reconnected) || reconnected == true) {
// prevent getreply block current thread when connection reconnected.
return false;
}

redisReply *r = NULL;
if (redisGetReply(m_c, (void**)&r) != REDIS_OK) {
return false;
}

if (reply != NULL) {
reply->SetReply(r);
} else {
freeReplyObject(r);
}
return true;
}

RedisManager::RedisManager()
: m_host(NULL)
, m_pool_index(0)
, m_port(0)
, m_is_free(NULL)
, m_connections(NULL)
, m_maxConnections(0)
, m_countConnections(0)
{
}

RedisManager::~RedisManager() {
if (m_connections != NULL) {
for (int i=0; i<m_maxConnections; i++) {
Redis* redis = m_connections[i];
if (redis != NULL) {
delete redis;
}
}
delete[] m_connections;
m_connections = NULL;
}

if (m_is_free != NULL) {
delete[] m_is_free;
m_is_free = NULL;
}

if (m_host != NULL) {
free(m_host);
m_host = NULL;
}
}

Redis* RedisManager::_NewConnection()
{
if (m_countConnections >= m_maxConnections) {
ELOG("RedisManager: connection exceed max connection count %d", m_maxConnections);
return NULL;
}

CAutoLock lock(&m_mutex_of_connection);
Redis *redis = new Redis();
if (!redis->Connect(m_host, m_port))
{
delete redis;
return NULL;
}

m_connections[m_countConnections] = redis;
m_is_free[m_countConnections] = true;
redis->SetPoolIndex(m_countConnections);
m_countConnections++;

return redis;
}

bool RedisManager::Init(const char* host, uint32 port, int maxConnections, int initConnections)
{
#ifdef _WIN32
w32initWinSock();
#endif

m_host = (char*)calloc(strlen(host)+1, sizeof(char));
memcpy(m_host, host, strlen(host));
m_port = port;

m_maxConnections = maxConnections;
m_connections = new Redis*[m_maxConnections]();
m_is_free = new bool[m_maxConnections]();

while (initConnections-- > 0) {
if (_NewConnection() == NULL) {
return false;
}
}

return true;
}

Redis* RedisManager::Get()
{
Redis *redis = NULL;

int maxRetryCount = m_countConnections;
int iConn = -1;
while(maxRetryCount--) {
long _index = AtomicIncrement(&m_pool_index);

int count = m_countConnections;
int iConn = _index % count;

if (m_is_free[iConn]) {
m_is_free[iConn] = false;
redis = m_connections[iConn];
break;
}
}

if (redis == NULL)
{
redis = _NewConnection();
if (redis) {
iConn = redis->GetPoolIndex();
m_is_free[iConn] = false;
} else {
return Get();
}
}

return redis;
}

void RedisManager::Put(Redis* redis)
{
int iConn = redis->GetPoolIndex();
m_is_free[iConn] = true;
}


#pragma once

#include <hiredis/hiredis.h>

class RedisReply : public SerBaseHeapObject {
friend class Redis;

public:
RedisReply();
~RedisReply();

inline redisReply* GetReply() const {
return _reply;
}

inline bool Status() const {
return _reply != NULL && _reply->type == REDIS_REPLY_STATUS;
}

inline bool Error() const {
return _reply == NULL || _reply->type == REDIS_REPLY_ERROR;
}

inline bool Integer() const {
return _reply != NULL && _reply->type == REDIS_REPLY_INTEGER;
}

inline bool Nil() const {
return _reply != NULL && _reply->type == REDIS_REPLY_NIL;
}

inline bool String() const {
return _reply != NULL && _reply->type == REDIS_REPLY_STRING && _reply->len > 0;
}

inline bool Array() const {
return _reply != NULL && _reply->type == REDIS_REPLY_ARRAY && _reply->elements > 0;
}

protected:
void SetReply(redisReply* r);

protected:
redisReply* _reply;
};

class Redis : public SerBaseHeapObject {
protected:
redisContext*	m_c;
int				m_pool_index;

char*			m_host;
uint32			m_port;

int				m_pending_reply_count;

public:
Redis();
virtual ~Redis();

virtual bool Connect(const char* Hostname, unsigned int port);
virtual bool ReConnect();
virtual void Close() ;

void SetPoolIndex(int index);
int GetPoolIndex();

inline bool Ensure(bool *reconnected = NULL);
bool Ping();

bool ExecuteCommand(RedisReply &reply, const char *format, ...);
bool ExecuteCommand(RedisReply *reply, const char *format, ...);
bool ExecuteCommandV(RedisReply *reply, const char *format, va_list ap);
bool ExecuteCommandArgv(RedisReply *reply, int argc, const char **argv, const size_t *argvlen);

bool ExecuteCommandInPipeline(const char *format, ...);
bool ExecuteCommandInPipelineV(const char *format, va_list ap);
bool ExecuteCommandInPipelineArgv(int argc, const char **argv, const size_t *argvlen);

bool GetReplyInPipeline(RedisReply &reply);
bool GetReplyInPipeline(RedisReply *reply);

bool GetReply(RedisReply *reply);

inline bool Error() {
return m_c != NULL && m_c->err != 0;
}

inline char* GetError() {
if (!Error()) return NULL;
return m_c->errstr;
}
};

class RedisManager : public Singleton<RedisManager> {
friend class Singleton<RedisManager>;

protected:

~RedisManager();

private:
RedisManager();
Mutex		m_mutex_of_connection;
char*		m_host;
uint32		m_port;
int			m_maxConnections;
int			m_countConnections;

volatile long m_pool_index;

Redis**		m_connections;
bool*		m_is_free;

Redis* _NewConnection();

public:
bool Init(const char* host, uint32 port, int maxConnections, int initConnections);

Redis* Get();
void   Put(Redis* redis);
};

#define sRedisManager (*RedisManager::instance())


文章转载 http://www.cp2012.com/forum.php?mod=viewthread&tid=68&extra=page%3D1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: