您的位置:首页 > Web前端 > Node.js

思科VPP源码分析(node调度框架)

2016-11-05 07:30 597 查看
vpp的功能逻辑被划分为一个个node,node之间通过下一跳传递处理完的数据包集合,从而组成整个业务图谱。本文将分析node调度框架源码。

- 基本概念

vlib_node_type_t

node分为四种类型

VLIB_NODE_TYPE_INTERNAL:对数据包真正处理的业务node。

VLIB_NODE_TYPE_INPUT:收包逻辑node,比如:dpdk,pcap等。

VLIB_NODE_TYPE_PRE_INPUT:目前只有一个epoll node,对socket相关逻辑提供服务,主要使用在控制业务上。

VLIB_NODE_TYPE_PROCESS:该类型node可以被挂起也可以被恢复,有独立的分配在heap上的运行时栈。类似于在一个线程中实现了多任务调度机制。主要用来修改vpp node内部参数。

vlib_node_main_t

记录各种全局信息,比如各种数据结构集合,数据结构内存池之类。

vlib_node_t

注册node时将业务逻辑,几乎不怎么修改的参数,状态信息保存在这里。

vlib_node_runtime_t

这是调度框架实际频繁使用的结构,从vlib_node_t拷贝了部分信息,以及私有的频繁变动的信息。

vlib_process_t

VLIB_NODE_TYPE_PROCESS类型node专用结构,记录用于模拟task的基础结构:heap上的运行时栈,2种返回时寄存器备份,等。

vlib_frame_t

每个node都有一个对应的vlib_frame_t,用来保存供node使用的数据包集合。这是每个node最终处理数据的内存所在地。

vlib_pending_frame_t

当一个node处理完数据包,则填充该数据结构,并加入到全局链表,调度框架便能在下一次调度时找到需要

接手该数据包的下一个node

vlib_next_frame_t

主要是node内部逻辑使用,定位该node的下一条信息。

- 关键结构

vlib_node_main_t

typedef struct
{
/* Public nodes. */
//一块连续内存,头部是vec_header_t,数据部分是node指针数组
vlib_node_t **nodes;

/* Node index hashed by node name. */
//node按名字组成hash表
uword *node_by_name;

//目前只有VLIB_NODE_MAIN_RUNTIME_STARTED一个状态,暂时没啥用,忽略之。
u32 flags;
#define VLIB_NODE_MAIN_RUNTIME_STARTED (1 << 0)

/* Nodes segregated by type for cache locality.
Does not apply to nodes of type VLIB_NODE_TYPE_INTERNAL. */
/*node有三种类型:VLIB_NODE_TYPE_INTERNAL,VLIB_NODE_TYPE_INPUT,
VLIB_NODE_TYPE_PRE_INPUT,按类型分类索引*/
vlib_node_runtime_t *nodes_by_type[VLIB_N_NODE_TYPE];

//以下三个用于类似于网卡napi收包模式,dpdk没用到,忽略之
/* Node runtime indices for input nodes with pending interrupts. */
u32 *pending_interrupt_node_runtime_indices;
/* Input nodes are switched from/to interrupt to/from polling mode
when average vector length goes above/below polling/interrupt
thresholds. */
u32 polling_threshold_vector_length;
u32 interrupt_threshold_vector_length;

/* Vector of next frames. */
/*假设node1有n1个下一跳,node2有n2个下一跳....共n1 + n2 +....+ni个下一跳连续
保存在next_frames指向的vec中。*/
vlib_next_frame_t *next_frames;

/* Vector of internal node's frames waiting to be called. */
//数据包从node输出到下一跳,那么下一跳node即是pending frame,会加入pending_frames指向的vec。主循环会遍历该vec,对每个node来调用处理逻辑
vlib_pending_frame_t *pending_frames;

/* Timing wheel for scheduling time-based node dispatch. */
//定时器,信号相关,源码及其恶心,博主暂时放弃阅读
timing_wheel_t timing_wheel;
vlib_signal_timed_event_data_t *signal_timed_event_data_pool;
/* Opaque data vector added via timing_wheel_advance. */
u32 *data_from_advancing_timing_wheel;
/* CPU time of next process to be ready on timing wheel. */
u64 cpu_time_next_process_ready;

/* Vector of process nodes.
One for each node of type VLIB_NODE_TYPE_PROCESS. */
/*VLIB_NODE_TYPE_PROCESS类型的node,每个node有分配在heap上的运行栈,没错就是通常说的在顶部那种
栈,但是这里分配在了heap上。这里在单线程中模拟了类似多线程的效果,以后会详细分析。processes保存了
所有VLIB_NODE_TYPE_PROCESS类型node的描述结构指针。*/
vlib_process_t **processes;

/* Current running process or ~0 if no process running. */
//VLIB_NODE_TYPE_PROCESS类型的node,执行时都会把current_process_index赋值为本node的runtime_index
u32 current_process_index;

/* Pool of pending process frames. */
//VLIB_NODE_TYPE_PROCESS类型node专用,挂起时在其中保存信息。
vlib_pending_frame_t *suspended_process_frames;

/* Vector of event data vectors pending recycle. */
void **recycled_event_data_vectors;

/* Current counts of nodes in each state. */
u32 input_node_counts_by_state[VLIB_N_NODE_STATE];

/* Hash of (scalar_size,vector_size) to frame_sizes index. */
//hash表,把node的scalar_size,vector_size值组合成key,查找对应的vlib_frame_size_t结构。
uword *frame_size_hash;

/* Per-size frame allocation information. */
//通过hash查找到node对应的vlib_frame_size_t值,从中的内存池来分配vlib_frame_t。
vlib_frame_size_t *frame_sizes;

/* Time of last node runtime stats clear. */
f64 time_last_runtime_stats_clear;

/* Node registrations added by constructors */
//注册node函数提交的注册信息链接在这里,仅仅初始化时使用
vlib_node_registration_t *node_registrations;
} vlib_node_main_t;


vlib_node_t

typedef struct vlib_node_t
{
/* Vector processing function for this node. */
//业务逻辑
vlib_node_function_t *function;

/* Node name. */
u8 *name;

/* Node name index in elog string table. */
u32 name_elog_string;

/* Total statistics for this node. */
vlib_node_stats_t stats_total;

/* Saved values as of last clear (or zero if never cleared).
Current values are always stats_total - stats_last_clear. */
vlib_node_stats_t stats_last_clear;

/* Type of this node. */
//node类型,之前提到的那四种之一
vlib_node_type_t type;

/* Node index. */
u32 index;

/* Index of corresponding node runtime. */
//vlib_node_t和vlib_node_runtime_t是一一对应的好基友
u32 runtime_index;

/* Runtime data for this node. */
void *runtime_data;

/* Node flags. */
u16 flags;

/* Processing function keeps frame.  Tells node dispatching code not
to free frame after dispatch is done.  */
#define VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH (1 << 0)

/* Node counts as output/drop/punt node for stats purposes. */
#define VLIB_NODE_FLAG_IS_OUTPUT (1 << 1)
#define VLIB_NODE_FLAG_IS_DROP (1 << 2)
#define VLIB_NODE_FLAG_IS_PUNT (1 << 3)
#define VLIB_NODE_FLAG_IS_HANDOFF (1 << 4)

/* Set if current node runtime has traced vectors. */
#define VLIB_NODE_FLAG_TRACE (1 << 5)

#define VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE (1 << 6)
#define VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE (1 << 7)

/* State for input nodes. */
u8 state;

/* Number of bytes of run time data. */
u8 runtime_data_bytes;

/* Number of error codes used by this node. */
u16 n_errors;

/* Size of scalar and vector arguments in bytes. */
//这两个成员组合成key,在vlib_node_main_t->frame_size_hash中查找,确定本node相关的frame的内存池
u16 scalar_size, vector_size;

/* Handle/index in error heap for this node. */
u32 error_heap_handle;
u32 error_heap_index;

/* Error strings indexed by error code for this node. */
char **error_strings;

/* Vector of next node names.
Only used before next_nodes array is initialized. */
//初始化时用用
char **next_node_names;

/* Next node indices for this node. */
//根据next_node_names来生成next_nodes,vec结构,记录了每个可选下一条的index。
u32 *next_nodes;

/* Name of node that we are sibling of. */
char *sibling_of;

/* Bitmap of all of this node's siblings. */
uword *sibling_bitmap;

/* Total number of vectors sent to each next node. */
//统计发给下一条node的数据包总数
u64 *n_vectors_by_next_node;

/* Hash table mapping next node index into slot in
next_nodes vector.  Quickly determines whether this node
is connected to given next node and, if so, with which slot. */
uword *next_slot_by_node;

/* Bitmap of node indices which feed this node. */
uword *prev_node_bitmap;

/* Node/next-index which own enqueue rights with to this node. */
u32 owner_node_index, owner_next_index;

/* Buffer format/unformat for this node. */
format_function_t *format_buffer;
unformat_function_t *unformat_buffer;

/* Trace buffer format/unformat for this node. */
format_function_t *format_trace;

/* Function to validate incoming frames. */
u8 *(*validate_frame) (struct vlib_main_t * vm,
struct vlib_node_runtime_t *,
struct vlib_frame_t * f);
/* for pretty-printing, not typically valid */
u8 *state_string;
} vlib_node_t;


vlib_node_runtime_t

typedef struct vlib_node_runtime_t
{
/* Node function to call. */
vlib_node_function_t *function;

/* Vector of errors for this node. */
vlib_error_t *errors;

/* Number of clock cycles. */
u32 clocks_since_last_overflow;

/* Maximum clock cycle for an invocation. */
u32 max_clock;

/* Number of vectors in the recorded max_clock. */
u32 max_clock_n;

/* Number of calls. */
u32 calls_since_last_overflow;

/* Number of vector elements processed by this node. */
u32 vectors_since_last_overflow;

/* Start of next frames for this node. */
//本node的多个下一条中,第一个下一条在vlib_node_main_t->next_frames中的索引
u32 next_frame_index;

/* Node index. */
//vlib_node_t和vlib_node_runtime_t是一一对应的好基友
u32 node_index;

/* For input nodes: decremented on each main loop interation until it reaches zero
and function is called.  Allows some input nodes to be called
more than others. */
u32 input_main_loops_per_call;

/* Saved main loop counter of last dispatch of this node. */
u32 main_loop_count_last_dispatch;

u32 main_loop_vector_stats[2];

/* Copy of main node flags. */
u16 flags;

/* Input node state. */
u16 state;

u16 n_next_nodes;

/* Next frame index that vector arguments were last enqueued to
last time this node ran.  Set to zero before first run
of this node. */
u16 cached_next_index;

/* CPU this node runs on */
u16 cpu_index;

/* Function dependent node-runtime. */
uword runtime_data[(128
- 1 * sizeof (vlib_node_function_t *)
- 1 * sizeof (vlib_error_t *)
- 11 * sizeof (u32)
- 5 * sizeof (u16)) / sizeof (uword)];
}


vlib_process_t

typedef struct
{
/* Node runtime for this process. */
//对应的PROSESS类型node索引号
vlib_node_runtime_t node_runtime;

/* Where to longjmp when process is done. */
//如下return_longjmp,resume_longjmp用于保存当前寄存器内容,可以参考C库的setjump,longjump原理
clib_longjmp_t return_longjmp;

#define VLIB_PROCESS_RETURN_LONGJMP_RETURN ((uword) ~0 - 0)
#define VLIB_PROCESS_RETURN_LONGJMP_SUSPEND ((uword) ~0 - 1)

/* Where to longjmp to resume node after suspend. */
clib_longjmp_t resume_longjmp;
#define VLIB_PROCESS_RESUME_LONGJMP_SUSPEND 0
#define VLIB_PROCESS_RESUME_LONGJMP_RESUME  1

u16 flags;
#define VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK (1 << 0)
#define VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT (1 << 1)
/* Set to indicate that this process has been added to resume vector. */
#define VLIB_PROCESS_RESUME_PENDING (1 << 2)

/* Process function is currently running. */
#define VLIB_PROCESS_IS_RUNNING (1 << 3)

/* Size of process stack. */
u16 log2_n_stack_bytes;

u32 suspended_process_frame_index;

/* Number of times this process was suspended. */
u32 n_suspends;

/* Vectors of pending event data indexed by event type index. */
void **pending_event_data_by_type_index;

/* Bitmap of event type-indices with non-empty vectors. */
uword *non_empty_event_type_bitmap;

/* Bitmap of event type-indices which are one time events. */
uword *one_time_event_type_bitmap;

/* Type is opaque pointer -- typically a pointer to an event handler
function.  Hash table to map opaque to a type index. */
uword *event_type_index_by_type_opaque;

/* Pool of currently valid event types. */
vlib_process_event_type_t *event_type_pool;

/* When suspending saves cpu cycle counter when process is to be resumed. */
u64 resume_cpu_time;

/* Default output function and its argument for any CLI outputs
within the process. */
vlib_cli_output_function_t *output_function;
uword output_function_arg;

#ifdef CLIB_UNIX
/* Pad to a multiple of the page size so we can mprotect process stacks */
#define PAGE_SIZE_MULTIPLE 0x1000
#define ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT  __attribute__ ((aligned (PAGE_SIZE_MULTIPLE)))
#else
#define ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT
#endif

/* Process stack.  Starts here and extends 2^log2_n_stack_bytes
bytes. */
//PROSESS类型node的业务逻辑运行时栈地址,在heap上
#define VLIB_PROCESS_STACK_MAGIC (0xdead7ead)
u32 stack[0] ALIGN_ON_MULTIPLE_PAGE_BOUNDARY_FOR_MPROTECT;
} vlib_process_t __attribute__ ((aligned (CLIB_CACHE_LINE_BYTES)));


- 调度逻辑

VPP支持多工作线程模型,这里不考虑多线程,以单线程模型来分析源码。

核心调度函数:

vlib_main_loop

static void
vlib_main_loop (vlib_main_t * vm)
{
vlib_node_main_t *nm = &vm->node_main;
uword i;
u64 cpu_time_now;

/* Initialize pending node vector. */
vec_resize (nm->pending_frames, 32);
_vec_len (nm->pending_frames) = 0;

/* Mark time of main loop start. */
cpu_time_now = vm->clib_time.last_cpu_time;
vm->cpu_time_main_loop_start = cpu_time_now;

/* Arrange for first level of timing wheel to cover times we care
most about. */
nm->timing_wheel.min_sched_time = 10e-6;
nm->timing_wheel.max_sched_time = 10e-3;
timing_wheel_init (&nm->timing_wheel,
cpu_time_now, vm->clib_time.clocks_per_second);

/* Pre-allocate expired nodes. */
vec_alloc (nm->data_from_advancing_timing_wheel, 32);
vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);

//dpdk收包时下面用不到。它们是用来模拟标准网卡NAPI机制的
if (!nm->polling_threshold_vector_length)
nm->polling_threshold_vector_length = 10;
if (!nm->interrupt_threshold_vector_length)
nm->interrupt_threshold_vector_length = 5;

nm->current_process_index = ~0;

/* Start all processes. */
/*执行所有VLIB_NODE_TYPE_PROCESS类型node,利用setjump,longjump机制,把node挂起来,等待之后唤醒。可以理解为一种多任务模型。该类型node主要时用在作运行时配置相关。之后会详细论述该类型node*/
{
uword i;
for (i = 0; i < vec_len (nm->processes); i++)
cpu_time_now =
dispatch_process (vm, nm->processes[i], /* frame */ 0, cpu_time_now);
}

while (1)
{
vlib_node_runtime_t *n;

/* Process pre-input nodes. */
//目前只有一个epoll相关的node,监听socket,辅助功能
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
cpu_time_now = dispatch_node (vm, n,
VLIB_NODE_TYPE_PRE_INPUT,
VLIB_NODE_STATE_POLLING,
/* frame */ 0,
cpu_time_now);

/* Next process input nodes. */
//收包node,假设使用dpdk的node
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
cpu_time_now = dispatch_node (vm, n,
VLIB_NODE_TYPE_INPUT,
VLIB_NODE_STATE_POLLING,
/* frame */ 0,
cpu_time_now);

//memclnt_node会用到,但是博主发现该node没有使用。无视之。
if (PREDICT_TRUE (vm->queue_signal_pending == 0))
vm->queue_signal_callback (vm);

/* Next handle interrupts. */
//dpdk没有中断机制,这里不会执行
{
uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
uword i;
if (l > 0)
{
_vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
for (i = 0; i < l; i++)
{
n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
nm->
pending_interrupt_node_runtime_indices
[i]);
cpu_time_now =
dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
VLIB_NODE_STATE_INTERRUPT,
/* frame */ 0,
cpu_time_now);
}
}
}

/* Check if process nodes have expired from timing wheel. */
/*处理超时事件,通过api发生的唤醒node事件。只对VLIB_NODE_TYPE_PROCESS类型node有效。
VLIB_NODE_TYPE_PROCESS类型node处理需要单独描述,但不是开发人员关注的重点,只是配置相关。*/
nm->data_from_advancing_timing_wheel
= timing_wheel_advance (&nm->timing_wheel, cpu_time_now,
nm->data_from_advancing_timing_wheel,
&nm->cpu_time_next_process_ready);

ASSERT (nm->data_from_advancing_timing_wheel != 0);
if (PREDICT_FALSE (_vec_len (nm->data_from_advancing_timing_wheel) > 0))
{
uword i;

processes_timing_wheel_data:
for (i = 0; i < _vec_len (nm->data_from_advancing_timing_wheel);
i++)
{
u32 d = nm->data_from_advancing_timing_wheel[i];
u32 di = vlib_timing_wheel_data_get_index (d);

if (vlib_timing_wheel_data_is_timed_event (d))
{
vlib_signal_timed_event_data_t *te =
pool_elt_at_index (nm->signal_timed_event_data_pool, di);
vlib_node_t *n = vlib_get_node (vm, te->process_node_index);
vlib_process_t *p =
vec_elt (nm->processes, n->runtime_index);
void *data;
data =
vlib_process_signal_event_helper (nm, n, p,
te->event_type_index,
te->n_data_elts,
te->n_data_elt_bytes);
if (te->n_data_bytes < sizeof (te->inline_event_data))
clib_memcpy (data, te->inline_event_data,
te->n_data_bytes);
else
{
clib_memcpy (data, te->event_data_as_vector,
te->n_data_bytes);
vec_free (te->event_data_as_vector);
}
pool_put (nm->signal_timed_event_data_pool, te);
}
else
{
cpu_time_now = clib_cpu_time_now ();
cpu_time_now =
dispatch_suspended_process (vm, di, cpu_time_now);
}
}

/* Reset vector. */
_vec_len (nm->data_from_advancing_timing_wheel) = 0;
}

/* Input nodes may have added work to the pending vector.
Process pending vector until there is nothing left.
All pending vectors will be processed from input -> output. */
/*真正开发人员关注的重点,数据包处理的核心逻辑都在VLIB_NODE_TYPE_INTERNAL类型node中。
nm->pending_frames记录了上一个node转给下一个node信息,最后可以找到传递给下一个node使用的
数据包*/
for (i = 0; i < _vec_len (nm->pending_frames); i++)
cpu_time_now = dispatch_pending_node (vm, nm->pending_frames + i,
cpu_time_now);
/* Reset pending vector for next iteration. */
_vec_len (nm->pending_frames) = 0;

/* Pending internal nodes may resume processes. */
if (_vec_len (nm->data_from_advancing_timing_wheel) > 0)
goto processes_timing_wheel_data;

vlib_increment_main_loop_counter (vm);

/* Record time stamp in case there are no enabled nodes and above
calls do not update time stamp. */
cpu_time_now = clib_cpu_time_now ();
}
}


/* static */ u64
dispatch_pending_node (vlib_main_t * vm,
vlib_pending_frame_t * p, u64 last_time_stamp)
{
vlib_node_main_t *nm = &vm->node_main;
vlib_frame_t *f;
vlib_next_frame_t *nf, nf_dummy;
vlib_node_runtime_t *n;
u32 restore_frame_index;

//之后将执行n的业务逻辑
n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
p->node_runtime_index);

//node n的对应frame,里面包含了node需要处理的数据包
f = vlib_get_frame (vm, p->frame_index);
/*不是下一跳传过来的数据包,比如自己生成的包注入某个node,此时if判断为真。
vlib_put_frame_to_node()完成注入逻辑*/
if (p->next_frame_index == VLIB_PENDING_FRAME_NO_NEXT_FRAME)
{
/* No next frame: so use dummy on stack. */
nf = &nf_dummy;
nf->flags = f->flags & VLIB_NODE_FLAG_TRACE;
nf->frame_index = ~p->frame_index;
}
else
//通过node下一跳传递来的数据包
nf = vec_elt_at_index (nm->next_frames, p->next_frame_index);

ASSERT (f->flags & VLIB_FRAME_IS_ALLOCATED);

/* Force allocation of new frame while current frame is being
dispatched. */
restore_frame_index = ~0;
/*node传递数据包到下一个node是通过调用vlib_put_next_frame()接口:
p->frame_index = nf->frame_index;
p->node_runtime_index = nf->node_runtime_index;
p->next_frame_index = nf - nm->next_frames;
*/
if (nf->frame_index == p->frame_index)
{
//有两个位置同时引用了frame,把nf中的清掉,以免干扰本线程对frame的使用。
nf->frame_index = ~0;
nf->flags &= ~VLIB_FRAME_IS_ALLOCATED;
if (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH))
restore_frame_index = p->frame_index;
}

/* Frame must be pending. */
ASSERT (f->flags & VLIB_FRAME_PENDING);
ASSERT (f->n_vectors > 0);

/* Copy trace flag from next frame to node.
Trace flag indicates that at least one vector in the dispatched
frame is traced. */
n->flags &= ~VLIB_NODE_FLAG_TRACE;
n->flags |= (nf->flags & VLIB_FRAME_TRACE) ? VLIB_NODE_FLAG_TRACE : 0;
nf->flags &= ~VLIB_FRAME_TRACE;

//调度node业务逻辑
last_time_stamp = dispatch_node (vm, n,
VLIB_NODE_TYPE_INTERNAL,
VLIB_NODE_STATE_POLLING,
f, last_time_stamp);

f->flags &= ~VLIB_FRAME_PENDING;

/* Frame is ready to be used again, so restore it. */
//frame已经在业务逻辑中使用完了,重新保存到nf中
if (restore_frame_index != ~0)
{
/* p->next_frame_index can change during node dispatch if node
function decides to change graph hook up. */
//vlib_next_frame_change_ownership()可能会修改p->next_frame_index
nf = vec_elt_at_index (nm->next_frames, p->next_frame_index);
nf->frame_index = restore_frame_index;
nf->flags |= VLIB_FRAME_IS_ALLOCATED;
}

if (f->flags & VLIB_FRAME_FREE_AFTER_DISPATCH)
{
ASSERT (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH));
vlib_frame_free (vm, n, f);
}

return last_time_stamp;
}


//对DPDK驱动,该函数基本没有什么复杂处理。
/* static_always_inline */ u64
dispatch_node (vlib_main_t * vm,
vlib_node_runtime_t * node,
vlib_node_type_t type,
vlib_node_state_t dispatch_state,
vlib_frame_t * frame, u64 last_time_stamp)
{
uword n, v;
u64 t;
vlib_node_main_t *nm = &vm->node_main;
vlib_next_frame_t *nf;

if (CLIB_DEBUG > 0)
{
vlib_node_t *n = vlib_get_node (vm, node->node_index);
ASSERT (n->type == type);
}

/* Only non-internal nodes may be disabled. */
if (type != VLIB_NODE_TYPE_INTERNAL && node->state != dispatch_state)
{
ASSERT (type != VLIB_NODE_TYPE_INTERNAL);
return last_time_stamp;
}

if ((type == VLIB_NODE_TYPE_PRE_INPUT || type == VLIB_NODE_TYPE_INPUT)
&& dispatch_state != VLIB_NODE_STATE_INTERRUPT)
{
u32 c = node->input_main_loops_per_call;
/* Only call node when count reaches zero. */
if (c)
{
node->input_main_loops_per_call = c - 1;
return last_time_stamp;
}
}

/* Speculatively prefetch next frames. */
if (node->n_next_nodes > 0)
{
//node的业务逻辑肯定会用到nf
nf = vec_elt_at_index (nm->next_frames, node->next_frame_index);
CLIB_PREFETCH (nf, 4 * sizeof (nf[0]), WRITE);
}

vm->cpu_time_last_node_dispatch = last_time_stamp;

if (1 /* || vm->cpu_index == node->cpu_index */ )
{
vlib_main_t *stat_vm;

stat_vm = /* vlib_mains ? vlib_mains[0] : */ vm;

vlib_elog_main_loop_event (vm, node->node_index,
last_time_stamp,
frame ? frame->n_vectors : 0,
/* is_after */ 0);

/*
* Turn this on if you run into
* "bad monkey" contexts, and you want to know exactly
* which nodes they've visited... See ixge.c...
*/
if (VLIB_BUFFER_TRACE_TRAJECTORY && frame)
{
int i;
int log_index;
u32 *from;
from = vlib_frame_vector_args (frame);
for (i = 0; i < frame->n_vectors; i++)
{
vlib_buffer_t *b = vlib_get_buffer (vm, from[i]);
ASSERT (b->pre_data[0] < 32);
log_index = b->pre_data[0]++ + 1;
b->pre_data[log_index] = node->node_index;
}
n = node->function (vm, node, frame);
}
else
//业务逻辑
n = node->function (vm, node, frame);

t = clib_cpu_time_now ();

vlib_elog_main_loop_event (vm, node->node_index, t, n,    /* is_after */
1);

vm->main_loop_vectors_processed += n;
vm->main_loop_nodes_processed += n > 0;

v = vlib_node_runtime_update_stats (stat_vm, node,
/* n_calls */ 1,
/* n_vectors */ n,
/* n_clocks */ t - last_time_stamp);

/* When in interrupt mode and vector rate crosses threshold switch to
polling mode. */
//跟dpdk没关系了
if ((DPDK == 0 && dispatch_state == VLIB_NODE_STATE_INTERRUPT)
|| (DPDK == 0 && dispatch_state == VLIB_NODE_STATE_POLLING
&& (node->flags
& VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE)))
{
ELOG_TYPE_DECLARE (e) =
{
.function = (char *) __FUNCTION__,.format =
"%s vector length %d, switching to %s",.format_args =
"T4i4t4",.n_enum_strings = 2,.enum_strings =
{
"interrupt", "polling",},};
struct
{
u32 node_name, vector_length, is_polling;
} *ed;

if (dispatch_state == VLIB_NODE_STATE_INTERRUPT
&& v >= nm->polling_threshold_vector_length)
{
vlib_node_t *n = vlib_get_node (vm, node->node_index);
n->state = VLIB_NODE_STATE_POLLING;
node->state = VLIB_NODE_STATE_POLLING;
ASSERT (!
(node->flags &
VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE));
node->flags &=
~VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
node->flags |=
VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] -= 1;
nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] += 1;

ed = ELOG_DATA (&vm->elog_main, e);
ed->node_name = n->name_elog_string;
ed->vector_length = v;
ed->is_polling = 1;
}
else if (dispatch_state == VLIB_NODE_STATE_POLLING
&& v <= nm->interrupt_threshold_vector_length)
{
vlib_node_t *n = vlib_get_node (vm, node->node_index);
if (node->flags &
VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE)
{
/* Switch to interrupt mode after dispatch in polling one more time.
This allows driver to re-enable interrupts. */
n->state = VLIB_NODE_STATE_INTERRUPT;
node->state = VLIB_NODE_STATE_INTERRUPT;
node->flags &=
~VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] -=
1;
nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] +=
1;

}
else
{
node->flags |=
VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
ed = ELOG_DATA (&vm->elog_main, e);
ed->node_name = n->name_elog_string;
ed->vector_length = v;
ed->is_polling = 0;
}
}
}
}

return t;
}


有时间再详细分析,该函数背后机制比较复杂,但是跟vpp框架使用者关系不大。

static u64
dispatch_process (vlib_main_t * vm,
vlib_process_t * p, vlib_frame_t * f, u64 last_time_stamp)
{
vlib_node_main_t *nm = &vm->node_main;
vlib_node_runtime_t *node_runtime = &p->node_runtime;
vlib_node_t *node = vlib_get_node (vm, node_runtime->node_index);
u64 t;
uword n_vectors, is_suspend;

if (node->state != VLIB_NODE_STATE_POLLING
|| (p->flags & (VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK
| VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT)))
return last_time_stamp;

p->flags |= VLIB_PROCESS_IS_RUNNING;

t = last_time_stamp;
vlib_elog_main_loop_event (vm, node_runtime->node_index, t,
f ? f->n_vectors : 0, /* is_after */ 0);

/* Save away current process for suspend. */
nm->current_process_index = node->runtime_index;

n_vectors = vlib_process_startup (vm, p, f);

nm->current_process_index = ~0;

ASSERT (n_vectors != VLIB_PROCESS_RETURN_LONGJMP_RETURN);
is_suspend = n_vectors == VLIB_PROCESS_RETURN_LONGJMP_SUSPEND;
if (is_suspend)
{
vlib_pending_frame_t *pf;

n_vectors = 0;
pool_get (nm->suspended_process_frames, pf);
pf->node_runtime_index = node->runtime_index;
pf->frame_index = f ? vlib_frame_index (vm, f) : ~0;
pf->next_frame_index = ~0;

p->n_suspends += 1;
p->suspended_process_frame_index = pf - nm->suspended_process_frames;

if (p->flags & VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK)
timing_wheel_insert (&nm->timing_wheel, p->resume_cpu_time,
vlib_timing_wheel_data_set_suspended_process
(node->runtime_index));
}
else
p->flags &= ~VLIB_PROCESS_IS_RUNNING;

t = clib_cpu_time_now ();

vlib_elog_main_loop_event (vm, node_runtime->node_index, t, is_suspend,
/* is_after */ 1);

vlib_process_update_stats (vm, p,
/* n_calls */ !is_suspend,
/* n_vectors */ n_vectors,
/* n_clocks */ t - last_time_stamp);

return t;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: