思科VPP源码分析(node调度框架)
2016-11-05 07:30
597 查看
vpp的功能逻辑被划分为一个个node,node之间通过下一跳传递处理完的数据包集合,从而组成整个业务图谱。本文将分析node调度框架源码。
node分为四种类型
VLIB_NODE_TYPE_INTERNAL:对数据包真正处理的业务node。
VLIB_NODE_TYPE_INPUT:收包逻辑node,比如:dpdk,pcap等。
VLIB_NODE_TYPE_PRE_INPUT:目前只有一个epoll node,对socket相关逻辑提供服务,主要使用在控制业务上。
VLIB_NODE_TYPE_PROCESS:该类型node可以被挂起也可以被恢复,有独立的分配在heap上的运行时栈。类似于在一个线程中实现了多任务调度机制。主要用来修改vpp node内部参数。
vlib_node_main_t
记录各种全局信息,比如各种数据结构集合,数据结构内存池之类。
vlib_node_t
注册node时将业务逻辑,几乎不怎么修改的参数,状态信息保存在这里。
vlib_node_runtime_t
这是调度框架实际频繁使用的结构,从vlib_node_t拷贝了部分信息,以及私有的频繁变动的信息。
vlib_process_t
VLIB_NODE_TYPE_PROCESS类型node专用结构,记录用于模拟task的基础结构:heap上的运行时栈,2种返回时寄存器备份,等。
vlib_frame_t
每个node都有一个对应的vlib_frame_t,用来保存供node使用的数据包集合。这是每个node最终处理数据的内存所在地。
vlib_pending_frame_t
当一个node处理完数据包,则填充该数据结构,并加入到全局链表,调度框架便能在下一次调度时找到需要
接手该数据包的下一个node
vlib_next_frame_t
主要是node内部逻辑使用,定位该node的下一条信息。
vlib_node_t
vlib_node_runtime_t
vlib_process_t
核心调度函数:
vlib_main_loop
有时间再详细分析,该函数背后机制比较复杂,但是跟vpp框架使用者关系不大。
- 基本概念
vlib_node_type_tnode分为四种类型
VLIB_NODE_TYPE_INTERNAL:对数据包真正处理的业务node。
VLIB_NODE_TYPE_INPUT:收包逻辑node,比如:dpdk,pcap等。
VLIB_NODE_TYPE_PRE_INPUT:目前只有一个epoll node,对socket相关逻辑提供服务,主要使用在控制业务上。
VLIB_NODE_TYPE_PROCESS:该类型node可以被挂起也可以被恢复,有独立的分配在heap上的运行时栈。类似于在一个线程中实现了多任务调度机制。主要用来修改vpp node内部参数。
vlib_node_main_t
记录各种全局信息,比如各种数据结构集合,数据结构内存池之类。
vlib_node_t
注册node时将业务逻辑,几乎不怎么修改的参数,状态信息保存在这里。
vlib_node_runtime_t
这是调度框架实际频繁使用的结构,从vlib_node_t拷贝了部分信息,以及私有的频繁变动的信息。
vlib_process_t
VLIB_NODE_TYPE_PROCESS类型node专用结构,记录用于模拟task的基础结构:heap上的运行时栈,2种返回时寄存器备份,等。
vlib_frame_t
每个node都有一个对应的vlib_frame_t,用来保存供node使用的数据包集合。这是每个node最终处理数据的内存所在地。
vlib_pending_frame_t
当一个node处理完数据包,则填充该数据结构,并加入到全局链表,调度框架便能在下一次调度时找到需要
接手该数据包的下一个node
vlib_next_frame_t
主要是node内部逻辑使用,定位该node的下一条信息。
- 关键结构
vlib_node_main_ttypedef struct { /* Public nodes. */ //一块连续内存,头部是vec_header_t,数据部分是node指针数组 vlib_node_t **nodes; /* Node index hashed by node name. */ //node按名字组成hash表 uword *node_by_name; //目前只有VLIB_NODE_MAIN_RUNTIME_STARTED一个状态,暂时没啥用,忽略之。 u32 flags; #define VLIB_NODE_MAIN_RUNTIME_STARTED (1 << 0) /* Nodes segregated by type for cache locality. Does not apply to nodes of type VLIB_NODE_TYPE_INTERNAL. */ /*node有三种类型:VLIB_NODE_TYPE_INTERNAL,VLIB_NODE_TYPE_INPUT, VLIB_NODE_TYPE_PRE_INPUT,按类型分类索引*/ vlib_node_runtime_t *nodes_by_type[VLIB_N_NODE_TYPE]; //以下三个用于类似于网卡napi收包模式,dpdk没用到,忽略之 /* Node runtime indices for input nodes with pending interrupts. */ u32 *pending_interrupt_node_runtime_indices; /* Input nodes are switched from/to interrupt to/from polling mode when average vector length goes above/below polling/interrupt thresholds. */ u32 polling_threshold_vector_length; u32 interrupt_threshold_vector_length; /* Vector of next frames. */ /*假设node1有n1个下一跳,node2有n2个下一跳....共n1 + n2 +....+ni个下一跳连续 保存在next_frames指向的vec中。*/ vlib_next_frame_t *next_frames; /* Vector of internal node's frames waiting to be called. */ //数据包从node输出到下一跳,那么下一跳node即是pending frame,会加入pending_frames指向的vec。主循环会遍历该vec,对每个node来调用处理逻辑 vlib_pending_frame_t *pending_frames; /* Timing wheel for scheduling time-based node dispatch. */ //定时器,信号相关,源码及其恶心,博主暂时放弃阅读 timing_wheel_t timing_wheel; vlib_signal_timed_event_data_t *signal_timed_event_data_pool; /* Opaque data vector added via timing_wheel_advance. */ u32 *data_from_advancing_timing_wheel; /* CPU time of next process to be ready on timing wheel. */ u64 cpu_time_next_process_ready; /* Vector of process nodes. One for each node of type VLIB_NODE_TYPE_PROCESS. */ /*VLIB_NODE_TYPE_PROCESS类型的node,每个node有分配在heap上的运行栈,没错就是通常说的在顶部那种 栈,但是这里分配在了heap上。这里在单线程中模拟了类似多线程的效果,以后会详细分析。processes保存了 所有VLIB_NODE_TYPE_PROCESS类型node的描述结构指针。*/ vlib_process_t **processes; /* Current running process or ~0 if no process running. */ //VLIB_NODE_TYPE_PROCESS类型的node,执行时都会把current_process_index赋值为本node的runtime_index u32 current_process_index; /* Pool of pending process frames. */ //VLIB_NODE_TYPE_PROCESS类型node专用,挂起时在其中保存信息。 vlib_pending_frame_t *suspended_process_frames; /* Vector of event data vectors pending recycle. */ void **recycled_event_data_vectors; /* Current counts of nodes in each state. */ u32 input_node_counts_by_state[VLIB_N_NODE_STATE]; /* Hash of (scalar_size,vector_size) to frame_sizes index. */ //hash表,把node的scalar_size,vector_size值组合成key,查找对应的vlib_frame_size_t结构。 uword *frame_size_hash; /* Per-size frame allocation information. */ //通过hash查找到node对应的vlib_frame_size_t值,从中的内存池来分配vlib_frame_t。 vlib_frame_size_t *frame_sizes; /* Time of last node runtime stats clear. */ f64 time_last_runtime_stats_clear; /* Node registrations added by constructors */ //注册node函数提交的注册信息链接在这里,仅仅初始化时使用 vlib_node_registration_t *node_registrations; } vlib_node_main_t;
vlib_node_t
typedef struct vlib_node_t { /* Vector processing function for this node. */ //业务逻辑 vlib_node_function_t *function; /* Node name. */ u8 *name; /* Node name index in elog string table. */ u32 name_elog_string; /* Total statistics for this node. */ vlib_node_stats_t stats_total; /* Saved values as of last clear (or zero if never cleared). Current values are always stats_total - stats_last_clear. */ vlib_node_stats_t stats_last_clear; /* Type of this node. */ //node类型,之前提到的那四种之一 vlib_node_type_t type; /* Node index. */ u32 index; /* Index of corresponding node runtime. */ //vlib_node_t和vlib_node_runtime_t是一一对应的好基友 u32 runtime_index; /* Runtime data for this node. */ void *runtime_data; /* Node flags. */ u16 flags; /* Processing function keeps frame. Tells node dispatching code not to free frame after dispatch is done. */ #define VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH (1 << 0) /* Node counts as output/drop/punt node for stats purposes. */ #define VLIB_NODE_FLAG_IS_OUTPUT (1 << 1) #define VLIB_NODE_FLAG_IS_DROP (1 << 2) #define VLIB_NODE_FLAG_IS_PUNT (1 << 3) #define VLIB_NODE_FLAG_IS_HANDOFF (1 << 4) /* Set if current node runtime has traced vectors. */ #define VLIB_NODE_FLAG_TRACE (1 << 5) #define VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE (1 << 6) #define VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE (1 << 7) /* State for input nodes. */ u8 state; /* Number of bytes of run time data. */ u8 runtime_data_bytes; /* Number of error codes used by this node. */ u16 n_errors; /* Size of scalar and vector arguments in bytes. */ //这两个成员组合成key,在vlib_node_main_t->frame_size_hash中查找,确定本node相关的frame的内存池 u16 scalar_size, vector_size; /* Handle/index in error heap for this node. */ u32 error_heap_handle; u32 error_heap_index; /* Error strings indexed by error code for this node. */ char **error_strings; /* Vector of next node names. Only used before next_nodes array is initialized. */ //初始化时用用 char **next_node_names; /* Next node indices for this node. */ //根据next_node_names来生成next_nodes,vec结构,记录了每个可选下一条的index。 u32 *next_nodes; /* Name of node that we are sibling of. */ char *sibling_of; /* Bitmap of all of this node's siblings. */ uword *sibling_bitmap; /* Total number of vectors sent to each next node. */ //统计发给下一条node的数据包总数 u64 *n_vectors_by_next_node; /* Hash table mapping next node index into slot in next_nodes vector. Quickly determines whether this node is connected to given next node and, if so, with which slot. */ uword *next_slot_by_node; /* Bitmap of node indices which feed this node. */ uword *prev_node_bitmap; /* Node/next-index which own enqueue rights with to this node. */ u32 owner_node_index, owner_next_index; /* Buffer format/unformat for this node. */ format_function_t *format_buffer; unformat_function_t *unformat_buffer; /* Trace buffer format/unformat for this node. */ format_function_t *format_trace; /* Function to validate incoming frames. */ u8 *(*validate_frame) (struct vlib_main_t * vm, struct vlib_node_runtime_t *, struct vlib_frame_t * f); /* for pretty-printing, not typically valid */ u8 *state_string; } vlib_node_t;
vlib_node_runtime_t
typedef struct vlib_node_runtime_t { /* Node function to call. */ vlib_node_function_t *function; /* Vector of errors for this node. */ vlib_error_t *errors; /* Number of clock cycles. */ u32 clocks_since_last_overflow; /* Maximum clock cycle for an invocation. */ u32 max_clock; /* Number of vectors in the recorded max_clock. */ u32 max_clock_n; /* Number of calls. */ u32 calls_since_last_overflow; /* Number of vector elements processed by this node. */ u32 vectors_since_last_overflow; /* Start of next frames for this node. */ //本node的多个下一条中,第一个下一条在vlib_node_main_t->next_frames中的索引 u32 next_frame_index; /* Node index. */ //vlib_node_t和vlib_node_runtime_t是一一对应的好基友 u32 node_index; /* For input nodes: decremented on each main loop interation until it reaches zero and function is called. Allows some input nodes to be called more than others. */ u32 input_main_loops_per_call; /* Saved main loop counter of last dispatch of this node. */ u32 main_loop_count_last_dispatch; u32 main_loop_vector_stats[2]; /* Copy of main node flags. */ u16 flags; /* Input node state. */ u16 state; u16 n_next_nodes; /* Next frame index that vector arguments were last enqueued to last time this node ran. Set to zero before first run of this node. */ u16 cached_next_index; /* CPU this node runs on */ u16 cpu_index; /* Function dependent node-runtime. */ uword runtime_data[(128 - 1 * sizeof (vlib_node_function_t *) - 1 * sizeof (vlib_error_t *) - 11 * sizeof (u32) - 5 * sizeof (u16)) / sizeof (uword)]; }
vlib_process_t
typedef struct { /* Node runtime for this process. */ //对应的PROSESS类型node索引号 vlib_node_runtime_t node_runtime; /* Where to longjmp when process is done. */ //如下return_longjmp,resume_longjmp用于保存当前寄存器内容,可以参考C库的setjump,longjump原理 clib_longjmp_t return_longjmp; #define VLIB_PROCESS_RETURN_LONGJMP_RETURN ((uword) ~0 - 0) #define VLIB_PROCESS_RETURN_LONGJMP_SUSPEND ((uword) ~0 - 1) /* Where to longjmp to resume node after suspend. */ clib_longjmp_t resume_longjmp; #define VLIB_PROCESS_RESUME_LONGJMP_SUSPEND 0 #define VLIB_PROCESS_RESUME_LONGJMP_RESUME 1 u16 flags; #define VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK (1 << 0) #define VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT (1 << 1) /* Set to indicate that this process has been added to resume vector. */ #define VLIB_PROCESS_RESUME_PENDING (1 << 2) /* Process function is currently running. */ #define VLIB_PROCESS_IS_RUNNING (1 << 3) /* Size of process stack. */ u16 log2_n_stack_bytes; u32 suspended_process_frame_index; /* Number of times this process was suspended. */ u32 n_suspends; /* Vectors of pending event data indexed by event type index. */ void **pending_event_data_by_type_index; /* Bitmap of event type-indices with non-empty vectors. */ uword *non_empty_event_type_bitmap; /* Bitmap of event type-indices which are one time events. */ uword *one_time_event_type_bitmap; /* Type is opaque pointer -- typically a pointer to an event handler function. Hash table to map opaque to a type index. */ uword *event_type_index_by_type_opaque; /* Pool of currently valid event types. */ vlib_process_event_type_t *event_type_pool; /* When suspending saves cpu cycle counter when process is to be resumed. */ u64 resume_cpu_time; /* Default output function and its argument for any CLI outputs within the process. */ vlib_cli_output_function_t *output_function; uword output_function_arg; #ifdef CLIB_UNIX /* Pad to a multiple of the page size so we can mprotect process stacks */ #define PAGE_SIZE_MULTIPLE 0x1000 #define ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT __attribute__ ((aligned (PAGE_SIZE_MULTIPLE))) #else #define ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT #endif /* Process stack. Starts here and extends 2^log2_n_stack_bytes bytes. */ //PROSESS类型node的业务逻辑运行时栈地址,在heap上 #define VLIB_PROCESS_STACK_MAGIC (0xdead7ead) u32 stack[0] ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT; } vlib_process_t __attribute__ ((aligned (CLIB_CACHE_LINE_BYTES)));
- 调度逻辑
VPP支持多工作线程模型,这里不考虑多线程,以单线程模型来分析源码。核心调度函数:
vlib_main_loop
static void vlib_main_loop (vlib_main_t * vm) { vlib_node_main_t *nm = &vm->node_main; uword i; u64 cpu_time_now; /* Initialize pending node vector. */ vec_resize (nm->pending_frames, 32); _vec_len (nm->pending_frames) = 0; /* Mark time of main loop start. */ cpu_time_now = vm->clib_time.last_cpu_time; vm->cpu_time_main_loop_start = cpu_time_now; /* Arrange for first level of timing wheel to cover times we care most about. */ nm->timing_wheel.min_sched_time = 10e-6; nm->timing_wheel.max_sched_time = 10e-3; timing_wheel_init (&nm->timing_wheel, cpu_time_now, vm->clib_time.clocks_per_second); /* Pre-allocate expired nodes. */ vec_alloc (nm->data_from_advancing_timing_wheel, 32); vec_alloc (nm->pending_interrupt_node_runtime_indices, 32); //dpdk收包时下面用不到。它们是用来模拟标准网卡NAPI机制的 if (!nm->polling_threshold_vector_length) nm->polling_threshold_vector_length = 10; if (!nm->interrupt_threshold_vector_length) nm->interrupt_threshold_vector_length = 5; nm->current_process_index = ~0; /* Start all processes. */ /*执行所有VLIB_NODE_TYPE_PROCESS类型node,利用setjump,longjump机制,把node挂起来,等待之后唤醒。可以理解为一种多任务模型。该类型node主要时用在作运行时配置相关。之后会详细论述该类型node*/ { uword i; for (i = 0; i < vec_len (nm->processes); i++) cpu_time_now = dispatch_process (vm, nm->processes[i], /* frame */ 0, cpu_time_now); } while (1) { vlib_node_runtime_t *n; /* Process pre-input nodes. */ //目前只有一个epoll相关的node,监听socket,辅助功能 vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT]) cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_PRE_INPUT, VLIB_NODE_STATE_POLLING, /* frame */ 0, cpu_time_now); /* Next process input nodes. */ //收包node,假设使用dpdk的node vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, VLIB_NODE_STATE_POLLING, /* frame */ 0, cpu_time_now); //memclnt_node会用到,但是博主发现该node没有使用。无视之。 if (PREDICT_TRUE (vm->queue_signal_pending == 0)) vm->queue_signal_callback (vm); /* Next handle interrupts. */ //dpdk没有中断机制,这里不会执行 { uword l = _vec_len (nm->pending_interrupt_node_runtime_indices); uword i; if (l > 0) { _vec_len (nm->pending_interrupt_node_runtime_indices) = 0; for (i = 0; i < l; i++) { n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], nm-> pending_interrupt_node_runtime_indices [i]); cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, VLIB_NODE_STATE_INTERRUPT, /* frame */ 0, cpu_time_now); } } } /* Check if process nodes have expired from timing wheel. */ /*处理超时事件,通过api发生的唤醒node事件。只对VLIB_NODE_TYPE_PROCESS类型node有效。 VLIB_NODE_TYPE_PROCESS类型node处理需要单独描述,但不是开发人员关注的重点,只是配置相关。*/ nm->data_from_advancing_timing_wheel = timing_wheel_advance (&nm->timing_wheel, cpu_time_now, nm->data_from_advancing_timing_wheel, &nm->cpu_time_next_process_ready); ASSERT (nm->data_from_advancing_timing_wheel != 0); if (PREDICT_FALSE (_vec_len (nm->data_from_advancing_timing_wheel) > 0)) { uword i; processes_timing_wheel_data: for (i = 0; i < _vec_len (nm->data_from_advancing_timing_wheel); i++) { u32 d = nm->data_from_advancing_timing_wheel[i]; u32 di = vlib_timing_wheel_data_get_index (d); if (vlib_timing_wheel_data_is_timed_event (d)) { vlib_signal_timed_event_data_t *te = pool_elt_at_index (nm->signal_timed_event_data_pool, di); vlib_node_t *n = vlib_get_node (vm, te->process_node_index); vlib_process_t *p = vec_elt (nm->processes, n->runtime_index); void *data; data = vlib_process_signal_event_helper (nm, n, p, te->event_type_index, te->n_data_elts, te->n_data_elt_bytes); if (te->n_data_bytes < sizeof (te->inline_event_data)) clib_memcpy (data, te->inline_event_data, te->n_data_bytes); else { clib_memcpy (data, te->event_data_as_vector, te->n_data_bytes); vec_free (te->event_data_as_vector); } pool_put (nm->signal_timed_event_data_pool, te); } else { cpu_time_now = clib_cpu_time_now (); cpu_time_now = dispatch_suspended_process (vm, di, cpu_time_now); } } /* Reset vector. */ _vec_len (nm->data_from_advancing_timing_wheel) = 0; } /* Input nodes may have added work to the pending vector. Process pending vector until there is nothing left. All pending vectors will be processed from input -> output. */ /*真正开发人员关注的重点,数据包处理的核心逻辑都在VLIB_NODE_TYPE_INTERNAL类型node中。 nm->pending_frames记录了上一个node转给下一个node信息,最后可以找到传递给下一个node使用的 数据包*/ for (i = 0; i < _vec_len (nm->pending_frames); i++) cpu_time_now = dispatch_pending_node (vm, nm->pending_frames + i, cpu_time_now); /* Reset pending vector for next iteration. */ _vec_len (nm->pending_frames) = 0; /* Pending internal nodes may resume processes. */ if (_vec_len (nm->data_from_advancing_timing_wheel) > 0) goto processes_timing_wheel_data; vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); } }
/* static */ u64 dispatch_pending_node (vlib_main_t * vm, vlib_pending_frame_t * p, u64 last_time_stamp) { vlib_node_main_t *nm = &vm->node_main; vlib_frame_t *f; vlib_next_frame_t *nf, nf_dummy; vlib_node_runtime_t *n; u32 restore_frame_index; //之后将执行n的业务逻辑 n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL], p->node_runtime_index); //node n的对应frame,里面包含了node需要处理的数据包 f = vlib_get_frame (vm, p->frame_index); /*不是下一跳传过来的数据包,比如自己生成的包注入某个node,此时if判断为真。 vlib_put_frame_to_node()完成注入逻辑*/ if (p->next_frame_index == VLIB_PENDING_FRAME_NO_NEXT_FRAME) { /* No next frame: so use dummy on stack. */ nf = &nf_dummy; nf->flags = f->flags & VLIB_NODE_FLAG_TRACE; nf->frame_index = ~p->frame_index; } else //通过node下一跳传递来的数据包 nf = vec_elt_at_index (nm->next_frames, p->next_frame_index); ASSERT (f->flags & VLIB_FRAME_IS_ALLOCATED); /* Force allocation of new frame while current frame is being dispatched. */ restore_frame_index = ~0; /*node传递数据包到下一个node是通过调用vlib_put_next_frame()接口: p->frame_index = nf->frame_index; p->node_runtime_index = nf->node_runtime_index; p->next_frame_index = nf - nm->next_frames; */ if (nf->frame_index == p->frame_index) { //有两个位置同时引用了frame,把nf中的清掉,以免干扰本线程对frame的使用。 nf->frame_index = ~0; nf->flags &= ~VLIB_FRAME_IS_ALLOCATED; if (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH)) restore_frame_index = p->frame_index; } /* Frame must be pending. */ ASSERT (f->flags & VLIB_FRAME_PENDING); ASSERT (f->n_vectors > 0); /* Copy trace flag from next frame to node. Trace flag indicates that at least one vector in the dispatched frame is traced. */ n->flags &= ~VLIB_NODE_FLAG_TRACE; n->flags |= (nf->flags & VLIB_FRAME_TRACE) ? VLIB_NODE_FLAG_TRACE : 0; nf->flags &= ~VLIB_FRAME_TRACE; //调度node业务逻辑 last_time_stamp = dispatch_node (vm, n, VLIB_NODE_TYPE_INTERNAL, VLIB_NODE_STATE_POLLING, f, last_time_stamp); f->flags &= ~VLIB_FRAME_PENDING; /* Frame is ready to be used again, so restore it. */ //frame已经在业务逻辑中使用完了,重新保存到nf中 if (restore_frame_index != ~0) { /* p->next_frame_index can change during node dispatch if node function decides to change graph hook up. */ //vlib_next_frame_change_ownership()可能会修改p->next_frame_index nf = vec_elt_at_index (nm->next_frames, p->next_frame_index); nf->frame_index = restore_frame_index; nf->flags |= VLIB_FRAME_IS_ALLOCATED; } if (f->flags & VLIB_FRAME_FREE_AFTER_DISPATCH) { ASSERT (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH)); vlib_frame_free (vm, n, f); } return last_time_stamp; }
//对DPDK驱动,该函数基本没有什么复杂处理。 /* static_always_inline */ u64 dispatch_node (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_node_type_t type, vlib_node_state_t dispatch_state, vlib_frame_t * frame, u64 last_time_stamp) { uword n, v; u64 t; vlib_node_main_t *nm = &vm->node_main; vlib_next_frame_t *nf; if (CLIB_DEBUG > 0) { vlib_node_t *n = vlib_get_node (vm, node->node_index); ASSERT (n->type == type); } /* Only non-internal nodes may be disabled. */ if (type != VLIB_NODE_TYPE_INTERNAL && node->state != dispatch_state) { ASSERT (type != VLIB_NODE_TYPE_INTERNAL); return last_time_stamp; } if ((type == VLIB_NODE_TYPE_PRE_INPUT || type == VLIB_NODE_TYPE_INPUT) && dispatch_state != VLIB_NODE_STATE_INTERRUPT) { u32 c = node->input_main_loops_per_call; /* Only call node when count reaches zero. */ if (c) { node->input_main_loops_per_call = c - 1; return last_time_stamp; } } /* Speculatively prefetch next frames. */ if (node->n_next_nodes > 0) { //node的业务逻辑肯定会用到nf nf = vec_elt_at_index (nm->next_frames, node->next_frame_index); CLIB_PREFETCH (nf, 4 * sizeof (nf[0]), WRITE); } vm->cpu_time_last_node_dispatch = last_time_stamp; if (1 /* || vm->cpu_index == node->cpu_index */ ) { vlib_main_t *stat_vm; stat_vm = /* vlib_mains ? vlib_mains[0] : */ vm; vlib_elog_main_loop_event (vm, node->node_index, last_time_stamp, frame ? frame->n_vectors : 0, /* is_after */ 0); /* * Turn this on if you run into * "bad monkey" contexts, and you want to know exactly * which nodes they've visited... See ixge.c... */ if (VLIB_BUFFER_TRACE_TRAJECTORY && frame) { int i; int log_index; u32 *from; from = vlib_frame_vector_args (frame); for (i = 0; i < frame->n_vectors; i++) { vlib_buffer_t *b = vlib_get_buffer (vm, from[i]); ASSERT (b->pre_data[0] < 32); log_index = b->pre_data[0]++ + 1; b->pre_data[log_index] = node->node_index; } n = node->function (vm, node, frame); } else //业务逻辑 n = node->function (vm, node, frame); t = clib_cpu_time_now (); vlib_elog_main_loop_event (vm, node->node_index, t, n, /* is_after */ 1); vm->main_loop_vectors_processed += n; vm->main_loop_nodes_processed += n > 0; v = vlib_node_runtime_update_stats (stat_vm, node, /* n_calls */ 1, /* n_vectors */ n, /* n_clocks */ t - last_time_stamp); /* When in interrupt mode and vector rate crosses threshold switch to polling mode. */ //跟dpdk没关系了 if ((DPDK == 0 && dispatch_state == VLIB_NODE_STATE_INTERRUPT) || (DPDK == 0 && dispatch_state == VLIB_NODE_STATE_POLLING && (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE))) { ELOG_TYPE_DECLARE (e) = { .function = (char *) __FUNCTION__,.format = "%s vector length %d, switching to %s",.format_args = "T4i4t4",.n_enum_strings = 2,.enum_strings = { "interrupt", "polling",},}; struct { u32 node_name, vector_length, is_polling; } *ed; if (dispatch_state == VLIB_NODE_STATE_INTERRUPT && v >= nm->polling_threshold_vector_length) { vlib_node_t *n = vlib_get_node (vm, node->node_index); n->state = VLIB_NODE_STATE_POLLING; node->state = VLIB_NODE_STATE_POLLING; ASSERT (! (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE)); node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE; node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE; nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] -= 1; nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] += 1; ed = ELOG_DATA (&vm->elog_main, e); ed->node_name = n->name_elog_string; ed->vector_length = v; ed->is_polling = 1; } else if (dispatch_state == VLIB_NODE_STATE_POLLING && v <= nm->interrupt_threshold_vector_length) { vlib_node_t *n = vlib_get_node (vm, node->node_index); if (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE) { /* Switch to interrupt mode after dispatch in polling one more time. This allows driver to re-enable interrupts. */ n->state = VLIB_NODE_STATE_INTERRUPT; node->state = VLIB_NODE_STATE_INTERRUPT; node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE; nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] -= 1; nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] += 1; } else { node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE; ed = ELOG_DATA (&vm->elog_main, e); ed->node_name = n->name_elog_string; ed->vector_length = v; ed->is_polling = 0; } } } } return t; }
有时间再详细分析,该函数背后机制比较复杂,但是跟vpp框架使用者关系不大。
static u64 dispatch_process (vlib_main_t * vm, vlib_process_t * p, vlib_frame_t * f, u64 last_time_stamp) { vlib_node_main_t *nm = &vm->node_main; vlib_node_runtime_t *node_runtime = &p->node_runtime; vlib_node_t *node = vlib_get_node (vm, node_runtime->node_index); u64 t; uword n_vectors, is_suspend; if (node->state != VLIB_NODE_STATE_POLLING || (p->flags & (VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK | VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT))) return last_time_stamp; p->flags |= VLIB_PROCESS_IS_RUNNING; t = last_time_stamp; vlib_elog_main_loop_event (vm, node_runtime->node_index, t, f ? f->n_vectors : 0, /* is_after */ 0); /* Save away current process for suspend. */ nm->current_process_index = node->runtime_index; n_vectors = vlib_process_startup (vm, p, f); nm->current_process_index = ~0; ASSERT (n_vectors != VLIB_PROCESS_RETURN_LONGJMP_RETURN); is_suspend = n_vectors == VLIB_PROCESS_RETURN_LONGJMP_SUSPEND; if (is_suspend) { vlib_pending_frame_t *pf; n_vectors = 0; pool_get (nm->suspended_process_frames, pf); pf->node_runtime_index = node->runtime_index; pf->frame_index = f ? vlib_frame_index (vm, f) : ~0; pf->next_frame_index = ~0; p->n_suspends += 1; p->suspended_process_frame_index = pf - nm->suspended_process_frames; if (p->flags & VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK) timing_wheel_insert (&nm->timing_wheel, p->resume_cpu_time, vlib_timing_wheel_data_set_suspended_process (node->runtime_index)); } else p->flags &= ~VLIB_PROCESS_IS_RUNNING; t = clib_cpu_time_now (); vlib_elog_main_loop_event (vm, node_runtime->node_index, t, is_suspend, /* is_after */ 1); vlib_process_update_stats (vm, p, /* n_calls */ !is_suspend, /* n_vectors */ n_vectors, /* n_clocks */ t - last_time_stamp); return t; }
相关文章推荐
- 思科VPP源码分析(路由框架分析二)
- 思科VPP源码分析(ethernet node分析)
- 思科VPP源码分析(dpdk node分析)
- 思科VPP源码分析(多线程支持分析)
- 思科VPP源码分析(内存管理)
- 思科VPP源码分析(feature机制分析)
- 思科VPP源码分析(dpo机制源码分析)
- 思科VPP源码分析(Bihash分析)
- 思科VPP源码分析(CLI支持分析)
- 思科VPP源码分析(trace机制分析)
- Linux网桥源码框架分析初步
- 蔡军生先生第二人生的源码分析(六十八)LLXMLNode使用Expat库分析XML文件
- TOMCAT源码分析(启动框架)
- 蔡军生先生第二人生的源码分析(六十七)LLXMLNode使用Expat库打开文件
- 简单RCP框架源码分析
- 第二人生的源码分析(六十八)LLXMLNode使用Expat库分析XML文件
- TOMCAT源码分析(启动框架)
- cxxtest单元测试框架源码分析(一):类的组成关系
- 第二人生的源码分析(六十八)LLXMLNode使用Expat库分析XML文件