基础消息队列

作者: wiseAaron | 来源:发表于2016-10-16 16:27 被阅读393次

在UNIX系统所提供的经典进程间通信机制(IPC):管道FIFO消息队列信号量以及共享储存。这些机制允许在同一台计算机上运行的进程可以相互通信。但是当考察到不同计算机(通过网络相连)的进程相互通信时就必须借助网络通信机制(network IPC),在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
设计分布式应用的方法主要有:

  1. 远程过程调用(PRC)--分布式计算环境(DCE)的基础标准成分之一;
  2. 对象事务监控(OTM)--基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;
  3. 消息队列(MessageQueue)--构造分布式应用的松耦合方法;
  • 消息队列的API调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。

项目目标:简单实现精简版消息中间件

项目任务:

  1. 作为msgsnd系列的扩展服务
  2. 使msg系列支持跨ip通信能力,而其他使用msg系列接口的应用无感知

项目分析:

  1. 某client向相应key值的消息队列发送消息
  2. 服务器在定时器的作用下每1000微秒,就读取自身管理的消息队列数据
  3. 读取数据根据消息的type值和服务器配置文件,来决定将消息发送到哪一个服务器上
  4. 当服务器收到远程发送来消息的时候,读取该消息的type值,判断该type是否需要继续转发,如果是本服务器接受的数据是就保留数据,否则就转发。

技术要点:

  • 进程间通信 - 消息队列
  • TCP socket
  • epoll的IO复用
  • 自定义通信协议

一、进程间通信 - 消息队列

消息队列的消息的链接表储存在内核中,有消息队列标示符标示。

1、有关消息队列的相关函数

  • int msgget(key_t key, int flag);
    msgget用于创建一个新队列打开一个现有队列
  • int msgsnd(int msqid, const void * ptr, size_t nbytes, int flag);
    msgsnd将新消息添加到队列尾端。每一个消息包含一个正的长整形类型的字段、一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传送给msgsnd。
  • int msgctl(int msqid, int cmd, struct msqid_ds * buf);
    对队列执行cmd操作,例如:IPC_STAT(读取消息)、IPC_SET(设置消息)、IPC_RMID(删除消息)
  • ssize_t msgrcv(int msqid, void * ptr, size_t nbytes, long type,int flag);
    msgrcv用于从队列中取消息,我们不一定要已先进先出次序取消息,也可以按照消息的类型字段取消息

注意点
: 这里唯一值得提醒的是函数msgget中的key值,每个内核中的IPC(进程间通信)结构都用一个非负整数的标示符加以引用,只需知道其队列标示符。与文件描述符不同,IPC标示符不是小的整数。当IPC结构被创建,然后又被删除时,与这种结构相关的标示符连续加1,知道达到一个整型数的最大值,然后又转到0。每一个IPC对象都与一个键(key)现关联。

当我们启动这个服务后,肯定是通过读取配置文件来确定读取哪一个key值的消息队列,本项目使用的是libxml来解析的xml配置文件保存在一个存有配置文件结构体的数组里。通过配置文件我们可以获知消息的转发路径(和路由表相似)配置文件结构如下:

<?xml version="1.0" encoding="UTF-8"?>
<msgTypeToipInfo>
  <listenPort>8787</listenPort>         <!-- 本服务监听的端口号 -->
  <ip>10.81.12.240</ip>                 <!-- 本服务监听的IP地址 -->
  <alarmSeconds>1000</alarmSeconds>     <!-- 取队列的刷新时间 微秒-->
  <mesgqKey>10000</mesgqKey>            <!-- 本服务维护的消息队列的key值 -->
  <msg type="101">                      <!-- 消息key值为101时,发送ip为10.81.12.240 端口号为8787的服务器-->
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  <msg type="102">
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  ......
</msgTypeToipInfo>

在该配置文件中只监听了一台服务器,如有需监听多台服务器添加即可。。。

2、定时器刷新

函数alarm设置的定时器只能精确到秒,而以下函数理论上可以精确到微妙:

#include  <sys/select.h>
#include  <sys/itimer.h>
int getitimer(int which, struct itimerval *value);
int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue);

函数setitimer可以提供三种定时器,它们相互独立,任意一个定时完成都将发送定时信号到进程,并且自动重新计时。参数which确定了定时器的类型,如表所示:

取值 含义 信号发送
ITIMER_REAL 定时真实时间,与alarm类型相同。 SIGALRM
ITIMER_VIRT 定时进程在用户态下的实际执行时间。 SIGVTALRM
ITIMER_PROF 定时进程在用户态和核心态下的实际执行时间 SIGPROF

这三种定时器定时完成时给进程发送的信号各不相同.

  • ITIMER_REAL类定时器发送SIGALRM信号,
  • ITIMER_VIRT类定时器发送SIGVTALRM信号,
  • ITIMER_REAL类定时器发送SIGPROF信号。

函数alarm本质上设置的是低精确、非重载的ITIMER_REAL类定时器,它只能精确到秒,并且每次设置只能产生一次定时。函数setitimer设置的定时器则不同,它们不但可以计时到微妙(理论上),还能自动循环定时。在一个Unix进程中,不能同时使用alarm和ITIMER_REAL类定时器。

//结构itimerval描述了定时器的组成:
struct itimerval
{
    struct tim.  it_interval;     /* 下次定时取值 */
    struct tim.  it_value;        /* 本次定时设置值 */
}
//结构tim.描述了一个精确到微妙的时间:
struct tim.
{
    long    tv_sec;                 /* 秒(1000000微秒) */
    long    tv_usec;                 /* 微妙 */
}

设置定时器代码,如下:

void NoMesgQue::setRefreshTime(double seconds)
{
    this->refreshTime = seconds;
    //1微秒=10的-6次方秒=0.000001秒
    struct itimerval value;
    value.it_value.tv_sec = 0;
    value.it_value.tv_usec = seconds;
    value.it_interval.tv_sec = 0; //val秒
    value.it_interval.tv_usec = seconds;
    signal(SIGALRM, timeReady);
    setitimer(ITIMER_REAL,&value,NULL);
}

设置void timeReady(int signo)为响应函数,读取队列中的数据

void timeReady(int signo)
{
    //在mesgQueue中取数据
    s_msg * rebuf = new s_msg();
    NoMesgQue * pp = NoMesgQue::getInstance();
    int length = sizeof(s_msg) - sizeof(long);
    while( msgrcv(pp->getMsgqid(), rebuf, length, 0, IPC_NOWAIT) > 0) 
    {
        cout << "Message : "<<rebuf->mtext  << " FromType:"<<rebuf->FromType << " toType:"<<rebuf->type << endl;

        int sockfd;           //sockfd socket对应的描述符
        if((pp->findConfigFileToRemote(rebuf->type,&sockfd) < 0 ) || sockfd < 0)
        {
            cout<<"Don't find Remote Info from Config or Remote is not connecet"<<endl;
        }
        else
        {
            //包装package信息
            s_msgPackage * package = new s_msgPackage();
            package->data = *rebuf;
            (package->packageHead).mesgLength = sizeof(*rebuf);

            //将包装好的package信息 放到epoll中write
            s_msgFdOfData * fdData = NoMesgQue::getInstance()->getMsgFdOfData();

            s_msgFdOfData * thisPP = NULL;
            for(int ii = 0; ii < number; ii++)
            {
                if (sockfd == fdData[ii].sockfd)
                {
                    thisPP = &fdData[ii];
                    break;
                }
            }

            thisPP->sockfd = sockfd;
            memcpy(thisPP->data,package,sizeof(*package));
            thisPP->size = sizeof(*package);
            cout << "数量 " << number <<"内容fd "<< thisPP->sockfd << endl;
        }

        printf("socketfd %d\n", sockfd);
    }
     signal(SIGALRM, timeReady);
}

在while中不断取队列中的数据,当msgrcv(int msqid, void * ptr, size_t nbytes, long type, int flag); 成功执行时,内核会更新与该消息队列相关联的msgid_ds结构,以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并指示队列中的消息数减少1个。当取到数据后通过读取配置文件与消息type来获取与之相连的sockfd,通过epoll来实现IO复用进而发送数据。当队列中没有数据时,再一次注册信号SIGALRM,就OK了。

相关文章

  • 基础消息队列

    在UNIX系统所提供的经典进程间通信机制(IPC):管道、FIFO、消息队列、信号量以及共享储存。这些机制允许在同...

  • 消息队列基础

  • 消息队列 MQ 1 - 3分钟让你快速了解消息队列

    1 什么是消息队列 队列是一个基础的“先进先出”的数据结构,消息队列,就是一个用来存放消息的队列。一个基本的消息队...

  • 必知必会——分布式消息队列

    分布式消息队列需要先掌握消息队列的基础,比如消息模式、消息协议等,其次找一个常用的分布式消息队列作为学习的对象, ...

  • Kafka

    消息队列是分布式架构的基础组件,掌握这类组件是必要的,kafka是其中的典型代表。 消息队列 消息队列的电信原型就...

  • 消息队列-kafka基础

    outline kafka是什么 基本概念和整体架构 producer端 消息的存储机制 consumer端 Ka...

  • 【基础】消息队列原理

  • springboot整合mq接收消息队列

    继上篇springboot整合mq发送消息队列本篇主要在上篇基础上进行activiemq消息队列的接收spring...

  • RabbitMQ和Kafka思维图总结

    消息队列【RabbitMQ和Kafka】基础概念用思维图整理:

  • RabbitMQ总结

    RabbitMQ总结 基础知识 为什么要用消息队列 队列是一种先进先出的数据结构。 消息队列是分布式系统中重要的组...

网友评论

    本文标题:基础消息队列

    本文链接:https://www.haomeiwen.com/subject/zozoyttx.html