您的位置:首页 > 其它

ceph源码分析之Log实现

2016-01-07 16:08 267 查看
转自 http://blog.chinaunix.net/uid-24774106-id-5059292.html
每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。

下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。

ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数

创建一个重要的数据结构CephContext cct。

CephContext::CephContext(uint32_t
module_type_)

: nref(1),

_conf(new md_config_t()),

_log(NULL),

_module_type(module_type_),

_service_thread(NULL),

_log_obs(NULL),

_admin_socket(NULL),

_perf_counters_collection(NULL),

_perf_counters_conf_obs(NULL),

_heartbeat_map(NULL),

_crypto_none(NULL),

_crypto_aes(NULL)

{

pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);

_log = new ceph::log::Log(&_conf->subsys);

_log->start();

_log_obs = new LogObs(_log);

_conf->add_observer(_log_obs);

_perf_counters_collection = new PerfCountersCollection(this);

_admin_socket = new AdminSocket(this);

_heartbeat_map = new HeartbeatMap(this);

_admin_hook = new CephContextHook(this);

_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");

_admin_socket->register_command("1", "1", _admin_hook, "");

_admin_socket->register_command("perf
dump", "perf dump", _admin_hook, "dump
perfcounters value");

_admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");

_admin_socket->register_command("2", "2", _admin_hook, "");

_admin_socket->register_command("perf
schema", "perf schema", _admin_hook, "dump
perfcounters schema");

_admin_socket->register_command("config
show", "config show", _admin_hook, "dump
current config settings");

_admin_socket->register_command("config
set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config
set [ ...]: set a config variable");

_admin_socket->register_command("config
get", "config get name=var,type=CephString", _admin_hook, "config
get : get the config value");

_admin_socket->register_command("log
flush", "log flush", _admin_hook, "flush
log entries to log file");

_admin_socket->register_command("log
dump", "log dump", _admin_hook, "dump
recent log entries to log file");

_admin_socket->register_command("log
reopen", "log reopen", _admin_hook, "reopen
log file");

_crypto_none = new CryptoNone;

_crypto_aes = new CryptoAES;

}

这个函数并不长,但是绝不简单,本文就是介绍Log线程

_log = new ceph::log::Log(&_conf->subsys);

_log->start();

首先是创建了成员变量_log

第一句 new 是创建了Log实例:基本就是初始化

1 初始化自旋锁 m_lock

2 初始化互斥量 m_flush_mutex 和m_queue_mutex

3 初始化条件变量 m_con_logger 和m_cond_flusher

Log::Log(SubsystemMap *s)

: m_indirect_this(NULL),

m_subs(s),

m_new(), m_recent(),

m_fd(-1),

m_syslog_log(-2), m_syslog_crash(-2),

m_stderr_log(1), m_stderr_crash(-1),

m_stop(false),

m_max_new(DEFAULT_MAX_NEW),

m_max_recent(DEFAULT_MAX_RECENT)

{

int ret;

ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);

assert(ret == 0);

ret = pthread_mutex_init(&m_flush_mutex, NULL);

assert(ret == 0);

ret = pthread_mutex_init(&m_queue_mutex, NULL);

assert(ret == 0);

ret = pthread_cond_init(&m_cond_loggers, NULL);

assert(ret == 0);

ret = pthread_cond_init(&m_cond_flusher, NULL);

assert(ret == 0);

// kludge for prealloc testing

if (false)

for (int i=0; i < PREALLOC; i++)

m_recent.enqueue(new Entry);

}

接下来启动了log线程:

void Log::start()

{

assert(!is_started());

pthread_mutex_lock(&m_queue_mutex);

m_stop = false;

pthread_mutex_unlock(&m_queue_mutex);

create();

}

关键一句是create,这个create从哪里冒出来的? 这个调用的是 Thread里面的create 方法:

int Thread::try_create(size_t
stacksize)

{

pthread_attr_t *thread_attr = NULL;

stacksize &= CEPH_PAGE_MASK; // must
be multiple of page

if (stacksize) {

thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));

if (!thread_attr)

return -ENOMEM;

pthread_attr_init(thread_attr);

pthread_attr_setstacksize(thread_attr, stacksize);

}

int r;

// The child thread will inherit our signal mask. Set our
signal mask to

// the set of signals we want to block. (It's
ok to block signals more

// signals than usual for a little while-- they
will just be delivered to

// another thread or delieverd to this
thread later.)

sigset_t old_sigset;

if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {

block_signals(NULL, &old_sigset);

}

else {

int to_block[] = { SIGPIPE , 0 };

block_signals(to_block, &old_sigset);

}

r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);

restore_sigset(&old_sigset);

if (thread_attr)

free(thread_attr);

return r;

}

void Thread::create(size_t stacksize)

{

int ret = try_create(stacksize);

if (ret != 0) {

char buf[256];

snprintf(buf, sizeof(buf), "Thread::try_create():
pthread_create "

"failed with error %d", ret);

dout_emergency(buf);

assert(ret == 0);

}

}

创建线程,该方法做了下面的事情:

1 可以设置线程栈的size

2 对于daemon进程,会创建线程前会阻塞SIGPIPE

3 调用pthread_create函数创建线程

等一下,线程执行什么函数? _entry_func到底是什么?

void *Thread::_entry_func(void *arg) {

void *r = ((Thread*)arg)->entry();

return r;

}

也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:

namespace ceph {

namespace log {

class Log : private Thread

{

Log **m_indirect_this;

SubsystemMap *m_subs;

pthread_spinlock_t m_lock;

pthread_mutex_t m_queue_mutex;

pthread_mutex_t m_flush_mutex;

pthread_cond_t m_cond_loggers;

pthread_cond_t m_cond_flusher;

EntryQueue m_new; ///< new
entries

EntryQueue m_recent; ///< recent (less
new) entries we've already written at low detail

std::string m_log_file;

int m_fd;

int m_syslog_log, m_syslog_crash;

int m_stderr_log, m_stderr_crash;

bool m_stop;

int m_max_new, m_max_recent;

void *entry();

void _flush(EntryQueue *q, EntryQueue *requeue, bool
crash);

void _log_message(const char *s, bool
crash);

public:

Log(SubsystemMap *s);

virtual ~Log();

void set_flush_on_exit();

void set_max_new(int n);

void set_max_recent(int n);

void set_log_file(std::string fn);

void reopen_log_file();

void flush();

void dump_recent();

void set_syslog_level(int log, int crash);

void set_stderr_level(int log, int crash);

Entry *create_entry(int level, int subsys);

void submit_entry(Entry *e);

void start();

void stop();

};

}

}

即Log的类中,有成员函数 entry,即线程应该执行的函数:

void *Log::entry()

{

pthread_mutex_lock(&m_queue_mutex);

while (!m_stop) {

if (!m_new.empty()) {

pthread_mutex_unlock(&m_queue_mutex);

flush();

pthread_mutex_lock(&m_queue_mutex);

continue;

}

pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);

}

pthread_mutex_unlock(&m_queue_mutex);

flush();

return NULL;

}

这就是log线程执行的动作。
1 m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程

暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下

void Log::flush()

{

pthread_mutex_lock(&m_flush_mutex);

pthread_mutex_lock(&m_queue_mutex);

EntryQueue t;

t.swap(m_new);

pthread_cond_broadcast(&m_cond_loggers);

pthread_mutex_unlock(&m_queue_mutex);

_flush(&t, &m_recent, false);

// trim

//m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了

while (m_recent.m_len > m_max_recent) {

delete m_recent.dequeue();

}

pthread_mutex_unlock(&m_flush_mutex);

}

首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。

swap操作比较简单,就是交换指针的彼此指向:

void swap(EntryQueue& other) {

int len = m_len;

struct Entry *h = m_head, *t = m_tail;

m_len = other.m_len;

m_head = other.m_head;

m_tail = other.m_tail;

other.m_len = len;

other.m_head = h;

other.m_tail = t;

}

然后将主要的写日志的工作就委托给了__flush函数。

可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr

同时,日志还有一个优先级的概念,因此会根据should_log 来控制。

void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool
crash)

{

Entry *e;

char buf[80];

while ((e = t->dequeue()) != NULL) {

unsigned sub = e->m_subsys;

bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;

bool do_fd = m_fd >= 0 && should_log;

bool do_syslog = m_syslog_crash >= e->m_prio && should_log;

bool do_stderr = m_stderr_crash >= e->m_prio && should_log;

if (do_fd || do_syslog || do_stderr) {

int buflen = 0;

if (crash)

buflen += snprintf(buf, sizeof(buf), "%6d>
", -t->m_len);

buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);

buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",

(unsigned long)e->m_thread, e->m_prio);

// FIXME: this is slow

string s = e->get_str();

if (do_fd) {

int r = safe_write(m_fd, buf, buflen);

if (r >= 0)

r = safe_write(m_fd, s.data(), s.size());

if (r >= 0)

r = write(m_fd, "\n", 1);

if (r < 0)

cerr << "problem writing to " << m_log_file << ":
" << cpp_strerror(r) << std::endl;

}

if (do_syslog) {

syslog(LOG_USER, "%s%s", buf, s.c_str());

}

if (do_stderr) {

cerr << buf << s << std::endl;

}

}

requeue->enqueue(e);

}

}

接下来是common字段,日志总免不了要记录消息的发生时间。:

buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);

这个m_stamp是utime_t类型,他的sprintf实现如下:

int sprintf(char *out, int outlen) const {

struct tm bdt;

time_t tt = sec();

localtime_r(&tt, &bdt);

return snprintf(out, outlen,

"%04d-%02d-%02d %02d:%02d:%02d.%06ld",

bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,

bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());

}

因此log的前缀是

2015-05-29 10:20:45.076105

接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID

buflen += snprintf(buf + buflen, sizeof(buf)-buflen, "
%lx %2d ",

(unsigned long)e->m_thread, e->m_prio);

ceph-mon是多线程的程序:

root@test3:/var/log/ceph# pidof ceph--mon

root@test3:/var/log/ceph# pidof ceph-mon

138812

root@test3:/var/log/ceph#

root@test3:/var/log/ceph# ll /proc/138812/task

total 0

dr-xr-xr-x 23 root root 0 May 29 16:17 ./

dr-xr-xr-x 9 root root 0 May 29 16:11 ../

dr-xr-xr-x 6 root root 0 May 29 16:17
138812/

dr-xr-xr-x 6 root root 0 May 29 16:17
138813/

dr-xr-xr-x 6 root root 0 May 29 16:17
138814/

dr-xr-xr-x 6 root root 0 May 29 16:17
138815/

dr-xr-xr-x 6 root root 0 May 29 16:17
138816/

dr-xr-xr-x 6 root root 0 May 29 16:17
138817/

dr-xr-xr-x 6 root root 0 May 29 16:17
138818/

dr-xr-xr-x 6 root root 0 May 29 16:17
138819/

dr-xr-xr-x 6 root root 0 May 29 16:17
138820/

dr-xr-xr-x 6 root root 0 May 29 16:17
138821/

dr-xr-xr-x 6 root root 0 May 29 16:17
138822/

dr-xr-xr-x 6 root root 0 May 29 16:17
138823/

dr-xr-xr-x 6 root root 0 May 29 16:17
138824/

dr-xr-xr-x 6 root root 0 May 29 16:17
139643/

dr-xr-xr-x 6 root root 0 May 29 16:17
139895/

dr-xr-xr-x 6 root root 0 May 29 16:17
139896/

dr-xr-xr-x 6 root root 0 May 29 16:17
143413/

dr-xr-xr-x 6 root root 0 May 30 16:47
78042/

dr-xr-xr-x 6 root root 0 May 30 16:47
78044/

dr-xr-xr-x 6 root root 0 May 30 17:08
90496/

dr-xr-xr-x 6 root root 0 May 30 17:08
90497/

因此,我们可以取来一条log查看下ceph log的前缀:

2015-05-29 10:20:45.076105
7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats
avail 92% total 95990980 used 2183384 avail 88908400

值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。

这就是_flush 函数中的 enqueue做的事情:

当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.

while (m_recent.m_len > m_max_recent) {

delete m_recent.dequeue();

}

ceph提供了方法来查看最近的log,这就是log dump方法。

ceph daemon /var/run/ceph/ceph-mon.*asok
log dump

{}

表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:

v2 ==== 42+0+0 (3105867923
0 0) 0x48d6bc0 con 0x3d5c9a0

-13> 2015-05-30
17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2

-12> 2015-05-30
17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3

-11> 2015-05-30
17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631bdc0 next 0 (onetime)

-10> 2015-05-30
17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4230b40

-9> 2015-05-30
17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6680

-8> 2015-05-30
17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923
0 0) 0x48d5180 con 0x3d5c9a0

-7> 2015-05-30
17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3
handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2

-6> 2015-05-30
17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3
check_sub monmap next 4 have 3

-5> 2015-05-30
17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd
e260 check_sub 0x631a7c0 next 0 (onetime)

-4> 2015-05-30
17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260
src has 1..260) v3 -- ?+0
0x4232ac0

-3> 2015-05-30
17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663
10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0
0x48d6bc0

-2> 2015-05-30
17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663
10.16.20.181:0/1138587
5 ==== mon_command({"prefix": "get_command_descriptions"} v
0) v1 ==== 80+0+0 (3374501561
0 0) 0x477f0e0 con 0x3d5c9a0

-1> 2015-05-30
17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0
v0) v1 -- ?+24689
0x3e62760 con 0x3d5c9a0

0> 2015-05-30 17:23:06.821389
7f5859662700 1 do_command 'log dump'

很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。

admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。

很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。

admin socket机制,那是下一篇的任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: