zoukankan      html  css  js  c++  java
  • java ee wildfly spring 在线程池的线程中注入

    public class RtmpSpyingTests extends AbstractTransactionalJUnit4SpringContextTests {
        @Autowired
        ThreadPoolTaskExecutor rtmpSpyingTaskExecutor;
    
        @Autowired
        ApplicationContext ctx;
    
        @Autowired
        RtmpSourceRepository rtmpRep;
    
        @Test
        public void test() {
                RtmpSource rtmpSourceSample = new RtmpSource("test");
    
                rtmpRep.save(rtmpSourceSample);
                rtmpRep.flush();
    
                List<RtmpSource> rtmpSourceList = rtmpRep.findAll();  // Here I get a list containing rtmpSourceSample
    
                RtmpSpyingTask rtmpSpyingTask = ctx.getBean(RtmpSpyingTask.class, 
                            "arg1","arg2");
                    rtmpSpyingTaskExecutor.execute(rtmpSpyingTask);
    
        }
    }
    
    public class RtmpSpyingTask implements Runnable {
    
        @Autowired
        RtmpSourceRepository rtmpRep;
    
        String nameIdCh;
        String rtmpUrl;
    
        public RtmpSpyingTask(String nameIdCh, String rtmpUrl) {
            this.nameIdCh = nameIdCh;
            this.rtmpUrl = rtmpUrl;
        }
    
        public void run() {
            // Here I should get a list containing rtmpSourceSample, but instead of that
            // I get an empty list
            List<RtmpSource> rtmpSource = rtmpRep.findAll();  
        }
    }
    
    应该用
    @Service
    public class AsyncTransactionService {
    
        @Autowired
        RtmpSourceRepository rtmpRep;
    
        @Transactional(readOnly = true)
        public List<RtmpSource> getRtmpSources() {
            return rtmpRep.findAll();
        }
    
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        public void insertRtmpSource(RtmpSource rtmpSource) {
            rtmpRep.save(rtmpSource);
        }
    }

    或者

    用内部类。

    package com.italktv.platform.audioDist.service;
    
    import java.io.Serializable;
    import java.time.LocalDateTime;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import com.italktv.platform.audioDist.mongo.CustomerRepository;
    import com.italktv.platform.audioDist.mongo.PlayUrl;
    import com.italktv.platform.audioDist.mongo.PlayUrl.MyUrl;
    import com.italktv.platform.audioDist.mongo.PlayUrlRepository;
    import com.italktv.platform.audioDist.mysql.SubSet;
    import com.italktv.platform.audioDist.mysql.UserRepository;
    import com.italktv.platform.audioDist.task.MyTask;
    import com.italktv.platform.audioDist.task.TaskManager;
    
    @Component
    public class ScheduleJobs {
        private static final Logger log = LoggerFactory.getLogger(ScheduleJobs.class);
    
        public final static long SECOND = 1 * 1000;
        LocalDateTime nowDate = LocalDateTime.now();
    
        @Autowired
        // This means to get the bean called userRepository
        // Which is auto-generated by Spring, we will use it to handle the data
        private UserRepository userRepository;
    
        @Autowired
        private PlayUrlRepository repository;
        @Autowired
        private CustomerRepository cc;
        
        @Autowired
        private UserRepository user;
    
          @Autowired 
        TaskManager taskManager;
    
        @Scheduled(fixedRate = SECOND * 400)
        public void fixedRateJob() {
            nowDate = LocalDateTime.now();
            System.out.println("=== start distribution: " + nowDate);
            dotask();
        }
    
    //    @PostConstruct
    //    public void init() {
    //
    //        taskManager = new TaskManager();
    //        taskManager.init();
    //    }
    //
    //    @PreDestroy
    //    void destroy() {
    //        taskManager.destroy();
    //    }
    
        void dotask() {
    
            Map<Integer, List<SubSet>> map = userRepository.getUploadFileMap();
            for (Entry<Integer, List<SubSet>> subject : map.entrySet()) {
                int subjectId = subject.getKey();
                log.info(" subject id:" + subjectId);
                List<SubSet> allsub = subject.getValue();
                for (SubSet item : allsub) {
                    log.info(" sub:" + item.toString());
                    taskManager.add(new MessagePublish(item.id, item.path));
                }
                
                //wait them finished
                //TODO:
                
                //update subject status
                //TODO
                
            }
    
        }
        
        ////////////////////////内部类////////////////////////
        public class MessagePublish  extends MyTask implements Serializable{
            public MessagePublish() {
                super();
            }
            public  MessagePublish(int id,String name ){
                this.srcFile = name;
                this.partId=id;
            }
            
            @Value("${platform.audio.dist.domain}") private String domain;
            
            @Override
            public String call() {
                System.out.println(srcFile + " is uploading...");
                try {
                    //获取消息发布的区域
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10)+1);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(srcFile + " uploaded.");
                
                //2.RECORD TO MONGO DB
                PlayUrl play=new PlayUrl();
                play.programid="programid fake"+ "";
                play.domain=domain;
                play.protocol="HTTP";
                MyUrl myurl=new MyUrl();
                myurl.high="http://xxx.xxx/xi//";
                play.url=myurl;
                repository.save(play);
                //TODO:
                
                //IF FAILED, RETRY, RECORD RETRY TIMES.
                //TODO:
                
                return "ok";
            }
            
        }
    }
    
    
    package com.italktv.platform.audioDist.task;
    
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    
    
    @Component
    public class TaskManager {
        
        private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskManager.class);
    
    //    @Resource(lookup = "java:comp/DefaultManagedScheduledExecutorService")
    //    ManagedScheduledExecutorService executor;
    
        Map<String, Future<String>> tasks;
        ExecutorService executor ;
        @PostConstruct
        public void init() {
            logger.info(" === init TaskManager===");
            tasks = new HashMap<String, Future<String>>();
            executor =   Executors.newFixedThreadPool(3);
        }
    
        public void add(MyTask task) {
            logger.info("add delay:"+ task.partId+task.srcFile);
              Future<String> future = executor.submit(task);
            tasks.put(task.srcFile, future);
        }
    
        public boolean cancel(String name) {
            logger.info("cancel "+ name);
            boolean ret = false;
            Future<String> future = tasks.get(name);
            if (future == null) {
                logger.info("Not found name:" + name);
            } else {
                ret = future.cancel(true);
                logger.info("cancel "+ name+":"+ret);
                tasks.remove(name);
            }
            return ret;
        }
    
        public void waitTaskDone(){
            Collection<Future<String>> futuretasks = tasks.values();
            for(Future<String> future: futuretasks ){
                System.out.println("future done? " + future.isDone());
    
                String result="";
                try {
                    result = future.get();
                } catch (InterruptedException | ExecutionException e) {
                    logger.error("future exec failed.");
                    e.printStackTrace();
                }
    
                System.out.println("future done? " + future.isDone());
                System.out.print("result: " + result);
            }
        }
        @PreDestroy
        public void destroy(){
            try {
                System.out.println("attempt to shutdown executor");
                executor.shutdown();
                executor.awaitTermination(5, TimeUnit.SECONDS);
                }
            catch (InterruptedException e) {
                System.err.println("tasks interrupted");
            }
            finally {
                if (!executor.isTerminated()) {
                    System.err.println("cancel non-finished tasks");
                }
                executor.shutdownNow();
                System.out.println("shutdown finished");
            }
        }
    }
    
    
    package com.italktv.platform.audioDist.task;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    public abstract class MyTask implements Callable<String> {
        protected String srcFile;
        protected int partId;
        String programId;
    
        protected MyTask() {
    
        }
    
    }
  • 相关阅读:
    关于PHP程序员技术职业生涯规划
    让PHP7达到最高性能的几个Tips
    php-fpm解读-进程管理的三种模式 及 worker进程、master进程详解
    CGI、FastCGI和php-fpm概念和区别
    什么是PHP7中的孤儿进程与僵尸进程,加上守护进程
    PHP 信号管理知识整理汇总
    PHP多进程---fork多个子进程,父进程阻塞与非阻塞
    锁存器、触发器和寄存器
    FPGA基础之锁存器与触发器的设计
    从CMOS到触发器(二)
  • 原文地址:https://www.cnblogs.com/bigben0123/p/7458684.html
Copyright © 2011-2022 走看看