zoukankan      html  css  js  c++  java
  • RabbitMQ学习之ConntectionFactory与Conntection的认知

    在发送和接收消息重要的类有:ConnectionFactory, Connection,Channel和 QueueingConsumer。

    ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为broke IP。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。
        代码清单1创建Connection的源码(ConnectionFactory.Java中)

    [java] view plain copy
     
     print?
    1. protected FrameHandler createFrameHandler(Address addr)  
    2.         throws IOException {  
    3.   
    4.         String hostName = addr.getHost();//获取主机IP  
    5.         int portNumber = portOrDefault(addr.getPort());//获取端口  
    6.         Socket socket = null;  
    7.         try {  
    8.             socket = factory.createSocket();  
    9.             configureSocket(socket);  
    10.             socket.connect(new InetSocketAddress(hostName, portNumber),  
    11.                     connectionTimeout);  
    12.             return createFrameHandler(socket);  
    13.         } catch (IOException ioe) {  
    14.             quietTrySocketClose(socket);  
    15.             throw ioe;  
    16.         }  
    17.     }  
    18.   
    19.     private static void quietTrySocketClose(Socket socket) {  
    20.         if (socket != null)  
    21.             try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}  
    22.     }  
    23.   
    24.     protected FrameHandler createFrameHandler(Socket sock)  
    25.         throws IOException  
    26.     {  
    27.         return new SocketFrameHandler(sock);  
    28.     }  
    29.   
    30.     /** 
    31.      *  Provides a hook to insert custom configuration of the sockets 
    32.      *  used to connect to an AMQP server before they connect. 
    33.      * 
    34.      *  The default behaviour of this method is to disable Nagle's 
    35.      *  algorithm to get more consistently low latency.  However it 
    36.      *  may be overridden freely and there is no requirement to retain 
    37.      *  this behaviour. 
    38.      * 
    39.      *  @param socket The socket that is to be used for the Connection 
    40.      */  
    41.     protected void configureSocket(Socket socket) throws IOException{  
    42.         // disable Nagle's algorithm, for more consistently low latency  
    43.         socket.setTcpNoDelay(true);  
    44.     }  
    45.   
    46.     /** 
    47.      * Create a new broker connection 
    48.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
    49.      * @return an interface to the connection 
    50.      * @throws IOException if it encounters a problem 
    51.      */  
    52.     public Connection newConnection(Address[] addrs) throws IOException {  
    53.         return newConnection(null, addrs);  
    54.     }  
    55.   
    56.     /** 
    57.      * Create a new broker connection 
    58.      * @param executor thread execution service for consumers on the connection 
    59.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
    60.      * @return an interface to the connection 
    61.      * @throws IOException if it encounters a problem 
    62.      */  
    63.     public Connection newConnection(ExecutorService executor, Address[] addrs)  
    64.         throws IOException  
    65.     {  
    66.         IOException lastException = null;  
    67.         for (Address addr : addrs) {  
    68.             try {  
    69.                 FrameHandler frameHandler = createFrameHandler(addr);  
    70.                 AMQConnection conn =  
    71.                     new AMQConnection(username,  
    72.                                       password,  
    73.                                       frameHandler,  
    74.                                       executor,  
    75.                                       virtualHost,  
    76.                                       getClientProperties(),  
    77.                                       requestedFrameMax,  
    78.                                       requestedChannelMax,  
    79.                                       requestedHeartbeat,  
    80.                                       saslConfig);  
    81.                 conn.start();  
    82.                 return conn;  
    83.             } catch (IOException e) {  
    84.                 lastException = e;  
    85.             }  
    86.         }  
    87.   
    88.         throw (lastException != null) ? lastException  
    89.                                       : new IOException("failed to connect");  
    90.     }  
    91.   
    92.     /** 
    93.      * Create a new broker connection 
    94.      * @return an interface to the connection 
    95.      * @throws IOException if it encounters a problem 
    96.      */  
    97.     public Connection newConnection() throws IOException {  
    98.         return newConnection(null,  
    99.                              new Address[] {new Address(getHost(), getPort())}  
    100.                             );  
    101.     }  
    102.   
    103.     /** 
    104.      * Create a new broker connection 
    105.      * @param executor thread execution service for consumers on the connection 
    106.      * @return an interface to the connection 
    107.      * @throws IOException if it encounters a problem 
    108.      */  
    109.     public Connection newConnection(ExecutorService executor) throws IOException {  
    110.         return newConnection(executor,  
    111.                              new Address[] {new Address(getHost(), getPort())}  
    112.                             );  
    113.     }  

      代码清单2 连接启动

    [java] view plain copy
     
     print?
    1. /** 
    2.      * Start up the connection, including the MainLoop thread. 
    3.      * Sends the protocol 
    4.      * version negotiation header, and runs through 
    5.      * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then 
    6.      * calls Connection.Open and waits for the OpenOk. Sets heart-beat 
    7.      * and frame max values after tuning has taken place. 
    8.      * @throws IOException if an error is encountered 
    9.      * either before, or during, protocol negotiation; 
    10.      * sub-classes {@link ProtocolVersionMismatchException} and 
    11.      * {@link PossibleAuthenticationFailureException} will be thrown in the 
    12.      * corresponding circumstances. If an exception is thrown, connection 
    13.      * resources allocated can all be garbage collected when the connection 
    14.      * object is no longer referenced. 
    15.      */  
    16.     public void start()  
    17.         throws IOException  
    18.     {  
    19.         this._running = true;  
    20.         // Make sure that the first thing we do is to send the header,  
    21.         // which should cause any socket errors to show up for us, rather  
    22.         // than risking them pop out in the MainLoop  
    23.         AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =  
    24.             new AMQChannel.SimpleBlockingRpcContinuation();  
    25.         // We enqueue an RPC continuation here without sending an RPC  
    26.         // request, since the protocol specifies that after sending  
    27.         // the version negotiation header, the client (connection  
    28.         // initiator) is to wait for a connection.start method to  
    29.         // arrive.  
    30.         _channel0.enqueueRpc(connStartBlocker);  
    31.         try {  
    32.             // The following two lines are akin to AMQChannel's  
    33.             // transmit() method for this pseudo-RPC.  
    34.             _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);  
    35.             _frameHandler.sendHeader();  
    36.         } catch (IOException ioe) {  
    37.             _frameHandler.close();  
    38.             throw ioe;  
    39.         }  
    40.   
    41.         // start the main loop going  
    42.         new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();  
    43.         // after this point clear-up of MainLoop is triggered by closing the frameHandler.  
    44.   
    45.         AMQP.Connection.Start connStart = null;  
    46.         AMQP.Connection.Tune connTune = null;  
    47.         try {  
    48.             connStart =  
    49.                 (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();  
    50.   
    51.             _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());  
    52.   
    53.             Version serverVersion =  
    54.                 new Version(connStart.getVersionMajor(),  
    55.                             connStart.getVersionMinor());  
    56.   
    57.             if (!Version.checkVersion(clientVersion, serverVersion)) {  
    58.                 throw new ProtocolVersionMismatchException(clientVersion,  
    59.                                                            serverVersion);  
    60.             }  
    61.   
    62.             String[] mechanisms = connStart.getMechanisms().toString().split(" ");  
    63.             SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);  
    64.             if (sm == null) {  
    65.                 throw new IOException("No compatible authentication mechanism found - " +  
    66.                         "server offered [" + connStart.getMechanisms() + "]");  
    67.             }  
    68.   
    69.             LongString challenge = null;  
    70.             LongString response = sm.handleChallenge(null, this.username, this.password);  
    71.   
    72.             do {  
    73.                 Method method = (challenge == null)  
    74.                     ? new AMQP.Connection.StartOk.Builder()  
    75.                                     .clientProperties(_clientProperties)  
    76.                                     .mechanism(sm.getName())  
    77.                                     .response(response)  
    78.                           .build()  
    79.                     : new AMQP.Connection.SecureOk.Builder().response(response).build();  
    80.   
    81.                 try {  
    82.                     Method serverResponse = _channel0.rpc(method).getMethod();  
    83.                     if (serverResponse instanceof AMQP.Connection.Tune) {  
    84.                         connTune = (AMQP.Connection.Tune) serverResponse;  
    85.                     } else {  
    86.                         challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();  
    87.                         response = sm.handleChallenge(challenge, this.username, this.password);  
    88.                     }  
    89.                 } catch (ShutdownSignalException e) {  
    90.                     throw new PossibleAuthenticationFailureException(e);  
    91.                 }  
    92.             } while (connTune == null);  
    93.         } catch (ShutdownSignalException sse) {  
    94.             _frameHandler.close();  
    95.             throw AMQChannel.wrap(sse);  
    96.         } catch(IOException ioe) {  
    97.             _frameHandler.close();  
    98.             throw ioe;  
    99.         }  
    100.   
    101.         try {  
    102.             int channelMax =  
    103.                 negotiatedMaxValue(this.requestedChannelMax,  
    104.                                    connTune.getChannelMax());  
    105.             _channelManager = new ChannelManager(this._workService, channelMax);  
    106.   
    107.             int frameMax =  
    108.                 negotiatedMaxValue(this.requestedFrameMax,  
    109.                                    connTune.getFrameMax());  
    110.             this._frameMax = frameMax;  
    111.   
    112.             int heartbeat =  
    113.                 negotiatedMaxValue(this.requestedHeartbeat,  
    114.                                    connTune.getHeartbeat());  
    115.   
    116.             setHeartbeat(heartbeat);  
    117.   
    118.             _channel0.transmit(new AMQP.Connection.TuneOk.Builder()  
    119.                                 .channelMax(channelMax)  
    120.                                 .frameMax(frameMax)  
    121.                                 .heartbeat(heartbeat)  
    122.                               .build());  
    123.             _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()  
    124.                                       .virtualHost(_virtualHost)  
    125.                                     .build());  
    126.         } catch (IOException ioe) {  
    127.             _heartbeatSender.shutdown();  
    128.             _frameHandler.close();  
    129.             throw ioe;  
    130.         } catch (ShutdownSignalException sse) {  
    131.             _heartbeatSender.shutdown();  
    132.             _frameHandler.close();  
    133.             throw AMQChannel.wrap(sse);  
    134.         }  
    135.   
    136.         // We can now respond to errors having finished tailoring the connection  
    137.         this._inConnectionNegotiation = false;  
    138.   
    139.         return;  
    140.     }  


    转载:http://wubin850219.iteye.com/blog/1007984

  • 相关阅读:
    完整java开发中JDBC连接数据库代码和步骤
    2007最后一贴
    ajax数据加载经验分享
    vs2008中文版提供下载(包含中文msdn)
    修改服务器控件的ID和Name
    你使用控件吗?会用吗?
    自定义控件集
    asp.net控件开发基础示例代码打包
    javascript好文章收藏
    wpf学习笔记简单绑定
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124719.html
Copyright © 2011-2022 走看看