zoukankan      html  css  js  c++  java
  • 如何使用MQ标头动态确定数据处理程序的行为?

    本文的目标读者是集成开发人员,其角色涉及使用服务组件体系结构(Service Component Architecture,SCA)集成MQ消息传递引擎。本文将描述如何使用数据处理程序基于MQ标头更改转换逻辑,同时确保处理程序保持协议独立性。涵盖的主题包括:

    •   数据处理程序的解释
    •   从数据处理程序访问MQ标头
    •   基于标头改变数据处理程序的行为
    •   确保数据处理程序保持协议独立性

      在阅读本文之后,您将能够创作基于MQ标头动态地转换数据的可重用数据处理程序。

      数据处理程序

      数据处理程序负责将操作的输入和输出类型转换为有线使用的格式,反之亦然。以前,MQ绑定使用了数据绑定执行此功能。数据绑定具有访问有线格式(如MQ消息)的权限,并可以直接对消息进行读取或写入。在JMS绑定上无法使用这种连接到特定绑定类型(例如MQ数据绑定)的每个数据绑定。

      数据处理程序提供了使用规范格式转换数据的通用方法。transform方法用于读取数据,并传递以下类型之一来读取输入:

    •   InputStream-用于读取字节
    •   Reader-用于读取文本
    •   Object-用于读取Java对象

      transformInto方法用于写入数据,并传入以下类型之一来写入输出:

    •   OutputStream-用于写入字节
    •   Writer-用于写入文本
    •   Object-用于写入Java对象

      这样可使消息的有线格式对数据处理程序透明,允许任何受支持的绑定重用它们。

      数据处理程序和消息标头

      当创作数据绑定时,会将消息标头传入读或写方法,并允许协议标头中的字段确定转换逻辑。不过,对于方法的输入参数不包括协议标头的数据处理程序,我们应如何访问它们?

      在WebSphere Enterprise Service Bus (WESB) V6.2中,协议标头存储在名为ContextService的存储库中。ContextService通过使用普通SPI进行访问,并且可以从任何Java代码引用,其中包括POJO、函数选择器或数据处理程序。清单 1 显示了从ContextService获取MQHeader的示例代码。

      清单 1. 使用上下文服务访问协议标头
        

    以下是引用片段:
    HeadersType headers = ContextService.INSTANCE.getHeaders(); 
    MQHeaderType mqHeader = headers.getMQHeader(); 
     
      使用数据处理程序中的MQHeader

      现在,我们已了解了如何访问数据处理程序中的MQ Header,让我们基于这些字段之一制定转换决策。MQMD的格式字段通常用于确定消息正文格式。消息正文的格式、编码和编码字符集存储在MQControl结构中,并包含在MQ Header中。清单 2 中所示的代码说明了如何根据MQMD中格式字段的值修改转换逻辑。

      清单 2. 基于MQMD格式字段更改行为
        

    以下是引用片段:
    package com.ibm.custom.datahandlers; 
    import java.io.IOException; 
    import java.io.InputStream; 
    import java.io.OutputStream; 
    import java.util.Map; 
    import com.ibm.websphere.bo.BOFactory; 
    import com.ibm.websphere.bo.BOXMLDocument; 
    import com.ibm.websphere.bo.BOXMLSerializer; 
    import com.ibm.websphere.sca.ServiceManager; 
    import com.ibm.websphere.sca.mq.structures.MQControl; 
    import com.ibm.websphere.sibx.smobo.HeadersType; 
    import com.ibm.websphere.sibx.smobo.MQHeaderType; 
    import com.ibm.wsspi.session.ContextService; 
    import commonj.connector.runtime.DataHandler; 
    import commonj.connector.runtime.DataHandlerException; 
    import commonj.sdo.DataObject; 
    @SuppressWarnings("serial") 
    public class MQHeaderDataHandler implements DataHandler { 
     Map bindingContext; 
     String delimiter = "%"; 
     /** 
      * Reads a DataObject from an InputStream (Bytes support only) 
      */ 
     public Object transform(Object source, Class target, Object options)  
               throws DataHandlerException { 
      System.out.println("MyDataHandler: Calling  
       transform(" + source + "," + target + "," + options + ")"); 
      DataObject output = null; 
      InputStream is = (InputStream) source; 
       
      //Handle InputStreams and DataObjects 
      if (source instanceof InputStream &&  
       target.getName().equals("commonj.sdo.DataObject")) { 
                    if (("MQXML").equals(getMQMessageFormat())) { 
        //If the format field of the incoming message  
                                is XML use BOXMLSerializer 
        ServiceManager serviceMgr = new ServiceManager(); 
        BOXMLSerializer xmlSerializer =  
                            (BOXMLSerializer) serviceMgr 
          .locateService 
                            ("com/ibm/websphere/bo/BOXMLSerializer"); 
        try { 
         BOXMLDocument xmlDoc = xmlSerializer. 
                            readXMLDocumentWithOptions 
                                (is , options); 
         output = xmlDoc.getDataObject(); 
         ((InputStream)source).close(); 
        } catch (IOException e) { 
         throw new  
          DataHandlerException 
                            ("Exception reading DataObject "  
                            + e.getLocalizedMessage()); 
        } 
       } else { 
        //Read delimited data 
        byte[] ch = new byte[2]; 
        StringBuffer buffer = new StringBuffer(); 
          
        try { 
         int len = is.read(ch); 
         while (len != -1) { 
          buffer.append(new String(ch), 0, len); 
          len = is.read(ch); 
         } 
        } catch (IOException e) { 
         throw new DataHandlerException(e); 
        } 
        //Split the values using the delimiter 
        String[] values = buffer.toString().split(delimiter); 
        System.out.println("Found name: "+values[0]+" and id: " 
                            +values[1]); 
        //Build the CustomerType 
        ServiceManager serviceManager = new ServiceManager(); 
                            BOFactory bofactory =  
          (BOFactory) serviceManager.locateService 
                                    ("com/ibm/websphere/bo/BOFactory"); 
        output = 
         bofactory.create("http:// 
                            MQHeaderDataHandlerModule", "CustomerType"); 
        output.setString("name", values[0]); 
        output.setInt("id", Integer.parseInt(values[1])); 
       } 
      } else { 
       throw new  
        DataHandlerException("Source type " +  
                        source.getClass().getName() + " or Target type " 
         + target.getName() + " unsupported by  
                                                MyDataHandler"); 
      } 
      return output; 
     } 
     /** 
      * Writes the DataObject out to a OutputStream (Bytes support only) 
      */ 
     public void transformInto(Object source, Object target, Object options) 
          throws DataHandlerException { 
      System.out.println("Calling  
       transformInto(" + source + "," + target + "," + options + ")"); 
      if (source instanceof DataObject && target instanceof OutputStream) { 
       OutputStream os = (OutputStream) target; 
       DataObject bo = (DataObject) source; 
        
       if (("MQXML").equals(getMQMessageFormat())) { 
        //If the format field of the outgoing message is XML use  
                                BOXMLSerializer 
        ServiceManager serviceMgr = new ServiceManager(); 
        BOXMLSerializer xmlSerializer = (BOXMLSerializer)  
                          serviceMgr.locateService("com/ibm/websphere/ 
                                                   bo/BOXMLSerializer"); 
             
        try { 
         xmlSerializer.writeDataObject(bo, bo.getType(). 
                                 getURI(), bo.getType().getName(), os); 
        } catch (IOException e) { 
         throw new  
          DataHandlerException("Exception writing  
                               DataObject " + e.getLocalizedMessage()); 
        } 
       } else { 
        //Write delimited data     
        String name = bo.getString("name"); 
        int id = bo.getInt("id");     
        String message = name+delimiter+id; 
         
        try { 
         os.write(message.getBytes()); 
        } catch (IOException e) { 
         throw new  
          DataHandlerException("Exception writing  
                            DataObject " + e.getLocalizedMessage()); 
        } 
       }    
      } else { 
       
       throw new  
        DataHandlerException("Source type " + source.getClass() 
                        .getName() + " or Target type " 
         + target.getClass().getName() +  
                        " unsupported by MyDataHandler"); 
      } 
     } 
       
     /** 
      * Returns the format of the MQHeader or null if there is no MQHeader 
      * @return 
      */ 
     private String getMQMessageFormat() { 
      String format = null; 
       
      HeadersType headers = ContextService.INSTANCE.getHeaders(); 
       
      MQHeaderType mqHeader = null;    
         if(headers!= null){ 
          mqHeader = headers.getMQHeader(); 
          if (mqHeader != null) { 
           MQControl mqControl = mqHeader.getControl(); 
           if (mqControl != null) { 
            format = mqControl.getFormat(); 
            if (format != null) { 
             format = format.trim(); 
            } 
           } 
          } 
         } 
       
      return format; 
     } 
      
     public void setBindingContext(Map context) { 
      bindingContext = context; 
     } 
    }
      此处理程序使用MQHeader格式字段,如果将其设置为MQXML,则会将消息正文视为XML,否则会将消息正文视为带分隔符的数据。如果将此数据处理程序与非MQ绑定一起使用,则不能在 ContextService 中定义 MQHeader。在这种情况下,getMQMessageFormat()方法将返回null,并导致将正文视为带分隔符的数据,因此,数据处理程序应保持协议独立性。 

      测试数据处理程序

      首先,使用MQ绑定创建一个简单的模块。假定已安装WebSphere MQ,并且可以在WebSphere ESB中访问它。

      在WID中创建名为MQHeaderDataHandlerModule的新模块

      在MQHeaderDataHandlerModule项目中,创建名为CustomerType的新数据类型

    图 1. CustomerType Business对象

    图 1. CustomerType Business对象

        添加名为name的String字段和名为id的int字段

      保存并关闭新数据类型

      创建名为CustomerInterface的新接口

      添加名为processCustomer的单向操作,它可以接受CustomerType作为输入参数

    图 2. Customer接口

    图 2. Customer接口

        将新的导出添加到组装图

      将CustomerInterface添加到导出

      在导出上配置MQ绑定

      设置请求队列管理器名称

      设置接收目标

      为WebSphere MQ队列管理器设置适当的连接详细信息

      要获取缺省的请求数据格式,请单击select

      选择Select your custom data format transformation from the workspace,并单击select

      选择MQHeaderDataHandler,并单击ok

      勾选Add custom class to binding registry,并单击next(此操作可以在现有数据格式列表中显示)

      输入名称MQHeaderDataHandler

      添加描述

      选择它适用的绑定类型,现在勾选MQ和JMS

    图 3. 设置数据格式

    图 3. 设置数据格式

        在导入上,按照上述同一数据格式配置MQ绑定

      为确保正确地创建DataObject,我们将使用Java组件将其输出,因此将一个数据对象添加到画布
     
      将Java组件连接到导出和导入,您的模块现在应该与图 4 所示类似

    图 4. MQHeaderDataHandlerModule

    图 4. MQHeaderDataHandlerModule

      双击Java组件创建一个实现

      将processCustomer的实现替换为清单 3 中的代码

      清单 3. 打印数据对象
        

    以下是引用片段:
    BOFactory factoryService = (BOFactory) new  
     ServiceManager().locateService("com/ibm/websphere/bo/BOFactory"); 
    BOXMLSerializer xmlSerializerService =(BOXMLSerializer) new  
     ServiceManager().locateService("com/ibm/websphere/bo/BOXMLSerializer"); 
    try { 
     xmlSerializerService.writeDataObject(input1, input1.getType().getURI(),  
     input1.getType().getName(), System.out); 
    } catch (IOException e) { 
     e.printStackTrace(); 

    locateService_CustomerInterfacePartner().processCustomer(input1);
       
      保存Java组件
     
      保存模块

      运行模块

      为测试模块,我们将使用RFHUtil,它允许我们创建MQ消息,并将其放入导出接收目标。然后,我们可以使用WID中的服务器日志视图查看由Java组件输出的DataObject,并再次使用RFHUtil查看导入输出的消息。

      在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.xml的文件

      将内容设置为清单 4 中显示的 XML

      清单 4. CustomerType XML数据
          

    以下是引用片段:
    <?xml version="1.0" encoding="UTF-8"?> 
    <p:CustomerType xmlns:p="http://MQHeaderDataHandlerModule"  
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     xsi:schemaLocation="http://MQHeaderDataHandlerModule CustomerType.xsd "> 
    <name>Tim</name> 
    <id>114923</id> 
    </p:CustomerType> 
       
      在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.txt的文件 
    将内容设置为清单 5 中显示的XML

      清单 5. CustomerType带分隔符的数据
          
      

    以下是引用片段:
    Tim%114923

       
      将您的模块部署到服务器

      启动RFHUtil,并设置导出使用的队列管理器和队列

      使用Read File 读取 CustomerType.txt,您应该能够看到数据选项卡下的消息正文

      按WriteQ按钮将消息发送到队列

      选中WebSphere Integration Developer中的Server Logs视图,以便输出DataObject

      回到RFHUtil中,将队列更改为导入使用的队列,并单击ReadQ

      转到Data选项卡,看到的消息正文应该与图5所示类似

    图 5. 输出带分隔符的消息

    图 5. 输出带分隔符的消息

      转回RFHUtil中的Main选项卡

      使用Read File读取CustomerType.xml,您应该能够看到数据选项卡下的消息正文

      转到MQMD选项卡,并将消息格式设置为MQXML

    图 6. 设置MQXML消息格式

    图 6. 设置MQXML消息格式

      回到Main选项卡上,按WriteQ发送消息
     
      将队列更改为导入使用的队列,并单击ReadQ

      转到Data选项卡,看到的消息正文应该与图 7 所示类似

    图 7. 输出XML消息

    图 7. 输出XML消息

      结束语

      祝贺您,现在您已经成功地创建并测试了由MQ标头中的字段控制其行为的独立于协议的数据处理程序。我们可以看到,如果将格式字段设置为MQXML,则可以将消息视为XML。如果没有任何MQ标头,还可以将数据视为带分隔符的数据。请尝试将此数据处理程序与另一个绑定一起使用并检查其行为。

      在本文中,我们主要讨论了如何使用MQ格式字段更改数据处理程序逻辑,我们还可以在任何其他标头(如JMS标头或HTTP标头)中使用相应字段(如果处理这些协议)。值得注意的是,如果启用Propagate protocol header,则标头仅对数据处理程序可用,这是缺省设置。要查看设置,请选中绑定属性面板中的Propagation选项卡。

  • 相关阅读:
    docker入门
    IAR屏蔽警告的方法
    在MDK 中忽略(suppress) 某一个警告
    stm32 F40x CCM数据区的使用
    如何理解Stand SPI Dual SPI 和Quad SPI??
    stm32F1在sram中调试运行代码
    stm32低功耗的一点总结
    C语言中__attribute__ ((at())绝对定位的应用
    系统栈和任务栈——freertos
    软件模拟spi的注意事项
  • 原文地址:https://www.cnblogs.com/java20130722/p/3207022.html
Copyright © 2011-2022 走看看