WebSphere MQ Low Latency Messaging 产品介绍及 API 使用

王 贝贝 (wangbeib@cn.ibm.com), 软件工程师, IBM
 
简介: IBM WebSphere MQ Low Latency Messaging 提供了一种高吞吐量、低延迟的消息传递方法。本文介绍该产品的用途和主要特点,并详细阐述 Reliable Multicast Messaging 和 Reliable Unicast Messaging 两种消息传输方式和 API 使用,以及典型的测试场景和测试方法。
 

引言

IBM WebSphere MQ Low Latency Messaging(以下简称 MQ LLM)提供了一种高吞吐量、低延迟的消息传递方法。在金融市场和其它一些需要高速数据交换的应用场景中,消息传递需要满足极高容量、低延迟的需求,MQ LLM 能够有效满足这种需求。本文首先介绍 MQ LLM 的用途和主要特点,然后详细阐述 Reliable Multicast Messaging(RMM)和 Reliable Unicast Messaging(RUM)两种消息传输方式及其 API 的使用方法,并描述了测试场景。

 

MQ LLM 产品介绍

IBM WebSphere MQ Low Latency Messaging,最初来源于 IBM Haifa 实验室,而后作为 WebSphere Front Office for Financial Markets(以下简称 WFO)市场数据交付平台的高性能信息组件处理金融市场数据,2.0 版本的发布使其独立出来作为一个单独的产品。MQ LLM 提供了一种高吞吐量、低延迟的消息传递方法,在金融领域,结合 WFO,它能够满足高速运转的金融市场的爆炸性数据分析的应用需求,帮助金融企业改善其管理复杂金融市场数据的能力;在市场运输行业,也存在着大量的实时市场信息分析及物流信息管理的应用需求,它能够大大提高市场运输行业的数据分发性能和能力。海量数据的低延迟传输是数据交付速度至关重要行业的典型需求,MQ LLM 能够显著降低传输延时,提供可靠的数据交付和流故障转移,消息过滤以及控制流量速率拥塞等多种选择,使大量数据能够以满足急迫业务要求的速度传输。目前,该产品已成功应用于金融市场领域解决方案。

MQ LLM 具有如下核心特性:

  1. 高吞吐量、低延迟:采用分组、包丢失处理等策略提高性能,使得消息(12-byte)发送速率能够达到每秒千万级,在 1G 以太网上的延迟实现微妙级。
  2. 多播和单播的支持:包括可靠的 UDP 多播(RMM),可靠的 UDP 单播(点对点的 RMM)以及可靠的 TCP 单播(RUM)。
  3. 可靠性:与 2 中的多播和单播支持相对应,UDP 多播的可靠性由 NAK 可靠性、滑动窗口、拥塞管理来保证,UDP 单播的可靠性由 ACK 可靠性、周期心跳检测来实现,TCP 单播的可靠性依赖于 TCP 的可靠性。
  4. 消息过滤机制:通过消息属性进行过滤。
  5. 拥塞管理:自动发现并采取相应策略处理慢速消费者(处理消息速度比较慢的接收方)。
  6. 系统支持:能够运行在 Windows(32,64),Linux(32,64),Solaris(Sparc,x86,32,64)平台上;提供 C,Java 支持和 .NET API,监控 API 可以获得网络状态、发送方、接收方、延迟统计等信息。
 

RMM 和 RUM 概览

首先,回顾一下传输层协议和网络节点间的通讯模式。

传输层有两种传输协议:UDP 和 TCP。

  • UDP(User Data Protocol,用户数据报协议):是一种无连接的不可靠传输层协议。在正式通信前,不与对方建立连接,直接发送数据,适用于传输效率要求高、对可靠性要求较低的应用环境。
  • TCP(Transmission Control Protocol,传输控制协议):是一种面向连接的可靠传输层协议。在正式通信之前,必须要与对方建立可靠连接,由于采用滑动窗口、慢启动等机制,效率低于 UDP,适用于对可靠性要求高的应用环境。

网络节点之间有三种通讯模式:单播、广播、多播。

  • 单播(Unicast):点对点通讯模式,数据的接收和传递只在两个节点之间进行。
  • 多播(Multicast):点对多通讯模式,也称之为“组播”,一次传送到一组特定目标节点。
  • 广播(Broadcast):IP 子网内的所有主机都可以接收到所有信息。

在 MQ LLM 中,有两种消息传输方式:RMM 和 RUM。

  • RMM(Reliable Multicast Messaging):基于 UDP 协议,支持可靠的单播和多播传输。
  • RUM(Reliable Unicast Messaging):基于 TCP 协议,提供一对一的单播传输。
  • RMM 和 RUM 实现机制不同,但都需要定义发送方、接收方,通讯的逻辑通道、物理通道、消息等,表 1 描述了 RMM 和 RUM 中的对象。

表 1. RMM 和 RUM 中的对象描述
RMM RUM
实例(Instance) 发送方或接收方对象本身
主题(Topic) 发送方和接收方之间通讯的逻辑通道。发送方、接收方需要分别创建发送主题、接收主题。
连接(Connection) RUM 基于 TCP 协议,发送方和接收方在发送数据前需要建立连接。
队列(Queue) 发送方和接收方之间通讯的逻辑通道。在创建发送队列时需要一个已经存在的连接,创建接收队列时不需要。
流(Stream) 一个发送主题对应一个流,多个流可以对应一个接收主题。 多个流可以对应一个发送方或接收方。
消息(Message) 发送方和接收方之间传输的数据。包括原始数据、属性等。

 

MQ LLM 的 API 使用

MQ LLM API 提供了对 C,Java,.NET 开发语言的支持,本文主要介绍 MQ LLM Version 2.1.0.0 的 Java API 使用。用 Java 编程,要使用 llmJni.jar 类库,可以在 LLM 安装后的 lib 目录下获得,内容如图 1 所示。com.ibm.llm.rmm/rum 下分别有如下六个包:common(常用类),config(配置类),constants(数据结构类),exception(异常处理类),impl(实现类),monitoring(监控统计类)。


图 1. Java 编程类库 llmJni.jar
图 1. Java 编程类库 llmJni.jar

该类库提供了 RMM 和 RUM 两种消息传输方式的 API,能够处理消息的发送和接收,并获得统计数据。下面详细描述在 RMM 和 RUM 中如何对发送方和接收方编程。

RMM API 的使用

RMM 的消息传输过程如图 2 所示:在发送方部署 Transmitter,在每个接收方部署 Receiver。由于 RMM 可以支持单播和多播,单播时 Receiver 只需要部署在一个接收方,而多播时需要部署在多播组内的每个接收方。首先,启动 Receiver(s),创建接收方实例、接收主题,并加入多播组,监听消息;然后,启动 Transmitter,创建发送方实例、发送主题,建立消息通道发送消息,Receiver 接收消息;在一定条件下(如发送 10000 条消息)结束发送;发送方、接收方分别关闭主题、停止实例。


图 2. RMM 的消息传输过程
图 2. RMM 的消息传输过程

Transmitter API

  1. 创建发送方实例

    RmmTransmitter 类定义了 RMM 发送方实例,由 RmmFactory 创建。RmmFactory 提供了两个方法创建发送方实例。

    方法一,需要先创建发送方配置 RmmTxConfig,设置配置项,然后基于此配置创建发送方实例(清单 1)。



    清单 1. 创建 RMM 发送方实例
    				 
     // 创建发送方配置
     RmmTxConfig config_t = new RmmTxConfig(); 
     // 设置配置项
     config_t.dataPort = 34343; 
     ... 
    
     RmmTransmitter transmitter = null; 
     // 基于配置创建发送方实例
     try { 
     transmitter = RmmFactory.initRmmTransmitter(config_t); 
     } catch (RmmException e) { 
    	 System.out.println("fail to initialize the transmitter. " + e.getReason()); 
     } 
    

    方法二,基于默认配置创建(清单 2)。发送默认配置在 rmmTbasic.cfg 和 rmmTadvanced.cfg 文件中定义,用 key=value 的键值描述参数名值。rmmTbasic.cfg 和 rmmTadvanced.cfg 分别定义了基本配置参数和高级配置参数,开发者可以参考 %llm%/perfsamples/configs 中的配置,在测试过程中根据参数意义和实际环境进行设置。这里建议采用方法二,在调试和优化时可以通过修改配置文件调整参数值,代码不需要任何变动。



    清单 2. 创建 RMM 发送方实例(基于默认配置)
    				 
     RmmTransmitter transmitter = null; 
     // 基于默认配置创建发送方实例
     try { 
     transmitter = RmmFactory.initRmmTransmitterUsingDefaultConfig(); 
     } catch (RmmException e) { 
     System.out.println("fail to initialize the transmitter." + e.getReason()); 
     } 

  2. 创建发送主题

    创建发送主题有两个步骤(清单 3):

    • 创建发送主题参数 RmmTxTopicParameters:设置主题名称,目的地址,传输类型等。

      RMM 支持多播和单播,因此传输类型要指定是多播 RmmTransportType.RMM_TRANSPORT_MULTICAST 或者单播 RmmTransportType.RMM_TRANSPORT_UDP_UNICAST。

      设置消息回调事件,为该主题设置回调类,实现 RmmEventListener 接口,根据 event 类型的不同,输出对应的提示信息(清单 4)。

    • 创建发送主题 RmmTxTopic:在发送方上创建发送主题。


    清单 3. 创建 RMM 发送主题
    				 
     // 创建发送主题参数
     RmmTxTopicParameters topicParams_t = new RmmTxTopicParameters(); 
     // 设置主题名称
     String txTopicName = "RMMTestTopic"; 
     topicParams_t.topicName = txTopicName; 
     // 设置目的地址
     topicParams_t.destAddress = "239.255.1.5"; 
     try { 
          // 设置传输类型为多播 
          topicParams_t.transport.setValue(RmmTransportType.RMM_TRANSPORT_MULTICAST); 
          // 设置可靠性 
          topicParams_t.reliability.setValue(RmmTxReliabilityType.RMM_RELIABLE); 
     } catch (RmmInvalidValueException e) { 
     System.out.println("fail to set topic parameters. " + e.getReason()); 
     } 
     // 设置消息回调事件
     topicParams_t.onEvent = new RmmCustomizedEventListener(); 
    
     // 创建发送主题
     RmmTxTopic txTopic = null; 
     try { 
    	 txTopic = transmitter.createTopic(topicParams_t); 
     } catch (RmmException e) { 
     System.out.println("fail to create topic " + txTopicName + "." + e.getReason()); 
     } 
    



    清单 4. 消息回调事件
    				
    public class RmmCustomizedEventListener implements RmmEventListener { 
      publicvoid onEvent(RmmEvent event) { 
      Date eventTime = new Date(); 
     switch(event.type.getValue()) { 
     case RmmEventType.RMM_MESSAGE_LOSS: 
                    // 输出丢失消息信息
     System.out.println(eventTime.toString() + "> message loss on topic " 
      + event.topicName + " from " + event.sourceAddr + ":" + event.port); 
     casedefault: 
                     // 输出收到消息信息
     System.out.println(eventTime.toString() + "> received event "
      + event.type.toString() + " on topic " + event.topicName 
      + " from " + event.sourceAddr + ":" + event.port); 
    		 } 
    	 } 
     } 

  3. 发送消息

    RmmTxMessage 类定义了消息,填充消息内容和大小后,在一个主题上发送消息(清单 5)。主程序应当控制消息的发送数量和频率,可以采用线程实现。



    清单 5. 发送消息
    				  
     // 创建消息
     RmmTxMessage msg = new RmmTxMessage(); 
     byte[] bytes = new byte[1024]; 
     msg.msgBuf = bytes; 
     msg.msgLen = bytes.length; 
    
     // 发送消息
     try { 
    	 txTopic.submitMessage(msg); 			
     } catch (RmmException e) { 
     System.out.println("fail to submit message for the topic" + txTopicName); 
     } 

  4. 关闭发送主题

    关闭发送方在该主题上的消息传输(清单 6),其它主题不受影响。关闭主题后,消息不能继续在该主题上发送,接收方会自动得知流关闭,但仍可以在延迟时间(closeTopic 方法参数)内接收数据。超时后,主题将会彻底删除。



    清单 6. 关闭 RMM 发送主题
    				 
     // 关闭发送主题
     try { 
    	 txTopic.closeTopic(0); 
     } catch (RmmException e) { 
     System.out.println("fail to close transmitter topic." + e.getReason()); 
     } 
    

  5. 停止发送方实例

    停止指定的发送方实例(清单 7),发送方可在延迟时间内(stopTransmitter 方法参数)发送等待发送的数据,超时后停止。



    清单 7. 停止 RMM 发送方实例
    				 
     // 停止发送端实例
     try { 
    	 transmitter.stopTransmitter(0); 
     } catch (RmmException e) { 
     System.out.println("fail to stop transmitter instance." + e.getReason()); 
     } 
    

Receiver API

  1. 创建接收方实例

    由 RmmFactory 创建接收方实例 RmmReceiver,也有类似基于 RmmRxConfig 和基于默认配置创建两种方法。示例中基于默认配置创建(清单 8),接收默认配置在 rmmRbasic.cfg 和 rmmRadvanced.cfg 文件中定义。



    清单 8. 创建 RMM 接收方实例
    				 
     RmmReceiver receiver = null; 
     try { 
     receiver = RmmFactory.initRmmReceiverUsingDefaultConfig(); 
     } catch (RmmException e) { 
     System.out.println("fail to initialize the receiver." + e.getReason()); 
     } 
    

  2. 创建接收主题,加入多播组

    与创建发送主题类似,需要先创建主题参数 RmmRxTopicParameters,再创建主题 RmmRxTopic(清单 9)。此时需要注意以下几点:

    • 主题名称:接收主题名称要与发送发主题名称一致,如示例中均为 RMMTestTopic,这样发送方上发送到该主题的消息才能被接收。
    • 加入多播组:加入该多播组 239.255.1.5 之后,才能收到发送发发到该地址的消息,这里地址需要与发送主题设置的目的地址一致。
    • 消息处理事件:侦听该主题上收到的消息并处理。


    清单 9. 创建 RMM 接收主题,加入多播组
    				 
     // 创建接收主题参数
     RmmRxTopicParameters topicParams_r = new RmmRxTopicParameters(); 
     // 设置主题名称
     String rxTopicName = "RMMTestTopic"; 
     topicParams_r.topicName = rxTopicName; 
     try { 
          // 设置可靠性
     topicParams_r.reliability.setValue(RmmRxReliabilityType.RMM_RELIABLE_ORDERED); 
     } catch (RmmInvalidValueException e) { 
     System.out.println("fail to set topic parameters." + e.getReason()); 
     } 
    
     // 设置消息回调事件
     topicParams_r.onEvent = new RmmCustomizedEventListener(); 
     // 设置消息处理事件
     topicParams_r.onMessage = new RmmCustomizedMessageListener(); 
    
     try { 
          // 加入多播组
    	 receiver.joinMulticastGroup("239.255.1.5"); 
     } catch (RmmException e) { 
     System.out.println("fail to join multicast group." + e.getReason()); 
     } 
    
     // 创建接收主题
     RmmRxTopic rxTopic = null; 
     try { 
    	 rxTopic = receiver.createTopic(topicParams_r); 
     } catch (RmmException e) { 
     System.out.println("fail to create topic " + rxTopicName + ". " + e.getReason()); 
     } 
    

  3. 接收消息

    侦听该主题上收到的消息,对消息进行处理。Listener 实现 RmmMessageListener 接口,每次收到消息时,onMessage 方法都会被调用。如示例中计算消息延迟时间,其中发送时间可以在发送消息时附带(清单 10)。



    清单 10. 接收消息
    				
    public class RmmCustomizedMessageListener implements RmmMessageListener { 
     public void onMessage(RmmRxMessage msg) { 
                // 消息收到时间
           long receiveTime = System.nanoTime(); 
                // 消息发送时间
              long transmitTime = …
                // 计算延迟时间
     int latency = (int)((receiveTime - transmitTime) * 1.0e-3); 
    	 } 
     } 
    

  4. 离开多播组

    接收方离开多播组后,不再收到发送到该组地址的消息,该步骤只需在必要时操作(清单 11)。



    清单 11. 离开多播组
    				 
     // 离开多播组
     try { 
          receiver.leaveMulticastGroup("239.255.1.5"); 
     } catch (RmmException e) { 
          System.out.println("fail to leave multicast group." + e.getReason()); 
     } 
    

  5. 关闭接收主题

    关闭接收主题后,不会再接收消息。如果还有正在处理的消息,会抛出 RmmBusyException,表示此时不能删除该消息,应当稍后再试图关闭(清单 12)。



    清单 12. 关闭 RMM 接收主题
    				 
     // 关闭接收主题
     try { 
    	 rxTopic.closeTopic(); 
     } catch (RmmBusyException e) { 
     // 稍后关闭
          …
     } catch (RmmException e) { 
     System.out.println("fail to close receiver topic." + e.getReason()); 
     } 
    

  6. 停止接收方实例

    停止接收消息,释放资源(清单 13)。



    清单 13. 停止 RMM 接收方实例
    				 
     // 停止接收方实例
     try { 
    	 receiver.stopReceiver(); 
     } catch (RmmException e) { 
     System.out.println("failed to stop RMM receiver instance." + e.getReason()); 
     } 
    

RUM API 的使用

RUM 的消息传输过程如图 3 所示:在发送方部署 Transmitter,在接收方部署 Receiver。由于 RUM 是基于 TCP 的可靠传输,需要在事先建立连接。首先,启动 Receiver,创建接收方实例、接收队列;然后,启动 Transmitter,创建发送方实例,建立连接;在得到确认连接后,创建发送队列、发送消息,Receiver 接收消息;发送结束后,发送方关闭连接,发送方、接收方分别关闭主题、停止实例。


图 3. RUM 的消息传输过程
图 3. RUM 的消息传输过程

下述 RUM 的 API 介绍重点突出与 RMM 不同的部分,并省略了异常处理,读者可参考 RMM 中的示例。

Transmitter API

  1. 创建发送方实例

    RUM 的发送方实例、接收方实例都由 RumInstance 类定义(清单 14),默认配置文件均为 rumbasic.cfg 和 rumAdvanced.cfg。



    清单 14. 创建 RUM 发送方实例
    				 
     // 基于默认配置创建发送方实例
     RumInstance transmitter = RumFactory.initRumInstanceUsingDefaultConfig(); 
    

  2. 建立连接

    与接收方建立连接,指定接收方的地址和端口,并侦听连接响应(清单 15)。RumCustomizedTransmitterConnListener 实现 RumConnectionListener 接口,接收方响应后,在 onConnectionEvent 方法中获得是否成功建立连接(清单 16)。



    清单 15. 建立连接
    				 
     // 设置连接参数
     RumEstablishConnectionParams connParams = new RumEstablishConnectionParams(); 
     connParams.address = "9.186.113.28"; 
     connParams.port = 35353; 
     // 侦听连接响应
     RumCustomizedTransmitterConnListener transmitterConn = 
        new RumCustomizedTransmitterConnListener(); 
     connParams.connectionListener = transmitterConn; 
     // 建立连接
     transmitter.establishConnection(connParams); 
    



    清单 16. 侦听连接响应
    				
    public class RumCustomizedTransmitterConnListener implements RumConnectionListener { 
    	
     private RumConnection conn; 
      private int connEventType = RumConnEventType.RUM_CONNECTION_ESTABLISH_IN_PROCESS; 
    	
      public RumConnection getConn() { return conn; } 
    	
       public int getConnEventType() { return connEventType; } 
    
      public int onConnectionEvent(RumConnectionEvent connEvent) { 
    		 connEventType = connEvent.type.getValue(); 
       switch (connEventType) { 
        case RumConnEventType.RUM_CONNECTION_ESTABLISH_SUCCESS: 
    			 conn = connEvent.connectionInfo; 
                      // 确认连接
         return
    				RUM_ON_CONNECTION_ACCEPT; 
         case RumConnEventType.RUM_NEW_CONNECTION: 
         case RumConnEventType.RUM_CONNECTION_READY: 
                      // 拒绝连接
         return
    				RUM_ON_CONNECTION_REJECT; 
         case RumConnEventType.RUM_CONNECTION_ESTABLISH_FAILURE: 
         case RumConnEventType.RUM_CONNECTION_ESTABLISH_IN_PROCESS: 
         case RumConnEventType.RUM_CONNECTION_ESTABLISH_TIMEOUT: 
         default: 
         return
    				RUM_ON_CONNECTION_NULL; 
    		 } 
    	 } 
    
     } 
    

  3. 创建发送队列

    在接收方同意连接后,创建发送队列(清单 17)。



    清单 17. 创建 RUM 发送队列
    				 
     // 等待响应
    while 
      (transmitterConn.getConnEventType() == 
        RumConnEventType.RUM_CONNECTION_ESTABLISH_IN_PROCESS); 
    if (transmitterConn.getConn() == null) 
    new Exception("fail to establish the connection."); 
    
     // 创建发送队列参数
     RumTxQueueParameters queueParams_t = new RumTxQueueParameters(); 
     queueParams_t.rumConnection = transmitterConn.getConn(); 
     queueParams_t.queueName = "RUMTestQueue"; 
     queueParams_t.reliability 
       = new RumTxReliabilityType(RumTxReliabilityType.RUM_RELIABLE); 
    
     // 创建发送队列
     RumTxQueue txQueue = transmitter.createTxQueue(queueParams_t); 
    

  4. 发送消息

    创建消息 RumTxMessage,并发送到队列上(清单 18)。



    清单 18. 发送消息
    				 
    // 创建消息
    RumTxMessage msg = new RumTxMessage();
    …
    // 发送消息
    txQueue.submitMessage(msg);
    

  5. 关闭发送队列
     txQueue.closeQueue(0); 
    

  6. 关闭连接
     transmitter.closeConnection(transmitterConn.getConn()); 
    

  7. 停止发送方实例
     transmitter.stopInstance(0); 
    

Receiver API

  1. 创建接收方实例
     RumInstance receiver = RumFactory.initRumInstanceUsingDefaultConfig(); 
    

  2. 创建接收队列

    在接收队列参数中,接收队列名称应与发送队列名称匹配,并设置 Listener 侦听该队列上的消息(清单 19)。



    清单 19. 创建 RUM 接收队列
    				 
     // 创建接收队列参数
     RumRxQueueParameters queueParams_r = new RumRxQueueParameters(); 
     queueParams_r.queueName = "RUMTestQueue"; 
     queueParams_r.onMessage = new RumCustomizedMessageListener(); 
     queueParams_r.reliability = 
         new RumRxReliabilityType(RumRxReliabilityType.RUM_RELIABLE); 
    
     // 创建接收队列
     RumRxQueue rxQueue = receiver.createRxQueue(queueParams_r); 
    

  3. 确认连接

    当收到一个新的连接请求 RUM_NEW_CONNECTION 时,确认连接(清单 20)。



    清单 20. 确认连接
    				 
     receiver.addConnectionListener(new RumCustomizedReceiverConnListener()); 
    
    public class RumCustomizedReceiverConnListener implements RumConnectionListener { 
      public int onConnectionEvent(RumConnectionEvent connEvent) { 
         switch (connEvent.type.getValue()) { 
           case RumConnEventType.RUM_NEW_CONNECTION: 
            case RumConnEventType.RUM_CONNECTION_READY: 
                      // 确认连接
          return
    				RUM_ON_CONNECTION_ACCEPT; 
          case RumConnEventType.RUM_CONNECTION_BROKE: 
          case RumConnEventType.RUM_CONNECTION_CLOSED: 
          case RumConnEventType.RUM_CONNECTION_ESTABLISH_FAILURE: 
          case RumConnEventType.RUM_CONNECTION_ESTABLISH_IN_PROCESS: 
          case RumConnEventType.RUM_CONNECTION_ESTABLISH_TIMEOUT: 
          return
    				RUM_ON_CONNECTION_NULL; 
          case RumConnEventType.RUM_CONNECTION_ESTABLISH_SUCCESS: 
         default: 
         return
    				RUM_ON_CONNECTION_REJECT; 
    		 } 
    	 } 
     } 
    

  4. 接收消息

    Listener 实现 RumMessageListener 接口,对消息的处理封装在 onMessage 方法中,例如统计收到的消息数量或计算延迟(清单 21)。



    清单 21. 接收消息
    				
    public class RumCustomizedMessageListener implements RumMessageListener { 
     public void onMessage(RumRxMessage msg) { 
     // 处理消息
                …
    	 } 
     } 
    

  5. 关闭接收队列
     rxQueue.closeQueue(); 
    

  6. 停止接收方实例

    receiver.stopInstance(0);

 

性能评测方法

MQ LLM 的特点是高吞吐量、低延迟,因此吞吐量、延迟这两个参数是该产品性能评测的重要依据,下面简单描述 RMM 单播时的测试方法。

吞吐量是指在没有消息丢失的情况下,能够接收消息的最大速率。测量该参数,如图 4 所示,在机器 A 部署 Transmitter,在机器 B 上部署 Receiver。启动 Receiver 监听发送到“ThroughputTest”主题的消息;启动 Transmitter,发送消息。在 Transmitter 设置不同的发送频率,当接收方出现消息丢失或者无法接收消息时的的最大发送频率,即吞吐量。


图 4. 吞吐量测试方法
图 4. 吞吐量测试方法

延迟是指消息从发送方到接收方所用的时间。由于发送方和接收方的时钟不能保证同步,因此可以采用接收方再回送消息的方法来计算延迟。如图 5 所示,在机器 A 上部署 TransmitterA 和 ReceiverA(分别部署),在机器 B 上部署 ReceiverB 和 TransmitterB(打包在一起部署)。启动 ReceiverA 监听回送消息;启动 ReceiverB 和 TransmitterB,监听发送到“LatencyTest”主题的消息;启动 TransmitterA,发送消息;ReceiverB 接收到消息后,TransmitterB 将该消息发送到“LatencyBackTest”主题上;ReceiverA 收到回送消息。通过在发送消息时在消息上留下的时间戳,在收到回送消息时获得当前时间,可计算出往返用时 T1+T2。延迟时间 T 近似为(T1+T2)/2,在机器 B 中的转发用时忽略不计。


图 5. 延迟测试方法
图 5. 延迟测试方法

另外,开发者也可以借助 MQ LLM 提供的监控 API,获得统计信息,具体可参见对象的 fetchStatistics 方法,以及 API 的 com.ibm.llm.rmm.monitoring 包。

 

总结

本文介绍了 MQ LLM 的产品特性及其支持的 RMM 和 RUM 两种传输方式,重点描述了 RMM 和 RUM 发送方、接收方 API 的使用,未涉及到性能的调试和优化等高级主题。读者通过本文能够快速熟悉产品 API,构建传输场景,进行参数测定。

声明:本文仅代表作者个人之观点,不代表 IBM 公司之观点。

 

原文链接:http://www.ibm.com/developerworks/cn/websphere/library/techarticles/1007_wangbb_mqllm/1007_wangbb_mqllm.html

请使用浏览器的分享功能分享到微信等