FastDFS分布式文件系统点滴记录5 -- upload上传机制剖析3
2012-06-28 15:48
417 查看
有了tracker 分析的基础,我们直接进入storage 的任务处理函数 int storage_deal_task(struct fast_task_info *pTask);
storage_service.c 6473行:
case STORAGE_PROTO_CMD_UPLOAD_FILE:
result = storage_upload_file(pTask,
false);
break;
如果命令是上传文件,会调用storage_upload_file函数处理,我们进入这个函数。
storage_service.c 3735行,是storage_upload_file 的入口,
storage_service.c 3771行 :
p = pTask->data
+ sizeof(TrackerHeader);
store_path_index =
*p++;
if (store_path_index
< 0 || store_path_index
>= g_fdfs_path_count)
{
logError("file: "__FILE__", line: %d, "
\
"client ip: %s, store_path_index: %d "
\
"is invalid", __LINE__,
\
pTask->client_ip, store_path_index);
pClientInfo->total_length
= sizeof(TrackerHeader);
return EINVAL;
}
file_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
if (file_bytes
< 0 || file_bytes
!= nInPackLen
- \
(1
+ FDFS_PROTO_PKG_LEN_SIZE +
\
FDFS_FILE_EXT_NAME_MAX_LEN))
{
logError("file: "__FILE__", line: %d, "
\
"client ip: %s, pkg length is not correct, "
\
"invalid file bytes: "INT64_PRINTF_FORMAT
\
", total body length: "INT64_PRINTF_FORMAT,
\
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
pClientInfo->total_length
= sizeof(TrackerHeader);
return EINVAL;
}
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
*(file_ext_name
+ FDFS_FILE_EXT_NAME_MAX_LEN)
= '\0';
p += FDFS_FILE_EXT_NAME_MAX_LEN;
pFileContext->calc_crc32
= true;
pFileContext->calc_file_hash
= g_check_file_duplicate;
pFileContext->extra_info.upload.start_time
= time(NULL);
代码分析:
1. 首先解析出store_path_index;
2. 解析出file_bytes;
3. 解析出file_ext_name;
4. 将相关的属性赋值给pFileContext
storage_service.c 3810行 :
pFileContext->extra_info.upload.file_type
= _FILE_TYPE_REGULAR;
pFileContext->sync_flag
= STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
pFileContext->timestamp2log
= pFileContext->extra_info.upload.start_time;
pFileContext->op
= FDFS_STORAGE_FILE_OP_WRITE;
if (bAppenderFile)
{
pFileContext->extra_info.upload.file_type
|=
\
_FILE_TYPE_APPENDER;
}
else
{
if (g_if_use_trunk_file
&& trunk_check_size(
\
TRUNK_CALC_SIZE(file_bytes)))
{
pFileContext->extra_info.upload.file_type
|=
\
_FILE_TYPE_TRUNK;
}
}
注意,这里把pFileContext->op 置为 FDFS_STORAGE_FILE_OP_WRITE;说明要执行的是写操作。
为了简化分析,我们暂时不考虑trunk 这种方式,以后会单独详细分析。
接着往下执行,storage_service.c 3893行 :
return storage_write_to_file(pTask, file_offset, file_bytes,
\
p - pTask->data, dio_write_file,
\
storage_upload_file_done_callback,
\
clean_func, store_path_index);
开始真正的写文件操作了。
我们进入storage_write_to_file,storage_service.c 5567行 :
static int storage_write_to_file(struct fast_task_info
*pTask,
\
const int64_t file_offset,
const int64_t upload_bytes,
\
const
int buff_offset, TaskDealFunc deal_func,
\
FileDealDoneCallback done_callback,
\
DisconnectCleanFunc clean_func,
const int store_path_index)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
int result;
pClientInfo =
(StorageClientInfo *)pTask->arg;
pFileContext =
&(pClientInfo->file_context);
pClientInfo->deal_func
= deal_func;
pClientInfo->clean_func
= clean_func;
pFileContext->fd
= -1;
pFileContext->buff_offset
= buff_offset;
pFileContext->offset
= file_offset;
pFileContext->start
= file_offset;
pFileContext->end
= file_offset + upload_bytes;
pFileContext->dio_thread_index
= storage_dio_get_thread_index(
\
pTask, store_path_index, pFileContext->op);
pFileContext->done_callback
= done_callback;
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_XINIT;
}
if (pFileContext->calc_file_hash)
{
INIT_HASH_CODES4(pFileContext->file_hash_codes)
}
if ((result=storage_dio_queue_push(pTask))
!= 0)
{
pClientInfo->total_length
= sizeof(TrackerHeader);
return result;
}
return STORAGE_STATUE_DEAL_FILE;
}
代码分析:
1. 注意参数的回调函数,这里含义是写动作执行完成后,会主动调用;
2.storage_dio_queue_push 函数,把pTask push 到dio 队列中;
3. 前面在分析storage 入口函数的时候,在main函数init 时,已经完成了对dio 线程的初始化动作;
4. 这里是往dio队列中push pTask;
5. storage_dio_queue_push 内部 使用的task_queue_push,同时,pthread_cond_signal 通知队列另端有数据到来。
6.特别需要注意 pClientInfo->deal_func = deal_func; 下面会用到。
既然已经放入队列,那么,另端就应该从队列中取出任务执行。这里,我们看一下dio线程执行函数。
storage_dio.c 646行:
static void *dio_thread_entrance(void* arg)
{
int result;
struct storage_dio_context *pContext;
struct fast_task_info *pTask;
pContext =
(struct storage_dio_context *)arg;
pthread_mutex_lock(&(pContext->lock));
while (g_continue_flag)
{
if ((result=pthread_cond_wait(&(pContext->cond),
\
&(pContext->lock)))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_cond_wait fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
while
((pTask=task_queue_pop(&(pContext->queue)))
!=
NULL)
{
((StorageClientInfo
*)pTask->arg)->deal_func(pTask);
}
}
pthread_mutex_unlock(&(pContext->lock));
if ((result=pthread_mutex_lock(&g_dio_thread_lock))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_mutex_lock fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
g_dio_thread_count--;
if ((result=pthread_mutex_unlock(&g_dio_thread_lock))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_mutex_lock fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
logDebug("file: "__FILE__", line: %d, "
\
"dio thread exited, thread count: %d",
\
__LINE__, g_dio_thread_count);
return NULL;
}
代码分析:
1. 当队列有数据,取出数据,执行之;
2. ((StorageClientInfo *)pTask->arg)->deal_func(pTask); 这里通过回调函数完成任务的处理。
结合前面的分析,deal_func 函数指针,其实是函数 dio_write_file;
我们接着分析 dio_write_file,这个函数在 storage_dio.c 421行。
storage_dio.c 454行:
if (write(pFileContext->fd, pDataBuff,
write_bytes) != write_bytes)
{
result = errno
!= 0 ? errno
: EIO;
logError("file: "__FILE__", line: %d, "
\
"write to file: %s fail, fd=%d, write_bytes=%d, "
\
"errno: %d, error info: %s",
\
__LINE__, pFileContext->filename,
\
pFileContext->fd, write_bytes,
\
result, STRERROR(result));
}
调用write 写数据了。
接着分析, storage_dio.c 478行:
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_ex(pDataBuff, write_bytes,
\
pFileContext->crc32);
}
storage 通过crc32 保证数据的完整性。
接着分析, storage_dio.c 495行:
pFileContext->offset
+= write_bytes;
if (pFileContext->offset
< pFileContext->end)
{
pFileContext->buff_offset
= 0;
storage_nio_notify(pTask);
//notify nio
to deal
}
else
{
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_FINAL(
\
pFileContext->crc32);
}
if (pFileContext->calc_file_hash)
{
FINISH_HASH_CODES4(pFileContext->file_hash_codes)
}
if (pFileContext->extra_info.upload.before_close_callback
!=
NULL)
{
result = pFileContext->extra_info.upload.
\
before_close_callback(pTask);
}
/* file write done, close it
*/
close(pFileContext->fd);
pFileContext->fd
= -1;
if (pFileContext->done_callback
!=
NULL)
{
pFileContext->done_callback(pTask, result);
}
}
代码分析:
1. 如果文件数据长度没有接收完,会storage_nio_notify 继续从nio(网络io) 读取文件内容;
2. 写文件完成后,得到文件 crc32 的值;
3. 执行回调函数,pFileContext->done_callback(pTask, result);
4. 而这个done_callback 实际上是 storage_upload_file_done_callback。
接着分析storage_upload_file_done_callback, storage_dio.c 1004行:
static void storage_upload_file_done_callback(struct fast_task_info
*pTask,
\
const
int err_no)
接着分析, 先跳过trunk文件的处理,trunk 后面分析,storage_dio.c 1029行:
if (result
== 0)
{
result = storage_service_upload_file_done(pTask);
if (result
== 0)
{
if (pFileContext->create_flag
& STORAGE_CREATE_FLAG_FILE)
{
result = storage_binlog_write(\
pFileContext->timestamp2log,
\
STORAGE_OP_TYPE_SOURCE_CREATE_FILE,
\
pFileContext->fname2log);
}
}
}
代码分析:
1. 调用storage_service_upload_file_done,完成接收文件后的操作。目前暂时先不展开分析,因为这里涉及到slave、link、fastdht 等机制,后面专门详细介绍。
2. 文件上传结束后,会调用 storage_binlog_write 写入binlog。
接着往下分析, storage_dio.c 1085行:
pHeader =
(TrackerHeader *)pTask->data;
pHeader->status
= result;
pHeader->cmd
= STORAGE_PROTO_CMD_RESP;
long2buff(pClientInfo->total_length
- sizeof(TrackerHeader),
\
pHeader->pkg_len);
storage_nio_notify(pTask);
构造应答的数据包,storage_nio_notify发送至客户端。
至此,我们简要分析了storage 接收文件的处理流程。大概的调用脉络上还是清晰的。像slave、trunk、link、fastdht 等机制,比较复杂,后面慢慢分析。本章节主要解释的就是storage对文件上传的大概处理流程。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
storage_service.c 6473行:
case STORAGE_PROTO_CMD_UPLOAD_FILE:
result = storage_upload_file(pTask,
false);
break;
如果命令是上传文件,会调用storage_upload_file函数处理,我们进入这个函数。
storage_service.c 3735行,是storage_upload_file 的入口,
storage_service.c 3771行 :
p = pTask->data
+ sizeof(TrackerHeader);
store_path_index =
*p++;
if (store_path_index
< 0 || store_path_index
>= g_fdfs_path_count)
{
logError("file: "__FILE__", line: %d, "
\
"client ip: %s, store_path_index: %d "
\
"is invalid", __LINE__,
\
pTask->client_ip, store_path_index);
pClientInfo->total_length
= sizeof(TrackerHeader);
return EINVAL;
}
file_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
if (file_bytes
< 0 || file_bytes
!= nInPackLen
- \
(1
+ FDFS_PROTO_PKG_LEN_SIZE +
\
FDFS_FILE_EXT_NAME_MAX_LEN))
{
logError("file: "__FILE__", line: %d, "
\
"client ip: %s, pkg length is not correct, "
\
"invalid file bytes: "INT64_PRINTF_FORMAT
\
", total body length: "INT64_PRINTF_FORMAT,
\
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
pClientInfo->total_length
= sizeof(TrackerHeader);
return EINVAL;
}
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
*(file_ext_name
+ FDFS_FILE_EXT_NAME_MAX_LEN)
= '\0';
p += FDFS_FILE_EXT_NAME_MAX_LEN;
pFileContext->calc_crc32
= true;
pFileContext->calc_file_hash
= g_check_file_duplicate;
pFileContext->extra_info.upload.start_time
= time(NULL);
代码分析:
1. 首先解析出store_path_index;
2. 解析出file_bytes;
3. 解析出file_ext_name;
4. 将相关的属性赋值给pFileContext
storage_service.c 3810行 :
pFileContext->extra_info.upload.file_type
= _FILE_TYPE_REGULAR;
pFileContext->sync_flag
= STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
pFileContext->timestamp2log
= pFileContext->extra_info.upload.start_time;
pFileContext->op
= FDFS_STORAGE_FILE_OP_WRITE;
if (bAppenderFile)
{
pFileContext->extra_info.upload.file_type
|=
\
_FILE_TYPE_APPENDER;
}
else
{
if (g_if_use_trunk_file
&& trunk_check_size(
\
TRUNK_CALC_SIZE(file_bytes)))
{
pFileContext->extra_info.upload.file_type
|=
\
_FILE_TYPE_TRUNK;
}
}
注意,这里把pFileContext->op 置为 FDFS_STORAGE_FILE_OP_WRITE;说明要执行的是写操作。
为了简化分析,我们暂时不考虑trunk 这种方式,以后会单独详细分析。
接着往下执行,storage_service.c 3893行 :
return storage_write_to_file(pTask, file_offset, file_bytes,
\
p - pTask->data, dio_write_file,
\
storage_upload_file_done_callback,
\
clean_func, store_path_index);
开始真正的写文件操作了。
我们进入storage_write_to_file,storage_service.c 5567行 :
static int storage_write_to_file(struct fast_task_info
*pTask,
\
const int64_t file_offset,
const int64_t upload_bytes,
\
const
int buff_offset, TaskDealFunc deal_func,
\
FileDealDoneCallback done_callback,
\
DisconnectCleanFunc clean_func,
const int store_path_index)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
int result;
pClientInfo =
(StorageClientInfo *)pTask->arg;
pFileContext =
&(pClientInfo->file_context);
pClientInfo->deal_func
= deal_func;
pClientInfo->clean_func
= clean_func;
pFileContext->fd
= -1;
pFileContext->buff_offset
= buff_offset;
pFileContext->offset
= file_offset;
pFileContext->start
= file_offset;
pFileContext->end
= file_offset + upload_bytes;
pFileContext->dio_thread_index
= storage_dio_get_thread_index(
\
pTask, store_path_index, pFileContext->op);
pFileContext->done_callback
= done_callback;
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_XINIT;
}
if (pFileContext->calc_file_hash)
{
INIT_HASH_CODES4(pFileContext->file_hash_codes)
}
if ((result=storage_dio_queue_push(pTask))
!= 0)
{
pClientInfo->total_length
= sizeof(TrackerHeader);
return result;
}
return STORAGE_STATUE_DEAL_FILE;
}
代码分析:
1. 注意参数的回调函数,这里含义是写动作执行完成后,会主动调用;
2.storage_dio_queue_push 函数,把pTask push 到dio 队列中;
3. 前面在分析storage 入口函数的时候,在main函数init 时,已经完成了对dio 线程的初始化动作;
4. 这里是往dio队列中push pTask;
5. storage_dio_queue_push 内部 使用的task_queue_push,同时,pthread_cond_signal 通知队列另端有数据到来。
6.特别需要注意 pClientInfo->deal_func = deal_func; 下面会用到。
既然已经放入队列,那么,另端就应该从队列中取出任务执行。这里,我们看一下dio线程执行函数。
storage_dio.c 646行:
static void *dio_thread_entrance(void* arg)
{
int result;
struct storage_dio_context *pContext;
struct fast_task_info *pTask;
pContext =
(struct storage_dio_context *)arg;
pthread_mutex_lock(&(pContext->lock));
while (g_continue_flag)
{
if ((result=pthread_cond_wait(&(pContext->cond),
\
&(pContext->lock)))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_cond_wait fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
while
((pTask=task_queue_pop(&(pContext->queue)))
!=
NULL)
{
((StorageClientInfo
*)pTask->arg)->deal_func(pTask);
}
}
pthread_mutex_unlock(&(pContext->lock));
if ((result=pthread_mutex_lock(&g_dio_thread_lock))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_mutex_lock fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
g_dio_thread_count--;
if ((result=pthread_mutex_unlock(&g_dio_thread_lock))
!= 0)
{
logError("file: "__FILE__", line: %d, "
\
"call pthread_mutex_lock fail, "
\
"errno: %d, error info: %s",
\
__LINE__, result, STRERROR(result));
}
logDebug("file: "__FILE__", line: %d, "
\
"dio thread exited, thread count: %d",
\
__LINE__, g_dio_thread_count);
return NULL;
}
代码分析:
1. 当队列有数据,取出数据,执行之;
2. ((StorageClientInfo *)pTask->arg)->deal_func(pTask); 这里通过回调函数完成任务的处理。
结合前面的分析,deal_func 函数指针,其实是函数 dio_write_file;
我们接着分析 dio_write_file,这个函数在 storage_dio.c 421行。
storage_dio.c 454行:
if (write(pFileContext->fd, pDataBuff,
write_bytes) != write_bytes)
{
result = errno
!= 0 ? errno
: EIO;
logError("file: "__FILE__", line: %d, "
\
"write to file: %s fail, fd=%d, write_bytes=%d, "
\
"errno: %d, error info: %s",
\
__LINE__, pFileContext->filename,
\
pFileContext->fd, write_bytes,
\
result, STRERROR(result));
}
调用write 写数据了。
接着分析, storage_dio.c 478行:
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_ex(pDataBuff, write_bytes,
\
pFileContext->crc32);
}
storage 通过crc32 保证数据的完整性。
接着分析, storage_dio.c 495行:
pFileContext->offset
+= write_bytes;
if (pFileContext->offset
< pFileContext->end)
{
pFileContext->buff_offset
= 0;
storage_nio_notify(pTask);
//notify nio
to deal
}
else
{
if (pFileContext->calc_crc32)
{
pFileContext->crc32
= CRC32_FINAL(
\
pFileContext->crc32);
}
if (pFileContext->calc_file_hash)
{
FINISH_HASH_CODES4(pFileContext->file_hash_codes)
}
if (pFileContext->extra_info.upload.before_close_callback
!=
NULL)
{
result = pFileContext->extra_info.upload.
\
before_close_callback(pTask);
}
/* file write done, close it
*/
close(pFileContext->fd);
pFileContext->fd
= -1;
if (pFileContext->done_callback
!=
NULL)
{
pFileContext->done_callback(pTask, result);
}
}
代码分析:
1. 如果文件数据长度没有接收完,会storage_nio_notify 继续从nio(网络io) 读取文件内容;
2. 写文件完成后,得到文件 crc32 的值;
3. 执行回调函数,pFileContext->done_callback(pTask, result);
4. 而这个done_callback 实际上是 storage_upload_file_done_callback。
接着分析storage_upload_file_done_callback, storage_dio.c 1004行:
static void storage_upload_file_done_callback(struct fast_task_info
*pTask,
\
const
int err_no)
接着分析, 先跳过trunk文件的处理,trunk 后面分析,storage_dio.c 1029行:
if (result
== 0)
{
result = storage_service_upload_file_done(pTask);
if (result
== 0)
{
if (pFileContext->create_flag
& STORAGE_CREATE_FLAG_FILE)
{
result = storage_binlog_write(\
pFileContext->timestamp2log,
\
STORAGE_OP_TYPE_SOURCE_CREATE_FILE,
\
pFileContext->fname2log);
}
}
}
代码分析:
1. 调用storage_service_upload_file_done,完成接收文件后的操作。目前暂时先不展开分析,因为这里涉及到slave、link、fastdht 等机制,后面专门详细介绍。
2. 文件上传结束后,会调用 storage_binlog_write 写入binlog。
接着往下分析, storage_dio.c 1085行:
pHeader =
(TrackerHeader *)pTask->data;
pHeader->status
= result;
pHeader->cmd
= STORAGE_PROTO_CMD_RESP;
long2buff(pClientInfo->total_length
- sizeof(TrackerHeader),
\
pHeader->pkg_len);
storage_nio_notify(pTask);
构造应答的数据包,storage_nio_notify发送至客户端。
至此,我们简要分析了storage 接收文件的处理流程。大概的调用脉络上还是清晰的。像slave、trunk、link、fastdht 等机制,比较复杂,后面慢慢分析。本章节主要解释的就是storage对文件上传的大概处理流程。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
相关文章推荐
- FastDFS分布式文件系统点滴记录5 -- upload上传机制剖析1
- FastDFS分布式文件系统点滴记录5 -- upload上传机制剖析2
- FastDFS分布式文件系统点滴记录6 -- download下载机制剖析
- FastDFS分布式文件系统点滴记录1 -- 目录
- FastDFS分布式文件系统点滴记录2 -- 架构概述
- FastDFS分布式文件系统点滴记录3 -- 网络模型、libevent框架使用
- FastDFS分布式文件系统点滴记录4 -- tracker 、storage 入口分析
- 分布式文件系统FastDFS架构剖析
- angular file upload 队列上传机制
- 分布式文件系统FastDFS架构剖析
- Fastdfs分布式文件系统之文件同步机制
- 分布式文件系统fastDFS多种上传下载API代码及测速情况。
- 分布式文件系统FastDFS架构剖析
- 分布式文件系统 fastdfs源码上传流程分析
- 分布式文件系统FastDFS架构剖析
- 分布式文件系统 - FastDFS 配置 Nginx 模块及上传测试
- 记录Angular2 file-upload 图片上传和文件上传
- 分布式文件系统FastDFS架构剖析
- 分布式文件系统FastDFS架构剖析
- Fastdfs分布式文件系统(图片上传)代码共享