C++封装的Redis
2016-05-05 16:10
302 查看
#include "ServerBasePCH.h" #include "RedisManager.h" #ifdef _WIN32 #include <hiredis/win32fixes.h> #endif RedisReply::RedisReply() : _reply(NULL) { } RedisReply::~RedisReply() { SetReply(NULL); } void RedisReply::SetReply(redisReply* r) { if (_reply != NULL) { freeReplyObject(_reply); } _reply = r; } Redis::Redis() : m_pool_index(-1) , m_c(NULL) , m_pending_reply_count(0) , m_host(NULL) , m_port(0) { } Redis::~Redis() { Close(); if (m_host) { free(m_host); m_host = NULL; } } void Redis::SetPoolIndex(int index) { m_pool_index = index; } int Redis::GetPoolIndex() { return m_pool_index; } bool Redis::Connect(const char* host, unsigned int port) { m_pending_reply_count = 0; if (host != m_host) { if (m_host != NULL) { free(m_host); } m_host = (char*)calloc(strlen(host)+1, sizeof(char)); memcpy(m_host, host, strlen(host)); } m_port = port; NSLOG("Connecting to Redis `%s`...\n", host); m_c = redisConnect(host, port); if (m_c->err) { ESLOG("[redis] Connection error: %s\n", m_c->errstr); return false; } return true; } bool Redis::ReConnect() { Close(); return Connect(m_host, m_port); } void Redis::Close() { if (m_c != NULL) { redisFree(m_c); m_c = NULL; } } bool Redis::Ensure(bool *reconnected) { if (reconnected != NULL) { *reconnected = false; } if (m_c == NULL || m_c->err != 0) { if (reconnected != NULL) { *reconnected = true; } return ReConnect(); } return true; } bool Redis::Ping() { return ExecuteCommand(NULL, "PING"); } bool Redis::ExecuteCommand(RedisReply &reply, const char *format, ...) { va_list ap; va_start(ap, format); bool ret = ExecuteCommandV(&reply, format, ap); va_end(ap); return ret; } bool Redis::ExecuteCommand(RedisReply *reply, const char *format, ...) { va_list ap; va_start(ap, format); bool ret = ExecuteCommandV(reply, format, ap); va_end(ap); return ret; } bool Redis::ExecuteCommandV(RedisReply *reply, const char *format, va_list ap) { if (!Ensure()) { return false; } redisReply* r = (redisReply*)redisvCommand(m_c, format, ap); if (r == NULL) { if (ReConnect()) { r = (redisReply*)redisvCommand(m_c, format, ap); } } if (r == NULL) { return false; } if (reply != NULL) { reply->SetReply(r); } else { freeReplyObject(r); } return true; } bool Redis::ExecuteCommandArgv(RedisReply *reply, int argc, const char **argv, const size_t *argvlen) { if (!Ensure()) { return false; } redisReply* r = (redisReply*)redisCommandArgv(m_c, argc, argv, argvlen); if (r == NULL) { if (ReConnect()) { r = (redisReply*)redisCommandArgv(m_c, argc, argv, argvlen); } } if (r == NULL) { return false; } if (reply != NULL) { reply->SetReply(r); } else { freeReplyObject(r); } return true; } bool Redis::ExecuteCommandInPipeline(const char *format, ...) { va_list ap; va_start(ap, format); bool ret = ExecuteCommandInPipelineV(format, ap); va_end(ap); return ret; } bool Redis::ExecuteCommandInPipelineV(const char *format, va_list ap) { if (!Ensure()) { return false; } int r = redisvAppendCommand(m_c, format, ap); if (r == REDIS_ERR) { if (ReConnect()) { r = redisAppendCommand(m_c, format, ap); } } m_pending_reply_count++; if (r == REDIS_ERR) { GetReplyInPipeline(NULL); return false; } return true; } bool Redis::ExecuteCommandInPipelineArgv(int argc, const char **argv, const size_t *argvlen) { if (!Ensure()) { return false; } int r = redisAppendCommandArgv(m_c, argc, argv, argvlen); if (r == REDIS_ERR) { if (ReConnect()) { r = redisAppendCommandArgv(m_c, argc, argv, argvlen); } } m_pending_reply_count++; if (r == REDIS_ERR) { GetReplyInPipeline(NULL); return false; } return true; } bool Redis::GetReplyInPipeline(RedisReply &reply) { return GetReplyInPipeline(&reply); } bool Redis::GetReplyInPipeline(RedisReply *reply) { bool reconnected = false; if (!Ensure(&reconnected) || reconnected == true || m_pending_reply_count <= 0) { // prevent getreply block current thread when connection reconnected. return false; } m_pending_reply_count--; redisReply *r = NULL; if (redisGetReply(m_c, (void**)&r) != REDIS_OK) { return false; } if (reply != NULL) { reply->SetReply(r); } else { freeReplyObject(r); } return true; } bool Redis::GetReply(RedisReply *reply) { bool reconnected = false; if (!Ensure(&reconnected) || reconnected == true) { // prevent getreply block current thread when connection reconnected. return false; } redisReply *r = NULL; if (redisGetReply(m_c, (void**)&r) != REDIS_OK) { return false; } if (reply != NULL) { reply->SetReply(r); } else { freeReplyObject(r); } return true; } RedisManager::RedisManager() : m_host(NULL) , m_pool_index(0) , m_port(0) , m_is_free(NULL) , m_connections(NULL) , m_maxConnections(0) , m_countConnections(0) { } RedisManager::~RedisManager() { if (m_connections != NULL) { for (int i=0; i<m_maxConnections; i++) { Redis* redis = m_connections[i]; if (redis != NULL) { delete redis; } } delete[] m_connections; m_connections = NULL; } if (m_is_free != NULL) { delete[] m_is_free; m_is_free = NULL; } if (m_host != NULL) { free(m_host); m_host = NULL; } } Redis* RedisManager::_NewConnection() { if (m_countConnections >= m_maxConnections) { ELOG("RedisManager: connection exceed max connection count %d", m_maxConnections); return NULL; } CAutoLock lock(&m_mutex_of_connection); Redis *redis = new Redis(); if (!redis->Connect(m_host, m_port)) { delete redis; return NULL; } m_connections[m_countConnections] = redis; m_is_free[m_countConnections] = true; redis->SetPoolIndex(m_countConnections); m_countConnections++; return redis; } bool RedisManager::Init(const char* host, uint32 port, int maxConnections, int initConnections) { #ifdef _WIN32 w32initWinSock(); #endif m_host = (char*)calloc(strlen(host)+1, sizeof(char)); memcpy(m_host, host, strlen(host)); m_port = port; m_maxConnections = maxConnections; m_connections = new Redis*[m_maxConnections](); m_is_free = new bool[m_maxConnections](); while (initConnections-- > 0) { if (_NewConnection() == NULL) { return false; } } return true; } Redis* RedisManager::Get() { Redis *redis = NULL; int maxRetryCount = m_countConnections; int iConn = -1; while(maxRetryCount--) { long _index = AtomicIncrement(&m_pool_index); int count = m_countConnections; int iConn = _index % count; if (m_is_free[iConn]) { m_is_free[iConn] = false; redis = m_connections[iConn]; break; } } if (redis == NULL) { redis = _NewConnection(); if (redis) { iConn = redis->GetPoolIndex(); m_is_free[iConn] = false; } else { return Get(); } } return redis; } void RedisManager::Put(Redis* redis) { int iConn = redis->GetPoolIndex(); m_is_free[iConn] = true; }
#pragma once #include <hiredis/hiredis.h> class RedisReply : public SerBaseHeapObject { friend class Redis; public: RedisReply(); ~RedisReply(); inline redisReply* GetReply() const { return _reply; } inline bool Status() const { return _reply != NULL && _reply->type == REDIS_REPLY_STATUS; } inline bool Error() const { return _reply == NULL || _reply->type == REDIS_REPLY_ERROR; } inline bool Integer() const { return _reply != NULL && _reply->type == REDIS_REPLY_INTEGER; } inline bool Nil() const { return _reply != NULL && _reply->type == REDIS_REPLY_NIL; } inline bool String() const { return _reply != NULL && _reply->type == REDIS_REPLY_STRING && _reply->len > 0; } inline bool Array() const { return _reply != NULL && _reply->type == REDIS_REPLY_ARRAY && _reply->elements > 0; } protected: void SetReply(redisReply* r); protected: redisReply* _reply; }; class Redis : public SerBaseHeapObject { protected: redisContext* m_c; int m_pool_index; char* m_host; uint32 m_port; int m_pending_reply_count; public: Redis(); virtual ~Redis(); virtual bool Connect(const char* Hostname, unsigned int port); virtual bool ReConnect(); virtual void Close() ; void SetPoolIndex(int index); int GetPoolIndex(); inline bool Ensure(bool *reconnected = NULL); bool Ping(); bool ExecuteCommand(RedisReply &reply, const char *format, ...); bool ExecuteCommand(RedisReply *reply, const char *format, ...); bool ExecuteCommandV(RedisReply *reply, const char *format, va_list ap); bool ExecuteCommandArgv(RedisReply *reply, int argc, const char **argv, const size_t *argvlen); bool ExecuteCommandInPipeline(const char *format, ...); bool ExecuteCommandInPipelineV(const char *format, va_list ap); bool ExecuteCommandInPipelineArgv(int argc, const char **argv, const size_t *argvlen); bool GetReplyInPipeline(RedisReply &reply); bool GetReplyInPipeline(RedisReply *reply); bool GetReply(RedisReply *reply); inline bool Error() { return m_c != NULL && m_c->err != 0; } inline char* GetError() { if (!Error()) return NULL; return m_c->errstr; } }; class RedisManager : public Singleton<RedisManager> { friend class Singleton<RedisManager>; protected: ~RedisManager(); private: RedisManager(); Mutex m_mutex_of_connection; char* m_host; uint32 m_port; int m_maxConnections; int m_countConnections; volatile long m_pool_index; Redis** m_connections; bool* m_is_free; Redis* _NewConnection(); public: bool Init(const char* host, uint32 port, int maxConnections, int initConnections); Redis* Get(); void Put(Redis* redis); }; #define sRedisManager (*RedisManager::instance())
文章转载 http://www.cp2012.com/forum.php?mod=viewthread&tid=68&extra=page%3D1
相关文章推荐
- PHPredis长连接pconnect
- Redis主从复制和sentinel配置
- php操作redis出现不报错就退出
- Redis发布预订使用总结
- ServiceStack.Redis常用操作 - 事务、并发锁
- ServiceStack.Redis 之 IRedisTypedClient<第四篇>
- Redis 安装与简单示例 <第一篇>
- ServiceStack.Redis之IRedisClient<第三篇>
- Redis常用命令速查 <第二篇>
- redis常用命令
- Redis Crackit事件发人深思
- 利用redis完成自动补全搜索功能(一)
- Redis----windows下的常用命令
- Redis初识:1.安装
- Java--------------Windows下Redis的安装使用
- linux安装redis报错
- redis集中监控
- Redis Essentials 读书笔记 - 第三章:Time Series (A Collection of Observations)
- 【NoSql】redis安装&部署
- Windows下搭建Redis环境