zoukankan      html  css  js  c++  java
  • 搭建rtmp直播流服务之3:java开发ffmpeg实现rtsp转rtmp并实现ffmpeg命令的接口化管理架构设计及代码实现

    上一篇文章简单介绍了java如何调用ffmpeg的命令:http://blog.csdn.net/eguid_1/article/details/51777716

    上上一篇介绍了nginx-rtmp服务器的搭建:http://blog.csdn.net/eguid_1/article/details/51749830

    这一篇将进一步深挖java对ffmepg命令的控制并最终实现服务接口化

    本篇文章源码:http://download.csdn.net/detail/eguid_1/9563637

    github项目地址:https://github.com/eguid/FFCH4J

    通知:由于很多同学反映本章代码的命令封装设计的不是很好,所以对本章代码重新进行了实现,新版本推翻了本章原有代码内部实现,接口设计更加利于注入自己的实现,并增加可执行原生ffmpeg命令功能

    新版本请到这里查看:java封装FFmpeg命令,支持原生ffmpeg全部命令,实现FFmpeg多进程处理与多线程输出控制(开启、关闭、查询),rtsp/rtmp推流、拉流

    (一)、简单介绍

    该服务接口可实现rtsp协议转换为rtmp协议且可以实现rtmp直播流发布到nginx流媒体服务器,其中最为重要的是如何实现通过参数生成ffmpeg命令并执行,且可以通过接口进行控制ffmpeg命令的停止

    (二)、实现ffmpeg接口化服务架构设计

    push端接口化管理
     

    一、接口化调用


    1、采用多线程方式,每次调用push端口开启一个主进程及两个输出线程
    2、可以对每个push端(线程)进行开启和关闭的控制
    3、统一接口参数,对ffmpeg的命令做到参数可控制

    二、架构设计

    1、服务接口

    PushManager提供push(开启一个push处理器),closePush(关闭push处理器),viewAppName接口(查看当前已经开启的应用)


    1.1、应用名和push处理器的关系

    一个处理器对应一个应用名

    1.2、push处理器

    一个处理器对应一个push主进程和两个输出线程

    2、主进程控制

    2.1、主进程开启

    服务接口调用push处理器开启push主进程,主进程会自动开启两个输出线程用于消息输出,
    开启后会将主进程Process和两个输出线程OutHandler通过map返回给服务接口。


    2.2、主进程关闭

    主进程可通过Process的destroy方法进行安全关闭

    3、输出线程控制


    3.1、输出线程开启

    输出线程从主进程获取到输出流进行输出

    3.2、输出线程关闭

    输出线程重写了destory方法,用于安全的关闭输出线程

    4、持久层控制


    持久层分为两个:

    1、appName(应用名)-pushId(push处理器的ID)对应关系 

    用于维护应用名和push处理器ID的对应关系,pushId为随机生成id


    2、pushId-主进程-输出线程对应关系

    主要用于存放主进程(Process)和两个输出线程,建立两者对应关系,方便服务接口管理

    (三)、代码实现

    1、PushManager实现

    /**
     * 实现push管理器的push,delete,view服务
     * 
     * @author eguid
     * @see PushMangerImpl
     * @since jdk1.7
     */
    
    public class PushManagerImpl implements PushManager
    {
        /**
         * 引用push处理器
         */
        private PushHandler pusher = new PushHandlerImpl();
        /**
         * 管理应用名和push处理器之间的关系
         */
        private PushId_AppRelshipDao pard=new PushId_AppRelshipDaoImpl();
        /**
         * 管理处理器的主进程Process及两个输出线程的关系
         */
        private HandlerDao hd = new HandlerDaoImpl();
        
        public void setPusher(PushHandler pusher)
        {
            this.pusher = pusher;
        }
    
        public void setPard(PushId_AppRelshipDao pard)
        {
            this.pard = pard;
        }
    
        public void setHd(HandlerDao hd)
        {
            this.hd = hd;
        }
    
        @Override
        public String push(Map<String, Object> map)
        {
            if(map==null||map.isEmpty()||!map.containsKey("appName"))
            {
                return null;
            }
            String appName=null;
            ConcurrentMap<String, Object> resultMap = null;
            try
            {
                appName=(String)map.get("appName");
                if(appName!=null&&"".equals(appName.trim()))
                {
                    return null;
                }
                resultMap = pusher.push(map);
                // 生成一个标识该命令行线程集的key
                String pushId = UUID.randomUUID().toString();
                hd.set(pushId, resultMap);
                pard.set(appName, pushId);
            }
            catch (IOException e)
            {
                // 暂时先写这样,后期加日志
                System.err.println("发生一个异常" + e.getMessage());
            }
            return appName;
        }
    
        @Override
        public void closePush(String appName)
        {
            String pushId=null;
            if(pard.isHave(appName))
            {
                pushId= pard.getPushId(appName);
            }
            if (pushId!=null&&hd.isHave(pushId))
            {
                ConcurrentMap<String, Object> map = hd.get(pushId);
                //关闭两个线程
                ((OutHandler)map.get("error")).destroy();
                ((OutHandler)map.get("info")).destroy();
                //暂时先这样写,后期加日志
                System.out.println("停止命令-----end commond");
                //关闭命令主进程
                ((Process)map.get("process")).destroy();
                //删除处理器与线程对应关系表
                hd.delete(pushId);
                //删除应用名对应关系表
                pard.delete(appName);
            }
        }
    
        @Override
        public List<String> viewAppName()
        {
            return pard.getAll();
        }
    

    2、pushHandler实现(push处理器)

    /**
     * 提供解析参数生成ffmpeg命令并处理push操作
     * @see PushHandlerImpl
     * @since jdk1.7
     */
    
    public class PushHandlerImpl implements PushHandler
    {
        /*
         * "ffmpeg -i "+ "rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 "+" -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test"
         * 推送流格式: name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
         */
        @Override
        public ConcurrentMap<String, Object> push(Map<String, Object> paramMap)
            throws IOException
        {
            // 从map里面取数据,组装成命令
            String comm = getComm4Map(paramMap);
            ConcurrentMap<String, Object> resultMap = null;
            // 执行命令行
    
            final Process proc = Runtime.getRuntime().exec(comm);
            System.out.println("执行命令----start commond");
            OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), "Error");
            OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info");
    
            errorGobbler.start();
            outputGobbler.start();
            // 返回参数
            resultMap = new ConcurrentHashMap<String, Object>();
            resultMap.put("info", outputGobbler);
            resultMap.put("error", errorGobbler);
            resultMap.put("process", proc);
            return resultMap;
        }
    
        /**
         * 通过解析参数生成可执行的命令行字符串;
         * name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
         * 
         * @param paramMap
         * @return 命令行字符串
         */
        protected String getComm4Map(Map<String, Object> paramMap)
        {
            // -i:输入流地址或者文件绝对地址
            StringBuilder comm = new StringBuilder("ffmpeg -i ");
            // 是否有必输项:输入地址,输出地址,应用名
            if (paramMap.containsKey("input") && paramMap.containsKey("output")
                && paramMap.containsKey("appName"))
            {
                comm.append(paramMap.get("input")).append(" ");
                // -f :转换格式,默认flv
                comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" ");
                // -r :帧率,默认25
                comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" ");
                // -s 分辨率 默认是原分辨率
                comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" ");
                // -an 禁用音频 
                comm.append("-an ").append(paramMap.containsKey("disableAudio") && ((Boolean)paramMap.get("disableAudio")) ? "-an" : "").append(" ");
                // 输出地址
                comm.append(paramMap.get("output"));
                //发布的应用名
                comm.append(paramMap.get("appName"));
                //一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD
                comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD");
                System.out.println(comm.toString());
                return comm.toString();
            }
            else
            {
                throw new RuntimeException("输入流地址不能为空!");
            }
    
        }
    }

    3、OutHandler(输出线程)

    **
     * 用于输出命令行主进程的消息线程(必须开启,否则命令行主进程无法正常执行) 重要:该类重写了destroy方法,用于安全的关闭该线程
     * 
     * @author eguid
     * @see OutHandler
     * @since jdk1.7
     */
    
    public class OutHandler extends Thread
    {
        // 控制状态
        volatile boolean status = true;
    
        BufferedReader br = null;
    
        String type = null;
    
        public OutHandler(InputStream is, String type)
        {
            br = new BufferedReader(new InputStreamReader(is));
            this.type = type;
        }
    
        /**
         * 重写线程销毁方法,安全的关闭线程
         */
        @Override
        public void destroy()
        {
            status = false;
        }
    
        /**
         * 执行输出线程
         */
        @Override
        public void run()
        {
            String msg = null;
            try
            {
                while (status)
                {
    
                    if ((msg = br.readLine()) != null)
                    {
                        System.out.println(type + "消息:" + msg);
                    }
                }
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    
    }
    

    4、两个dao层接口,方便后期实现该接口并实现持久化

    4.1、主进程(Process)和两个输出线程Dao

    /**
     * 命令行执行处理器缓存,方便管理处理器的开启和关闭
     * @author eguid  
     * @see HandlerDao  
     * @since  jdk1.7
     */
    
    public interface HandlerDao
    {
        /**
         *  获取某个处理器
         * @param pushId
         * @return
         */
        public ConcurrentMap get(String pushId);
        /**
         * 存放一个处理器
         * @param handlerMap
         */
        public void set(String key, ConcurrentMap<String, Object> resultMap);
        /**
         * 获取全部处理器的id
         * @return
         */
        public ConcurrentMap getAll();
        /**
         * 删除某个处理器
         * @param pushId
         */
        public void delete(String pushId);
        /**
         * 是否存在key
         */
        public boolean isHave(String pushId);
    }

    4.2、应用名-pushId对应关系Dao

    /**
     * 用于维护管理应用名与pushId的关系对应
     * @author eguid 
     * @see PushId_AppRelshipDao  
     * @since  jdk1.7
     */
    public interface PushId_AppRelshipDao
    {
    /**
     *  获取应用名对应的pushId
     * @param appName
     * @return pushId
     */
    public String getPushId(String appName);
    /**
     *  插入一个应用名和pushId对应
     * @param appName
     * @param pushId
     */
    public void set(String appName,String pushId);
    /**
     * 通过应用名删除对应关系
     * @param appName
     */
    public void delete(String appName);
    /**
     * 获取全部应用
     */
    public List<String> getAll();
    /**
     * 是否存在应用名
     * @param appName
     * @return true:存在;false:不存在
     */
    public boolean isHave(String appName);
    }


    下一篇将介绍一些支持rtmp直播的播放器

  • 相关阅读:
    top指令
    Trie
    最大公约数
    angular2 获取到的数据无法实时更新的问题
    npm install 的时候出现 write access 导致不能成功安装的问题
    angular 的 @Input、@Output 的一个用法
    windows 安装 apache 服务以及添加 php 解析
    php 性能优化之opcache
    intellij 插件结构(文件结构以及概念层面上的结构)
    jetBrains 插件开发第一课-- 在主菜单栏新增一个菜单
  • 原文地址:https://www.cnblogs.com/eguid/p/10195636.html
Copyright © 2011-2022 走看看