<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
	<channel>
		<atom:link href="http://gentoo-zh.org/extern.php?action=feed&amp;tid=480&amp;type=rss" rel="self" type="application/rss+xml" />
		<title><![CDATA[Gentoo中文社区 / linux源码解读（三十二）：dpdk核心源码解析（二）]]></title>
		<link>http://www.gentoo-zh.org/viewtopic.php?id=480</link>
		<description><![CDATA[linux源码解读（三十二）：dpdk核心源码解析（二） 最近发表的帖子。]]></description>
		<lastBuildDate>Sun, 05 Feb 2023 02:29:58 +0000</lastBuildDate>
		<generator>FluxBB</generator>
		<item>
			<title><![CDATA[Re: linux源码解读（三十二）：dpdk核心源码解析（二）]]></title>
			<link>http://www.gentoo-zh.org/viewtopic.php?pid=752#p752</link>
			<description><![CDATA[<p>下面增加的参考是很好的方便又疑问可以跳转查看，能够帮各位给容易理解，挺好的管理员batsom继续完善加油吧，希望社区越来越好</p>]]></description>
			<author><![CDATA[dummy@example.com (Urit)]]></author>
			<pubDate>Sun, 05 Feb 2023 02:29:58 +0000</pubDate>
			<guid>http://www.gentoo-zh.org/viewtopic.php?pid=752#p752</guid>
		</item>
		<item>
			<title><![CDATA[linux源码解读（三十二）：dpdk核心源码解析（二）]]></title>
			<link>http://www.gentoo-zh.org/viewtopic.php?pid=487#p487</link>
			<description><![CDATA[<p>dpdk是intel主导开发的网络编程框架， 有这么多的优点，都是怎么实现的了？</p><p>&#160; 1、UIO原理：dpdk绕过了操作系统内核，直接接管网卡，用户程序可以直接在3环读写网卡的数据，这就涉及到两个关键技术点了：</p><p>&#160; &#160; 地址映射：3环的程序是怎么定位到网卡数据存放在哪的了？<br />&#160; &#160; 拦截硬件中断：传统数据处理流程是网卡收到数据后通过硬件中断通知cpu来取数据，3环的程序肯定要拦截这个中断，然后通过轮询方式取数据，这个又是怎么实现的了？</p><p>&#160; &#160; &#160;（1）地址映射：3环程序最常使用的就是内存地址了，一共32或64bit；C/C++层面可以通过指针直接读写地址的值；除了内存，还有很多设备也需要和cpu交互数据，比如显示器：要在屏幕显示的内容肯定是需要用户指定的，用户程序可以把显示的内容发送到显示器指定的地方，然后再屏幕打印出来。为了方便用户程序发送数据，硬件层面会把显示器的部分存储空间映射到内存地址，做到了和内存条硬件的寻址方式一样，用户也可以直接通过指针往这里写数据（汇编层面直接通过mov指令操作即可）！网卡也类似：网卡是插在pci插槽的，网卡（或者说pci插槽）的存储空间也会映射到内存地址，应用程序读写这块物理地址就等同于读写网卡的存储空间！实际写代码时，由于要深入驱动，pci网卡预留物理的内存与io空间会保存到uio设备上，相当于将这些物理空间与io空间暴露给uio设备，应用程序访问这些uio设备即可！几个关键的函数如下：</p><p>&#160; 将pci网卡的物理内存空间以及io空间保存在uio设备结构struct uio_info中的mem成员以及port成员中，uio设备就知道了网卡的物理以及io空间。应用层访问这个uio设备的物理空间以及io空间，就相当于访问pci设备的物理以及io空间；本质上就是将pci网卡的空间暴露给uio设备。</p><div class="codebox"><pre><code>int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{
    //将pci内存，端口映射给uio设备
    struct rte_uio_pci_dev *udev;
    err = igbuio_setup_bars(dev, &amp;udev-&gt;info);
}
static int igbuio_setup_bars(struct pci_dev *dev, struct uio_info *info)
{
    //pci内存，端口映射给uio设备
    for (i = 0; i != sizeof(bar_names) / sizeof(bar_names[0]); i++) 
    {
        if (pci_resource_len(dev, i) != 0 &amp;&amp; pci_resource_start(dev, i) != 0) 
        {
            flags = pci_resource_flags(dev, i);
            if (flags &amp; IORESOURCE_MEM) 
            {
                //暴露pci的内存空间给uio设备
                ret = igbuio_pci_setup_iomem(dev, info, iom,  i, bar_names[i]);
            } 
            else if (flags &amp; IORESOURCE_IO) 
            {
                //暴露pci的io空间给uio设备
                ret = igbuio_pci_setup_ioport(dev, info, iop,  i, bar_names[i]);
            }
        }
    }
}</code></pre></div><p>&#160; （2）拦截硬件中断：为了减掉内核中冗余的数据处理流程，应用程序要hook网卡的中断，从源头开始拦截网卡数据！当硬件中断触发时，才不会一直触发内核去执行中断回调。也就是通过这种方式，才能在应用层实现硬件中断处理过程。注意：这里说的中断仅是控制中断，而不是报文收发的数据中断，数据中断是不会走到这里来的，因为在pmd开启中断时，没有设置收发报文的中断掩码，只注册了网卡状态改变的中断掩码；hook中断的代码如下：</p><div class="codebox"><pre class="vscroll"><code>int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{
    //填充uio信息
    udev-&gt;info.name = &quot;igb_uio&quot;;
    udev-&gt;info.version = &quot;0.1&quot;;
    udev-&gt;info.handler = igbuio_pci_irqhandler;        //硬件控制中断的入口，劫持原来的硬件中断
    udev-&gt;info.irqcontrol = igbuio_pci_irqcontrol;    //应用层开关中断时被调用，用于是否开始中断
}
static irqreturn_t igbuio_pci_irqhandler(int irq, struct uio_info *info)
{
    if (udev-&gt;mode == RTE_INTR_MODE_LEGACY &amp;&amp; !pci_check_and_mask_intx(udev-&gt;pdev))
    {
        return IRQ_NONE;
    }
    //返回IRQ_HANDLED时，linux uio框架会唤醒等待uio中断的进程。注册到epoll的uio中断事件就会被调度
    /* Message signal mode, no share IRQ and automasked */
    return IRQ_HANDLED;
}
static int igbuio_pci_irqcontrol(struct uio_info *info, s32 irq_state)
{
    //调用内核的api来开关中断
    if (udev-&gt;mode == RTE_INTR_MODE_LEGACY)
    {
        pci_intx(pdev, !!irq_state);
    }
    else if (udev-&gt;mode == RTE_INTR_MODE_MSIX)\
    {
        list_for_each_entry(desc, &amp;pdev-&gt;msi_list, list)
            igbuio_msix_mask_irq(desc, irq_state);
    }
}</code></pre></div><p>&#160; 2、内存池：传统应用要使用内存时，一般都是调用malloc让操作系统在堆上分配。这样做有两点弊端：</p><p>&#160; &#160; 进入内核要切换上下文<br />&#160; &#160; 操作系统通过buddy&amp;slab算法找合适的空闲内存</p><p>&#160; &#160; &#160; &#160;所以频繁调用malloc会严重拉低效率！如果不频繁调用malloc，怎么处理频繁收到和需要发送的报文数据了？dpdk采用的是内存池的技术：即在huge page内存中开辟一个连续的大缓冲区当做内存池！同时提供rte_mempool_get从内存池中获取内存空间。也可调用rte_mempool_put将不再使用的内存空间放回到内存池中。从这里就能看出：dpdk自己从huge page处维护了一大块内存供应用程序使用，应用程序不再需要通过系统调用从操作系统申请内存了！</p><p>&#160; &#160; &#160;（1）内存池的创建，在rte_mempool_create接口中完成。这个接口主要是在大页内存中开辟一个连续的大缓冲区当做内存池，然后将这个内存池进行分割，头部为struct rte_mempool内存池结构； 紧接着是内存池的私有结构大小，这个由应用层自己设置，每个创建内存池的应用进程都可以指定不同的私有结构； 最后是多个连续的对象元素，这些对象元素都是处于同一个内存池中。每个对象元素又有对象的头部，对象的真实数据区域，对象的尾部组成。这里所说的对象元素，其实就是应用层要开辟的真实数据空间，例如应用层自己定义的结构体变量等；本质上是dpdk自己实现了一套内存的管理办法，其作用和linux的buddy&amp;slab是一样的，没本质区别！整个内存池图示如下：<br /><span class="postimg"><img src="https://img2022.cnblogs.com/blog/2052730/202203/2052730-20220323181304388-472718071.png" alt="FluxBB bbcode 测试" /></span></p><p>&#160; &#160; &#160;每创建一个内存池，都会创建一个链表节点，然后插入到链表中，因此这个链表记录着当前系统创建了多少内存池。核心代码如下：</p><p>//创建内存池链表节点<br />te = rte_zmalloc(&quot;MEMPOOL_TAILQ_ENTRY&quot;, sizeof(*te), 0);<br />//内存池链表节点插入到内存池链表中<br />te-&gt;data = (void *) mp;<br />RTE_EAL_TAILQ_INSERT_TAIL(RTE_TAILQ_MEMPOOL, rte_mempool_list, te);</p><p>&#160; &#160;所以说内存池可能不止1个，会有多个！在内存池中，内存被划分成了N多的对象。应用程序要申请内存时，怎么知道哪些对象空闲可以用，哪些对象已经被占用了？当对象元素初始化完成后，会把对象指针放入ring队列，所以说ring队列的所有对象指针都是可以使用的！应用程序要申请内存时，可以调用rte_mempool_get接口从ring队列中获取，也就是出队； 使用完毕后调用rte_mempool_put将内存释放回收时，也是将要回收的内存空间对应的对象指针放到这个ring队列中，也就是入队！</p><p>&#160; （2）具体分配内存时的步骤：</p><p>&#160; &#160; 现代cpu基本都是多核的，多个cpu同时在内存池申请内存时无法避免涉及到互斥，会在一定程度上影响分配的效率，所以每个cpu自己都有自己的“自留地”，会优先在自己的“自留地”申请内存；<br />&#160; &#160; 如果“自留地”的内存已耗尽，才会继续去内存池申请内存！核心代码如下：</p><div class="codebox"><pre><code>int rte_mempool_get(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE &gt; 0
    //从当前cpu应用层缓冲区中获取
    cache = &amp;mp-&gt;local_cache[lcore_id];
    cache_objs = cache-&gt;objs;
    for (index = 0, len = cache-&gt;len - 1; index &lt; n; ++index, len--, obj_table++)
    {
        *obj_table = cache_objs[len];
    }
    return 0;
#endif
    /* get remaining objects from ring */
    //直接从ring队列中获取
    ret = rte_ring_sc_dequeue_bulk(mp-&gt;ring, obj_table, n);
}</code></pre></div><p>&#160; 释放内存的步骤和申请类似：</p><p>&#160; &#160; 先查看cpu的“自留地”是否还有空间。如果有，就先把释放的对象指针放在“自留地”；<br />&#160; &#160; 如果“自留地”没空间了，再把释放的对象指针放在内存池！核心代码如下：</p><div class="codebox"><pre><code>int rte_mempool_put(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE &gt; 0
    //在当前cpu本地缓存有空间的场景下， 先放回到本地缓存。
    cache = &amp;mp-&gt;local_cache[lcore_id];
    cache_objs = &amp;cache-&gt;objs[cache-&gt;len];
    for (index = 0; index &lt; n; ++index, obj_table++)
    {
        cache_objs[index] = *obj_table;
    }
    //缓冲达到阈值，刷到队列中
    if (cache-&gt;len &gt;= flushthresh) 
    {
        rte_ring_mp_enqueue_bulk(mp-&gt;ring, &amp;cache-&gt;objs[cache_size], cache-&gt;len - cache_size);
        cache-&gt;len = cache_size;
    }
        return 0
#endif
    //直接放回到ring队列
    rte_ring_sp_enqueue_bulk(mp-&gt;ring, obj_table, n);
}</code></pre></div><p>&#160; 注意：这里的ring是环形无锁队列！</p><p>&#160; &#160; 3、Poll mode driver： 不论何总形式的io，接收方获取数据的方式有两种：</p><p>&#160; &#160; &#160;被动接收中断的唤醒：典型如网卡收到数据，通过硬件中断通知操作系统去处理；操作系统收到数据后会唤醒休眠的进程继续处理数据<br />&#160; &#160; &#160; &#160; 轮询 poll：写个死循环不停的检查内存地址是否有新数据到了！</p><p>&#160; &#160;在 x86 体系结构中，一次中断处理需要将 CPU 的状态寄存器保存到堆栈，并运行中断handler，最后再将保存的状态寄存器信息从堆栈中恢复，整个过程需要至少 300 个处理器时钟周期！所以dpdk果断抛弃了中断，转而使用轮询方式！整个流程大致是这样的：内核态的UIO Driver&#160; hook了网卡发出的中断信号，然后由用户态的 PMD Driver 采用主动轮询的方式。除了链路状态通知仍必须采用中断方式以外（因为网卡发出硬件中断才能触发执行hook代码的嘛，这个容易理解吧？），均使用无中断方式直接操作网卡设备的接收和发送队列。整体流程大致如下：UIO hook了网卡的中断，网卡收到数据后“被迫”执行hook代码！先是通过UIO把网卡的存储地址映射到/dev/uio文件，而后应用程序通过PMD轮询检查文件是否有新数据到来！期间也使用mmap把应用的虚拟地址映射到网卡的物理地址，减少数据的拷贝转移！<br /><span class="postimg"><img src="https://img2022.cnblogs.com/blog/2052730/202203/2052730-20220323200841798-31663941.png" alt="FluxBB bbcode 测试" /></span></p><p>&#160; &#160;总的来说：UIO+PMD，前者旁路了内核，后者主动轮询避免了硬中断，DPDK 从而可以在用户态进行收发包的处理。带来了零拷贝（Zero Copy）、无系统调用（System call）的优化。同时，还避免了软中断的异步处理，也减少了上下文切换带来的 Cache Miss！轮询收报核心代码如下：</p><div class="codebox"><pre class="vscroll"><code>/*PMD轮询接收数据包*/
uint16_t
eth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
        uint16_t nb_pkts)
{
    /* volatile防止编译器优化,每次使用必须又一次从memory中取而不是用寄存器的值 */
    volatile struct e1000_rx_desc *rx_ring;
    volatile struct e1000_rx_desc *rxdp;//指向rx ring中某个e1000_rx_desc描述符
    struct em_rx_queue *rxq;//整个接收队列
    struct em_rx_entry *sw_ring;//指向描述符队列的头部，根据rx tail来偏移
    struct em_rx_entry *rxe;//指向sw ring中具体的entry
    struct rte_mbuf *rxm;//entry里的rte mbuf
    /*是new mbuf，新申请的mbuf，当rxm从ring中取出后，需要用nmb再挂上去，
      更新对应rx ring和sw ring中的值，为下一次收包做准备*/
    struct rte_mbuf *nmb;
    struct e1000_rx_desc rxd;//具体的非指针描述符
    uint64_t dma_addr;
    uint16_t pkt_len;
    uint16_t rx_id;
    uint16_t nb_rx;
    uint16_t nb_hold;
    uint8_t status;

    rxq = rx_queue;

    nb_rx = 0;
    nb_hold = 0;
    //初始化临时变量，要开始遍历队列了
    rx_id = rxq-&gt;rx_tail;
    rx_ring = rxq-&gt;rx_ring;
    sw_ring = rxq-&gt;sw_ring;
    /* 一次性收32个报文 */
    while (nb_rx &lt; nb_pkts) {
        /*
         * The order of operations here is important as the DD status
         * bit must not be read after any other descriptor fields.
         * rx_ring and rxdp are pointing to volatile data so the order
         * of accesses cannot be reordered by the compiler. If they were
         * not volatile, they could be reordered which could lead to
         * using invalid descriptor fields when read from rxd.
         */
        /* 当前报文的descriptor */
        rxdp = &amp;rx_ring[rx_id];
        status = rxdp-&gt;status; /* 结束标记,必须首先读取 */
        /*检查状态是否为dd, 不是则说明驱动还没有把报文放到接收队列，直接退出*/
        if (! (status &amp; E1000_RXD_STAT_DD))
            break;
        rxd = *rxdp; /* 复制一份 */

        /*
         * End of packet.
         *
         * If the E1000_RXD_STAT_EOP flag is not set, the RX packet is
         * likely to be invalid and to be dropped by the various
         * validation checks performed by the network stack.
         *
         * Allocate a new mbuf to replenish the RX ring descriptor.
         * If the allocation fails:
         *    - arrange for that RX descriptor to be the first one
         *      being parsed the next time the receive function is
         *      invoked [on the same queue].
         *
         *    - Stop parsing the RX ring and return immediately.
         *
         * This policy do not drop the packet received in the RX
         * descriptor for which the allocation of a new mbuf failed.
         * Thus, it allows that packet to be later retrieved if
         * mbuf have been freed in the mean time.
         * As a side effect, holding RX descriptors instead of
         * systematically giving them back to the NIC may lead to
         * RX ring exhaustion situations.
         * However, the NIC can gracefully prevent such situations
         * to happen by sending specific &quot;back-pressure&quot; flow control
         * frames to its peer(s).
         */
        PMD_RX_LOG(DEBUG, &quot;port_id=%u queue_id=%u rx_id=%u &quot;
               &quot;status=0x%x pkt_len=%u&quot;,
               (unsigned) rxq-&gt;port_id, (unsigned) rxq-&gt;queue_id,
               (unsigned) rx_id, (unsigned) status,
               (unsigned) rte_le_to_cpu_16(rxd.length));

        nmb = rte_mbuf_raw_alloc(rxq-&gt;mb_pool);
        if (nmb == NULL) {
            PMD_RX_LOG(DEBUG, &quot;RX mbuf alloc failed port_id=%u &quot;
                   &quot;queue_id=%u&quot;,
                   (unsigned) rxq-&gt;port_id,
                   (unsigned) rxq-&gt;queue_id);
            rte_eth_devices[rxq-&gt;port_id].data-&gt;rx_mbuf_alloc_failed++;
            break;
        }
        
        /* 表示当前descriptor被上层软件占用 */
        nb_hold++;
        /* 当前收到的mbuf */
        rxe = &amp;sw_ring[rx_id];
        /* 收包位置,假设超过环状数组则回滚 */
        rx_id++;
        if (rx_id == rxq-&gt;nb_rx_desc)
            rx_id = 0;

        /* mbuf加载cache下次循环使用 */
        /* Prefetch next mbuf while processing current one. */
        rte_em_prefetch(sw_ring[rx_id].mbuf);

        /*
         * When next RX descriptor is on a cache-line boundary,
         * prefetch the next 4 RX descriptors and the next 8 pointers
         * to mbufs.
         */
         /* 取下一个descriptor,以及mbuf指针下次循环使用 */
        /* 一个cache line是4个descriptor大小(64字节) */
        if ((rx_id &amp; 0x3) == 0) {
            rte_em_prefetch(&amp;rx_ring[rx_id]);
            rte_em_prefetch(&amp;sw_ring[rx_id]);
        }

        /* Rearm RXD: attach new mbuf and reset status to zero. */

        rxm = rxe-&gt;mbuf;
        rxe-&gt;mbuf = nmb;
        dma_addr =
            rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
        rxdp-&gt;buffer_addr = dma_addr;
        rxdp-&gt;status = 0;/* 重置当前descriptor的status */

        /*
         * Initialize the returned mbuf.
         * 1) setup generic mbuf fields:
         *    - number of segments,
         *    - next segment,
         *    - packet length,
         *    - RX port identifier.
         * 2) integrate hardware offload data, if any:
         *    - RSS flag &amp; hash,
         *    - IP checksum flag,
         *    - VLAN TCI, if any,
         *    - error flags.
         */
        pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.length) -
                rxq-&gt;crc_len);
        rxm-&gt;data_off = RTE_PKTMBUF_HEADROOM;
        rte_packet_prefetch((char *)rxm-&gt;buf_addr + rxm-&gt;data_off);
        rxm-&gt;nb_segs = 1;
        rxm-&gt;next = NULL;
        rxm-&gt;pkt_len = pkt_len;
        rxm-&gt;data_len = pkt_len;
        rxm-&gt;port = rxq-&gt;port_id;

        rxm-&gt;ol_flags = rx_desc_status_to_pkt_flags(status);
        rxm-&gt;ol_flags = rxm-&gt;ol_flags |
                rx_desc_error_to_pkt_flags(rxd.errors);

        /* Only valid if PKT_RX_VLAN set in pkt_flags */
        rxm-&gt;vlan_tci = rte_le_to_cpu_16(rxd.special);

        /*
         * Store the mbuf address into the next entry of the array
         * of returned packets.
         */
          /* 把收到的mbuf返回给用户 */
        rx_pkts[nb_rx++] = rxm;
    }
     /* 收包位置更新 */
    rxq-&gt;rx_tail = rx_id;

    /*
     * If the number of free RX descriptors is greater than the RX free
     * threshold of the queue, advance the Receive Descriptor Tail (RDT)
     * register.
     * Update the RDT with the value of the last processed RX descriptor
     * minus 1, to guarantee that the RDT register is never equal to the
     * RDH register, which creates a &quot;full&quot; ring situtation from the
     * hardware point of view...
     */
    nb_hold = (uint16_t) (nb_hold + rxq-&gt;nb_rx_hold);
    if (nb_hold &gt; rxq-&gt;rx_free_thresh) {
        PMD_RX_LOG(DEBUG, &quot;port_id=%u queue_id=%u rx_tail=%u &quot;
               &quot;nb_hold=%u nb_rx=%u&quot;,
               (unsigned) rxq-&gt;port_id, (unsigned) rxq-&gt;queue_id,
               (unsigned) rx_id, (unsigned) nb_hold,
               (unsigned) nb_rx);
        rx_id = (uint16_t) ((rx_id == 0) ?
            (rxq-&gt;nb_rx_desc - 1) : (rx_id - 1));
        E1000_PCI_REG_WRITE(rxq-&gt;rdt_reg_addr, rx_id);
        nb_hold = 0;
    }
    rxq-&gt;nb_rx_hold = nb_hold;
    return nb_rx;
}</code></pre></div><p>&#160; &#160;接收报文的整理流程梳理如下图所示：</p><p>&#160; &#160; DMA控制器控制报文一个个写到rx ring中接收描述符指定的IO虚拟内存中，对应的实际内存应该就是mbuf；<br />&#160; &#160; 接收函数用rx tail变量控制不停地读取rx ring中的描述符和sw ring中的mbuf，并申请新的mbuf放入sw ring中，更新rx ring中的buffer addr<br />&#160; &#160; 最后把读取的mbuf返回给应用程序。<br /><span class="postimg"><img src="https://img2022.cnblogs.com/blog/2052730/202203/2052730-20220323212325355-1264912920.png" alt="FluxBB bbcode 测试" /></span></p><p>&#160; &#160;4、线程亲和性</p><p>&#160; &#160;一个cpu上可以运行多个线程， 由linux内核来调度各个线程的执行。内核在调度线程时，会进行上下文切换，保存线程的堆栈等信息， 以便这个线程下次再被调度执行时，继续从指定的位置开始执行。然而上下文切换是需要耗费cpu资源的的。多核体系的CPU，物理核上的线程来回切换，会导致L1/L2 cache命中率的下降。同时NUMA架构下，如果操作系统调度线程的时候，跨越了NUMA节点，将会导致大量的L3 cache的丢失。Linux对线程的亲和性是有支持的, 如果将线程和cpu进行绑定的话，线程会一直在指定的cpu上运行，不会被操作系统调度到别的cpu上，线程之间互相独立工作而不会互相扰完，节省了操作系统来回调度的时间。目前DPDK通过把线程绑定到cpu的方法来避免跨核任务中的切换开销。</p><p>&#160; 线程绑定cpu物理核的函数如下：</p><div class="codebox"><pre><code>/* set affinity for current EAL thread */
static int
eal_thread_set_affinity(void)
{
    unsigned lcore_id = rte_lcore_id();

    /* acquire system unique id  */
    rte_gettid();

    /* update EAL thread core affinity */
    return rte_thread_set_affinity(&amp;lcore_config[lcore_id].cpuset);
}</code></pre></div><p>&#160; 继续往下走：</p><div class="codebox"><pre class="vscroll"><code>/*
    根据前面的rte_cpuset_t ,设置tid的绑定关系
    存储thread local socket_id
    存储thread local rte_cpuset_t
*/
int
rte_thread_set_affinity(rte_cpuset_t *cpusetp)
{
    int s;
    unsigned lcore_id;
    pthread_t tid;

    tid = pthread_self();//得到当前线程id
    //绑定cpu和线程
    s = pthread_setaffinity_np(tid, sizeof(rte_cpuset_t), cpusetp);
    if (s != 0) {
        RTE_LOG(ERR, EAL, &quot;pthread_setaffinity_np failed\n&quot;);
        return -1;
    }

    /* store socket_id in TLS for quick access */
    //socketid存放到线程本地空间，便于快速读取
    RTE_PER_LCORE(_socket_id) =
        eal_cpuset_socket_id(cpusetp);

    /* store cpuset in TLS for quick access */
    //cpu信息存放到cpu本地空间，便于快速读取
    memmove(&amp;RTE_PER_LCORE(_cpuset), cpusetp,
        sizeof(rte_cpuset_t));

    lcore_id = rte_lcore_id();//获取线程绑定的CPU
    if (lcore_id != (unsigned)LCORE_ID_ANY) {//如果不相等，就更新lcore配置
        /* EAL thread will update lcore_config */
        lcore_config[lcore_id].socket_id = RTE_PER_LCORE(_socket_id);
        memmove(&amp;lcore_config[lcore_id].cpuset, cpusetp,
            sizeof(rte_cpuset_t));
    }

    return 0;
}</code></pre></div><p>&#160; 继续往下走：</p><div class="codebox"><pre class="vscroll"><code>int
pthread_setaffinity_np(pthread_t thread, size_t cpusetsize,
               const rte_cpuset_t *cpuset)
{
    if (override) {
        /* we only allow affinity with a single CPU */
        if (CPU_COUNT(cpuset) != 1)
            return POSIX_ERRNO(EINVAL);

        /* we only allow the current thread to sets its own affinity */
        struct lthread *lt = (struct lthread *)thread;

        if (lthread_current() != lt)
            return POSIX_ERRNO(EINVAL);

        /* determine the CPU being requested */
        int i;

        for (i = 0; i &lt; LTHREAD_MAX_LCORES; i++) {
            if (!CPU_ISSET(i, cpuset))
                continue;
            break;
        }
        /* check requested core is allowed */
        if (i == LTHREAD_MAX_LCORES)
            return POSIX_ERRNO(EINVAL);

        /* finally we can set affinity to the requested lcore 
        前面做了大量的检查和容错，这里终于开始绑定cpu了
        */
        lthread_set_affinity(i);
        return 0;
    }
    return _sys_pthread_funcs.f_pthread_setaffinity_np(thread, cpusetsize,
                               cpuset);
}</code></pre></div><p>&#160; 绑定cpu的方法也简单：本质就是个上下文切换</p><div class="codebox"><pre class="vscroll"><code>/*
 * migrate the current thread to another scheduler running
 * on the specified lcore.
 */
int lthread_set_affinity(unsigned lcoreid)
{
    struct lthread *lt = THIS_LTHREAD;
    struct lthread_sched *dest_sched;

    if (unlikely(lcoreid &gt;= LTHREAD_MAX_LCORES))
        return POSIX_ERRNO(EINVAL);

    DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);

    dest_sched = schedcore[lcoreid];

    if (unlikely(dest_sched == NULL))
        return POSIX_ERRNO(EINVAL);

    if (likely(dest_sched != THIS_SCHED)) {
        lt-&gt;sched = dest_sched;
        lt-&gt;pending_wr_queue = dest_sched-&gt;pready;
        //真正切换线程到指定cpu运行的代码
        _affinitize();
        return 0;
    }
    return 0;
}
tatic __rte_always_inline void
_affinitize(void);
static inline void
_affinitize(void)
{
    struct lthread *lt = THIS_LTHREAD;

    DIAG_EVENT(lt, LT_DIAG_LTHREAD_SUSPENDED, 0, 0);
    ctx_switch(&amp;(THIS_SCHED)-&gt;ctx, &amp;lt-&gt;ctx);
}
void
ctx_switch(struct ctx *new_ctx __rte_unused, struct ctx *curr_ctx __rte_unused)
{
    /* SAVE CURRENT CONTEXT */
    asm volatile (
        /* Save SP */
        &quot;mov x3, sp\n&quot;
        &quot;str x3, [x1, #0]\n&quot;

        /* Save FP and LR */
        &quot;stp x29, x30, [x1, #8]\n&quot;

        /* Save Callee Saved Regs x19 - x28 */
        &quot;stp x19, x20, [x1, #24]\n&quot;
        &quot;stp x21, x22, [x1, #40]\n&quot;
        &quot;stp x23, x24, [x1, #56]\n&quot;
        &quot;stp x25, x26, [x1, #72]\n&quot;
        &quot;stp x27, x28, [x1, #88]\n&quot;

        /*
         * Save bottom 64-bits of Callee Saved
         * SIMD Regs v8 - v15
         */
        &quot;stp d8, d9, [x1, #104]\n&quot;
        &quot;stp d10, d11, [x1, #120]\n&quot;
        &quot;stp d12, d13, [x1, #136]\n&quot;
        &quot;stp d14, d15, [x1, #152]\n&quot;
    );

    /* RESTORE NEW CONTEXT */
    asm volatile (
        /* Restore SP */
        &quot;ldr x3, [x0, #0]\n&quot;
        &quot;mov sp, x3\n&quot;

        /* Restore FP and LR */
        &quot;ldp x29, x30, [x0, #8]\n&quot;

        /* Restore Callee Saved Regs x19 - x28 */
        &quot;ldp x19, x20, [x0, #24]\n&quot;
        &quot;ldp x21, x22, [x0, #40]\n&quot;
        &quot;ldp x23, x24, [x0, #56]\n&quot;
        &quot;ldp x25, x26, [x0, #72]\n&quot;
        &quot;ldp x27, x28, [x0, #88]\n&quot;

        /*
         * Restore bottom 64-bits of Callee Saved
         * SIMD Regs v8 - v15
         */
        &quot;ldp d8, d9, [x0, #104]\n&quot;
        &quot;ldp d10, d11, [x0, #120]\n&quot;
        &quot;ldp d12, d13, [x0, #136]\n&quot;
        &quot;ldp d14, d15, [x0, #152]\n&quot;
    );
}</code></pre></div><p> </p><p> </p><p> </p><p>参考：</p><p>1、https://blog.csdn.net/ApeLife/article/details/100751359&#160; uio驱动实现</p><p>2、https://blog.csdn.net/ApeLife/article/details/100006695&#160; &#160;内存池的实现</p><p>3、https://blog.51cto.com/u_15076236/4624576&#160; PMD优化</p><p>4、https://blog.csdn.net/jeawayfox/article/details/105189788&#160; dpdk接收报文</p><p>5、https://blog.csdn.net/u012630961/article/details/80918682 dpdk线程亲和性</p><p>6、https://zhuanlan.zhihu.com/p/366155783&#160; dpdk多线程模型</p>]]></description>
			<author><![CDATA[dummy@example.com (batsom)]]></author>
			<pubDate>Wed, 12 Oct 2022 02:56:07 +0000</pubDate>
			<guid>http://www.gentoo-zh.org/viewtopic.php?pid=487#p487</guid>
		</item>
	</channel>
</rss>
