您的位置:首页 > 运维架构 > Linux

Linux发送函数dev_queue_xmit分析&…

2013-12-19 20:55 447 查看
当上层准备好一个包之后,交给下面这个函数处理:

Java代码 name);
} else { if (net_ratelimit()) printk(KERN_CRIT "Dead loop on
virtual device " "%s, fix it urgently!\n", dev->name); } } rc =
-ENETDOWN; rcu_read_unlock_bh(); out_kfree_skb: kfree_skb(skb);
return rc; out: rcu_read_unlock_bh(); return rc; }
" quality="high" allowscriptaccess="always"
type="application/x-shockwave-flash"
pluginspage="http://www.macromedia.com/go/getflashplayer">

--转" />


int dev_queue_xmit(struct sk_buff *skb)

{

struct net_device *dev = skb->dev;

struct netdev_queue *txq;

struct Qdisc *q;

int rc = -ENOMEM;

if (netif_needs_gso(dev, skb))

goto gso;

//首先判断skb是否被分段,如果分了段并且网卡不支持分散读的话需要将所有段重新组合成一个段

//这里__skb_linearize其实就是__pskb_pull_tail(skb, skb->data_len),这个函数基本上等同于pskb_may_pull

//pskb_may_pull的作用就是检测skb对应的主buf中是否有足够的空间来pull出len长度,

//如果不够就重新分配skb并将frags中的数据拷贝入新分配的主buff中,而这里将参数len设置为skb->datalen,

//也就是会将所有的数据全部拷贝到主buff中,以这种方式完成skb的线性化

if (skb_shinfo(skb)->frag_list &&

!(dev->features & NETIF_F_FRAGLIST) &&

__skb_linearize(skb))

goto out_kfree_skb;

//如果上面已经线性化了一次,这里的__skb_linearize就会直接返回

//注意区别frags和frag_list,

//前者是将多的数据放到单独分配的页面中,sk_buff只有一个。而后者则是连接多个sk_buff

if (skb_shinfo(skb)->nr_frags &&

(!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&

__skb_linearize(skb))

goto out_kfree_skb;

if (skb->ip_summed == CHECKSUM_PARTIAL) {

skb_set_transport_header(skb, skb->csum_start -

skb_headroom(skb));

if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))

goto out_kfree_skb;

}

gso:

rcu_read_lock_bh();

//选择一个发送队列,如果设备提供了select_queue回调函数就使用它,否则由内核选择一个队列

//大部分驱动都不会设置多个队列,而是在调用alloc_etherdev分配net_device时将队列个数设置为1

//也就是只有一个队列

txq = dev_pick_tx(dev, skb);

//从netdev_queue结构上取下设备的qdisc

q = rcu_dereference(txq->qdisc);

#ifdef CONFIG_NET_CLS_ACT

skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);

#endif

//上面说大部分驱动只有一个队列,但是只有一个队列也不代表设备准备使用它

//这里检查这个队列中是否有enqueue函数,如果有则说明设备会使用这个队列,否则需另外处理

//关于enqueue函数的设置,我找到dev_open->dev_activate中调用了qdisc_create_dflt来设置,

//不知道一般驱动怎么设置这个queue

//需要注意的是,这里并不是将传进来的skb直接发送,而是先入队,然后调度队列,

//具体发送哪个包由enqueue和dequeue函数决定,这体现了设备的排队规则

if (q->enqueue) {

spinlock_t *root_lock = qdisc_lock(q);

spin_lock(root_lock);

if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {

kfree_skb(skb);

rc = NET_XMIT_DROP;

} else {

//将skb加入到设备发送队列中,然后调用qdisc_run来发送

rc = qdisc_enqueue_root(skb, q);

qdisc_run(q); //下面看

}

spin_unlock(root_lock);

goto out;

}

//下面是处理不使用发送队列的情况,注意看下面一段注释

//要确定设备是开启的,下面还要确定队列是运行的。启动和停止队列由驱动程序决定

//详见ULNI中文版P251

//如上面英文注释所说,设备没有输出队列典型情况是回环设备

//我们所要做的就是直接调用驱动的hard_start_xmit将它发送出去

//如果发送失败就直接丢弃,因为没有队列可以保存它

if (dev->flags & IFF_UP) {

int cpu = smp_processor_id();

if (txq->xmit_lock_owner != cpu) {

HARD_TX_LOCK(dev, txq, cpu);

if (!netif_tx_queue_stopped(txq)) {

rc = 0;

//对于loopback设备,它的hard_start_xmit函数是loopback_xmit

//我们可以看到,在loopback_xmit末尾直接调用了netif_rx函数

//将带发送的包直接接收了回来

//这个函数下面具体分析,返回0表示成功,skb已被free

if (!dev_hard_start_xmit(skb, dev, txq)) {

HARD_TX_UNLOCK(dev, txq);

goto out;

}

}

HARD_TX_UNLOCK(dev, txq);

if (net_ratelimit())

printk(KERN_CRIT "Virtual device %s asks to "

"queue packet!\n", dev->name);

} else {

if (net_ratelimit())

printk(KERN_CRIT "Dead loop on virtual device "

"%s, fix it urgently!\n", dev->name);

}

}

rc = -ENETDOWN;

rcu_read_unlock_bh();

out_kfree_skb:

kfree_skb(skb);

return rc;

out:

rcu_read_unlock_bh();

return rc;

}

从此函数可以看出,当驱动使用发送队列的时候会循环从队列中取出包发送

而不使用队列的时候只发送一次,如果没发送成功就直接丢弃

Java代码

--转" />


int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,

struct netdev_queue *txq)

{

if (likely(!skb->next)) {

//从这里可以看出,对于每一个发送的包也会给ptype_all一份,

//而packet套接字创建时对于proto为ETH_P_ALL的会在ptype_all中注册一个成员

//因此对于协议号为ETH_P_ALL的packet套接字来说,发送和接受的数据都能收到

//而其他成员似乎不行,这个要回去试试

if (!list_empty(&ptype_all))

dev_queue_xmit_nit(skb, dev);

if (netif_needs_gso(dev, skb)) {

if (unlikely(dev_gso_segment(skb)))

goto out_kfree_skb;

if (skb->next)

goto gso;

}

//这个就是驱动提供的发送回调函数了

return dev->hard_start_xmit(skb, dev);

}

gso:

do {

struct sk_buff *nskb = skb->next;

int rc;

skb->next = nskb->next;

nskb->next = NULL;

rc = dev->hard_start_xmit(nskb, dev);

if (unlikely(rc)) {

nskb->next = skb->next;

skb->next = nskb;

return rc;

}

if (unlikely(netif_tx_queue_stopped(txq) && skb->next))

return NETDEV_TX_BUSY;

} while (skb->next);

skb->destructor = DEV_GSO_CB(skb)->destructor;

out_kfree_skb:

kfree_skb(skb);

return 0;

}

qdisc_run和__qdisc_run的功能很简单,就是检查队列是否处于运行状态

Java代码

--转" />


static inline void qdisc_run(struct Qdisc *q)

{

if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))

__qdisc_run(q);

}

void __qdisc_run(struct Qdisc *q)

{

unsigned long start_time = jiffies;

//真正的操作在这个函数里面

while (qdisc_restart(q)) {

if (need_resched() || jiffies != start_time) {

//当需要进行调度或者时间超过了1个时间片的时候就退出循环,退出之前发出软中断请求

__netif_schedule(q);

break;

}

}

clear_bit(__QDISC_STATE_RUNNING, &q->state);

}

然后循环调用qdisc_restart发送数据

下面这个函数qdisc_restart是真正发送数据包的函数

它从队列上取下一个帧,然后尝试将它发送出去

若发送失败则一般是重新入队。

此函数返回值为:发送成功时返回剩余队列长度

发送失败时返回0(若发送成功且剩余队列长度为0也返回0)

Java代码 name, ret, q->q.qlen); ret =
dev_requeue_skb(skb, q); break; } if (ret &&
(netif_tx_queue_stopped(txq) || netif_tx_queue_frozen(txq))) ret =
0; return ret; } " quality="high" allowscriptaccess="always"
type="application/x-shockwave-flash"
pluginspage="http://www.macromedia.com/go/getflashplayer">

--转" />


static inline int qdisc_restart(struct Qdisc *q)

{

struct netdev_queue *txq;

int ret = NETDEV_TX_BUSY;

struct net_device *dev;

spinlock_t *root_lock;

struct sk_buff *skb;

if (unlikely((skb = dequeue_skb(q)) == NULL))

return 0;

root_lock = qdisc_lock(q);

spin_unlock(root_lock);

dev = qdisc_dev(q);

txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

HARD_TX_LOCK(dev, txq, smp_processor_id());

if (!netif_tx_queue_stopped(txq) &&

!netif_tx_queue_frozen(txq))

ret = dev_hard_start_xmit(skb, dev, txq);

HARD_TX_UNLOCK(dev, txq);

spin_lock(root_lock);

switch (ret) {

case NETDEV_TX_OK:

ret = qdisc_qlen(q);

break;

case NETDEV_TX_LOCKED:

//有可能其他CPU正占有这个锁,这里处理冲突

//函数就不看了,里面的处理流程简单说下

//分两种情况,如果占有锁的cpu就是当前cpu,那么释放这个包同时打印一条警告

//否则将包重新入队

ret = handle_dev_cpu_collision(skb, txq, q);

break;

default:

//当发送队列处于停止状态并且队列中有数据待发送时会返回NETDEV_TX_BUSY

//代表发送忙,这种情况下就将包重新入队

if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))

printk(KERN_WARNING "BUG %s code %d qlen %d\n",

dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);

break;

}

if (ret && (netif_tx_queue_stopped(txq) ||

netif_tx_queue_frozen(txq)))

ret = 0;

return ret;

}

至此,dev_queue_xmit到驱动层的发送流程就分析完了。

已经有了dev_queue_xmit函数,为什么还需要软中断来发送呢?

我们可以看到在dev_queue_xmit中将skb进行了一些处理(比如合并成一个包,计算校验和等)

处理完的skb是可以直接发送的了,这时dev_queue_xmit也会先将skb入队(skb一般都是在这个函数中入队的)

并且调用qdisc_run尝试发送,但是有可能发送失败,这时就将skb重新入队,调度软中断,并且自己直接返回。

软中断只是发送队列中的skb以及释放已经发送的skb,它无需再对skb进行线性化或者校验和处理

另外在队列被停止的情况下,dev_queue_xmit仍然可以把包加入队列,但是不能发送

这样在队列被唤醒的时候就需要通过软中断来发送停止期间积压的包

简而言之,dev_queue_xmit是对skb做些最后的处理并且第一次尝试发送,软中断是将前者发送失败或者没发完的包发送出去。

(其实发送软中断还有一个作用,就是释放已经发送的包,因为某些情况下发送是在硬件中断中完成的,

为了提高硬件中断处理效率,内核提供一种方式将释放skb放到软中断中进行,

这时只要调用dev_kfree_skb_irq,它将skb加入softnet_data的completion_queue中,然后开启发送软中断,

net_tx_action会在软中断中将completion_queue中的skb全部释放掉)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: