zoukankan      html  css  js  c++  java
  • 聊聊jstack的工作原理

    实现一个jstack

    在聊Jstack得工作原理前呢,不如让我们先写一个简单的jstack玩玩。不用怕,很简单的,就几行代码的事,看:

    public class MyJstack {
    
        public static void main(String[] args)throws Exception {
            VirtualMachine virtualMachine = VirtualMachine.attach("6361");
            HotSpotVirtualMachine hotSpotVirtualMachine = (HotSpotVirtualMachine)virtualMachine;
            InputStream inputStream = hotSpotVirtualMachine.remoteDataDump(new String[]{});
    
            byte[] buff = new byte[256];
            int len;
            do {
                len = inputStream.read(buff);
                if (len > 0) {
                    String respone = new String(buff, 0, len, "UTF-8");
                    System.out.print(respone);
                }
            } while(len > 0);
    
            inputStream.close();
            virtualMachine.detach();
        }
    }

    很简单吧,贴到你的开发环境里,运行就好了,别忘了把6361这个进程号换成你自己的Java进程号哦。

    实现原理

    jstack有两种实现方式,一种是基于attach api,其实现可以在tools.jar里找到;另一种是基于SA的实现,它被放在了sa-jdi.jar里。如果你通过idea搜索Jstack类,你会看到tools.jar和sa-jdi.jar各有一个Jstack类。

    本文呢,就通过分析attch api的源码,来了解jstack的工作原理。

    jstack本地源码实现

    我们来看一下HotSpotVirtualMachine的remoteDataDump方法:

    public InputStream remoteDataDump(Object... var1) throws IOException {
            return this.executeCommand("threaddump", var1);
    }

    他是在执行一个叫threaddump的命令。沿着这个executeCommand方法继续往里追,会发现他是调用了如下方法:

    InputStream execute(String var1, Object... var2) throws AgentLoadException, IOException {
            assert var2.length <= 3;
    
            String var3;
            synchronized(this) {
                if (this.path == null) {
                    throw new IOException("Detached from target VM");
                }
    
                var3 = this.path;
            }
    
            int var4 = socket();
    
            try {
                connect(var4, var3);
            } catch (IOException var9) {
                close(var4);
                throw var9;
            }
    
            IOException var5 = null;
    
            try {
                this.writeString(var4, "1");
                this.writeString(var4, var1);

    var1参数就是我们的threaddump指令,不难看出,这个方法是建立了一个socket连接,然后将threaddump指令发送给另一端,即我们要检查的jvm进程。

    注意:限于篇幅我并没有贴整个方法代码。execute是HotSpotVirtualMachine的抽象方法,不同平台的jdk有不同的execute方法的实现,我这里的代码是mac下的execute实现,位于BsdVirtualMachine类中。

    通过jtack本地源代码,我们大致可以粗略的认为:jstack就是通过与指定的jvm进程建立socket连接,然后发送指令,最后将jvm进程返回的内容打印出来。

    JVM的源码实现

    了解了jstack的本地源码,我们在看看jvm进程是如何处理的。

    当我们使用Java命令启动jvm进程时,Java命令会加载虚拟机共享库,然后执行共享库里的JNI_CreateJavaVM方法完成虚拟机的创建,在JNI_CreateJavaVM方法里会调用如下代码,完成具体的一个创建过程:

    result = Threads::create_vm((JavaVMInitArgs*) args, &can_try_again);

    如果你有心,或许会留意到,在你启动一个jvm进程时,即便你什么线程也没创建,你用jstack查看还是有很多的线程,如:Signal Dispatcher,VM Thread,Attach Listener等等。当过阅读本文,你会了解到这三个线程的作用。

    01 VM Thread线程

    Threads::create_vm这个方法很长,接下来咱们跳出一些重要的段落,来分析分析。

    // Create the VMThread
      { TraceTime timer("Start VMThread", TraceStartupTime);
        VMThread::create();//创建Thread对象
        Thread* vmthread = VMThread::vm_thread();
    
        if (!os::create_thread(vmthread, os::vm_thread))//调用操作系统api创建线程
          vm_exit_during_initialization("Cannot create VM thread. Out of system resources.");
    
        // Wait for the VM thread to become ready, and VMThread::run to initialize
        // Monitors can have spurious returns, must always check another state flag
        {
          MutexLocker ml(Notify_lock);
          os::start_thread(vmthread);//启动线程
          while (vmthread->active_handles() == NULL) {
            Notify_lock->wait();
          }
        }
      }

    通过注释,你也知道,这一段代码是从来创建VM Thread线程的。VMThread::create()完成了对现成的命名工作,代码如下:

    void VMThread::create() {
      assert(vm_thread() == NULL, "we can only allocate one VMThread");
      _vm_thread = new VMThread();
    
      // Create VM operation queue
      _vm_queue = new VMOperationQueue();
      guarantee(_vm_queue != NULL, "just checking");
    
      _terminate_lock = new Monitor(Mutex::safepoint, "VMThread::_terminate_lock", true);
    
      if (UsePerfData) {
        // jvmstat performance counters
        Thread* THREAD = Thread::current();
        _perf_accumulated_vm_operation_time =
                     PerfDataManager::create_counter(SUN_THREADS, "vmOperationTime",
                                                     PerfData::U_Ticks, CHECK);
      }
    }
    
    
    VMThread::VMThread() : NamedThread() {
      set_name("VM Thread");
    }

    通过new VMThread()创建线程对象,在VMThread的构造方法里将线程命名成VM Thread,这就是我们jstack看到的VM Thread线程,同时还为这个线程创建了一个叫VMOperationQueue的队列。

    至于VM Thread线程的作用,我们留到最后再说。

    02 Signal Dispatcher线程

    继续沿着 Threads::create_vm方法往下看,我们会看到如下代码:

    // Signal Dispatcher needs to be started before VMInit event is posted
      os::signal_init();

    这一句代码实现了Signal Dispatcher线程的创建,进入到signal_init()方法看看:

    void os::signal_init() {
      if (!ReduceSignalUsage) {
        // Setup JavaThread for processing signals
        EXCEPTION_MARK;
        Klass* k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_Thread(), true, CHECK);
        instanceKlassHandle klass (THREAD, k);
        instanceHandle thread_oop = klass->allocate_instance_handle(CHECK);
    
        const char thread_name[] = "Signal Dispatcher";
        Handle string = java_lang_String::create_from_str(thread_name, CHECK);
    
        // Initialize thread_oop to put it into the system threadGroup
        Handle thread_group (THREAD, Universe::system_thread_group());
        JavaValue result(T_VOID);
        JavaCalls::call_special(&result, thread_oop,
                               klass,
                               vmSymbols::object_initializer_name(),
                               vmSymbols::threadgroup_string_void_signature(),
                               thread_group,
                               string,
                               CHECK);
    
        KlassHandle group(THREAD, SystemDictionary::ThreadGroup_klass());
        JavaCalls::call_special(&result,
                                thread_group,
                                group,
                                vmSymbols::add_method_name(),
                                vmSymbols::thread_void_signature(),
                                thread_oop,         // ARG 1
                                CHECK);
    
        os::signal_init_pd();
    
        { MutexLocker mu(Threads_lock);
          JavaThread* signal_thread = new JavaThread(&signal_thread_entry);
    
          // At this point it may be possible that no osthread was created for the
          // JavaThread due to lack of memory. We would have to throw an exception
          // in that case. However, since this must work and we do not allow
          // exceptions anyway, check and abort if this fails.
          if (signal_thread == NULL || signal_thread->osthread() == NULL) {
            vm_exit_during_initialization("java.lang.OutOfMemoryError",
                                          "unable to create new native thread");
          }
    
          java_lang_Thread::set_thread(thread_oop(), signal_thread);
          java_lang_Thread::set_priority(thread_oop(), NearMaxPriority);
          java_lang_Thread::set_daemon(thread_oop());
    
          signal_thread->set_threadObj(thread_oop());
          Threads::add(signal_thread);
          Thread::start(signal_thread);
        }
        // Handle ^BREAK
        os::signal(SIGBREAK, os::user_handler());
      }
    }

    在这个方法里,我们可以看到要创建的线程名字:Signal Dispatcher,以及线程启动后调用的方法signal_thread_entry。(方法较长,看重点就好,没必要每句话都扣清楚)。

    有了对上边代码的分析,我们只需要看看signal_thread_entry方法,就知道Signal Dispatcher线程的作用了。

    static void signal_thread_entry(JavaThread* thread, TRAPS) {
      os::set_priority(thread, NearMaxPriority);
      while (true) {
        int sig;
        {
          // FIXME : Currently we have not decieded what should be the status
          //         for this java thread blocked here. Once we decide about
          //         that we should fix this.
          sig = os::signal_wait();//等待获取信号
        }
        if (sig == os::sigexitnum_pd()) {
           // Terminate the signal thread
           return;
        }
    
        switch (sig) {
          case SIGBREAK: {
            // Check if the signal is a trigger to start the Attach Listener - in that
            // case don't print stack traces.
            if (!DisableAttachMechanism && AttachListener::is_init_trigger()) {
              continue;
            }
            // Print stack traces
            // Any SIGBREAK operations added here should make sure to flush
            // the output stream (e.g. tty->flush()) after output.  See 4803766.
            // Each module also prints an extra carriage return after its output.
            VM_PrintThreads op;
            VMThread::execute(&op);
            VM_PrintJNI jni_op;
            VMThread::execute(&jni_op);
            VM_FindDeadlocks op1(tty);
            VMThread::execute(&op1);
            Universe::print_heap_at_SIGBREAK();
            if (PrintClassHistogram) {
              VM_GC_HeapInspection op1(gclog_or_tty, true /* force full GC before heap inspection */);
              VMThread::execute(&op1);
            }
            if (JvmtiExport::should_post_data_dump()) {
              JvmtiExport::post_data_dump();
            }
            break;

    这个方法里调用os::signal_wait()获取传给该jvm进程的信号,然后对信号进行处理。

    说下case SIGBREAK里的处理逻辑,当接收到SIGBREAK信号时,会先判断是否禁止Attach机制,如果没有禁止,会调用AttachListener::is_init_trigger()方法触发Attach Listener线程的初始化.如果attach机制被禁用,则会创建VM_PrintThreads、VM_PrintJNI、VM_FindDeadlocks等代表某一个操作的对象,通过VMThread::execute()方法扔到VM Thread线程的VMOperationQueue队列。

    03 Attach Listener线程

    继续沿着 Threads::create_vm方法往下看,在紧挨着启动Signal Dispatcher线程的下边,就是启动Attach Listener线程的语句:

    // Start Attach Listener if +StartAttachListener or it can't be started lazily
      if (!DisableAttachMechanism) {
        AttachListener::vm_start();
        if (StartAttachListener || AttachListener::init_at_startup()) {
          AttachListener::init();
        }
      }

    重点就在AttachListener::init()方法里:

    // Starts the Attach Listener thread
    void AttachListener::init() {
      EXCEPTION_MARK;
      Klass* k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_Thread(), true, CHECK);
      instanceKlassHandle klass (THREAD, k);
      instanceHandle thread_oop = klass->allocate_instance_handle(CHECK);
    
      const char thread_name[] = "Attach Listener";
      Handle string = java_lang_String::create_from_str(thread_name, CHECK);
    
      // Initialize thread_oop to put it into the system threadGroup
      Handle thread_group (THREAD, Universe::system_thread_group());
      JavaValue result(T_VOID);
      JavaCalls::call_special(&result, thread_oop,
                           klass,
                           vmSymbols::object_initializer_name(),
                           vmSymbols::threadgroup_string_void_signature(),
                           thread_group,
                           string,
                           THREAD);
    
      if (HAS_PENDING_EXCEPTION) {
        tty->print_cr("Exception in VM (AttachListener::init) : ");
        java_lang_Throwable::print(PENDING_EXCEPTION, tty);
        tty->cr();
    
        CLEAR_PENDING_EXCEPTION;
    
        return;
      }
    
      KlassHandle group(THREAD, SystemDictionary::ThreadGroup_klass());
      JavaCalls::call_special(&result,
                            thread_group,
                            group,
                            vmSymbols::add_method_name(),
                            vmSymbols::thread_void_signature(),
                            thread_oop,             // ARG 1
                            THREAD);
    
      if (HAS_PENDING_EXCEPTION) {
        tty->print_cr("Exception in VM (AttachListener::init) : ");
        java_lang_Throwable::print(PENDING_EXCEPTION, tty);
        tty->cr();
    
        CLEAR_PENDING_EXCEPTION;
    
        return;
      }
    
      { MutexLocker mu(Threads_lock);
        JavaThread* listener_thread = new JavaThread(&attach_listener_thread_entry);
    
        // Check that thread and osthread were created
        if (listener_thread == NULL || listener_thread->osthread() == NULL) {
          vm_exit_during_initialization("java.lang.OutOfMemoryError",
                                        "unable to create new native thread");
        }
    
        java_lang_Thread::set_thread(thread_oop(), listener_thread);
        java_lang_Thread::set_daemon(thread_oop());
    
        listener_thread->set_threadObj(thread_oop());
        Threads::add(listener_thread);
        Thread::start(listener_thread);
      }
    }
    我们可以通过代码看出其创建了一个叫Attach Listener的线程,线程执行的逻辑封装在了attach_listener_thread_entry方法里。

    Attach Listener线程的作用,我们看看attach_listener_thread_entry方法便知:

    static void attach_listener_thread_entry(JavaThread* thread, TRAPS) {
      os::set_priority(thread, NearMaxPriority);
    
      thread->record_stack_base_and_size();
    
      if (AttachListener::pd_init() != 0) {
        return;
      }
      AttachListener::set_initialized();
    
      for (;;) {
        AttachOperation* op = AttachListener::dequeue();//从队列里获取操作对象
        if (op == NULL) {
          return;   // dequeue failed or shutdown
        }
    
        ResourceMark rm;
        bufferedStream st;
        jint res = JNI_OK;
    
        // handle special detachall operation
        if (strcmp(op->name(), AttachOperation::detachall_operation_name()) == 0) {
          AttachListener::detachall();
        } else {
          // find the function to dispatch too
          AttachOperationFunctionInfo* info = NULL;
          for (int i=0; funcs[i].name != NULL; i++) {
            const char* name = funcs[i].name;
            assert(strlen(name) <= AttachOperation::name_length_max, "operation <= name_length_max");
            if (strcmp(op->name(), name) == 0) {
              info = &(funcs[i]);
              break;
            }
          }
    
          // check for platform dependent attach operation
          if (info == NULL) {
            info = AttachListener::pd_find_operation(op->name());
          }
    
          if (info != NULL) {
            // dispatch to the function that implements this operation
            res = (info->func)(op, &st);//执行操作对象
          } else {
            st.print("Operation %s not recognized!", op->name());
            res = JNI_ERR;
          }
        }
    
        // operation complete - send result and output to client
        op->complete(res, &st);
      }
    }

    方法很长,我把重点挑出来分析。

    首先我们看看调用AttachListener::pd_init()完了什么:

    int AttachListener::pd_init() {
      JavaThread* thread = JavaThread::current();
      ThreadBlockInVM tbivm(thread);
    
      thread->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or
      // java_suspend_self() via check_and_wait_while_suspended()
    
      int ret_code = LinuxAttachListener::init();
    
      // were we externally suspended while we were waiting?
      thread->check_and_wait_while_suspended();
    
      return ret_code;
    }
    
    int LinuxAttachListener::init() {
      char path[UNIX_PATH_MAX];          // socket file
      char initial_path[UNIX_PATH_MAX];  // socket file during setup
      int listener;                      // listener socket (file descriptor)
    
      // register function to cleanup
      ::atexit(listener_cleanup);
    
      int n = snprintf(path, UNIX_PATH_MAX, "%s/.java_pid%d",
                       os::get_temp_directory(), os::current_process_id());
      if (n < (int)UNIX_PATH_MAX) {
        n = snprintf(initial_path, UNIX_PATH_MAX, "%s.tmp", path);
      }
      if (n >= (int)UNIX_PATH_MAX) {
        return -1;
      }
    
      // create the listener socket
      listener = ::socket(PF_UNIX, SOCK_STREAM, 0);//创建套接字
      if (listener == -1) {
        return -1;
      }
    
      // bind socket
      struct sockaddr_un addr;
      addr.sun_family = AF_UNIX;
      strcpy(addr.sun_path, initial_path);
      ::unlink(initial_path);
      int res = ::bind(listener, (struct sockaddr*)&addr, sizeof(addr));//绑定地址
      if (res == -1) {
        ::close(listener);
        return -1;
      }
    
      // put in listen mode, set permissions, and rename into place
      res = ::listen(listener, 5);//发起监听
      if (res == 0) {
          RESTARTABLE(::chmod(initial_path, S_IREAD|S_IWRITE), res);
          if (res == 0) {
              res = ::rename(initial_path, path);
          }
      }
      if (res == -1) {
        ::close(listener);
        ::unlink(initial_path);
        return -1;
      }
      set_path(path);
      set_listener(listener);
    
      return 0;
    }

    不难发现,AttachListener::pd_init()方法又调用了LinuxAttachListener::init()方法,完成了对套接字的创建和监听。这与jstack本地代码建立socket连接发送命令,不谋而合。

    再就是有一个for死循环,不停地调用AttachOperation* op = AttachListener::dequeue();获取操作对象。如果进入到AttachListener::dequeue()方法看一看,其实就是在读上边监听的套接字,我这里就不贴源码了。

    在这个死循环里,我们重点看看如下代码:

     // find the function to dispatch too
          AttachOperationFunctionInfo* info = NULL;
          for (int i=0; funcs[i].name != NULL; i++) {
            const char* name = funcs[i].name;
            assert(strlen(name) <= AttachOperation::name_length_max, "operation <= name_length_max");
            if (strcmp(op->name(), name) == 0) {
              info = &(funcs[i]);
              break;
            }
          }
    
          // check for platform dependent attach operation
          if (info == NULL) {
            info = AttachListener::pd_find_operation(op->name());
          }
    
          if (info != NULL) {
            // dispatch to the function that implements this operation
            res = (info->func)(op, &st);//调动方法
          } else {
            st.print("Operation %s not recognized!", op->name());
            res = JNI_ERR;
          }
        }
    
        // operation complete - send result and output to client
        op->complete(res, &st);

    这个for循环会遍历funcs数组,然后根据从队列里拿到的AttachOperation对象的name来找到一个匹配的AttachOperationFunctionInfo对象,然后调用其func方法。

    看到这里你或许很多疑惑,当然看看funcs数组里的东西,就开朗了:

    static AttachOperationFunctionInfo funcs[] = {
      { "agentProperties",  get_agent_properties },
      { "datadump",         data_dump },
      { "dumpheap",         dump_heap },
      { "load",             JvmtiExport::load_agent_library },
      { "properties",       get_system_properties },
      { "threaddump",       thread_dump },
      { "inspectheap",      heap_inspection },
      { "setflag",          set_flag },
      { "printflag",        print_flag },
      { "jcmd",             jcmd },
      { NULL,               NULL }
    };

    有没有看到上文中我们提到的threaddump命令。jstack通过与jvm进程建立socket连接,然后向jvm进程发送threaddump指令。上文说道调用AttachOperationFunctionInfo对象的func方法处理指令,其实就是调用了thread_dump方法,针对threaddump命令来说。

    坚持,马上就要说完了。来看看thread_dump方法干了些啥吧:

    // Implementation of "threaddump" command - essentially a remote ctrl-break
    // See also: ThreadDumpDCmd class
    //
    static jint thread_dump(AttachOperation* op, outputStream* out) {
      bool print_concurrent_locks = false;
      if (op->arg(0) != NULL && strcmp(op->arg(0), "-l") == 0) {
        print_concurrent_locks = true;
      }
    
      // thread stacks
      VM_PrintThreads op1(out, print_concurrent_locks);
      VMThread::execute(&op1);
    
      // JNI global handles
      VM_PrintJNI op2(out);
      VMThread::execute(&op2);
    
      // Deadlock detection
      VM_FindDeadlocks op3(out);
      VMThread::execute(&op3);
    
      return JNI_OK;
    }

    很简单,创建了VM_PrintThreads、VM_PrintJNI、VM_FindDeadlocks三个对象,扔给了VM Thread线程的队列。

    说到这里,VM Thread线程的作用,应该真相大白了,就是读取队列,然后执行相应的操作。有兴趣你可以继续追进去看看源代码,我这里就不追下去了。

    总结

    看了这么多代码,确实很头疼,总结下吧。

    jstack是通过与jvm进程建立socket连接,然后发送指令来实现相关操作。

    jvm的Attach Listener线程监听套接字,读取jstack发来的指令,然后将相关的操作扔给VM Thread线程来执行,最后返回给jstack。

    在jvm启动的时候,如果没有指定StartAttachListener,Attach Listener线程是不会启动的,在Signal Dispatcher线程收到SIGBREAK信号时,会调用 AttachListener::is_init_trigger()通过调用用AttachListener::init()启动了Attach Listener 线程。



    加入知识星球,可以有更多的交流,更多的学习和更快的提高。

  • 相关阅读:
    递归函数的写法笔记
    Spring项目中执行Java脚本
    关于秒杀的系统架构优化思路
    分布式搜索引擎Elasticsearch性能优化与配置
    分布式搜索引擎ElasticSearch+Kibana (Marvel插件安装详解)
    分布式搜索引擎Elasticsearch的查询与过滤
    Linux 下编译升级 Python
    搭建通过 ssh 访问的 Git 服务器
    分布式搜索引擎Elasticsearch的简单使用
    PHP 源码学习之线程安全
  • 原文地址:https://www.cnblogs.com/qingquanzi/p/8974850.html
Copyright © 2011-2022 走看看