<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
	<channel>
		<atom:link href="http://gentoo-zh.org/extern.php?action=feed&amp;tid=828&amp;type=rss" rel="self" type="application/rss+xml" />
		<title><![CDATA[Gentoo-zh / Gentoo 之 Posix Message Queues]]></title>
		<link>http://www.gentoo-zh.org/viewtopic.php?id=828</link>
		<description><![CDATA[Gentoo 之 Posix Message Queues 最近发表的帖子。]]></description>
		<lastBuildDate>Tue, 12 Mar 2024 18:46:38 +0000</lastBuildDate>
		<generator>FluxBB</generator>
		<item>
			<title><![CDATA[Gentoo 之 Posix Message Queues]]></title>
			<link>http://www.gentoo-zh.org/viewtopic.php?pid=944#p944</link>
			<description><![CDATA[<p>特点</p><p>&#160; &#160; 1.向 Posix 消息队列中写入消息时并不需要有读者进程存在，与 pipes 与 FIFOs 的行为不同<br />&#160; &#160; 2.消息队列具有内核持久性，一个消息只有在被读取后才会释放，并不会因为写入消息的进程死亡而释放<br />&#160; &#160; 3.在一个 Posix 消息队列上读取永远返回优先级最高的消息类型中最老的消息，而 System V 消息队列则能够返回任何目标优先级的消息<br />&#160; &#160; 4.Posix 消息队列允许当有一个消息被放进了空的队列时产生信号、初始化一个线程来通知消费者，System V 消息队列没有此功能</p><p>消息队列的释放</p><p>Posix 消息队列内部维护了一个引用计数，当引用计数大于 0 的时候目标消息队列能够从系统中移除，但是队列的释放仅在最后一次 mq_close 发生时才会触发。<br />mq_notify 函数</p><p>mq_notify 函数为 Posix 消息队列提供了一种异步通知机制，当消息被放到队列中时通知消费者进程， System V 消息队列就不具备这样的能力。</p><p>在调用 msgrcv 函数从 System V 消息队列中接收消息时进程可以挂起等待消息，但是在挂起期间不能执行任何其他任务。如果指定 NONBLOCK 标志调用 msgrcv 函数，进程不再阻塞但是却要持续调用此函数以确定队列中是否有数据到来，会浪费 cpu 时间。</p><p>Posix 消息队列支持通过如下两种方式来异步通知一个空的队列中有新的消息到来：</p><p>&#160; &#160; 发送一个信号<br />&#160; &#160; 创建一个线程来执行指定的函数</p><p>这两种机制通过指定不同的参数调用 mq_notify 函数来选择，mq_notify 函数的原型如下：</p><p>int mq_notify(mqd_t mqdes, const struct sigevent *sevp);</p><p>&#160; &#160; 1</p><p>mq_notify 使用规则如下：</p><p>&#160; &#160; 如果 sevp 参数非空，那么当前进程希望在空队列中有新的消息到达时被通知。我们说“该进程被注册为接收该队列的通知”。<br />&#160; &#160; 如果 sevp 参数为空指针且当前进程已经使用 mq_notify 注册接收队列通知，己存在的注册将被移除。<br />&#160; &#160; 同一时刻只支持单个进程调用 mq_notify 注册为接收某个特定队列的通知时间。<br />&#160; &#160; 当有一个消息到达某个先前为空的队列，而且己有一个进程被注册为接收该队列的通知时，只有在没有任何线程阻塞在该队列的 mq_receive 调用中的前提下，通知才会发出。这就是说，在mq_reveive 调用中的阻塞比任何注册的通知具有更高优先级。<br />&#160; &#160; 当该通知被发送给它的注册进程时，其注册事件被移除。该进程必须再次调用 mq_notify 以重新注册。</p><p>UNPV2 中提供了这两种不同方案的示例代码，我分别描述下关键的流程。<br />mq_notify 使用信号通知消息到达<br />直接在信号处理函数中调用 mq_notify 与 mq_receive 函数来接收数据</p><div class="codebox"><pre><code>int main(int argc, char *argv[])
{
....................................
				Signal(SIGUSR1, sig_usr1);
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
        Mq_notify(mqd, &amp;sigev);
....................................
}

static void
sig_usr1(int signo)
{
        ssize_t n;

        Mq_notify(mqd, &amp;sigev);                 /* reregister first */
        n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
        printf(&quot;SIGUSR1 received, read %ld bytes\n&quot;, (long) n);
        return;
}
 </code></pre></div><p>上述代码实现了 SIGUSR1 的信号处理函数并配置 mq_notify 使用信号通知机制，通知信号为 SIGUSR。</p><p>main 函数中注册了 SIGUSR1 信号的处理函数 sig_usr1，此函数的逻辑如下：</p><p>&#160; &#160; 调用 Mq_notify 重新注册通知事件<br />&#160; &#160; 调用 Mq_receive 接收消息然后打印接收到的字节数</p><p>此实现存在的问题为不应该在信号处理函数中调用 mq_notify、mq_receive、printf 函数，这些函数并不是异步信号安全的函数。<br />在信号处理函数中设置标志在程序主逻辑中调用 mq_notify 与 mq_receive 函数来接收数据</p><p>核心代码如下：</p><div class="codebox"><pre><code> for ( ; ; ) {
                Sigprocmask(SIG_BLOCK, &amp;newmask, &amp;oldmask);     /* block SIGUSR1 */
                while (mqflag == 0)
                        sigsuspend(&amp;zeromask);
                mqflag = 0;             /* reset flag */

                Mq_notify(mqd, &amp;sigev);                 /* reregister first */
                n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
                printf(&quot;read %ld bytes\n&quot;, (long) n);
                Sigprocmask(SIG_UNBLOCK, &amp;newmask, NULL);       /* unblock SIGUSR1 */
        }

static void
sig_usr1(int signo)
{
        mqflag = 1;
        return;
}</code></pre></div><p>SIGUSR1 信号处理程序中仅仅设置一个全局变量 mqflag 的值，在程序主逻辑中调用 mq_notify 与 mq_receive 来接收消息。</p><p>上述代码首先修改当前线程的信号掩码，临时关闭 SIGUSR1，然后执行 sigsuspend 等待 SIGUSR1 信好到来。</p><p>sigsuspend 函数会使用 zeromask 表示的 signal mask 修改当前线程的 signal mask，然后挂起当前线程，直到有一个会执行 signal handler、终止进程的目标信号产生。当收到信号并终止进程时，sigsuspend 将不会返回。如果成功捕获到信号，sigsuspend 将会在信号处理函数执行后返回，signal 将会被恢复为调用 sigsuspend 函数之前的状态。</p><p>在 sigsuspend 返回后，程序重置 mqflag 标志并调用 Mq_notify 与 Mq_receive 接收消息并打印接收的字节数，最后调用 Sigprocmask unblock SIGUSR1 信号。<br />此实现存在如下问题：</p><p>由于通知消息仅在有一条新的消息被放到空的队列时产生，如果在我们能够读取第一个消息前有两个消息达到，那么只有一个通知事件产生，于是我们读取第一个消息并调用 sigsuspend 等待另一个消息，而后续可能没有新的消息产生，这样我们就会漏掉第二个消息。</p><p>为了解决这个问题，我们可以在 Mq_receive 的时候多次读取队列，这样就不会漏掉消息。示例代码如下：</p><div class="codebox"><pre><code> for ( ; ; ) {
                Sigprocmask(SIG_BLOCK, &amp;newmask, &amp;oldmask);     /* block SIGUSR1 */
                while (mqflag == 0)
                        sigsuspend(&amp;zeromask);
                mqflag = 0;             /* reset flag */

                Mq_notify(mqd, &amp;sigev);                 /* reregister first */
                while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) &gt;= 0) {
                        printf(&quot;read %ld bytes\n&quot;, (long) n);
                }
                if (errno != EAGAIN)
                        err_sys(&quot;mq_receive error&quot;);
                Sigprocmask(SIG_UNBLOCK, &amp;newmask, NULL);       /* unblock SIGUSR1 */
        }</code></pre></div><p>在信号处理函数中设置标志在程序主逻辑中调用 sigwait 等待信号然后调用 Mq_notify 与 Mq_receive 接收数据</p><p>上文描述了在信号处理函数中设置标志的方式，一个更简单的方式是在一个函数中阻塞等待内核发送目标信号，可以通过 sigwait 函数来实现。</p><p>新的代码如下：</p><div class="codebox"><pre><code> for ( ; ; ) {
                Sigwait(&amp;newmask, &amp;signo);
                if (signo == SIGUSR1) {
                        Mq_notify(mqd, &amp;sigev);                 /* reregister first */
                        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) &gt;= 0) {
                                printf(&quot;read %ld bytes\n&quot;, (long) n);
                        }
                        if (errno != EAGAIN)
                                err_sys(&quot;mq_receive error&quot;);
                }
        }</code></pre></div><p>sigwait 函数将会挂起当前线程直到 signal set 中指定的信号到来，此函数会接收这个信号（将信号从信号 pending list 中移除），然后通过第二个参数返回信号值。</p><p>上面的代码进一步简化，只调用 Sigwait，然后调用 Mq_notify、mq_receive，比使用 sigsuspend 更简单。<br />使用 select 监听 Posix 消息队列</p><p>Posix 消息队列描述符并不是一个普通的描述符不能使用 select、epoll 函数监控此描述符。可以使用 mq_notify + pipe 的方式，mq_notify 注册监控消息队列事件，通知方式为信号，在程序初始化时创建一个 pipe，在信号处理函数中调用 write 向这个管道的 fd 中写入数据，在主程序循环中 select pipe 的 fd 来间接的监听 Posix 消息队列。 write 系统调用是异步信号安全的函数，在信号处理函数中调用不会产生问题。</p><p>示例代码如下：</p><div class="codebox"><pre class="vscroll"><code>				Pipe(pipefd);

                /* 4establish signal handler, enable notification */
        Signal(SIGUSR1, sig_usr1);
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
        Mq_notify(mqd, &amp;sigev);

        FD_ZERO(&amp;rset);
        for ( ; ; ) {
                FD_SET(pipefd[0], &amp;rset);
                nfds = Select(pipefd[0] + 1, &amp;rset, NULL, NULL, NULL);

                if (FD_ISSET(pipefd[0], &amp;rset)) {
                        Read(pipefd[0], &amp;c, 1);
                        Mq_notify(mqd, &amp;sigev);                 /* reregister first */
                        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) &gt;= 0) {
                                printf(&quot;read %ld bytes\n&quot;, (long) n);
                        }
                        if (errno != EAGAIN)
                                err_sys(&quot;mq_receive error&quot;);
                }
        }

static void
sig_usr1(int signo)
{
        Write(pipefd[1], &quot;&quot;, 1);        /* one byte of 0 */
        return;
}
 </code></pre></div><p>向 pipe 中写入的数据内容并不重要，重要的是写入这个动作触发 select 系统调用捕获事件，间接的绑定到 Posix 消息队列的通知事件。<br />创建一个线程的执行函数的方式</p><p>示例代码如下：</p><div class="codebox"><pre class="vscroll"><code>int
main(int argc, char **argv)
{
        if (argc != 2)
                err_quit(&quot;usage: mqnotifythread1 &lt;name&gt;&quot;);

        mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
        Mq_getattr(mqd, &amp;attr);

        sigev.sigev_notify = SIGEV_THREAD;
        sigev.sigev_value.sival_ptr = NULL;
        sigev.sigev_notify_function = notify_thread;
        sigev.sigev_notify_attributes = NULL;
        Mq_notify(mqd, &amp;sigev);

        for ( ; ; )
                pause();                /* each new thread does everything */

        exit(0);
}

static void
notify_thread(union sigval arg)
{
        ssize_t n;
        void    *buff;

        printf(&quot;notify_thread started\n&quot;);
        buff = Malloc(attr.mq_msgsize);
        Mq_notify(mqd, &amp;sigev);                 /* reregister */

        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) &gt;= 0) {
                printf(&quot;read %ld bytes\n&quot;, (long) n);
        }
        if (errno != EAGAIN)
                err_sys(&quot;mq_receive error&quot;);

         free(buff);
         pthread_exit(NULL);
}
 </code></pre></div><p>sigev 中的 sigev_notify 设置为 SIGEV_THREAD 表示通过创建一个线程执行函数方式监听处理消息队列事件，sigev_notify_function 中设置了需要执行的函数指针为 notify_thread，此函数的主要逻辑如下：</p><p>&#160; &#160; 申请一块 buff 用以接收消息<br />&#160; &#160; 重新执行 Mq_notify 函数重新监听事件<br />&#160; &#160; 调用 mq_receive 函数从队列中接收消息</p><p>在这种实现中，主线程可以做其它的任务，在示例程序中主线程啥也不干。这种创建线程执行函数的机制表面上看上去挺简单，可 mq_notify 注册的 notify_thread 是一个用户态虚拟内存空间的代码地址，它不能在内核态执行，意味着线程的创建与回调的执行都在用户态完成，那内核又是如何将事件投递到新创建的线程，让此线程执行回调来处理消息呢？</p><p>在进一步探讨前，先在我的本地 linux 环境上运行下示例程序，运行 log 如下：</p><div class="codebox"><pre><code>[longyu@debian] pxmsg $ ./mqcreate /test1
[longyu@debian] pxmsg $ ./mqnotifythread1 /test1
notify_thread started
read 50 bytes
notify_thread started
read 50 bytes
notify_thread started
read 1024 bytes
 </code></pre></div><p>mqnotifythread demo 能够正常接收消息。</p>]]></description>
			<author><![CDATA[dummy@example.com (batsom)]]></author>
			<pubDate>Tue, 12 Mar 2024 18:46:38 +0000</pubDate>
			<guid>http://www.gentoo-zh.org/viewtopic.php?pid=944#p944</guid>
		</item>
	</channel>
</rss>
