zoukankan      html  css  js  c++  java
  • rocketmq的线程服务基类

    RocketMQ有很多的线程服务,这些服务都继承自抽象类ServiceThread。

    这个抽象类可以单独抽出来用到我们其他的项目中来,仅仅需要修改下日志模块:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     *  Unless required by applicable law or agreed to in writing, software
     *  distributed under the License is distributed on an "AS IS" BASIS,
     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     *  See the License for the specific language governing permissions and
     *  limitations under the License.
     */
    
    
    /**
     * @author shijia.wxr
     */
    public abstract class ServiceThread implements Runnable {
        // 抽象类可以不用实现接口中的方法,但是继承改抽象类的类就必须实现了。
        private static final long JoinTime = 90 * 1000;
    
        protected final Thread thread;
    
        protected volatile boolean hasNotified = false;
    
        protected volatile boolean stoped = false;
    
    
        public ServiceThread() {
            this.thread = new Thread(this, this.getServiceName());
        }
    
    
        public abstract String getServiceName();
    
    
        public void start() {
            this.thread.start();
        }
    
    
        public void shutdown() {
            this.shutdown(false);
        }
    
        public void shutdown(final boolean interrupt) {
            this.stoped = true;
            System.out.println();
            System.out.println("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
    
            try {
                if (interrupt) {
                    this.thread.interrupt();
                }
    
                long beginTime = System.currentTimeMillis();
                if (!this.thread.isDaemon()) {
                    this.thread.join(this.getJointime());
                }
                long eclipseTime = System.currentTimeMillis() - beginTime;
                System.out.println("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
                        + this.getJointime());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public long getJointime() {
            return JoinTime;
        }
    
        public void stop() {
            this.stop(false);
        }
    
        public void stop(final boolean interrupt) {
            this.stoped = true;
            System.out.println("stop thread " + this.getServiceName() + " interrupt " + interrupt);
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
    
            if (interrupt) {
                this.thread.interrupt();
            }
        }
    
        public void makeStop() {
            this.stoped = true;
            System.out.println("makestop thread " + this.getServiceName());
        }
    
        public void wakeup() {
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
        }
    
        protected void waitForRunning(long interval) {
            synchronized (this) {
                if (this.hasNotified) {
                    this.hasNotified = false;
                    this.onWaitEnd();
                    return;
                }
    
                try {
                    this.wait(interval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    this.hasNotified = false;
                    this.onWaitEnd();
                }
            }
        }
    
        protected void onWaitEnd() {
        }
    
        public boolean isStoped() {
            return stoped;
        }
    }

    使用方法:

    继承这个类,需要实现两个方法,一个来自runnable接口的run方法,一个是来自ServiceThread 的getServiceName方法。

    getServiceName方法主要是未来用来初始化线程。在上面标黄的位置可以看到。

    构造完对象之后调用抽象父类ServiceThread的start方法就能启动线程了(上面标黄红字部分)。

    你还可以通过这篇文章来观察一下,在rocketmq中一个完整的使用流程是怎样的:

    http://www.cnblogs.com/guazi/p/6850988.html

  • 相关阅读:
    1029: [JSOI2007]建筑抢修
    1028: [JSOI2007]麻将
    1050 棋盘染色 2
    1026: [SCOI2009]windy数
    1074: [SCOI2007]折纸origami
    839. Optimal Marks
    1024: [SCOI2009]生日快乐
    1025: [SCOI2009]游戏
    1023: [SHOI2008]cactus仙人掌图
    对前面的总结
  • 原文地址:https://www.cnblogs.com/guazi/p/6851053.html
Copyright © 2011-2022 走看看