ceph源码分析之Log实现
2016-01-07 16:08
267 查看
转自 http://blog.chinaunix.net/uid-24774106-id-5059292.html
每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。
下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。
ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数
创建一个重要的数据结构CephContext cct。
CephContext::CephContext(uint32_t
module_type_)
: nref(1),
_conf(new md_config_t()),
_log(NULL),
_module_type(module_type_),
_service_thread(NULL),
_log_obs(NULL),
_admin_socket(NULL),
_perf_counters_collection(NULL),
_perf_counters_conf_obs(NULL),
_heartbeat_map(NULL),
_crypto_none(NULL),
_crypto_aes(NULL)
{
pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
_log = new ceph::log::Log(&_conf->subsys);
_log->start();
_log_obs = new LogObs(_log);
_conf->add_observer(_log_obs);
_perf_counters_collection = new PerfCountersCollection(this);
_admin_socket = new AdminSocket(this);
_heartbeat_map = new HeartbeatMap(this);
_admin_hook = new CephContextHook(this);
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
_admin_socket->register_command("1", "1", _admin_hook, "");
_admin_socket->register_command("perf
dump", "perf dump", _admin_hook, "dump
perfcounters value");
_admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
_admin_socket->register_command("2", "2", _admin_hook, "");
_admin_socket->register_command("perf
schema", "perf schema", _admin_hook, "dump
perfcounters schema");
_admin_socket->register_command("config
show", "config show", _admin_hook, "dump
current config settings");
_admin_socket->register_command("config
set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config
set [ ...]: set a config variable");
_admin_socket->register_command("config
get", "config get name=var,type=CephString", _admin_hook, "config
get : get the config value");
_admin_socket->register_command("log
flush", "log flush", _admin_hook, "flush
log entries to log file");
_admin_socket->register_command("log
dump", "log dump", _admin_hook, "dump
recent log entries to log file");
_admin_socket->register_command("log
reopen", "log reopen", _admin_hook, "reopen
log file");
_crypto_none = new CryptoNone;
_crypto_aes = new CryptoAES;
}
这个函数并不长,但是绝不简单,本文就是介绍Log线程
_log = new ceph::log::Log(&_conf->subsys);
_log->start();
首先是创建了成员变量_log
第一句 new 是创建了Log实例:基本就是初始化
1 初始化自旋锁 m_lock
2 初始化互斥量 m_flush_mutex 和m_queue_mutex
3 初始化条件变量 m_con_logger 和m_cond_flusher
Log::Log(SubsystemMap *s)
: m_indirect_this(NULL),
m_subs(s),
m_new(), m_recent(),
m_fd(-1),
m_syslog_log(-2), m_syslog_crash(-2),
m_stderr_log(1), m_stderr_crash(-1),
m_stop(false),
m_max_new(DEFAULT_MAX_NEW),
m_max_recent(DEFAULT_MAX_RECENT)
{
int ret;
ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);
assert(ret == 0);
ret = pthread_mutex_init(&m_flush_mutex, NULL);
assert(ret == 0);
ret = pthread_mutex_init(&m_queue_mutex, NULL);
assert(ret == 0);
ret = pthread_cond_init(&m_cond_loggers, NULL);
assert(ret == 0);
ret = pthread_cond_init(&m_cond_flusher, NULL);
assert(ret == 0);
// kludge for prealloc testing
if (false)
for (int i=0; i < PREALLOC; i++)
m_recent.enqueue(new Entry);
}
接下来启动了log线程:
void Log::start()
{
assert(!is_started());
pthread_mutex_lock(&m_queue_mutex);
m_stop = false;
pthread_mutex_unlock(&m_queue_mutex);
create();
}
关键一句是create,这个create从哪里冒出来的? 这个调用的是 Thread里面的create 方法:
int Thread::try_create(size_t
stacksize)
{
pthread_attr_t *thread_attr = NULL;
stacksize &= CEPH_PAGE_MASK; // must
be multiple of page
if (stacksize) {
thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));
if (!thread_attr)
return -ENOMEM;
pthread_attr_init(thread_attr);
pthread_attr_setstacksize(thread_attr, stacksize);
}
int r;
// The child thread will inherit our signal mask. Set our
signal mask to
// the set of signals we want to block. (It's
ok to block signals more
// signals than usual for a little while-- they
will just be delivered to
// another thread or delieverd to this
thread later.)
sigset_t old_sigset;
if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
block_signals(NULL, &old_sigset);
}
else {
int to_block[] = { SIGPIPE , 0 };
block_signals(to_block, &old_sigset);
}
r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
restore_sigset(&old_sigset);
if (thread_attr)
free(thread_attr);
return r;
}
void Thread::create(size_t stacksize)
{
int ret = try_create(stacksize);
if (ret != 0) {
char buf[256];
snprintf(buf, sizeof(buf), "Thread::try_create():
pthread_create "
"failed with error %d", ret);
dout_emergency(buf);
assert(ret == 0);
}
}
创建线程,该方法做了下面的事情:
1 可以设置线程栈的size
2 对于daemon进程,会创建线程前会阻塞SIGPIPE
3 调用pthread_create函数创建线程
等一下,线程执行什么函数? _entry_func到底是什么?
void *Thread::_entry_func(void *arg) {
void *r = ((Thread*)arg)->entry();
return r;
}
也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:
namespace ceph {
namespace log {
class Log : private Thread
{
Log **m_indirect_this;
SubsystemMap *m_subs;
pthread_spinlock_t m_lock;
pthread_mutex_t m_queue_mutex;
pthread_mutex_t m_flush_mutex;
pthread_cond_t m_cond_loggers;
pthread_cond_t m_cond_flusher;
EntryQueue m_new; ///< new
entries
EntryQueue m_recent; ///< recent (less
new) entries we've already written at low detail
std::string m_log_file;
int m_fd;
int m_syslog_log, m_syslog_crash;
int m_stderr_log, m_stderr_crash;
bool m_stop;
int m_max_new, m_max_recent;
void *entry();
void _flush(EntryQueue *q, EntryQueue *requeue, bool
crash);
void _log_message(const char *s, bool
crash);
public:
Log(SubsystemMap *s);
virtual ~Log();
void set_flush_on_exit();
void set_max_new(int n);
void set_max_recent(int n);
void set_log_file(std::string fn);
void reopen_log_file();
void flush();
void dump_recent();
void set_syslog_level(int log, int crash);
void set_stderr_level(int log, int crash);
Entry *create_entry(int level, int subsys);
void submit_entry(Entry *e);
void start();
void stop();
};
}
}
即Log的类中,有成员函数 entry,即线程应该执行的函数:
void *Log::entry()
{
pthread_mutex_lock(&m_queue_mutex);
while (!m_stop) {
if (!m_new.empty()) {
pthread_mutex_unlock(&m_queue_mutex);
flush();
pthread_mutex_lock(&m_queue_mutex);
continue;
}
pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
}
pthread_mutex_unlock(&m_queue_mutex);
flush();
return NULL;
}
这就是log线程执行的动作。
1 m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程
暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下
void Log::flush()
{
pthread_mutex_lock(&m_flush_mutex);
pthread_mutex_lock(&m_queue_mutex);
EntryQueue t;
t.swap(m_new);
pthread_cond_broadcast(&m_cond_loggers);
pthread_mutex_unlock(&m_queue_mutex);
_flush(&t, &m_recent, false);
// trim
//m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了
while (m_recent.m_len > m_max_recent) {
delete m_recent.dequeue();
}
pthread_mutex_unlock(&m_flush_mutex);
}
首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。
swap操作比较简单,就是交换指针的彼此指向:
void swap(EntryQueue& other) {
int len = m_len;
struct Entry *h = m_head, *t = m_tail;
m_len = other.m_len;
m_head = other.m_head;
m_tail = other.m_tail;
other.m_len = len;
other.m_head = h;
other.m_tail = t;
}
然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。
void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool
crash)
{
Entry *e;
char buf[80];
while ((e = t->dequeue()) != NULL) {
unsigned sub = e->m_subsys;
bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
bool do_fd = m_fd >= 0 && should_log;
bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
if (do_fd || do_syslog || do_stderr) {
int buflen = 0;
if (crash)
buflen += snprintf(buf, sizeof(buf), "%6d>
", -t->m_len);
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",
(unsigned long)e->m_thread, e->m_prio);
// FIXME: this is slow
string s = e->get_str();
if (do_fd) {
int r = safe_write(m_fd, buf, buflen);
if (r >= 0)
r = safe_write(m_fd, s.data(), s.size());
if (r >= 0)
r = write(m_fd, "\n", 1);
if (r < 0)
cerr << "problem writing to " << m_log_file << ":
" << cpp_strerror(r) << std::endl;
}
if (do_syslog) {
syslog(LOG_USER, "%s%s", buf, s.c_str());
}
if (do_stderr) {
cerr << buf << s << std::endl;
}
}
requeue->enqueue(e);
}
}
接下来是common字段,日志总免不了要记录消息的发生时间。:
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
这个m_stamp是utime_t类型,他的sprintf实现如下:
int sprintf(char *out, int outlen) const {
struct tm bdt;
time_t tt = sec();
localtime_r(&tt, &bdt);
return snprintf(out, outlen,
"%04d-%02d-%02d %02d:%02d:%02d.%06ld",
bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,
bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());
}
因此log的前缀是
2015-05-29 10:20:45.076105
接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",
(unsigned long)e->m_thread, e->m_prio);
ceph-mon是多线程的程序:
root@test3:/var/log/ceph# pidof ceph--mon
root@test3:/var/log/ceph# pidof ceph-mon
138812
root@test3:/var/log/ceph#
root@test3:/var/log/ceph# ll /proc/138812/task
total 0
dr-xr-xr-x 23 root root 0 May 29 16:17 ./
dr-xr-xr-x 9 root root 0 May 29 16:11 ../
dr-xr-xr-x 6 root root 0 May 29 16:17
138812/
dr-xr-xr-x 6 root root 0 May 29 16:17
138813/
dr-xr-xr-x 6 root root 0 May 29 16:17
138814/
dr-xr-xr-x 6 root root 0 May 29 16:17
138815/
dr-xr-xr-x 6 root root 0 May 29 16:17
138816/
dr-xr-xr-x 6 root root 0 May 29 16:17
138817/
dr-xr-xr-x 6 root root 0 May 29 16:17
138818/
dr-xr-xr-x 6 root root 0 May 29 16:17
138819/
dr-xr-xr-x 6 root root 0 May 29 16:17
138820/
dr-xr-xr-x 6 root root 0 May 29 16:17
138821/
dr-xr-xr-x 6 root root 0 May 29 16:17
138822/
dr-xr-xr-x 6 root root 0 May 29 16:17
138823/
dr-xr-xr-x 6 root root 0 May 29 16:17
138824/
dr-xr-xr-x 6 root root 0 May 29 16:17
139643/
dr-xr-xr-x 6 root root 0 May 29 16:17
139895/
dr-xr-xr-x 6 root root 0 May 29 16:17
139896/
dr-xr-xr-x 6 root root 0 May 29 16:17
143413/
dr-xr-xr-x 6 root root 0 May 30 16:47
78042/
dr-xr-xr-x 6 root root 0 May 30 16:47
78044/
dr-xr-xr-x 6 root root 0 May 30 17:08
90496/
dr-xr-xr-x 6 root root 0 May 30 17:08
90497/
因此,我们可以取来一条log查看下ceph log的前缀:
2015-05-29 10:20:45.076105
7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats
avail 92% total 95990980 used 2183384 avail 88908400
值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:
当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
while (m_recent.m_len > m_max_recent) {
delete m_recent.dequeue();
}
ceph提供了方法来查看最近的log,这就是log dump方法。
ceph daemon /var/run/ceph/ceph-mon.*asok
log dump
{}
表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:
v2 ==== 42+0+0 (3105867923
0 0) 0x48d6bc0 con 0x3d5c9a0
-13> 2015-05-30
17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-12> 2015-05-30
17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3
-11> 2015-05-30
17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631bdc0 next 0 (onetime)
-10> 2015-05-30
17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4230b40
-9> 2015-05-30
17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6680
-8> 2015-05-30
17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923
0 0) 0x48d5180 con 0x3d5c9a0
-7> 2015-05-30
17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-6> 2015-05-30
17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3
-5> 2015-05-30
17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631a7c0 next 0 (onetime)
-4> 2015-05-30
17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4232ac0
-3> 2015-05-30
17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6bc0
-2> 2015-05-30
17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
5 ==== mon_command({"prefix": "get_command_descriptions"} v
0) v1 ==== 80+0+0 (3374501561
0 0) 0x477f0e0 con 0x3d5c9a0
-1> 2015-05-30
17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0
v0) v1 -- ?+24689
0x3e62760 con 0x3d5c9a0
0> 2015-05-30 17:23:06.821389
7f5859662700 1 do_command 'log dump'
很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。
很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。
admin socket机制,那是下一篇的任务。
每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。
下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。
ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数
创建一个重要的数据结构CephContext cct。
CephContext::CephContext(uint32_t
module_type_)
: nref(1),
_conf(new md_config_t()),
_log(NULL),
_module_type(module_type_),
_service_thread(NULL),
_log_obs(NULL),
_admin_socket(NULL),
_perf_counters_collection(NULL),
_perf_counters_conf_obs(NULL),
_heartbeat_map(NULL),
_crypto_none(NULL),
_crypto_aes(NULL)
{
pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
_log = new ceph::log::Log(&_conf->subsys);
_log->start();
_log_obs = new LogObs(_log);
_conf->add_observer(_log_obs);
_perf_counters_collection = new PerfCountersCollection(this);
_admin_socket = new AdminSocket(this);
_heartbeat_map = new HeartbeatMap(this);
_admin_hook = new CephContextHook(this);
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
_admin_socket->register_command("1", "1", _admin_hook, "");
_admin_socket->register_command("perf
dump", "perf dump", _admin_hook, "dump
perfcounters value");
_admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
_admin_socket->register_command("2", "2", _admin_hook, "");
_admin_socket->register_command("perf
schema", "perf schema", _admin_hook, "dump
perfcounters schema");
_admin_socket->register_command("config
show", "config show", _admin_hook, "dump
current config settings");
_admin_socket->register_command("config
set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config
set [ ...]: set a config variable");
_admin_socket->register_command("config
get", "config get name=var,type=CephString", _admin_hook, "config
get : get the config value");
_admin_socket->register_command("log
flush", "log flush", _admin_hook, "flush
log entries to log file");
_admin_socket->register_command("log
dump", "log dump", _admin_hook, "dump
recent log entries to log file");
_admin_socket->register_command("log
reopen", "log reopen", _admin_hook, "reopen
log file");
_crypto_none = new CryptoNone;
_crypto_aes = new CryptoAES;
}
这个函数并不长,但是绝不简单,本文就是介绍Log线程
_log = new ceph::log::Log(&_conf->subsys);
_log->start();
首先是创建了成员变量_log
第一句 new 是创建了Log实例:基本就是初始化
1 初始化自旋锁 m_lock
2 初始化互斥量 m_flush_mutex 和m_queue_mutex
3 初始化条件变量 m_con_logger 和m_cond_flusher
Log::Log(SubsystemMap *s)
: m_indirect_this(NULL),
m_subs(s),
m_new(), m_recent(),
m_fd(-1),
m_syslog_log(-2), m_syslog_crash(-2),
m_stderr_log(1), m_stderr_crash(-1),
m_stop(false),
m_max_new(DEFAULT_MAX_NEW),
m_max_recent(DEFAULT_MAX_RECENT)
{
int ret;
ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);
assert(ret == 0);
ret = pthread_mutex_init(&m_flush_mutex, NULL);
assert(ret == 0);
ret = pthread_mutex_init(&m_queue_mutex, NULL);
assert(ret == 0);
ret = pthread_cond_init(&m_cond_loggers, NULL);
assert(ret == 0);
ret = pthread_cond_init(&m_cond_flusher, NULL);
assert(ret == 0);
// kludge for prealloc testing
if (false)
for (int i=0; i < PREALLOC; i++)
m_recent.enqueue(new Entry);
}
接下来启动了log线程:
void Log::start()
{
assert(!is_started());
pthread_mutex_lock(&m_queue_mutex);
m_stop = false;
pthread_mutex_unlock(&m_queue_mutex);
create();
}
关键一句是create,这个create从哪里冒出来的? 这个调用的是 Thread里面的create 方法:
int Thread::try_create(size_t
stacksize)
{
pthread_attr_t *thread_attr = NULL;
stacksize &= CEPH_PAGE_MASK; // must
be multiple of page
if (stacksize) {
thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));
if (!thread_attr)
return -ENOMEM;
pthread_attr_init(thread_attr);
pthread_attr_setstacksize(thread_attr, stacksize);
}
int r;
// The child thread will inherit our signal mask. Set our
signal mask to
// the set of signals we want to block. (It's
ok to block signals more
// signals than usual for a little while-- they
will just be delivered to
// another thread or delieverd to this
thread later.)
sigset_t old_sigset;
if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
block_signals(NULL, &old_sigset);
}
else {
int to_block[] = { SIGPIPE , 0 };
block_signals(to_block, &old_sigset);
}
r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
restore_sigset(&old_sigset);
if (thread_attr)
free(thread_attr);
return r;
}
void Thread::create(size_t stacksize)
{
int ret = try_create(stacksize);
if (ret != 0) {
char buf[256];
snprintf(buf, sizeof(buf), "Thread::try_create():
pthread_create "
"failed with error %d", ret);
dout_emergency(buf);
assert(ret == 0);
}
}
创建线程,该方法做了下面的事情:
1 可以设置线程栈的size
2 对于daemon进程,会创建线程前会阻塞SIGPIPE
3 调用pthread_create函数创建线程
等一下,线程执行什么函数? _entry_func到底是什么?
void *Thread::_entry_func(void *arg) {
void *r = ((Thread*)arg)->entry();
return r;
}
也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:
namespace ceph {
namespace log {
class Log : private Thread
{
Log **m_indirect_this;
SubsystemMap *m_subs;
pthread_spinlock_t m_lock;
pthread_mutex_t m_queue_mutex;
pthread_mutex_t m_flush_mutex;
pthread_cond_t m_cond_loggers;
pthread_cond_t m_cond_flusher;
EntryQueue m_new; ///< new
entries
EntryQueue m_recent; ///< recent (less
new) entries we've already written at low detail
std::string m_log_file;
int m_fd;
int m_syslog_log, m_syslog_crash;
int m_stderr_log, m_stderr_crash;
bool m_stop;
int m_max_new, m_max_recent;
void *entry();
void _flush(EntryQueue *q, EntryQueue *requeue, bool
crash);
void _log_message(const char *s, bool
crash);
public:
Log(SubsystemMap *s);
virtual ~Log();
void set_flush_on_exit();
void set_max_new(int n);
void set_max_recent(int n);
void set_log_file(std::string fn);
void reopen_log_file();
void flush();
void dump_recent();
void set_syslog_level(int log, int crash);
void set_stderr_level(int log, int crash);
Entry *create_entry(int level, int subsys);
void submit_entry(Entry *e);
void start();
void stop();
};
}
}
即Log的类中,有成员函数 entry,即线程应该执行的函数:
void *Log::entry()
{
pthread_mutex_lock(&m_queue_mutex);
while (!m_stop) {
if (!m_new.empty()) {
pthread_mutex_unlock(&m_queue_mutex);
flush();
pthread_mutex_lock(&m_queue_mutex);
continue;
}
pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
}
pthread_mutex_unlock(&m_queue_mutex);
flush();
return NULL;
}
这就是log线程执行的动作。
1 m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程
暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下
void Log::flush()
{
pthread_mutex_lock(&m_flush_mutex);
pthread_mutex_lock(&m_queue_mutex);
EntryQueue t;
t.swap(m_new);
pthread_cond_broadcast(&m_cond_loggers);
pthread_mutex_unlock(&m_queue_mutex);
_flush(&t, &m_recent, false);
// trim
//m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了
while (m_recent.m_len > m_max_recent) {
delete m_recent.dequeue();
}
pthread_mutex_unlock(&m_flush_mutex);
}
首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。
swap操作比较简单,就是交换指针的彼此指向:
void swap(EntryQueue& other) {
int len = m_len;
struct Entry *h = m_head, *t = m_tail;
m_len = other.m_len;
m_head = other.m_head;
m_tail = other.m_tail;
other.m_len = len;
other.m_head = h;
other.m_tail = t;
}
然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。
void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool
crash)
{
Entry *e;
char buf[80];
while ((e = t->dequeue()) != NULL) {
unsigned sub = e->m_subsys;
bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
bool do_fd = m_fd >= 0 && should_log;
bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
if (do_fd || do_syslog || do_stderr) {
int buflen = 0;
if (crash)
buflen += snprintf(buf, sizeof(buf), "%6d>
", -t->m_len);
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",
(unsigned long)e->m_thread, e->m_prio);
// FIXME: this is slow
string s = e->get_str();
if (do_fd) {
int r = safe_write(m_fd, buf, buflen);
if (r >= 0)
r = safe_write(m_fd, s.data(), s.size());
if (r >= 0)
r = write(m_fd, "\n", 1);
if (r < 0)
cerr << "problem writing to " << m_log_file << ":
" << cpp_strerror(r) << std::endl;
}
if (do_syslog) {
syslog(LOG_USER, "%s%s", buf, s.c_str());
}
if (do_stderr) {
cerr << buf << s << std::endl;
}
}
requeue->enqueue(e);
}
}
接下来是common字段,日志总免不了要记录消息的发生时间。:
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
这个m_stamp是utime_t类型,他的sprintf实现如下:
int sprintf(char *out, int outlen) const {
struct tm bdt;
time_t tt = sec();
localtime_r(&tt, &bdt);
return snprintf(out, outlen,
"%04d-%02d-%02d %02d:%02d:%02d.%06ld",
bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,
bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());
}
因此log的前缀是
2015-05-29 10:20:45.076105
接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",
(unsigned long)e->m_thread, e->m_prio);
ceph-mon是多线程的程序:
root@test3:/var/log/ceph# pidof ceph--mon
root@test3:/var/log/ceph# pidof ceph-mon
138812
root@test3:/var/log/ceph#
root@test3:/var/log/ceph# ll /proc/138812/task
total 0
dr-xr-xr-x 23 root root 0 May 29 16:17 ./
dr-xr-xr-x 9 root root 0 May 29 16:11 ../
dr-xr-xr-x 6 root root 0 May 29 16:17
138812/
dr-xr-xr-x 6 root root 0 May 29 16:17
138813/
dr-xr-xr-x 6 root root 0 May 29 16:17
138814/
dr-xr-xr-x 6 root root 0 May 29 16:17
138815/
dr-xr-xr-x 6 root root 0 May 29 16:17
138816/
dr-xr-xr-x 6 root root 0 May 29 16:17
138817/
dr-xr-xr-x 6 root root 0 May 29 16:17
138818/
dr-xr-xr-x 6 root root 0 May 29 16:17
138819/
dr-xr-xr-x 6 root root 0 May 29 16:17
138820/
dr-xr-xr-x 6 root root 0 May 29 16:17
138821/
dr-xr-xr-x 6 root root 0 May 29 16:17
138822/
dr-xr-xr-x 6 root root 0 May 29 16:17
138823/
dr-xr-xr-x 6 root root 0 May 29 16:17
138824/
dr-xr-xr-x 6 root root 0 May 29 16:17
139643/
dr-xr-xr-x 6 root root 0 May 29 16:17
139895/
dr-xr-xr-x 6 root root 0 May 29 16:17
139896/
dr-xr-xr-x 6 root root 0 May 29 16:17
143413/
dr-xr-xr-x 6 root root 0 May 30 16:47
78042/
dr-xr-xr-x 6 root root 0 May 30 16:47
78044/
dr-xr-xr-x 6 root root 0 May 30 17:08
90496/
dr-xr-xr-x 6 root root 0 May 30 17:08
90497/
因此,我们可以取来一条log查看下ceph log的前缀:
2015-05-29 10:20:45.076105
7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats
avail 92% total 95990980 used 2183384 avail 88908400
值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:
当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
while (m_recent.m_len > m_max_recent) {
delete m_recent.dequeue();
}
ceph提供了方法来查看最近的log,这就是log dump方法。
ceph daemon /var/run/ceph/ceph-mon.*asok
log dump
{}
表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:
v2 ==== 42+0+0 (3105867923
0 0) 0x48d6bc0 con 0x3d5c9a0
-13> 2015-05-30
17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-12> 2015-05-30
17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3
-11> 2015-05-30
17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631bdc0 next 0 (onetime)
-10> 2015-05-30
17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4230b40
-9> 2015-05-30
17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6680
-8> 2015-05-30
17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923
0 0) 0x48d5180 con 0x3d5c9a0
-7> 2015-05-30
17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-6> 2015-05-30
17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3
-5> 2015-05-30
17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631a7c0 next 0 (onetime)
-4> 2015-05-30
17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4232ac0
-3> 2015-05-30
17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6bc0
-2> 2015-05-30
17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
5 ==== mon_command({"prefix": "get_command_descriptions"} v
0) v1 ==== 80+0+0 (3374501561
0 0) 0x477f0e0 con 0x3d5c9a0
-1> 2015-05-30
17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0
v0) v1 -- ?+24689
0x3e62760 con 0x3d5c9a0
0> 2015-05-30 17:23:06.821389
7f5859662700 1 do_command 'log dump'
很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。
很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。
admin socket机制,那是下一篇的任务。
相关文章推荐
- Android中Parcelable用法
- Swift场景过渡总结
- NodeJS+Redis实现分布式Session方案
- Mysql Replication机制主从备份实践
- 【数组】Search Insert Position
- JS对于字符串的切割截取
- 【Android】在线程中使用Handler
- 初识Camera,调用系统拍照录像程序
- 服务治理过程演进
- Android Activity ConfigChanges属性
- vs2013编译boost库
- 使用Cookie进行会话管理
- mysql 一些日期相关函数
- 自定义MonthPicker
- Redis中hash表中的field的value自增可以用hincrby
- 使用Android Studio调试UiAutomator过程中遇到的问题
- python中join在2.X版本中的使用
- QT for windows 32下libusb-win32环境搭建
- 01.C#基本使用
- Java Transaction API JAVA事务API