外部排序
2015-10-02 02:58
363 查看
/* 外部排序指的是大文件的排序,即待排序的记录存储在外存储器上,待排序的文件无法一次装入内存,需要在内存和外部存储器之间进行多次数据交换,以达到排序整个文件的目的。外部排序最常用的算法是多路归并排序,即将原文件分解成多个能够一次性装人内存的部分,分别把每一部分调入内存完成排序。然后,对已经排序的子文件进行归并排序。
外排序分为两个步骤:预处理和合并排序。即首先根据内存的大小,将有n个记录的磁盘文件分批读入内存,采用有效的内存排序方法进行排序,将其预处理为若干个有序的子文件,这些有序子文件就是初始顺串,然后采用合并的方法将这些初始顺串逐趟合并成一个有序文件。
预处理阶段最重要的事情就是选择初始顺串。通常使用的方法为置换选择排序,它是堆排序的一种变形,步骤如下:
/*功能:将一个某字段没有排序的DBF文件排序输出,采用
* 外部排序算法,K路合并排序算法。排序需要
* 一定的磁盘空间,否则会出现错误。如果文件很大
* 时间可能会很长。本程序适合排序非常大的DBF文件
* 。开始的时候使用快速排序(unix系统并行操作)
* ,然后合并K路排序段(Unix系统并行操作),最后合
* 并最后两个,不使用并行,生成目标文件。比较字符
* 串最长不超过100字符,而且字符必须是Ascii码
* 中间有IO操作
建议:排序文件大小在100M-10G左右记录在100万-20亿之间
* 排序时源文件所在目录要留有源文件两倍大小的空间
*/
//#define UNIX*/
*32位编译环境下:
* short 16 bit
* int 32 bit
* long 64 bit
*64位编译环境下:
* int 32 bit
* long 64 bit
*/
/* dbf头文件结构*/
typedef struct
{
unsigned char mask ; // 0位
unsigned char date[3] ; // 1-3 最后改动时间
unsigned short int record_length ; // 10-11 每条记录的长度
char nouse[20]; //不用的
} DBF_FILE_HEAD;
/dbf字段结构/
typedef struct
{
unsigned char name[11]; // 名字
unsigned char type ; //类型
unsigned char dec ; //如果为0表示数值型为整数,其他表示浮点型,
//注意dbf中间数值型以字符表示
char nouse[13]; //没有用的
} DBF_FILE_FIELD,*pDBF_FILE_FIELD;
/dbf文件字段完全数据/
typedef struct{
unsigned int len; //长度
char * pttr; //起始地址
} DBF_FILE_ALL_FIELD,*pDBF_FILE_ALL_FIELD;
/*
排序临时文件结构。
*/
typedef struct{
int fileid; //文件编号,从0开始
int max; //文件数最大值
char file[150]; //文件路径
int used; //0表示未使用,1表示已经使用
DBF_FILE_HEAD dff; //此文件的头信息
FILE * f; //打开的文件句柄
/外结点c,只存放待归并记录的关键码,字符类型/
typedef struct{
char key[MAX_CMP_CHAR];
}ExNode,*pExNode;
//失败树结构
typedef struct{
int element;
}LoserTree,*pLoserTree;
typedef struct{
unsigned long inrows; //读取到那条记录数
unsigned long allrows; //总的记录数
char * buf ; //缓存地址
}SEG_MEM,*pSEG_MEM;
SORT_TEMP_FILE *sort_file; //初始化时分配内存。
DBF_FILE_HEAD dbfHead; //dbf原文件的头描述
DBF_FILE_FIELD dbfField; //要排序的字段描述。
int lowext,koffset; //最底层的外部节点数。
int verifyDbfFile(FILE *file);
int golfileId; //这个是临时文件嵌套id号,一次嵌套增加一
char argsg[100]; //存放args[1]
功能:将短整形数(16位,两个字节)高低位反转
*/
void revert_unsigned_short(unsigned short *a)
{
unsigned short left,right;
left=right=*a;
*a=((left&0x00ff)<<8)|((right&0xff00)>>8);
}
/*
功能:将长整形数(32位,四个字节)高低位反转
*/
unsigned long first,second,third,forth;
first=second=third=forth=*a;
*a=((first&0x000000ff)<<24)|
((second&0x0000ff00)<<8)|
((third&0x00ff0000)>>8)|
((forth&0xff000000)>>24);
}
/*
功能:打开一个文件,返回文件句柄
*/
FILE * openAFile(char FileIn , char aa)
{
FILE *fileIn = NULL;
// char * writeMode[2] = {“rb”,”wb”};
//#ifdef HP_UX
// strcpy(writeMode,”wb”);
//#endif
if((fileIn = fopen(FileIn,aa)) == NULL)
{
return NULL;
}
return fileIn;
}
/*
功能:写一个dbf文件的某段,
参数:buf_sort是缓存段
sort_area_rows是要写的记录数
tmpF文件句柄
allf是头信息
flag是1表示要写头和文件体,0表示不要写头只写文件,2表示只写头。
失败返回-1,成功返回0
参数:
*/
int writeAFile(char * buf_sort ,unsigned int sort_area_rows, FILE * tmpF,/
DBF_FILE_ALL_FIELD allf,int flag,unsigned long rec_num){
DBF_FILE_HEAD * write_head; //写的dbf的头
DBF_FILE_FIELD * write_field; //写的dbf的头
printf(“malloc write head error”);
return -1;
}
if((write_field=malloc(dbfHead.head_length-32))==NULL){
printf(“malloc write head error”);
return -1;
}
if(flag==1 || flag==2){
memcpy(write_head,&dbfHead,32);
//memcpy(write_field,&dbfHead+0x2,dbfHead.head_length-32);
if(flag==2)(* write_head).record_num=rec_num;
fwrite(allf.pttr,dbfHead.head_length-32,1,tmpF);
}
if(flag!=2)
fwrite(buf_sort,dbfHead.record_length,sort_area_rows,tmpF);
free(write_head);
free(write_field);
return 0;
}
/*
功能:从指定文件里面读取一个小于等于SORT_AREA_ROWS记录数的段
到内存段当中。返回实际读取字节数。
参数:infiles是文件结构数组指针
k是结构数组中那个文件
seg是段句柄
返回0为已经到了文件末尾
*/
unsigned int read_data(SORT_TEMP_FILE * infiles,int k,SEG_MEM * seg){
//判断是否已经到了文件结尾
unsigned int redin=0;
if(infiles[k].curr_read_row==infiles[k].dff.record_num) return 0;
redin=fread(seg[k].buf,dbfHead.record_length,SORT_AREA_ROWS,infiles[k].f);
infiles[k].curr_read_row+=redin;
seg[k].inrows=0; //初始化读取记录数
seg[k].allrows=redin;
return redin;
}
/*
功能:将一组记录写入一个DBF格式的文件,组大小为
must less then 10/n”);
/*
功能:输入失败树,从seg读取到ExNode
如果seg已经到了最后,则重新读取到seg中。
参数:infiles是文件数组
seg是段数值指针
b是叶节点
k是到那一路输入
*/
void input_ex(pSORT_TEMP_FILE infiles, pSEG_MEM seg ,pExNode b,int k){
char tmp[MAX_CMP_CHAR];
tmp[dbfField.length]=’/0’;
if(read_data(infiles,k,seg)==0){ //文件已经到了末尾
strcpy(b[k].key,MAXKEY);
return ;
}
}
memcpy(tmp,(seg[k].buf+dbfHead.record_length*seg[k].inrows+dbfField.addr),dbfField.length);
strcpy(b[k].key,tmp);
seg[k].inrows++;
}
/*
功能:调整失败树其实只用到ls[1]…ls[k-1],ls[0]不用。
*/
void Adjust(LoserTree *ls,pExNode b,int k,int s)
{
int tmp=0;
int t=0;
char tmpc[MAX_CMP_CHAR]=”“;
strcpy(tmpc,b[s].key);
t=(int)((s+k)/2);
while(t>0)
{
if(strcmp(b[s].key,b[ls[t].element].key)>=0){
tmp=s; //值互换,s指示新的胜利者
s=ls[t].element;
ls[t].element=tmp;
}
t=(int)(t/2);
}
ls[0].element=s; //最终s变成最小值数组索引,保存在ls[0]中
}
/*
功能:建立败者树已知b[0]到b[k-1]为完全二叉树
ls的叶子结点存有k个关键码,沿从叶子到
根的k条路径。
*/
void CreateLoserTree(LoserTree *ls,pExNode b,int k){
int i=0,j=0,h=0;
char tmp[MAX_CMP_CHAR]=”“;
ExNode c[MERGE_K_MAX];
for(i=0;i
pid_t pid; //进程号
int process=0;
int k=0,v=0,l=0;
char tmppix1[10]=”“;
char tmppix2[10]=”“;
int mergek=0;
unsigned long rec_num=0;
printf(“infile or outfile in marge is null”);
return -1;
}
//最终已经生成了文件了
if(infiles[0].max==1){
strcpy(outfiles,infiles[0].file);
return 0;
}
if(infiles[0].max%merge_k==0){
part=(int)(infiles[0].max/merge_k);
}else{
part=(int)(infiles[0].max/merge_k)+1;
}
for(pa=0;pa
process++;
if(process>=MAX_PROCESS){
printf(“open too many process than %d and exit /n”,MAX_PROCESS);
exit(1);
}
if(pid=fork()<0){
printf(“fork process error”);
if(pa!=0){pa–;now_process–;} //不成功重新开进程。
continue;
}else if(pid==0){ //父进程
nowpa++;
// parallel_all++;
if(parallel_num==nowpa){
for(v=0;v
strcpy(newfiles[pa].file,argsg);
strcat(newfiles[pa].file,tmppix1);
itoa(pa,tmppix2,10);
strcat(newfiles[pa].file,tmppix2);
strcat(newfiles[pa].file+strlen(argsg)+strlen(tmppix1)+strlen(tmppix2),”.dbf”);
if((newfiles[pa].f=openAFile(newfiles[pa].file,”w+b”))==NULL){
printf(“open file %s failed”,newfiles[k].file);
return -1;
}
newfiles[pa].fileid=pa;
newfiles[pa].max=part;
if((outbuf.buf=malloc(OUT_BUFF*dbfHead.record_length))==NULL){
printf(“malloc in merge sort error”);
return -1;
}
outbuf.allrows=OUT_BUFF;
outbuf.inrows=0;
//这里调整merge_k,前面的mergek传入是建议值。
if((infiles[0].max-(pa+1)*merge_k)>=0) {mergek=merge_k;}
else mergek=infiles[0].max-pa*merge_k;
if(mergek==1){
free(outbuf.buf);
fclose(newfiles[pa].f);
remove(newfiles[pa].file);
rename(infiles[pa*merge_k].file,newfiles[pa].file);
continue;
}//只有一个文件不用合并
for(k=0;k
read_data(infiles,k,seg); //读取记录到内存段
rec_num+=infiles[k].dff.record_num;
}
//写文件头。
writeAFile(NULL,0,newfiles[pa].f,allf,2,rec_num);
rec_num=0;
K_Merge(infiles,newfiles,pa,seg,&outbuf,ls,exc,mergek);
//取消内存,关闭文件或删除
for(k=0;k
golfileId++;
return 0;
}
/*
功能:提取参数的值,返回1失败,返回0成功
*/
int getParaValue(char * * inpa,int start,int end){
char tmp[30],p1[3]=”-M”,p2[3]=”-P”,* retp;
int len=end-start+1,i=0;
for (i=0;i
strcpy(tmp,retp+2);
parallel_num=atoi(tmp);
if(parallel_num>PARALLEL_MAX){
printf(“parallel number you give is too large! /n”);
exit(1);
}
}
return 1;
}
return 0;
}
/*
功能:验证dbf文件长度是否正确
此函数因为有很多文件,所以需要多次
调用。
参数:File 文件路径名
返回值:0 正确,-1 错误
*/
int verifyDbfFile(FILE * afile)
{
long fileSize = 0;
long calSize = 0;
DBF_FILE_HEAD dbfHead2;
fseek(afile,0,SEEK_SET);
fread(&dbfHead2,sizeof(DBF_FILE_HEAD),1,afile);
fseek(afile,0,SEEK_END);
fileSize =(long) ftell(afile);
fseek(afile,0,SEEK_SET);
revert_unsigned_short(&dbfHead2.record_length);
revert_unsigned_long(&dbfHead2.record_num);
if(fileSize < calSize)
{
printf(“dbfHead2.record_num:%ld/n”,dbfHead2.record_num);
printf(“dbfHead2.record_length:%d/n”,dbfHead2.record_length);
printf(“/tbut actual length is %ld /n”,fileSize);
}
return 0;
}
/*
功能:字符和整数数值比较
aa 要比较的字符串
bb 要比较的字符串
aalen aa字符串长度
bblen bb字符串长度
type 类型”C”,”N”
dec 是小数位数
当aa
for(int i=1;i
/*
功能:主函数入口
*/
void main(int argc,char * * args){
int i=0;
int k=0,file_nums=0;
int tmp_files=0; //tmp_files为快速排序段数
char tmpFile[100]=”“; //排序临时文件名字
char tmppix[10]=”“;
FILE * tmpF=NULL; //生成中间文件
char * buf_sort=NULL ; //一个内存排序段
unsigned int sort_area_rows=0; //实际读入的文件长度
DBF_FILE_ALL_FIELD allf; //所有字段数据,写文件用到。
int shmid1=0; //unix系统中共享内存的标示
int parallel_now=0; //现在的子进程数,不包括父进程。
int parallel_all=0; //所有的子进程,全部子进程
parallel_num=DEFAULT_PAR;
now_process=0;
int process=0;
FILE * tar_file=NULL;
char tar_file_name[200]=”“;
char buffer1[BUFFERSIZE];
//char buffer2[BUFFERSIZE];
golfileId=0;
printf(“ERROR parameters too few or too much!/n”);
useage();
exit(1);
}
if(argc>4){
if(getParaValue(args,4,argc-1)==1){
printf(“get parameters vales error/n”);
useage();
exit(1);
}
}
if(strcmp(args[1],args[2])==0){
printf(“name of source file can not same with target file name/n”);
exit(1);
}
//打开原文件
if((src_file=openAFile(args[1],”rb”))==NULL){
printf(“can not open source file!”);
exit(1);
}
//打开目标文件
//if((tar_file=openAFile(args[2],”w+b”))==NULL){
// printf(“can not open target file!”);
// exit(1);
//}
/* 设置文件缓冲区*/
if( setvbuf(src_file, buffer1, _IOFBF, BUFFERSIZE) != 0){
printf(“Incorrect type or size of buffer for fileIn/n”);
exit(1);
}
//if( setvbuf(tar_file, buffer2, _IOFBF, BUFFERSIZE) != 0){
// printf(“Incorrect type or size of buffer for fileOut/n”);
// exit(1);
//}
/* 读dbf头文件信息 */
fseek(src_file,0,SEEK_SET);
fread(&dbfHead,sizeof(DBF_FILE_HEAD),1,src_file);
revert_unsigned_long(&dbfHead.record_num); /* 总的记录数*/
revert_unsigned_short(&dbfHead.record_length); /* 每条记录长度*/
printf(“dbfHead.record_num:%ld/n”,dbfHead.record_num);
printf(“dbfHead.record_length:%d/n”,dbfHead.record_length);
if(verifyDbfFile(src_file)==-1){
printf(“source file is destoryed”);
exit(1);
}
//验证排序字段是否出现在原文件DBF头中
fseek(src_file,(int)0x20,SEEK_SET); //将指针掉向字段记录开始
for(i=0;i
if((allf.pttr=malloc(dbfHead.head_length-32))==NULL){
printf(“malloc memary to allf error !”);
exit(1);
}
fseek(src_file,(int)0x20,SEEK_SET); //将指针掉向字段记录开始
if((allf.len=fread(allf.pttr,dbfHead.head_length-32,1,src_file))!=1){
printf(“read the column message error /n”);
exit(1);
}
//如果记录长度超过RECORD_MAX则推出
if(dbfField.length>(RECORD_MAX-1)){
printf(“the sort coloum is %s and length is %d /n”,dbfField.name,dbfField.length);
printf(“%s,%d /n”,”sort column length must less then “,RECORD_MAX-1);
exit(1);
}
//如果比较字符串,长度应该小于最长长度MAX_CMP_CHAR
if(dbfField.type=’C’ && dbfField.length>MAX_CMP_CHAR-1){
printf(“%s,%d /n”,”sort column length must less then “,MAX_CMP_CHAR-1);
exit(1);
}
file parameters can not adjust:
must: dbfHead.record_num*dbfHead.record_length<=SORT_AREA_ROWS*SORT_FILE_MAX*dbfHead.record_length
memary parameter can adjust MERAGE_K_MAX:
must: max_mem>=min(parallel_num)*(SORT_AREA_ROWS*dbfHead.record_length*min(MERGE_K_MAX=2))
final:max_mem=adjust1(parallel_num*(SORT_AREA_ROWS*dbfHead.record_length*adjust2(MERGE_K_MAX))
*/
printf(“The source file is too large!it must less then %f /n”, /
(double)SORT_AREA_ROWS*SORT_FILE_MAX*dbfHead.record_length);
exit(1);
}
if( max_mem!=0 && max_mem<1*(SORT_AREA_ROWS*dbfHead.record_length*2 )){
printf(“The max memary is too small “);
exit(1);
}else{
}
if((dbfHead.record_num%SORT_AREA_ROWS)!=0)
file_nums=(int)(dbfHead.record_num/SORT_AREA_ROWS)+1;
else
file_nums=(int)(dbfHead.record_num/SORT_AREA_ROWS);
//分配排序临时文件空间,在Unix中为共享的内存。
printf(“shmget error”);
exit (1);
}
if((sort_file = (SORT_TEMP_FILE )shmat(shmid1,0,0)) == (void )-1){
printf(“shmat error”);
exit(1);
}
//快速排序,在Unix系统中使用多进程
for(k=0;k
strcpy(tmpFile,args[1]);
strcat(tmpFile,tmppix);
strcat(tmpFile+strlen(args[1])+strlen(tmppix),”.dbf”);
if((buf_sort=malloc(SORT_AREA_ROWS*dbfHead.record_length))==NULL){
printf(“malloc memary in child process error and exit”);
exit(1);
}
fseek(src_file,tmp_files*SORT_AREA_ROWS*dbfHead.record_length+dbfHead.head_length,SEEK_SET);
sort_area_rows=fread(buf_sort,dbfHead.record_length,SORT_AREA_ROWS,src_file);
if((tmpF=openAFile(tmpFile,”w+b”))==NULL){
printf(“open files %s failed”,tmpFile);
exit(1);
}
}
fclose(tmpF);
free(buf_sort);
sort_file[tmp_files].fileid=tmp_files;
if(dbfHead.record_num%SORT_AREA_ROWS!=0)
sort_file[tmp_files].max=((int)(dbfHead.record_num/SORT_AREA_ROWS)+1);
else sort_file[tmp_files].max=(int)(dbfHead.record_num/SORT_AREA_ROWS);
strcpy(sort_file[tmp_files].file,tmpFile);
sort_file[tmp_files].used=0;
}
//成功退出
exit(0);
}
}
printf(“Assign memary for sort temp file message failed”);
exit (1);
}
//快速排序
for(tmp_files=0;tmp_files
if(merge_sort(sort_file,tar_file_name,merge_k,allf)==-1){
printf(“merge sort error ans exit”);
exit(1);
}
//改名
rename(tar_file_name,args[2]);
printf(“can not open target file!”);
exit(1);
}
//判断文件完整性
if(verifyDbfFile(tar_file)==-1){
printf(“target file is destoryed”);
exit(1);
}
//打印成功标致,退出
printf(“file out sort succeeful”);
fclose(src_file);
fclose(tar_file);
free(sort_file);
exit(0);
}
外排序分为两个步骤:预处理和合并排序。即首先根据内存的大小,将有n个记录的磁盘文件分批读入内存,采用有效的内存排序方法进行排序,将其预处理为若干个有序的子文件,这些有序子文件就是初始顺串,然后采用合并的方法将这些初始顺串逐趟合并成一个有序文件。
预处理阶段最重要的事情就是选择初始顺串。通常使用的方法为置换选择排序,它是堆排序的一种变形,步骤如下:
1.初始化堆 (1)从磁盘读入M个记录放到数组RAM中 (2)设置堆末尾标准LAST=M-1 (3)建立最小值堆 2.重复以下步骤直到堆为空 (1)把具有最小关键码值的记录Min也就是根节点送到输出缓冲区 (2)设R是输入缓冲区中的下一条记录,如果R的关键码大于刚刚输出的关键码值Min,则把R放到根节点,否则使用数组中LAST位置的记录代替根节点,并将刚才的R放入到LAST所在位置,LAST=LAST-1; (3)重新排列堆,筛出根节点 */
/*功能:将一个某字段没有排序的DBF文件排序输出,采用
* 外部排序算法,K路合并排序算法。排序需要
* 一定的磁盘空间,否则会出现错误。如果文件很大
* 时间可能会很长。本程序适合排序非常大的DBF文件
* 。开始的时候使用快速排序(unix系统并行操作)
* ,然后合并K路排序段(Unix系统并行操作),最后合
* 并最后两个,不使用并行,生成目标文件。比较字符
* 串最长不超过100字符,而且字符必须是Ascii码
* 中间有IO操作
建议:排序文件大小在100M-10G左右记录在100万-20亿之间
* 排序时源文件所在目录要留有源文件两倍大小的空间
*/
include
include
include
undef UNIX
define UNIX //UNIX系统,支持fork()子进程
undef UNIX
//UNIX 系统,支持fork()子进程//#define UNIX*/
ifdef UNIX
include
include
include
define MAX_PROCESS 200 //最多进程数,超过退出。
endif
//#define HP_UX //HP UNIX则打开这个定义define PRINT_MSG //打印调试信息
/#define COMPILER_WITH_64 / /*定义64编译环境*32位编译环境下:
* short 16 bit
* int 32 bit
* long 64 bit
*64位编译环境下:
* int 32 bit
* long 64 bit
*/
define MERGE_K_MAX 6 //K路合并排序最大的值,此值规定一个上限
int merge_k; //当前合并排序路数值define SORT_AREA_ROWS 400000 //外部排序写盘的记录数大小,实际是规定每个文件大小
//这个缓冲区在内存有对应大小
define DEFAULT_MAX_MEM 33554432 //默认最大使用内存,byte数
define BUFFERSIZE 32768 //环境缓冲区长度
define OUT_BUFF 32768 //合并排序缓冲区长度
define SORT_FILE_MAX 1000 //排序中使用的最大的临时文件个数。
ifdef UNIX
define DEFAULT_PAR 1 //默认的并行度
int parallel_num ; //并行数量define PARALLEL_MAX 20 //最大并行度
endif
define MAX_CMP_CHAR 100 //比较最长的字符串长度
define RECORD_MAX 2000 //记录的最长字符串数
define MINKEY “/0” //失败树关键码最小值
define MAXKEY “~~~~~~~~~~~~~~~~” //失败树关键码最大值
unsigned int max_mem; //内存使用最大数,这是一个总体的数目,包括所有子进程/* dbf头文件结构*/
typedef struct
{
unsigned char mask ; // 0位
unsigned char date[3] ; // 1-3 最后改动时间
ifdef COMPILER_WITH_64
unsigned int record_num;
else
unsigned long record_num; // 4-7 记录数endif
unsigned short int head_length; // 8-9 文件头长度,指第一条记录前的字节数unsigned short int record_length ; // 10-11 每条记录的长度
char nouse[20]; //不用的
} DBF_FILE_HEAD;
/dbf字段结构/
typedef struct
{
unsigned char name[11]; // 名字
unsigned char type ; //类型
ifdef COMPILER_WITH_64
unsigned int addr;
else
unsigned long addr; //开始地址从1开始计数的endif
unsigned short length; //长度unsigned char dec ; //如果为0表示数值型为整数,其他表示浮点型,
//注意dbf中间数值型以字符表示
char nouse[13]; //没有用的
} DBF_FILE_FIELD,*pDBF_FILE_FIELD;
/dbf文件字段完全数据/
typedef struct{
unsigned int len; //长度
char * pttr; //起始地址
} DBF_FILE_ALL_FIELD,*pDBF_FILE_ALL_FIELD;
/*
排序临时文件结构。
*/
typedef struct{
int fileid; //文件编号,从0开始
int max; //文件数最大值
char file[150]; //文件路径
int used; //0表示未使用,1表示已经使用
DBF_FILE_HEAD dff; //此文件的头信息
FILE * f; //打开的文件句柄
ifdef COMPILER_WITH_64
unsigned int curr_read_row;
else
unsigned long curr_read_row; //当前读到记录数
endif
}SORT_TEMP_FILE,*pSORT_TEMP_FILE;/外结点c,只存放待归并记录的关键码,字符类型/
typedef struct{
char key[MAX_CMP_CHAR];
}ExNode,*pExNode;
//失败树结构
typedef struct{
int element;
}LoserTree,*pLoserTree;
typedef struct{
unsigned long inrows; //读取到那条记录数
unsigned long allrows; //总的记录数
char * buf ; //缓存地址
}SEG_MEM,*pSEG_MEM;
SORT_TEMP_FILE *sort_file; //初始化时分配内存。
DBF_FILE_HEAD dbfHead; //dbf原文件的头描述
DBF_FILE_FIELD dbfField; //要排序的字段描述。
int lowext,koffset; //最底层的外部节点数。
int verifyDbfFile(FILE *file);
int golfileId; //这个是临时文件嵌套id号,一次嵌套增加一
char argsg[100]; //存放args[1]
ifdef UNIX
int now_process; //当前已经开过进程数endif
/*功能:将短整形数(16位,两个字节)高低位反转
*/
void revert_unsigned_short(unsigned short *a)
{
unsigned short left,right;
left=right=*a;
*a=((left&0x00ff)<<8)|((right&0xff00)>>8);
}
/*
功能:将长整形数(32位,四个字节)高低位反转
*/
ifdef COMPILER_WITH_64
void revert_unsigned_long(unsigned int *a)else
void revert_unsigned_long(unsigned long *a)endif
{unsigned long first,second,third,forth;
first=second=third=forth=*a;
*a=((first&0x000000ff)<<24)|
((second&0x0000ff00)<<8)|
((third&0x00ff0000)>>8)|
((forth&0xff000000)>>24);
}
/*
功能:打开一个文件,返回文件句柄
*/
FILE * openAFile(char FileIn , char aa)
{
FILE *fileIn = NULL;
// char * writeMode[2] = {“rb”,”wb”};
//#ifdef HP_UX
// strcpy(writeMode,”wb”);
//#endif
if((fileIn = fopen(FileIn,aa)) == NULL)
{
return NULL;
}
return fileIn;
}
/*
功能:写一个dbf文件的某段,
参数:buf_sort是缓存段
sort_area_rows是要写的记录数
tmpF文件句柄
allf是头信息
flag是1表示要写头和文件体,0表示不要写头只写文件,2表示只写头。
失败返回-1,成功返回0
参数:
*/
int writeAFile(char * buf_sort ,unsigned int sort_area_rows, FILE * tmpF,/
DBF_FILE_ALL_FIELD allf,int flag,unsigned long rec_num){
DBF_FILE_HEAD * write_head; //写的dbf的头
DBF_FILE_FIELD * write_field; //写的dbf的头
if((write_head=malloc(sizeof(DBF_FILE_HEAD)))==NULL){
printf(“malloc write head error”);
return -1;
}
if((write_field=malloc(dbfHead.head_length-32))==NULL){
printf(“malloc write head error”);
return -1;
}
if(flag==1 || flag==2){
memcpy(write_head,&dbfHead,32);
//memcpy(write_field,&dbfHead+0x2,dbfHead.head_length-32);
if(flag==1)(* write_head).record_num=sort_area_rows;
if(flag==2)(* write_head).record_num=rec_num;
fwrite(write_head,32,1,tmpF);
fwrite(allf.pttr,dbfHead.head_length-32,1,tmpF);
}
if(flag!=2)
fwrite(buf_sort,dbfHead.record_length,sort_area_rows,tmpF);
free(write_head);
free(write_field);
return 0;
}
/*
功能:从指定文件里面读取一个小于等于SORT_AREA_ROWS记录数的段
到内存段当中。返回实际读取字节数。
参数:infiles是文件结构数组指针
k是结构数组中那个文件
seg是段句柄
返回0为已经到了文件末尾
*/
unsigned int read_data(SORT_TEMP_FILE * infiles,int k,SEG_MEM * seg){
//判断是否已经到了文件结尾
unsigned int redin=0;
if(infiles[k].curr_read_row==infiles[k].dff.record_num) return 0;
redin=fread(seg[k].buf,dbfHead.record_length,SORT_AREA_ROWS,infiles[k].f);
infiles[k].curr_read_row+=redin;
seg[k].inrows=0; //初始化读取记录数
seg[k].allrows=redin;
return redin;
}
/*
功能:将一组记录写入一个DBF格式的文件,组大小为
ifdef UNIX
printf(” -pxxx :xxx is number of parallel processes /n/must less then 10/n”);
endif
}/*
功能:输入失败树,从seg读取到ExNode
如果seg已经到了最后,则重新读取到seg中。
参数:infiles是文件数组
seg是段数值指针
b是叶节点
k是到那一路输入
*/
void input_ex(pSORT_TEMP_FILE infiles, pSEG_MEM seg ,pExNode b,int k){
char tmp[MAX_CMP_CHAR];
tmp[dbfField.length]=’/0’;
if(seg[k].inrows==seg[k].allrows){
if(read_data(infiles,k,seg)==0){ //文件已经到了末尾
strcpy(b[k].key,MAXKEY);
return ;
}
}
memcpy(tmp,(seg[k].buf+dbfHead.record_length*seg[k].inrows+dbfField.addr),dbfField.length);
strcpy(b[k].key,tmp);
seg[k].inrows++;
}
/*
功能:调整失败树其实只用到ls[1]…ls[k-1],ls[0]不用。
*/
void Adjust(LoserTree *ls,pExNode b,int k,int s)
{
int tmp=0;
int t=0;
char tmpc[MAX_CMP_CHAR]=”“;
strcpy(tmpc,b[s].key);
t=(int)((s+k)/2);
while(t>0)
{
if(strcmp(b[s].key,b[ls[t].element].key)>=0){
tmp=s; //值互换,s指示新的胜利者
s=ls[t].element;
ls[t].element=tmp;
}
t=(int)(t/2);
}
ls[0].element=s; //最终s变成最小值数组索引,保存在ls[0]中
}
/*
功能:建立败者树已知b[0]到b[k-1]为完全二叉树
ls的叶子结点存有k个关键码,沿从叶子到
根的k条路径。
*/
void CreateLoserTree(LoserTree *ls,pExNode b,int k){
int i=0,j=0,h=0;
char tmp[MAX_CMP_CHAR]=”“;
ExNode c[MERGE_K_MAX];
for(i=0;i
ifdef UNIX
int nowpa=0; //当前并行度
pid_t pid; //进程号
int process=0;
endif
int pa=0;int k=0,v=0,l=0;
char tmppix1[10]=”“;
char tmppix2[10]=”“;
int mergek=0;
unsigned long rec_num=0;
if(infiles==NULL){
printf(“infile or outfile in marge is null”);
return -1;
}
//最终已经生成了文件了
if(infiles[0].max==1){
strcpy(outfiles,infiles[0].file);
return 0;
}
if(infiles[0].max%merge_k==0){
part=(int)(infiles[0].max/merge_k);
}else{
part=(int)(infiles[0].max/merge_k)+1;
}
for(pa=0;pa
ifdef UNIX
now_process++;process++;
if(process>=MAX_PROCESS){
printf(“open too many process than %d and exit /n”,MAX_PROCESS);
exit(1);
}
if(pid=fork()<0){
printf(“fork process error”);
if(pa!=0){pa–;now_process–;} //不成功重新开进程。
continue;
}else if(pid==0){ //父进程
nowpa++;
// parallel_all++;
if(parallel_num==nowpa){
for(v=0;v
endif
//并行系统中是子进程。 itoa(golfileId,tmppix1,10);
strcpy(newfiles[pa].file,argsg);
strcat(newfiles[pa].file,tmppix1);
itoa(pa,tmppix2,10);
strcat(newfiles[pa].file,tmppix2);
strcat(newfiles[pa].file+strlen(argsg)+strlen(tmppix1)+strlen(tmppix2),”.dbf”);
if((newfiles[pa].f=openAFile(newfiles[pa].file,”w+b”))==NULL){
printf(“open file %s failed”,newfiles[k].file);
return -1;
}
newfiles[pa].fileid=pa;
newfiles[pa].max=part;
if((outbuf.buf=malloc(OUT_BUFF*dbfHead.record_length))==NULL){
printf(“malloc in merge sort error”);
return -1;
}
outbuf.allrows=OUT_BUFF;
outbuf.inrows=0;
//这里调整merge_k,前面的mergek传入是建议值。
if((infiles[0].max-(pa+1)*merge_k)>=0) {mergek=merge_k;}
else mergek=infiles[0].max-pa*merge_k;
if(mergek==1){
free(outbuf.buf);
fclose(newfiles[pa].f);
remove(newfiles[pa].file);
rename(infiles[pa*merge_k].file,newfiles[pa].file);
continue;
}//只有一个文件不用合并
//分配段内存,读入文件
for(k=0;k
ifdef HP_UX
revert_unsigned_short(&(infiles[k].dff.head_length)); /* 用于定位到第一条记录*/ revert_unsigned_long(&(infiles[k].dff.record_num)); /* 总的记录数*/ revert_unsigned_short(&(infiles[k].dff.record_length)); /* 每条记录长度*/
endif
ifdef PRINT_MSG
printf("head_length:%d/n",infiles[k].dff.head_length); printf("record_num:%ld/n",infiles[k].dff.record_num); printf("record_length:%d/n",infiles[k].dff.record_length);
endif
fseek(infiles[k].f,dbfHead.head_length,SEEK_SET);//移动到第一条记录
read_data(infiles,k,seg); //读取记录到内存段
rec_num+=infiles[k].dff.record_num;
}
//写文件头。
writeAFile(NULL,0,newfiles[pa].f,allf,2,rec_num);
rec_num=0;
//k路合并
K_Merge(infiles,newfiles,pa,seg,&outbuf,ls,exc,mergek);
//取消内存,关闭文件或删除
for(k=0;k
ifdef UNIX
} //并行系统结束endif
}//for//递归调用本函数
golfileId++;
return merge_sort(newfiles,outfiles,merge_k,allf);
return 0;
}
/*
功能:提取参数的值,返回1失败,返回0成功
*/
int getParaValue(char * * inpa,int start,int end){
char tmp[30],p1[3]=”-M”,p2[3]=”-P”,* retp;
int len=end-start+1,i=0;
for (i=0;i
ifdef UNIX
else if((retp=strstr(inpa[start+i],p2))!=NULL){strcpy(tmp,retp+2);
parallel_num=atoi(tmp);
if(parallel_num>PARALLEL_MAX){
printf(“parallel number you give is too large! /n”);
exit(1);
}
}
endif
elsereturn 1;
}
return 0;
}
/*
功能:验证dbf文件长度是否正确
此函数因为有很多文件,所以需要多次
调用。
参数:File 文件路径名
返回值:0 正确,-1 错误
*/
int verifyDbfFile(FILE * afile)
{
long fileSize = 0;
long calSize = 0;
DBF_FILE_HEAD dbfHead2;
fseek(afile,0,SEEK_SET);
fread(&dbfHead2,sizeof(DBF_FILE_HEAD),1,afile);
fseek(afile,0,SEEK_END);
fileSize =(long) ftell(afile);
fseek(afile,0,SEEK_SET);
ifdef HP_UX
revert_unsigned_short(&dbfHead2.head_length);revert_unsigned_short(&dbfHead2.record_length);
revert_unsigned_long(&dbfHead2.record_num);
endif
calSize = (long)dbfHead2.head_length + dbfHead2.record_length*dbfHead2.record_num;if(fileSize < calSize)
{
ifdef PRINT_MSG
printf(“dbfHead2.head_length:%d/n”,dbfHead2.head_length);printf(“dbfHead2.record_num:%ld/n”,dbfHead2.record_num);
printf(“dbfHead2.record_length:%d/n”,dbfHead2.record_length);
printf(“/tbut actual length is %ld /n”,fileSize);
endif
return -1;}
return 0;
}
/*
功能:字符和整数数值比较
aa 要比较的字符串
bb 要比较的字符串
aalen aa字符串长度
bblen bb字符串长度
type 类型”C”,”N”
dec 是小数位数
当aa
ifdef UNIX
int i;for(int i=1;i
else
for(j=1;jendif
merge_k=j-1;ifdef UNIX
max_mem=parallel_num*SORT_AREA_ROWS*dbfHead.record_length*merge_k;else
max_mem=SORT_AREA_ROWS*dbfHead.record_length*merge_k;
endif
}/*
功能:主函数入口
*/
void main(int argc,char * * args){
int i=0;
int k=0,file_nums=0;
int tmp_files=0; //tmp_files为快速排序段数
char tmpFile[100]=”“; //排序临时文件名字
char tmppix[10]=”“;
FILE * tmpF=NULL; //生成中间文件
char * buf_sort=NULL ; //一个内存排序段
unsigned int sort_area_rows=0; //实际读入的文件长度
DBF_FILE_ALL_FIELD allf; //所有字段数据,写文件用到。
ifdef UNIX
pid_t pid; //进程号int shmid1=0; //unix系统中共享内存的标示
int parallel_now=0; //现在的子进程数,不包括父进程。
int parallel_all=0; //所有的子进程,全部子进程
parallel_num=DEFAULT_PAR;
now_process=0;
int process=0;
endif
FILE * src_file=NULL;FILE * tar_file=NULL;
char tar_file_name[200]=”“;
char buffer1[BUFFERSIZE];
//char buffer2[BUFFERSIZE];
golfileId=0;
//获取参数 strcpy(argsg,args[1]); if(argc<4 || argc>6){
printf(“ERROR parameters too few or too much!/n”);
useage();
exit(1);
}
if(argc>4){
if(getParaValue(args,4,argc-1)==1){
printf(“get parameters vales error/n”);
useage();
exit(1);
}
}
if(strcmp(args[1],args[2])==0){
printf(“name of source file can not same with target file name/n”);
exit(1);
}
//打开原文件
if((src_file=openAFile(args[1],”rb”))==NULL){
printf(“can not open source file!”);
exit(1);
}
//打开目标文件
//if((tar_file=openAFile(args[2],”w+b”))==NULL){
// printf(“can not open target file!”);
// exit(1);
//}
/* 设置文件缓冲区*/
if( setvbuf(src_file, buffer1, _IOFBF, BUFFERSIZE) != 0){
printf(“Incorrect type or size of buffer for fileIn/n”);
exit(1);
}
//if( setvbuf(tar_file, buffer2, _IOFBF, BUFFERSIZE) != 0){
// printf(“Incorrect type or size of buffer for fileOut/n”);
// exit(1);
//}
/* 读dbf头文件信息 */
fseek(src_file,0,SEEK_SET);
fread(&dbfHead,sizeof(DBF_FILE_HEAD),1,src_file);
ifdef HP_UX
revert_unsigned_short(&dbfHead.head_length); /* 用于定位到第一条记录*/revert_unsigned_long(&dbfHead.record_num); /* 总的记录数*/
revert_unsigned_short(&dbfHead.record_length); /* 每条记录长度*/
endif
ifdef PRINT_MSG
printf(“dbfHead.head_length:%d/n”,dbfHead.head_length);printf(“dbfHead.record_num:%ld/n”,dbfHead.record_num);
printf(“dbfHead.record_length:%d/n”,dbfHead.record_length);
endif
//验证原文件的正确性
if(verifyDbfFile(src_file)==-1){
printf(“source file is destoryed”);
exit(1);
}
//验证排序字段是否出现在原文件DBF头中
fseek(src_file,(int)0x20,SEEK_SET); //将指针掉向字段记录开始
for(i=0;i
ifdef HP_UX
revert_unsigned_short(&dbfField.length);endif
//读取全部头信息。if((allf.pttr=malloc(dbfHead.head_length-32))==NULL){
printf(“malloc memary to allf error !”);
exit(1);
}
fseek(src_file,(int)0x20,SEEK_SET); //将指针掉向字段记录开始
if((allf.len=fread(allf.pttr,dbfHead.head_length-32,1,src_file))!=1){
printf(“read the column message error /n”);
exit(1);
}
//如果记录长度超过RECORD_MAX则推出
if(dbfField.length>(RECORD_MAX-1)){
printf(“the sort coloum is %s and length is %d /n”,dbfField.name,dbfField.length);
printf(“%s,%d /n”,”sort column length must less then “,RECORD_MAX-1);
exit(1);
}
//如果比较字符串,长度应该小于最长长度MAX_CMP_CHAR
if(dbfField.type=’C’ && dbfField.length>MAX_CMP_CHAR-1){
printf(“%s,%d /n”,”sort column length must less then “,MAX_CMP_CHAR-1);
exit(1);
}
/*检验参数正确性
file parameters can not adjust:
must: dbfHead.record_num*dbfHead.record_length<=SORT_AREA_ROWS*SORT_FILE_MAX*dbfHead.record_length
memary parameter can adjust MERAGE_K_MAX:
must: max_mem>=min(parallel_num)*(SORT_AREA_ROWS*dbfHead.record_length*min(MERGE_K_MAX=2))
final:max_mem=adjust1(parallel_num*(SORT_AREA_ROWS*dbfHead.record_length*adjust2(MERGE_K_MAX))
*/
if( (double)dbfHead.record_num*dbfHead.record_length>(double)SORT_AREA_ROWS*SORT_FILE_MAX*dbfHead.record_length ){
printf(“The source file is too large!it must less then %f /n”, /
(double)SORT_AREA_ROWS*SORT_FILE_MAX*dbfHead.record_length);
exit(1);
}
if( max_mem!=0 && max_mem<1*(SORT_AREA_ROWS*dbfHead.record_length*2 )){
printf(“The max memary is too small “);
exit(1);
ifdef UNIX
}else if(max_mem!=0 && max_memelse
}else if(max_mem!=0 && max_mem<SORT_AREA_ROWS*dbfHead.record_length*MERGE_K_MAX){
endif
adjust_par();//调整参数。}else{
ifdef UNIX
max_mem=parallel_num*(SORT_AREA_ROWS*dbfHead.record_length*MERGE_K_MAX);else
max_mem=SORT_AREA_ROWS*dbfHead.record_length*MERGE_K_MAX;
endif
merge_k=MERGE_K_MAX;}
if((dbfHead.record_num%SORT_AREA_ROWS)!=0)
file_nums=(int)(dbfHead.record_num/SORT_AREA_ROWS)+1;
else
file_nums=(int)(dbfHead.record_num/SORT_AREA_ROWS);
//分配排序临时文件空间,在Unix中为共享的内存。
ifdef UNIX
if((shmid1 = shmget(IPC_PRIVATE, sizeof(SORT_TEMP_FILE)*SORT_FILE_MAX, (SHM_R|SHM_W)))<0){
printf(“shmget error”);
exit (1);
}
if((sort_file = (SORT_TEMP_FILE )shmat(shmid1,0,0)) == (void )-1){
printf(“shmat error”);
exit(1);
}
//快速排序,在Unix系统中使用多进程
for(k=0;k
ifdef PRINT_MSG
printf("tmp_files=%d /n tmppix=%s /n",tmp_files,tmppix);
endif
itoa(tmp_files,tmppix,10);
strcpy(tmpFile,args[1]);
strcat(tmpFile,tmppix);
strcat(tmpFile+strlen(args[1])+strlen(tmppix),”.dbf”);
if((buf_sort=malloc(SORT_AREA_ROWS*dbfHead.record_length))==NULL){
printf(“malloc memary in child process error and exit”);
exit(1);
}
fseek(src_file,tmp_files*SORT_AREA_ROWS*dbfHead.record_length+dbfHead.head_length,SEEK_SET);
sort_area_rows=fread(buf_sort,dbfHead.record_length,SORT_AREA_ROWS,src_file);
if((tmpF=openAFile(tmpFile,”w+b”))==NULL){
printf(“open files %s failed”,tmpFile);
exit(1);
}
quickSort(buf_sort,0,sort_area_rows-1); if(writeAFile(buf_sort,sort_area_rows,tmpF,allf,1,0)==-1){ printf("write a dbf file %s failed /n ",tmpFile); fclose(tmpF); exit(1);
}
fclose(tmpF);
free(buf_sort);
sort_file[tmp_files].fileid=tmp_files;
if(dbfHead.record_num%SORT_AREA_ROWS!=0)
sort_file[tmp_files].max=((int)(dbfHead.record_num/SORT_AREA_ROWS)+1);
else sort_file[tmp_files].max=(int)(dbfHead.record_num/SORT_AREA_ROWS);
strcpy(sort_file[tmp_files].file,tmpFile);
sort_file[tmp_files].used=0;
}
//成功退出
exit(0);
}
}
else
if((sort_file = (SORT_TEMP_FILE *)(malloc(sizeof(SORT_TEMP_FILE)*SORT_FILE_MAX)))==NULL){printf(“Assign memary for sort temp file message failed”);
exit (1);
}
//快速排序
for(tmp_files=0;tmp_files
endif
//合并排序if(merge_sort(sort_file,tar_file_name,merge_k,allf)==-1){
printf(“merge sort error ans exit”);
exit(1);
}
//改名
rename(tar_file_name,args[2]);
if((tar_file=openAFile(args[2],"rb"))==NULL){
printf(“can not open target file!”);
exit(1);
}
//判断文件完整性
if(verifyDbfFile(tar_file)==-1){
printf(“target file is destoryed”);
exit(1);
}
//打印成功标致,退出
printf(“file out sort succeeful”);
fclose(src_file);
fclose(tar_file);
free(sort_file);
exit(0);
}
相关文章推荐
- LeetCode_Array_Easy
- Win7安装.zip(绿色版)MySQL
- 项目管理-整体项目管理
- [Leetcode] Unique Binary Search Trees
- 史上最牛逼的创业团队之一
- 众微处理器云起ARM的Cortex-M核心是否为王?
- linux中的readlink命令
- 【Python之旅】第四篇(三):Python面向对象编程详解
- 【Python之旅】第四篇(三):Python面向对象编程详解
- 微信商城中使用微信支付接口获取用户地址
- 广度优先算法解决无向无权图的最短路径问题
- Go-简洁的并发
- easyui实现datagrid行内编辑
- Codeforces 557E Ann and Half-Palindrome (字典树+字符串排序)
- 海思HI3518E和HI3516C方案的源代码便宜转让
- linux下ls的用法小结
- 线性代数(2)矩阵可以理解成空间坐标
- 常用算法:(一)分治算法
- 职业化意识
- struts2+easyui实现根据条件检索信息