您的位置:首页 > 其它

FastDFS分布式文件系统点滴记录5 -- upload上传机制剖析3

2012-06-28 15:48 417 查看
有了tracker 分析的基础,我们直接进入storage 的任务处理函数 int storage_deal_task(struct fast_task_info *pTask);

storage_service.c 6473行:

case STORAGE_PROTO_CMD_UPLOAD_FILE:

result = storage_upload_file(pTask,
false);

break;

如果命令是上传文件,会调用storage_upload_file函数处理,我们进入这个函数。

storage_service.c 3735行,是storage_upload_file 的入口,

storage_service.c 3771行 :

p = pTask->data
+ sizeof(TrackerHeader);

store_path_index =
*p++;

if (store_path_index
< 0 || store_path_index
>= g_fdfs_path_count)

{

logError("file: "__FILE__", line: %d, "
\

"client ip: %s, store_path_index: %d "
\

"is invalid", __LINE__,
\

pTask->client_ip, store_path_index);

pClientInfo->total_length
= sizeof(TrackerHeader);

return EINVAL;

}

file_bytes = buff2long(p);

p += FDFS_PROTO_PKG_LEN_SIZE;

if (file_bytes
< 0 || file_bytes
!= nInPackLen
- \

(1
+ FDFS_PROTO_PKG_LEN_SIZE +
\

FDFS_FILE_EXT_NAME_MAX_LEN))

{

logError("file: "__FILE__", line: %d, "
\

"client ip: %s, pkg length is not correct, "
\

"invalid file bytes: "INT64_PRINTF_FORMAT
\

", total body length: "INT64_PRINTF_FORMAT,
\

__LINE__, pTask->client_ip, file_bytes, nInPackLen);

pClientInfo->total_length
= sizeof(TrackerHeader);

return EINVAL;

}

memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);

*(file_ext_name
+ FDFS_FILE_EXT_NAME_MAX_LEN)
= '\0';

p += FDFS_FILE_EXT_NAME_MAX_LEN;

pFileContext->calc_crc32
= true;

pFileContext->calc_file_hash
= g_check_file_duplicate;

pFileContext->extra_info.upload.start_time
= time(NULL);

代码分析:
1. 首先解析出store_path_index;
2. 解析出file_bytes;
3. 解析出file_ext_name;
4. 将相关的属性赋值给pFileContext

storage_service.c 3810行 :

pFileContext->extra_info.upload.file_type
= _FILE_TYPE_REGULAR;

pFileContext->sync_flag
= STORAGE_OP_TYPE_SOURCE_CREATE_FILE;

pFileContext->timestamp2log
= pFileContext->extra_info.upload.start_time;

pFileContext->op
= FDFS_STORAGE_FILE_OP_WRITE;

if (bAppenderFile)

{

pFileContext->extra_info.upload.file_type
|=
\

_FILE_TYPE_APPENDER;

}

else

{

if (g_if_use_trunk_file
&& trunk_check_size(
\

TRUNK_CALC_SIZE(file_bytes)))

{

pFileContext->extra_info.upload.file_type
|=
\

_FILE_TYPE_TRUNK;

}

}

注意,这里把pFileContext->op 置为 FDFS_STORAGE_FILE_OP_WRITE;说明要执行的是写操作。
为了简化分析,我们暂时不考虑trunk 这种方式,以后会单独详细分析。

接着往下执行,storage_service.c 3893行 :

return storage_write_to_file(pTask, file_offset, file_bytes,
\

p - pTask->data, dio_write_file,
\

storage_upload_file_done_callback,
\

clean_func, store_path_index);

开始真正的写文件操作了。

我们进入storage_write_to_file,storage_service.c 5567行 :

static int storage_write_to_file(struct fast_task_info
*pTask,
\

const int64_t file_offset,
const int64_t upload_bytes,
\

const
int buff_offset, TaskDealFunc deal_func,
\

FileDealDoneCallback done_callback,
\

DisconnectCleanFunc clean_func,
const int store_path_index)

{

StorageClientInfo *pClientInfo;

StorageFileContext *pFileContext;

int result;

pClientInfo =
(StorageClientInfo *)pTask->arg;

pFileContext =
&(pClientInfo->file_context);

pClientInfo->deal_func
= deal_func;

pClientInfo->clean_func
= clean_func;

pFileContext->fd
= -1;

pFileContext->buff_offset
= buff_offset;

pFileContext->offset
= file_offset;

pFileContext->start
= file_offset;

pFileContext->end
= file_offset + upload_bytes;

pFileContext->dio_thread_index
= storage_dio_get_thread_index(
\

pTask, store_path_index, pFileContext->op);

pFileContext->done_callback
= done_callback;

if (pFileContext->calc_crc32)

{

pFileContext->crc32
= CRC32_XINIT;

}

if (pFileContext->calc_file_hash)

{

INIT_HASH_CODES4(pFileContext->file_hash_codes)

}

if ((result=storage_dio_queue_push(pTask))
!= 0)

{

pClientInfo->total_length
= sizeof(TrackerHeader);

return result;

}

return STORAGE_STATUE_DEAL_FILE;

}

代码分析:
1. 注意参数的回调函数,这里含义是写动作执行完成后,会主动调用;
2.storage_dio_queue_push 函数,把pTask push 到dio 队列中;
3. 前面在分析storage 入口函数的时候,在main函数init 时,已经完成了对dio 线程的初始化动作;
4. 这里是往dio队列中push pTask;
5. storage_dio_queue_push 内部 使用的task_queue_push,同时,pthread_cond_signal 通知队列另端有数据到来。
6.特别需要注意 pClientInfo->deal_func = deal_func; 下面会用到。

既然已经放入队列,那么,另端就应该从队列中取出任务执行。这里,我们看一下dio线程执行函数。

storage_dio.c 646行:

static void *dio_thread_entrance(void* arg)

{

int result;

struct storage_dio_context *pContext;

struct fast_task_info *pTask;

pContext =
(struct storage_dio_context *)arg;

pthread_mutex_lock(&(pContext->lock));

while (g_continue_flag)

{

if ((result=pthread_cond_wait(&(pContext->cond),
\

&(pContext->lock)))
!= 0)

{

logError("file: "__FILE__", line: %d, "
\

"call pthread_cond_wait fail, "
\

"errno: %d, error info: %s",
\

__LINE__, result, STRERROR(result));

}

while
((pTask=task_queue_pop(&(pContext->queue)))
!=
NULL)

{

((StorageClientInfo
*)pTask->arg)->deal_func(pTask);

}

}

pthread_mutex_unlock(&(pContext->lock));

if ((result=pthread_mutex_lock(&g_dio_thread_lock))
!= 0)

{

logError("file: "__FILE__", line: %d, "
\

"call pthread_mutex_lock fail, "
\

"errno: %d, error info: %s",
\

__LINE__, result, STRERROR(result));

}

g_dio_thread_count--;

if ((result=pthread_mutex_unlock(&g_dio_thread_lock))
!= 0)

{

logError("file: "__FILE__", line: %d, "
\

"call pthread_mutex_lock fail, "
\

"errno: %d, error info: %s",
\

__LINE__, result, STRERROR(result));

}

logDebug("file: "__FILE__", line: %d, "
\

"dio thread exited, thread count: %d",
\

__LINE__, g_dio_thread_count);

return NULL;

}

代码分析:
1. 当队列有数据,取出数据,执行之;
2. ((StorageClientInfo *)pTask->arg)->deal_func(pTask); 这里通过回调函数完成任务的处理。

结合前面的分析,deal_func 函数指针,其实是函数 dio_write_file;
我们接着分析 dio_write_file,这个函数在 storage_dio.c 421行。

storage_dio.c 454行:

if (write(pFileContext->fd, pDataBuff,
write_bytes) != write_bytes)

{

result = errno
!= 0 ? errno
: EIO;

logError("file: "__FILE__", line: %d, "
\

"write to file: %s fail, fd=%d, write_bytes=%d, "
\

"errno: %d, error info: %s",
\

__LINE__, pFileContext->filename,
\

pFileContext->fd, write_bytes,
\

result, STRERROR(result));

}

调用write 写数据了。

接着分析, storage_dio.c 478行:

if (pFileContext->calc_crc32)

{

pFileContext->crc32
= CRC32_ex(pDataBuff, write_bytes,
\

pFileContext->crc32);

}

storage 通过crc32 保证数据的完整性。

接着分析, storage_dio.c 495行:

pFileContext->offset
+= write_bytes;

if (pFileContext->offset
< pFileContext->end)

{

pFileContext->buff_offset
= 0;

storage_nio_notify(pTask);
//notify nio
to deal

}

else

{

if (pFileContext->calc_crc32)

{

pFileContext->crc32
= CRC32_FINAL(
\

pFileContext->crc32);

}

if (pFileContext->calc_file_hash)

{

FINISH_HASH_CODES4(pFileContext->file_hash_codes)

}

if (pFileContext->extra_info.upload.before_close_callback
!=
NULL)

{

result = pFileContext->extra_info.upload.
\

before_close_callback(pTask);

}

/* file write done, close it
*/

close(pFileContext->fd);

pFileContext->fd
= -1;

if (pFileContext->done_callback
!=
NULL)

{

pFileContext->done_callback(pTask, result);

}

}

代码分析:
1. 如果文件数据长度没有接收完,会storage_nio_notify 继续从nio(网络io) 读取文件内容;
2. 写文件完成后,得到文件 crc32 的值;
3. 执行回调函数,pFileContext->done_callback(pTask, result);
4. 而这个done_callback 实际上是 storage_upload_file_done_callback。

接着分析storage_upload_file_done_callback, storage_dio.c 1004行:

static void storage_upload_file_done_callback(struct fast_task_info
*pTask,
\

const
int err_no)

接着分析, 先跳过trunk文件的处理,trunk 后面分析,storage_dio.c 1029行:

if (result
== 0)

{

result = storage_service_upload_file_done(pTask);

if (result
== 0)

{

if (pFileContext->create_flag
& STORAGE_CREATE_FLAG_FILE)

{

result = storage_binlog_write(\

pFileContext->timestamp2log,
\

STORAGE_OP_TYPE_SOURCE_CREATE_FILE,
\

pFileContext->fname2log);

}

}

}

代码分析:

1. 调用storage_service_upload_file_done,完成接收文件后的操作。目前暂时先不展开分析,因为这里涉及到slave、link、fastdht 等机制,后面专门详细介绍。
2. 文件上传结束后,会调用 storage_binlog_write 写入binlog。

接着往下分析, storage_dio.c 1085行:

pHeader =
(TrackerHeader *)pTask->data;

pHeader->status
= result;

pHeader->cmd
= STORAGE_PROTO_CMD_RESP;

long2buff(pClientInfo->total_length
- sizeof(TrackerHeader),
\

pHeader->pkg_len);

storage_nio_notify(pTask);

构造应答的数据包,storage_nio_notify发送至客户端。

至此,我们简要分析了storage 接收文件的处理流程。大概的调用脉络上还是清晰的。像slave、trunk、link、fastdht 等机制,比较复杂,后面慢慢分析。本章节主要解释的就是storage对文件上传的大概处理流程。

欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: