zoukankan      html  css  js  c++  java
  • Talend tJMSInput在bundle:restart后会存在两个Consumer的解决方法

    问题:在使用tJMSInput监听Topic/Queue,但用OSGI的形式部署到container中后,如果使用bundle:restart命令重启的话,会发现在JMS服务里面有两个Consumer!

    解决方法:因本人使用的是TIBCO EMS,需要引入相关jar,故我重新定义了一个组件来简化输入以及解决bundle:restart的问题。

    因为造成存在两个Consumer的原因是因为在Bundle:restart时没有关闭之前的连接而已!所以我们需要修改组件中的代码,用try{}catche(){}finally{}的方式,在finally里面关闭相关连接!

    下面列出我自定义TIBCO EMS调用并关闭连接的组件代码:

    tEsquelEMSInput_begin.javajet:

    <%@ jet 
    imports="
        org.talend.core.model.process.INode 
        org.talend.core.model.process.ElementParameterParser 
        org.talend.core.model.metadata.IMetadataTable 
        org.talend.core.model.metadata.IMetadataColumn 
        org.talend.core.model.process.IConnection
        org.talend.designer.codegen.config.CodeGeneratorArgument
        org.talend.core.model.process.IConnectionCategory
        java.util.List
        java.util.Map
    " 
    %>
    <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%>
    <%
    
    CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
    INode node = (INode)codeGenArgument.getArgument();
    String cid = node.getUniqueName();
    log = new LogUtil(node);
    String contextProvider="com.tibco.tibjms.naming.TibjmsInitialContextFactory";
    String connFacName="ConnectionFactory";
    String url=ElementParameterParser.getValue(node, "__SERVER_URL__");
    String user=ElementParameterParser.getValue(node, "__USER__");
    
    String from=ElementParameterParser.getValue(node, "__FROM__");
    String timeout=ElementParameterParser.getValue(node, "__TIMEOUT__");
    String messageSelector=ElementParameterParser.getValue(node, "__MSG_SELECTOR__");
    String processingMode = ElementParameterParser.getValue(node, "__PROCESSING_MODE__");
    String msgType = ElementParameterParser.getValue(node, "__MSGTYPE__");
    
    List<Map<String, String>> advProps = (List<Map<String,String>>)ElementParameterParser.getObjectValue(node, "__ADVANCED_PROPERTIES__");
    
    boolean enableSubscription = "Topic".equals(msgType) && "true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__ENABLE_SUB__"));
    String clientID = ElementParameterParser.getValue(node, "__CLIENT_ID__");
    String subscriberName = ElementParameterParser.getValue(node, "__SUBSCRIBER_NAME__");
    
    IMetadataTable metadata=null;
    List<IMetadataTable> metadatas = node.getMetadataList();
    if ((metadatas!=null)&&(metadatas.size()>0)) {
        metadata = metadatas.get(0);
    }
    %>
    
        java.util.Hashtable props_<%=cid%> = new java.util.Hashtable();
        props_<%=cid%>.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "<%=contextProvider%>");
        props_<%=cid%>.put(javax.naming.Context.PROVIDER_URL, <%=url%>);
        
    <%
    if(advProps.size() > 0){
        for(Map<String, String> item : advProps){
    %>
        props_<%=cid%>.put(<%=item.get("PROPERTY") %>, <%=item.get("VALUE") %>);
    <% 
        } 
    }
    %>
    javax.jms.Connection connection_<%=cid%> = null;
    javax.jms.Session session_<%=cid%> = null;
    javax.jms.MessageConsumer consumer_<%=cid%> = null;
    int nbline_<%=cid %> = 0;
    try {
        javax.naming.Context context_<%=cid%> = new javax.naming.InitialContext(props_<%=cid%>);
        javax.jms.ConnectionFactory factory_<%=cid%> = (javax.jms.ConnectionFactory) context_<%=cid%>.lookup("<%=connFacName%>");
        
        <%
        String passwordFieldName = "__PASS__";
        %>
            
        <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/password.javajet"%>
    
        connection_<%=cid%> = factory_<%=cid%>.createConnection(<%=user %>, decryptedPassword_<%=cid%>);
        
        <%
        if(enableSubscription && clientID != null && clientID.length() > 0){
        %>
        connection_<%=cid%>.setClientID(<%=clientID%>);
        <%
        }
        %>
        
        session_<%=cid%> = connection_<%=cid%>.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE);
        javax.jms.Destination dest_<%=cid%> = session_<%=cid%>.create<%=msgType%>(<%=from %>);
    
        <%
        if(enableSubscription){
        %>
        consumer_<%=cid%> = session_<%=cid%>.createDurableSubscriber((javax.jms.Topic)dest_<%=cid%>,<%=subscriberName%>,<%=messageSelector%>,false);
        <%
        } else {
        %>
        consumer_<%=cid%>    = session_<%=cid%>.createConsumer(dest_<%=cid%>, <%=messageSelector%>);
        <%
        }
        %>
        
        connection_<%=cid%>.start();
    
        System.out.println("Ready to receive message");
        System.out.println("Waiting...");
        <%log.info(log.str("Ready to receive message."));%>
        <%log.info(log.str("Waiting..."));%>
    
        javax.jms.Message message_<%=cid%>;    
    
        while ((message_<%=cid%> = consumer_<%=cid%>.receive(<%="-1".equals(timeout)?0:timeout%>*1000)) != null) {
            <%log.debug(log.str("Retrieving the message "), "(nbline_" + cid + "+1)", log.str("."));%>
    <%
        List< ? extends IConnection> conns = node.getOutgoingSortedConnections();
        List<IMetadataColumn> columnLists = metadata.getListColumns();
        for(IConnection conn:conns){
            if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
                String firstConnName = conn.getName();
                if("RAW".equals(processingMode)){
    %>
            <%=firstConnName%>.message=message_<%=cid %>;    
    <%
                }else{
                    if("id_Document".equals(metadata.getColumn("messageContent").getTalendType())){
    %>
            <%=firstConnName%>.messageContent=ParserUtils.parseTo_Document(((javax.jms.ObjectMessage) message_<%=cid %>).getObject().toString());
    <%
                    }else{
    %>
            <%=firstConnName%>.messageContent=((javax.jms.TextMessage) message_<%=cid %>).getText();
    <%
                    }
                }
            }
        }
        
    
    %>
    
    
            

    tEsquelEMSInput_end.javajet:

    <%@ jet 
        imports="
            org.talend.core.model.process.INode 
            org.talend.designer.codegen.config.CodeGeneratorArgument
            org.talend.core.model.process.ElementParameterParser
        " 
    %>
    <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%>
    <%
    CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
    INode node = (INode)codeGenArgument.getArgument();
    String cid = node.getUniqueName();
    log = new LogUtil(node);
    String timeout=ElementParameterParser.getValue(node, "__TIMEOUT__");
    String maxMsg=ElementParameterParser.getValue(node, "__MAX_MSG__");
    %>
        nbline_<%=cid %>++;
        message_<%=cid%>.acknowledge();
        if(<%=maxMsg %> > 0 && nbline_<%=cid %> >= <%=maxMsg %>){
            break;
        }
        
    }
    } catch(Exception error){
       throw error;
    } finally {
      if(consumer_<%=cid%> != null){
        consumer_<%=cid%>.close();
      }
      if(session_<%=cid%> != null){
        session_<%=cid%>.close();
      }
      if(connection_<%=cid%> != null){
        connection_<%=cid%>.close();
      }
    }
    <%log.info(log.str("Retrieved records count: "), log.var("nbline"), log.str("."));%>
    globalMap.put("<%=cid %>_NB_LINE", nbline_<%=cid%>);

    tEsquelEMSInput_java.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <COMPONENT>
        <HEADER PLATEFORM="ALL" SERIAL="" VERSION="0.102" STATUS="ALPHA"
            COMPATIBILITY="ALL" AUTHOR="AngusYang" RELEASE_DATE="20170220A"
            STARTABLE="true" LOG4J_ENABLED="true">
            <SIGNATURE />
        </HEADER>
        <FAMILIES>
            <FAMILY>Esquel</FAMILY>
        </FAMILIES>
        <DOCUMENTATION>
            <URL />
        </DOCUMENTATION>
        <CONNECTORS>
            <CONNECTOR CTYPE="FLOW" MAX_INPUT="0" MAX_OUTPUT="1" />
            <CONNECTOR CTYPE="ITERATE" MAX_OUTPUT="1" MAX_INPUT="1" />
            <CONNECTOR CTYPE="SUBJOB_OK" MAX_INPUT="1" MAX_OUTPUT="1" />
            <CONNECTOR CTYPE="COMPONENT_OK" />
            <CONNECTOR CTYPE="COMPONENT_ERROR" />
            <CONNECTOR CTYPE="RUN_IF" />
        </CONNECTORS>
        <PARAMETERS>
            <PARAMETER NAME="SERVER_URL" FIELD="TEXT" NUM_ROW="1">
                <DEFAULT>"tibjmsnaming://localhost:7222"</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="USER" FIELD="TEXT" NUM_ROW="5" REQUIRED="true">
                <DEFAULT>"admin"</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="PASS" FIELD="PASSWORD" NUM_ROW="5" REQUIRED="true">
                <DEFAULT>""</DEFAULT>
            </PARAMETER>
    
            <!-- Durable subscription start -->
            <PARAMETER NAME="ENABLE_SUB" FIELD="CHECK" SHOW_IF="MSGTYPE == 'TOPIC'"
                NUM_ROW="10">
                <DEFAULT>false</DEFAULT>
            </PARAMETER>
    
            <PARAMETER NAME="CLIENT_ID" FIELD="TEXT" NUM_ROW="15"
                REQUIRED="false" SHOW_IF="(MSGTYPE == 'TOPIC') AND (ENABLE_SUB == 'true')">
                <DEFAULT>""</DEFAULT>
            </PARAMETER>
    
            <PARAMETER NAME="SUBSCRIBER_NAME" FIELD="TEXT" NUM_ROW="20"
                REQUIRED="true" SHOW_IF="(MSGTYPE == 'TOPIC') AND (ENABLE_SUB == 'true')">
                <DEFAULT>""</DEFAULT>
            </PARAMETER>
            <!-- Durable subscription end -->
    
            <PARAMETER NAME="MSGTYPE" FIELD="CLOSED_LIST" NUM_ROW="25">
                <ITEMS DEFAULT="TOPIC">
                    <ITEM NAME="TOPIC" VALUE="Topic" />
                    <ITEM NAME="QUEUE" VALUE="Queue" />
                </ITEMS>
            </PARAMETER>
            <PARAMETER NAME="FROM" FIELD="TEXT" SHOW="true" NUM_ROW="30">
                <DEFAULT>""</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="TIMEOUT" FIELD="TEXT" SHOW="true" NUM_ROW="33">
                <DEFAULT>-1</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="MAX_MSG" FIELD="TEXT" SHOW="true" NUM_ROW="33">
                <DEFAULT>-1</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="MSG_SELECTOR" FIELD="TEXT" NUM_ROW="35"
                REQUIRED="true">
                <DEFAULT>""</DEFAULT>
            </PARAMETER>
            <PARAMETER NAME="PROCESSING_MODE" FIELD="CLOSED_LIST" SHOW="true"
                NUM_ROW="40">
                <ITEMS DEFAULT="RAW">
                    <ITEM NAME="RAW" VALUE="RAW" />
                    <ITEM NAME="CONTENT" VALUE="CONTENT" />
                </ITEMS>
            </PARAMETER>
            <PARAMETER NAME="SCHEMA" FIELD="SCHEMA_TYPE" REQUIRED="true"
                NUM_ROW="40">
                <TABLE IF="PROCESSING_MODE == 'RAW'" READONLY="true">
                    <COLUMN NAME="message" TYPE="id_Object" />
                </TABLE>
                <TABLE IF="PROCESSING_MODE == 'CONTENT'" READONLY="false">
                    <COLUMN NAME="messageContent" TYPE="id_String" CUSTOM="true" />
                </TABLE>
            </PARAMETER>
        </PARAMETERS>
    
        <ADVANCED_PARAMETERS>
            <PARAMETER NAME="ADVANCED_PROPERTIES" FIELD="TABLE"
                REQUIRED="false" NUM_ROW="10" NB_LINES="3">
                <ITEMS>
                    <ITEM NAME="PROPERTY" />
                    <ITEM NAME="VALUE" />
                </ITEMS>
            </PARAMETER>
        </ADVANCED_PARAMETERS>
        <CODEGENERATION>
            <IMPORTS>
                <IMPORT MODULE="jms-2.0.jar" NAME="jms-2.0" REQUIRED="true" />
                <IMPORT MODULE="tibcrypt.jar" NAME="tibcrypt" REQUIRED="true" />
                <IMPORT MODULE="tibjms.jar" NAME="tibjms" REQUIRED="true" />
            </IMPORTS>
        </CODEGENERATION>
        <RETURNS>
            <RETURN AVAILABILITY="AFTER" NAME="NB_LINE" TYPE="id_Integer" />
        </RETURNS>
    </COMPONENT>

    tEsquelEMSInput_messages.properties:

    #
    #Mon Jun 22 21:50:39 CST 2009
    HELP=org.talend.help.tEsquelEMSInput
    SERVER_URL.NAME=Server URL
    TIMEOUT.NAME=Timeout for Next Message(in sec)
    MSGTYPE.NAME=Message Type
    FROM.NAME=Message From
    NAME=tEsquelEMSInput
    MAX_MSG.NAME=Maximum Messages
    SERVER.NAME=MQ Server
    SERVER.ITEM.TIBCO=Tibco EMS
    NB_LINE.NAME=Number of line
    SCHEMA.NAME=Schema
    LONG_NAME=Esquel TIBCO EMS Receive Message
    MSG_SELECTOR.NAME=Message Selector Expression
    USER.NAME=User Name
    PASS.NAME=Password
    PROCESSING_MODE.NAME=Processing Mode
    PROCESSING_MODE.ITEM.RAW=Raw Message
    PROCESSING_MODE.ITEM.CONTENT=Message Content
    MSGTYPE.ITEM.QUEUE=Queue
    MSGTYPE.ITEM.TOPIC=Topic
    MSGTYPE.NAME=Message Type
    
    ADVANCED_PROPERTIES.NAME=Properties 
    ADVANCED_PROPERTIES.ITEM.PROPERTY=Property
    ADVANCED_PROPERTIES.ITEM.VALUE=Value
    
    ENABLE_SUB.NAME=Enable Durable Subscription
    CLIENT_ID.NAME=ClientID
    SUBSCRIBER_NAME.NAME=Subscriber Name
  • 相关阅读:
    Java多线程之 对同一个值操作
    Java 之 应用多线程计算1+2+...+100之多种方法比较(二)
    Java 之 应用多线程计算1+2+...+100之多种方法比较(一)
    SQL语句优化
    第六章:shiro Realm相关对象
    第五章:shiro密码加密
    第四章:shiro的INI配置
    第三章:shiro授权认证
    第二章:shiro身份验证
    第一章:Shiro简介
  • 原文地址:https://www.cnblogs.com/angusyang/p/6425162.html
Copyright © 2011-2022 走看看