您的位置:首页 > 其它

MJPG-Streamer源码分析(一)

2017-02-22 18:57 453 查看
--------------------------------------------------------------------------------------------------

另一片篇推荐的博文:http://www.armbbs.net/forum.php?mod=viewthread&tid=17434

基础知识:

 条件变量:

  条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:

  一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。

  为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。

  

  当程序进入pthread_cond_wait等待后,将会把g_mutex进行解锁,

  当离开pthread_cond_wait之前,g_mutex会重新加锁。所以在main中的g_mutex会被加锁。

  

 动态链接库的操作函数:

  #include <dlfcn.h>

  void *dlopen(const char *filename, int flag); /* 打开动态链接库,返回动态库句柄handle */

  char *dlerror(void);    /* 返回由于dlopen(),dlsym()或者dlclose()产生的错误 */

  void *dlsym(void *handle, const char *symbol); /* 通过handle,获得动态库内函数的地址,之后通过该地址调用动态库内的函数 */

  int dlclose(void *handle);   /* 关闭动态库 */

  Link with -ldl.     /* 注意,程序在编译的时候要用-ldl */

  

 字符串操作函数:

  #include <string.h>

  char *strchr(const char *s, int c);  /* 返回字符串s第一次出现c的指针 */

  char *strrchr(const char *s, int c);  /* 返回字符串s最后一次出现c的指针 */

  char *strdup(const char *s);   /* 复制字符串s,返回指向新字符串的指针(malloc\free) */

  char *strndup(const char *s, size_t n);  /* 复制字符串s最多n个字符,如果s正好有n个,'\0'将自动被添加 */

  char *strdupa(const char *s);   /* 调用alloca函数在站内分配内存 */

  char *strndupa(const char *s, size_t n); /* 调用alloca函数在站内分配内存 */

  

 守护进程:

  守护进程最重要的特性是后台运行;其次,守护进程必须与其运行前的环境隔离开来。

  这些环境包括未关闭的文件描述符,控制终端,会话和进程组,工作目录以及文件创建掩模等。

  这些环境通常是守护进程从执行它的父进程(特别是shell)中继承下来的;

  最后,守护进程的启动方式有其特殊之处------它可以在 Linux系统启动时从启动脚本/etc/rc.d中启动,

  可以由作业规划进程crond启动,还可以由用户终端(通常是shell)执行。

  总之,除开这些特殊性以外,守护进程与普通进程基本上没有什么区别,

  因此,编写守护进程实际上是把一个普通进程按照上述的守护进程的特性改造成为守护进程。

  

 守护进程的编程要点

  1. 在后台运行。

  为避免挂起控制终端将Daemon放入后台执行。方法是在进程中调用fork使

  父进程终止, 让Daemon在子进程中后台执行。

  if(pid=fork()) exit(0); //是父进程,结束父进程,子进程继续

  2. 脱离控制终端,登录会话和进程组

  有必要先介绍一下Linux中的进程与控制终端,登录会话和进程组之间的关系:

  进程属于 一个进程组,进程组号(GID)就是进程组长的进程号(PID)。

  登录会话可以包含多个进程组。这些进程组共享一个控制终端。这个控制终端通常是创建进程的登录终端。 

  控制终端,登录会话和进程组通常是从父进程继承下来的。我们的目的就是要摆脱它们 ,使之不受它们的影响。

  方法是在第1点的基础上,调用setsid()使进程成为会话组长:

  setsid();

  说明:当进程是会话组长时setsid()调用失败。但第一点已经保证进程不是会话组长。

  setsid()调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离。

  由于会话过程对控制终端的独占性,进程同时与控制终端脱离。

  3. 禁止进程重新打开控制终端

  现在,进程已经成为无终端的会话组长。但它可以重新申请打开一个控制终端。

  可以通过使进程不再成为会话组长来禁止进程重新打开控制终端:

  if(pid=fork()) exit(0); //结束第一子进程,第二子进程继续(第二子进程不再是会话组长)

  4. 关闭打开的文件描述符

  进程从创建它的父进程那里继承了打开的文件描述符。

  如不关闭,将会浪费系统资源, 造成进程所在的文件系统无法卸下以及引起无法预料的错误。

  按如下方法关闭它们:

  for(i=0;i 关闭打开的文件描述符close(i);

  5. 改变当前工作目录

  进程活动时,其工作目录所在的文件系统不能卸下。一般需要将工作目录

  改变到根目录 。对于需要转储核心,写运行日志的进程将工作目录改变到特定目录如/tmp

  chdir("/")

  6. 重设文件创建掩模

  进程从创建它的父进程那里继承了文件创建掩模。它可能修改守护进程所创建的文件的存取位。

  为防止这一点,将文件创建掩模清除:

  umask(0);

  7. 处理SIGCHLD信号

  处理SIGCHLD信号并不是必须的。但对于某些进程,特别是服务器进程往往在请求到来时生成子进程处理请求。

  如果父进程不等待子进程结束,子进程将成为僵尸进程(zombie )从而占用系统资源。

  如果父进程等待子进程结束,将增加父进程的负担,影响服务器 进程的并发性能。

  在Linux下可以简单地将SIGCHLD信号的操作设为SIG_IGN。

  signal(SIGCHLD,SIG_IGN);

  这样,内核在子进程结束时不会产生僵尸进程。

  这一点与BSD4不同,BSD4下必须显式等待子进程结束才能释放僵尸进程.

  

   关于/dev/null及用途

    把/dev/null看作"黑洞". 它非常等价于一个只写文件. 

    所有写入它的内容都会永远丢失. 而尝试从它那儿读取内容则什么也读不到. 

    然而, /dev/null对命令行和脚本都非常的有用.

    禁止标准输出.

  1 cat $filename >/dev/null

     2 # 文件内容丢失,而不会输出到标准输出.

  禁止标准错误

  1 rm $badname 2>/dev/null

     2 # 这样错误信息[标准错误]就被丢到太平洋去了.

  禁止标准输出和标准错误的输出.

  1 cat $filename 2>/dev/null >/dev/null

     2 # 如果"$filename"不存在,将不会有任何错误信息提示.

     3 # 如果"$filename"存在, 文件的内容不会打印到标准输出.

     4 # 因此Therefore, 上面的代码根本不会输出任何信息.

     5 # 当只想测试命令的退出码而不想有任何输出时非常有用。

     6 #-----------测试命令的退出 begin ----------------------#

     7 # ls dddd 2>/dev/null 8 

     8 # echo $?    //输出命令退出代码:0为命令正常执行,1-255为有出错。  

     9 #-----------测试命令的退出 end-----------#  

     10# cat $filename &>/dev/null 

     11 #也可以, 由 Baris Cicek 指出.

  清除日志文件内容

  1 cat /dev/null > /var/log/messages

     2 #  : > /var/log/messages   有同样的效果, 但不会产生新的进程.(因为:是内建的)

     3 

     4 cat /dev/null > /var/log/wtmp

  例子 28-1. 隐藏cookie而不再使用

  1 if [ -f ~/.netscape/cookies ]  # 如果存在则删除.

     2 then

     3   rm -f ~/.netscape/cookies

     4 fi

     5 

     6 ln -s /dev/null ~/.netscape/cookies

     7 # 现在所有的cookies都会丢入黑洞而不会保存在磁盘上了.

    

--------------------------------------------------------------------------------------------------

首先,分析该软件的结构体:

--------------------------------------------------------------------------------------------------

globals结构体:

--------------------------------------------------------------------------------------------------

 typedef struct _globals globals; /* mjpg-streamer只支持一个输入插件,多个输出插件 */

 struct _globals {

   int stop;    /* 一个全局标志位 */

   pthread_mutex_t db;   /* 互斥锁,数据锁 */

   pthread_cond_t  db_update;  /* 条件变量,数据更新的标志 */

   unsigned char *buf;   /* 全局JPG帧的缓冲区的指针 */

   int size;    /* 缓冲区的大小 */

   input in;    /* 输入插件,一个输入插件可对应多个输出插件 */

   output out[MAX_OUTPUT_PLUGINS];  /* 输出插件,以数组形式表示 */

   int outcnt;    /* 输出插件的数目 */

 };

--------------------------------------------------------------------------------------------------

input结构体:

 /* structure to store variables/functions for input plugin */

 typedef struct _input input;

 struct _input {

   char *plugin;    /* 动态链接库的名字,或者是动态链接库的地址 */

   void *handle;    /* 动态链接库的句柄,通过该句柄可以调用动态库中的函数 */

   input_parameter param;  /* 插件的参数 */

   int (*init)(input_parameter *); /* 四个函数指针 */

   int (*stop)(void);

   int (*run)(void);

   int (*cmd)(in_cmd_type, int);  /* 处理命令的函数 */

 };

 

/* parameters for input plugin */

 typedef struct _input_parameter input_parameter;

 struct _input_parameter {

   char *parameter_string;

   struct _globals *global;

 };

-------------------------------------------------------------------------------------------------- 

output结构体:

 /* structure to store variables/functions for output plugin */

 typedef struct _output output;

 struct _output {

   char *plugin;    /* 插件的名字 */

   void *handle;    /* 动态链接库的句柄,通过该句柄可以调用动态库中的函数 */

   output_parameter param;  /* 插件的参数 */

   int (*init)(output_parameter *); /* 四个函数指针 */

   int (*stop)(int);

   int (*run)(int);

   int (*cmd)(int, out_cmd_type, int); /* 处理命令的函数 */

 };

/* parameters for output plugin */

 typedef struct _output_parameter output_parameter;

 struct _output_parameter {

   int id;    /* 用于标记是哪一个输出插件的参数 */

   char *parameter_string;

   struct _globals *global;

 };

--------------------------------------------------------------------------------------------------

现在开始分析main()函数:

--------------------------------------------------------------------------------------------------

默认情况下,程序会将video0作为输入,http的8080端口作为输出  (fps = frames per second)

 char *input  = "input_uvc.so --resolution 640x480 --fps 5 --device /dev/video0";

 char *output[MAX_OUTPUT_PLUGINS];  /* 一个输入最大可以对应10个输出 */

 output[0] = "output_http.so --port 8080"; /* 将video0作为输入,http的8080端口作为输出 */

-------------------------------------------------------------------------------------------------- 

下面是一个while()循环,来解析main()函数后面所带的参数

 /* parameter parsing */

 while(1) {

 int option_index = 0, c=0;

 static struct option long_options[] = \ /* 长选项表,进行长选项的比对 */

 {

  {"h", no_argument, 0, 0},  /* 第一个参数为选项名,前面没有短横线。譬如"help"、"verbose"之类 */

  {"help", no_argument, 0, 0},  /* 第二个参数描述了选项是否有选项参数 |no_argument 0 选项没有参数|required_argument 1 选项需要参数|optional_argument 2 选项参数可选|*/

  {"i", required_argument, 0, 0},  /* 第三个参数指明长选项如何返回,如果flag为NULL,则getopt_long返回val。

  {"input", required_argument, 0, 0},  * 否则返回0,flag指向一个值为val的变量。如果该长选项没有发现,flag保持不变.

  {"o", required_argument, 0, 0},   */

  {"output", required_argument, 0, 0}, /* 第四个参数是发现了长选项时的返回值,或者flag不是NULL时载入*flag中的值 */

  {"v", no_argument, 0, 0},

  {"version", no_argument, 0, 0},

  {"b", no_argument, 0, 0},  /* 每个长选项在长选项表中都有一个单独条目,该条目里需要填入正确的数值。数组中最后的元素的值应该全是0。

  {"background", no_argument, 0, 0},  *数组不需要排序,getopt_long()会进行线性搜索。但是,根据长名字来排序会使程序员读起来更容易. 

  {0, 0, 0, 0}     */     

 };

--------------------------------------------------------------------------------------------------

 c = getopt_long_only(argc, argv, "", long_options, &option_index);

--------------------------------------------------------------------------------------------------

下面重点分析一下getopt_long_only函数:

 int getopt_long_only(int argc, char * const argv[],const char *optstring,const struct option *longopts, int *longindex);

 该函数每解析完一个选项,就返回该选项字符。

 如果选项带参数,参数保存在optarg中。如果选项带可选参数,而实际无参数时,optarg为NULL。

 当遇到一个不在optstring指明的选项时,返回字符'?'。如果在optstring指明某选项带参数而实际没有参数时,返回字符'?'或者字符':',视optstring的第一个字符而定。这两种情况选项的实际值被保存在optopt中。

 当解析错误时,如果opterr为1则自动打印一条错误消息(默认),否则不打印。

 当解析完成时,返回-1。

 每当解析完一个argv,optind就会递增。如果遇到无选项参数,getopt默认会把该参数调后一位,接着解析下一个参数。如果解析完成后还有无选项的参数,则optind指示的是第一个无选项参数在argv中的索引。

 最后一个参数longindex在函数返回时指向被搜索到的选项在longopts数组中的下标。longindex可以为NULL,表明不需要返回这个值

-------------------------------------------------------------------------------------------------- 

 /* no more options to parse */

 if (c == -1) break;

 /* unrecognized option */

 if(c=='?'){ help(argv[0]); return 0; }

 switch (option_index) {

 /* h, help */

 case 0:

 case 1:

 help(argv[0]);

 return 0;

 break;

 /* i, input */

 case 2:

 case 3:

 input = strdup(optarg);

 break;

 /* o, output */

 case 4:

 case 5:

 output[global.outcnt++] = strdup(optarg);

 break;

 /* v, version */

 case 6:

 case 7:

 printf("MJPG Streamer Version: %s\n" \

 "Compilation Date.....: %s\n" \

 "Compilation Time.....: %s\n", SOURCE_VERSION, __DATE__, __TIME__);

 return 0;

 break;

 /* b, background */

 case 8:

 case 9:

 daemon=1;

 break;

 default:

 help(argv[0]);

 return 0;

 }

 }

--------------------------------------------------------------------------------------------------

好,现在分析一下该程序是否需要成为守护进程:

  /* fork to the background */

  if ( daemon ) {     /* 当命令后面设置了b命令时,daemon就会被置为1 */

   LOG("enabling daemon mode");

   daemon_mode(); 

  }

 现在看一看daemon_mode()时如何创建守护进程的

   void daemon_mode(void) {

     int fr=0;

     fr = fork();

     if( fr < 0 ) {   /* fork失败  */

       fprintf(stderr, "fork() failed\n");

       exit(1);

     }

     if ( fr > 0 ) {   /* 结束父进程,子进程继续  */

       exit(0);

     }

     if( setsid() < 0 ) {   /* 创建新的会话组,子进程成为组长,并与控制终端分离 */

       fprintf(stderr, "setsid() failed\n");

       exit(1);

     }

     fr = fork();    /* 防止子进程(组长)获取控制终端 */ 

     if( fr < 0 ) {   /* fork错误,退出 */

       fprintf(stderr, "fork() failed\n");

       exit(1);

     }

     if ( fr > 0 ) {   /* 父进程,退出 */

       fprintf(stderr, "forked to background (%d)\n", fr);

       exit(0);

     }     /* 第二子进程继续执行 , 第二子进程不再是会会话组组长*/

     umask(0);    /* 重设文件创建掩码 */

     chdir("/");    /* 切换工作目录 */ 

     close(0);    /* 关闭打开的文件描述符*/

     close(1);

     close(2);

     open("/dev/null", O_RDWR);  /* 将0,1,2重定向到/dev/null */  

     dup(0);

     dup(0);

   }

--------------------------------------------------------------------------------------------------

初始化global全局变量

 global.stop      = 0;

 global.buf       = NULL;

 global.size      = 0;

 global.in.plugin = NULL;

--------------------------------------------------------------------------------------------------

同步全局图像缓冲区:

 pthread_mutex_init(&global.db, NULL);

 pthread_cond_init(&global.db_update, NULL);

--------------------------------------------------------------------------------------------------

忽略SIGPIPE信号(当关闭TCP sockets时,OS会发送该信号)

 signal(SIGPIPE, SIG_IGN);

--------------------------------------------------------------------------------------------------

注册<CTRL>+C信号处理函数,来结束该程序

 signal(SIGINT, signal_handler);

 ------------------------------------------------------------------------------------------

  void signal_handler(int sig)

  {

    int i;

    /* signal "stop" to threads */

    LOG("setting signal to stop\n");

    global.stop = 1;

    usleep(1000*1000);

    /* clean up threads */

    LOG("force cancelation of threads and cleanup ressources\n");

    global.in.stop();

    for(i=0; i<global.outcnt; i++) {

      global.out[i].stop(global.out[i].param.id);

    }

    usleep(1000*1000);

    /* close handles of input plugins */

    dlclose(&global.in.handle);

    for(i=0; i<global.outcnt; i++) {

      dlclose(global.out[i].handle);

    }

    DBG("all plugin handles closed\n");

    pthread_cond_destroy(&global.db_update);

    pthread_mutex_destroy(&global.db);

    LOG("done\n");

    closelog();

    exit(0);

    return;

  }

 ------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------

打开输入插件:

   tmp = (size_t)(strchr(input, ' ')-input);     

   global.in.plugin = (tmp > 0)?strndup(input, tmp):strdup(input); /* 在命令中获得动态库 */

   global.in.handle = dlopen(global.in.plugin, RTLD_LAZY);  /* 打开动态链接库 */

   global.in.init = dlsym(global.in.handle, "input_init");  /* 获得动态库内的input_init()函数 */

   global.in.stop = dlsym(global.in.handle, "input_stop");  /* 获得动态库内的input_stop()函数 */

   global.in.run = dlsym(global.in.handle, "input_run");   /* 获得动态库内的input_run()函数 */

   /* try to find optional command */

   global.in.cmd = dlsym(global.in.handle, "input_cmd");   /* 获得动态库内的input_cmd()函数 */

   global.in.param.parameter_string = strchr(input, ' ');  /* 将命令参数的起始地址赋给para.parameter_string,已经去掉前卖弄的动态库 */

   global.in.param.global = &global;     /* 将global结构体的地址赋给param.global */

   global.in.init(&global.in.param);     /* 传递global.in.param给init,进行初始化 */

   }

--------------------------------------------------------------------------------------------------

打开输出插件:

 for (i=0; i<global.outcnt; i++) {   /* 因为是一个输入对应多个输出,所以输出采用了for循环 */

  tmp = (size_t)(strchr(output[i], ' ')-output[i]);

  global.out[i].plugin = (tmp > 0)?strndup(output[i], tmp):strdup(output[i]);

  global.out[i].handle = dlopen(global.out[i].plugin, RTLD_LAZY);

  global.out[i].init = dlsym(global.out[i].handle, "output_init");

  global.out[i].stop = dlsym(global.out[i].handle, "output_stop");

  global.out[i].run = dlsym(global.out[i].handle, "output_run");

  /* try to find optional command */

  global.out[i].cmd = dlsym(global.out[i].handle, "output_cmd");

  global.out[i].param.parameter_string = strchr(output[i], ' ');

  global.out[i].param.global = &global;

  global.out[i].param.id = i;

  global.out[i].init(&global.out[i].param);

 }

--------------------------------------------------------------------------------------------------

开始运行输入插件的run()函数:

 global.in.run();

--------------------------------------------------------------------------------------------------

开始运行输出插件的run()函数:

 for(i=0; i<global.outcnt; i++) 

  global.out[i].run(global.out[i].param.id);

--------------------------------------------------------------------------------------------------

运行完以上函数,该进程进入休眠状态,等待用户按下<CTRL>+C结束所有的进程:

 pause();

--------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------

整个mjpg-streamer.c流程图



花了一下午加一晚上的时间阅读和理解mjpg-streamer的源码。结合上图对它的理解就是:output插件默认用http协议。server_thread主要做socket的初始化操作,当有一个socket连接上之后,就将这个socket的服务放到一个client_thread中处理。虽说Http是一次应答式,即有一次请求才有一次相应,但是因为mjpg-streamer是一个守护进程,所以会一直解析http请求并发送video stream.

才疏学浅,不知道这样理解对不对,有熟悉mjpg-streamer的大神请赐教!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mjpg-streamer