问题:在使用tJMSInput监听Topic/Queue,但用OSGI的形式部署到container中后,如果使用bundle:restart命令重启的话,会发现在JMS服务里面有两个Consumer!
解决方法:因本人使用的是TIBCO EMS,需要引入相关jar,故我重新定义了一个组件来简化输入以及解决bundle:restart的问题。
因为造成存在两个Consumer的原因是因为在Bundle:restart时没有关闭之前的连接而已!所以我们需要修改组件中的代码,用try{}catche(){}finally{}的方式,在finally里面关闭相关连接!
下面列出我自定义TIBCO EMS调用并关闭连接的组件代码:
tEsquelEMSInput_begin.javajet:
<%@ jet imports=" org.talend.core.model.process.INode org.talend.core.model.process.ElementParameterParser org.talend.core.model.metadata.IMetadataTable org.talend.core.model.metadata.IMetadataColumn org.talend.core.model.process.IConnection org.talend.designer.codegen.config.CodeGeneratorArgument org.talend.core.model.process.IConnectionCategory java.util.List java.util.Map " %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%> <% CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; INode node = (INode)codeGenArgument.getArgument(); String cid = node.getUniqueName(); log = new LogUtil(node); String contextProvider="com.tibco.tibjms.naming.TibjmsInitialContextFactory"; String connFacName="ConnectionFactory"; String url=ElementParameterParser.getValue(node, "__SERVER_URL__"); String user=ElementParameterParser.getValue(node, "__USER__"); String from=ElementParameterParser.getValue(node, "__FROM__"); String timeout=ElementParameterParser.getValue(node, "__TIMEOUT__"); String messageSelector=ElementParameterParser.getValue(node, "__MSG_SELECTOR__"); String processingMode = ElementParameterParser.getValue(node, "__PROCESSING_MODE__"); String msgType = ElementParameterParser.getValue(node, "__MSGTYPE__"); List<Map<String, String>> advProps = (List<Map<String,String>>)ElementParameterParser.getObjectValue(node, "__ADVANCED_PROPERTIES__"); boolean enableSubscription = "Topic".equals(msgType) && "true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__ENABLE_SUB__")); String clientID = ElementParameterParser.getValue(node, "__CLIENT_ID__"); String subscriberName = ElementParameterParser.getValue(node, "__SUBSCRIBER_NAME__"); IMetadataTable metadata=null; List<IMetadataTable> metadatas = node.getMetadataList(); if ((metadatas!=null)&&(metadatas.size()>0)) { metadata = metadatas.get(0); } %> java.util.Hashtable props_<%=cid%> = new java.util.Hashtable(); props_<%=cid%>.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "<%=contextProvider%>"); props_<%=cid%>.put(javax.naming.Context.PROVIDER_URL, <%=url%>); <% if(advProps.size() > 0){ for(Map<String, String> item : advProps){ %> props_<%=cid%>.put(<%=item.get("PROPERTY") %>, <%=item.get("VALUE") %>); <% } } %> javax.jms.Connection connection_<%=cid%> = null; javax.jms.Session session_<%=cid%> = null; javax.jms.MessageConsumer consumer_<%=cid%> = null; int nbline_<%=cid %> = 0; try { javax.naming.Context context_<%=cid%> = new javax.naming.InitialContext(props_<%=cid%>); javax.jms.ConnectionFactory factory_<%=cid%> = (javax.jms.ConnectionFactory) context_<%=cid%>.lookup("<%=connFacName%>"); <% String passwordFieldName = "__PASS__"; %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/password.javajet"%> connection_<%=cid%> = factory_<%=cid%>.createConnection(<%=user %>, decryptedPassword_<%=cid%>); <% if(enableSubscription && clientID != null && clientID.length() > 0){ %> connection_<%=cid%>.setClientID(<%=clientID%>); <% } %> session_<%=cid%> = connection_<%=cid%>.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE); javax.jms.Destination dest_<%=cid%> = session_<%=cid%>.create<%=msgType%>(<%=from %>); <% if(enableSubscription){ %> consumer_<%=cid%> = session_<%=cid%>.createDurableSubscriber((javax.jms.Topic)dest_<%=cid%>,<%=subscriberName%>,<%=messageSelector%>,false); <% } else { %> consumer_<%=cid%> = session_<%=cid%>.createConsumer(dest_<%=cid%>, <%=messageSelector%>); <% } %> connection_<%=cid%>.start(); System.out.println("Ready to receive message"); System.out.println("Waiting..."); <%log.info(log.str("Ready to receive message."));%> <%log.info(log.str("Waiting..."));%> javax.jms.Message message_<%=cid%>; while ((message_<%=cid%> = consumer_<%=cid%>.receive(<%="-1".equals(timeout)?0:timeout%>*1000)) != null) { <%log.debug(log.str("Retrieving the message "), "(nbline_" + cid + "+1)", log.str("."));%> <% List< ? extends IConnection> conns = node.getOutgoingSortedConnections(); List<IMetadataColumn> columnLists = metadata.getListColumns(); for(IConnection conn:conns){ if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) { String firstConnName = conn.getName(); if("RAW".equals(processingMode)){ %> <%=firstConnName%>.message=message_<%=cid %>; <% }else{ if("id_Document".equals(metadata.getColumn("messageContent").getTalendType())){ %> <%=firstConnName%>.messageContent=ParserUtils.parseTo_Document(((javax.jms.ObjectMessage) message_<%=cid %>).getObject().toString()); <% }else{ %> <%=firstConnName%>.messageContent=((javax.jms.TextMessage) message_<%=cid %>).getText(); <% } } } } %>
tEsquelEMSInput_end.javajet:
<%@ jet imports=" org.talend.core.model.process.INode org.talend.designer.codegen.config.CodeGeneratorArgument org.talend.core.model.process.ElementParameterParser " %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%> <% CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; INode node = (INode)codeGenArgument.getArgument(); String cid = node.getUniqueName(); log = new LogUtil(node); String timeout=ElementParameterParser.getValue(node, "__TIMEOUT__"); String maxMsg=ElementParameterParser.getValue(node, "__MAX_MSG__"); %> nbline_<%=cid %>++; message_<%=cid%>.acknowledge(); if(<%=maxMsg %> > 0 && nbline_<%=cid %> >= <%=maxMsg %>){ break; } } } catch(Exception error){ throw error; } finally { if(consumer_<%=cid%> != null){ consumer_<%=cid%>.close(); } if(session_<%=cid%> != null){ session_<%=cid%>.close(); } if(connection_<%=cid%> != null){ connection_<%=cid%>.close(); } } <%log.info(log.str("Retrieved records count: "), log.var("nbline"), log.str("."));%> globalMap.put("<%=cid %>_NB_LINE", nbline_<%=cid%>);
tEsquelEMSInput_java.xml:
<?xml version="1.0" encoding="UTF-8"?> <COMPONENT> <HEADER PLATEFORM="ALL" SERIAL="" VERSION="0.102" STATUS="ALPHA" COMPATIBILITY="ALL" AUTHOR="AngusYang" RELEASE_DATE="20170220A" STARTABLE="true" LOG4J_ENABLED="true"> <SIGNATURE /> </HEADER> <FAMILIES> <FAMILY>Esquel</FAMILY> </FAMILIES> <DOCUMENTATION> <URL /> </DOCUMENTATION> <CONNECTORS> <CONNECTOR CTYPE="FLOW" MAX_INPUT="0" MAX_OUTPUT="1" /> <CONNECTOR CTYPE="ITERATE" MAX_OUTPUT="1" MAX_INPUT="1" /> <CONNECTOR CTYPE="SUBJOB_OK" MAX_INPUT="1" MAX_OUTPUT="1" /> <CONNECTOR CTYPE="COMPONENT_OK" /> <CONNECTOR CTYPE="COMPONENT_ERROR" /> <CONNECTOR CTYPE="RUN_IF" /> </CONNECTORS> <PARAMETERS> <PARAMETER NAME="SERVER_URL" FIELD="TEXT" NUM_ROW="1"> <DEFAULT>"tibjmsnaming://localhost:7222"</DEFAULT> </PARAMETER> <PARAMETER NAME="USER" FIELD="TEXT" NUM_ROW="5" REQUIRED="true"> <DEFAULT>"admin"</DEFAULT> </PARAMETER> <PARAMETER NAME="PASS" FIELD="PASSWORD" NUM_ROW="5" REQUIRED="true"> <DEFAULT>""</DEFAULT> </PARAMETER> <!-- Durable subscription start --> <PARAMETER NAME="ENABLE_SUB" FIELD="CHECK" SHOW_IF="MSGTYPE == 'TOPIC'" NUM_ROW="10"> <DEFAULT>false</DEFAULT> </PARAMETER> <PARAMETER NAME="CLIENT_ID" FIELD="TEXT" NUM_ROW="15" REQUIRED="false" SHOW_IF="(MSGTYPE == 'TOPIC') AND (ENABLE_SUB == 'true')"> <DEFAULT>""</DEFAULT> </PARAMETER> <PARAMETER NAME="SUBSCRIBER_NAME" FIELD="TEXT" NUM_ROW="20" REQUIRED="true" SHOW_IF="(MSGTYPE == 'TOPIC') AND (ENABLE_SUB == 'true')"> <DEFAULT>""</DEFAULT> </PARAMETER> <!-- Durable subscription end --> <PARAMETER NAME="MSGTYPE" FIELD="CLOSED_LIST" NUM_ROW="25"> <ITEMS DEFAULT="TOPIC"> <ITEM NAME="TOPIC" VALUE="Topic" /> <ITEM NAME="QUEUE" VALUE="Queue" /> </ITEMS> </PARAMETER> <PARAMETER NAME="FROM" FIELD="TEXT" SHOW="true" NUM_ROW="30"> <DEFAULT>""</DEFAULT> </PARAMETER> <PARAMETER NAME="TIMEOUT" FIELD="TEXT" SHOW="true" NUM_ROW="33"> <DEFAULT>-1</DEFAULT> </PARAMETER> <PARAMETER NAME="MAX_MSG" FIELD="TEXT" SHOW="true" NUM_ROW="33"> <DEFAULT>-1</DEFAULT> </PARAMETER> <PARAMETER NAME="MSG_SELECTOR" FIELD="TEXT" NUM_ROW="35" REQUIRED="true"> <DEFAULT>""</DEFAULT> </PARAMETER> <PARAMETER NAME="PROCESSING_MODE" FIELD="CLOSED_LIST" SHOW="true" NUM_ROW="40"> <ITEMS DEFAULT="RAW"> <ITEM NAME="RAW" VALUE="RAW" /> <ITEM NAME="CONTENT" VALUE="CONTENT" /> </ITEMS> </PARAMETER> <PARAMETER NAME="SCHEMA" FIELD="SCHEMA_TYPE" REQUIRED="true" NUM_ROW="40"> <TABLE IF="PROCESSING_MODE == 'RAW'" READONLY="true"> <COLUMN NAME="message" TYPE="id_Object" /> </TABLE> <TABLE IF="PROCESSING_MODE == 'CONTENT'" READONLY="false"> <COLUMN NAME="messageContent" TYPE="id_String" CUSTOM="true" /> </TABLE> </PARAMETER> </PARAMETERS> <ADVANCED_PARAMETERS> <PARAMETER NAME="ADVANCED_PROPERTIES" FIELD="TABLE" REQUIRED="false" NUM_ROW="10" NB_LINES="3"> <ITEMS> <ITEM NAME="PROPERTY" /> <ITEM NAME="VALUE" /> </ITEMS> </PARAMETER> </ADVANCED_PARAMETERS> <CODEGENERATION> <IMPORTS> <IMPORT MODULE="jms-2.0.jar" NAME="jms-2.0" REQUIRED="true" /> <IMPORT MODULE="tibcrypt.jar" NAME="tibcrypt" REQUIRED="true" /> <IMPORT MODULE="tibjms.jar" NAME="tibjms" REQUIRED="true" /> </IMPORTS> </CODEGENERATION> <RETURNS> <RETURN AVAILABILITY="AFTER" NAME="NB_LINE" TYPE="id_Integer" /> </RETURNS> </COMPONENT>
tEsquelEMSInput_messages.properties:
# #Mon Jun 22 21:50:39 CST 2009 HELP=org.talend.help.tEsquelEMSInput SERVER_URL.NAME=Server URL TIMEOUT.NAME=Timeout for Next Message(in sec) MSGTYPE.NAME=Message Type FROM.NAME=Message From NAME=tEsquelEMSInput MAX_MSG.NAME=Maximum Messages SERVER.NAME=MQ Server SERVER.ITEM.TIBCO=Tibco EMS NB_LINE.NAME=Number of line SCHEMA.NAME=Schema LONG_NAME=Esquel TIBCO EMS Receive Message MSG_SELECTOR.NAME=Message Selector Expression USER.NAME=User Name PASS.NAME=Password PROCESSING_MODE.NAME=Processing Mode PROCESSING_MODE.ITEM.RAW=Raw Message PROCESSING_MODE.ITEM.CONTENT=Message Content MSGTYPE.ITEM.QUEUE=Queue MSGTYPE.ITEM.TOPIC=Topic MSGTYPE.NAME=Message Type ADVANCED_PROPERTIES.NAME=Properties ADVANCED_PROPERTIES.ITEM.PROPERTY=Property ADVANCED_PROPERTIES.ITEM.VALUE=Value ENABLE_SUB.NAME=Enable Durable Subscription CLIENT_ID.NAME=ClientID SUBSCRIBER_NAME.NAME=Subscriber Name