zoukankan      html  css  js  c++  java
  • activemq-broker接收消息


    配置broker时,都会设置connector,connector内部会持有一个TransportServer,TransportServer相当于socketserver,transport相当于socket,connector启动(start)时会引发socketserver的启动(start),TcpTransportServer的主要职责就是accept(socketserver的主要职责就是accept)

     1 //org.apache.activemq.transport.tcp.TcpTransportServer的start
     2 protected void doStart() throws Exception {
     3   if (useQueueForAccept) {
     4     Runnable run = new Runnable() {
     5       @Override
     6       public void run() {
     7         try {
     8             //socketQueue是一个阻塞队列,不断的从socketQueue中获得socket,然后执行handleSocket方法
     9           while (!isStopped() && !isStopping()) {
    10             Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
    11             if (sock != null) {
    12               try {
    13                   //handleSocket最终会调用getAcceptListener().onAccept(configuredTransport)
    14                 handleSocket(sock);
    15               } catch (Throwable thrown) {
    16                 if (!isStopping()) {
    17                   onAcceptError(new Exception(thrown));
    18                 } else if (!isStopped()) {
    19                   LOG.warn("Unexpected error thrown during accept handling: ", thrown);
    20                   onAcceptError(new Exception(thrown));
    21                 }
    22               }
    23             }
    24           }
    25 
    26         } catch (InterruptedException e) {
    27           if (!isStopped() || !isStopping()) {
    28             LOG.info("socketQueue interrupted - stopping");
    29             onAcceptError(e);
    30           }
    31         }
    32       }
    33     };
    34     socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
    35     socketHandlerThread.setDaemon(true);
    36     socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
    37     //启动线程,该线程循环处理accept到的socket
    38     socketHandlerThread.start();
    39   }
    40   //TcpTransportServer继承了TransportServerThreadSupport,其本身也持有持有一个线程,
    41   //该线程用来执行的run逻辑,TcpTransportServer的run逻辑就是accept。
    42   super.doStart();
    43 }

    TcpTransportServer启动后执行的run方法

     1 /**
     2 * 就是accept
     3 * pull Sockets from the ServerSocket
     4 */
     5 @Override
     6 public void run() {
     7   if (!isStopped() && !isStopping()) {
     8     final ServerSocket serverSocket = this.serverSocket;
     9     if (serverSocket == null) {
    10       onAcceptError(new IOException("Server started without a valid ServerSocket"));
    11     }
    12         //channel不为null说明是nio
    13     final ServerSocketChannel channel = serverSocket.getChannel();
    14     if (channel != null) {
    15         //nio
    16       doRunWithServerSocketChannel(channel);
    17     } else {
    18         //以bio为例
    19       doRunWithServerSocket(serverSocket);
    20     }
    21   }
    22 }

    TcpTransportServer的run方法就是accept

     1 private void doRunWithServerSocket(final ServerSocket serverSocket) {
     2   while (!isStopped()) {
     3       //accept
     4     Socket socket = serverSocket.accept();
     5     if (socket != null) {
     6       if (isStopped() || getAcceptListener() == null) {
     7         socket.close();
     8       } else {
     9           //默认useQueueForAccept为true,socketQueue中的Socket最终会由socketHandlerThread取出并执行handleSocket(socket),
    10           //handleSocket最终会调用getAcceptListener().onAccept(configuredTransport)。
    11         if (useQueueForAccept) {
    12           socketQueue.put(socket);
    13         } else {
    14             //handleSocket最终会调用getAcceptListener().onAccept(configuredTransport)
    15           handleSocket(socket);
    16         }
    17       }
    18     }
    19   }
    20 }

    在org.apache.activemq.broker.TransportConnector启动时,设置了AcceptListener(getServer().setAcceptListener)该AcceptListener的主要逻辑:

     1 brokerService.getTaskRunnerFactory().execute(new Runnable() {
     2   @Override
     3   public void run() {
     4     try {
     5       if (!brokerService.isStopping()) {
     6           //transport中含有Socket,transport代表client
     7         Connection connection = createConnection(transport);
     8         //connection.start会执行transport.start,TcpTransport继承了TransportServerThreadSupport,
     9         //start会开启该线程的执行逻辑,即TcpTransport的run方法,阻塞读取socket数据。
    10         connection.start();
    11       } else {
    12         throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
    13       }
    14     } catch (Exception e) {
    15       String remoteHost = transport.getRemoteAddress();
    16       ServiceSupport.dispose(transport);
    17       onAcceptError(e, remoteHost);
    18     }
    19   }
    20 });

    accept到的socket被包装为TcpTransport,并start该Transport,socket就可以开始读数据了

     1 //TcpTransport的run方法会调用doRun
     2 protected void doRun() throws IOException {
     3   try {
     4       //从Socket读数据
     5     Object command = readCommand();
     6     doConsume(command);//执行命令,比如如果该command是一个Message,则会将其放入对应的destination
     7   } catch (SocketTimeoutException e) {
     8   } catch (InterruptedIOException e) {
     9   }
    10 }

    小结:connector.start引发了TransportServer的start,TransportServer的start主要就是socketserver.accept(在独立线程中),accept到的socket可以放入队列异步处理,也可以同步处理(在accept所在线程中),二者的执行逻辑是一致的,都会将socket封装成transport,再将transport封装成connection后进行connection.start,这就开始阻塞读了(transport会持有一个独立线程,阻塞读就在该线程中)。

    TcpTransport的读线程会阻塞在readCommand上,当client有数据传来时,readCommand会读取数据,并将其unmarshal为activemq的命令对象,然后再doConsume该对象。doConsume最终会调用broker(TransportConnection持有broker)的相关方法(比如,如果command是Message,就会调用broker.send(producerExchange, messageSend);),这就将Message交由broker处理了(接收client发来的命令并不属于broker的职责,broker真正要做的是将处理这些命令,比如将消息路由置对应的destination,而接收client命令的任务是由TransportServer完成的)。

  • 相关阅读:
    CodeForces 710CMagic Odd Square(经典-奇数个奇数&偶数个偶数)
    CodeForces 710A King Moves(水题-越界问题)
    CodeForces 701C They Are Everywhere (滑动窗口)
    CodeForces 701B Cells Not Under Attack
    [补档]happiness
    [补档]王者之剑
    [补档]士兵占领
    [补档]搭配飞行员
    [补档]暑假集训D6总结
    [补档][Lydsy2017年4月月赛]抵制克苏恩
  • 原文地址:https://www.cnblogs.com/holoyong/p/7469793.html
Copyright © 2011-2022 走看看