zoukankan      html  css  js  c++  java
  • Unity Job System

      参考链接 : 

      http://esprog.hatenablog.com/entry/2018/05/19/150313

      https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/

      Job系统作为一个多线程系统, 它因为跟ECS有天生的融合关系所以比较重要的样子, 我也按照使用类型的分类来看看Job System到底怎么样.

      Job说实话就是一套封装的多线程系统, 我相信所有开发人员都能自己封装一套, 所以Unity推出这个的时候跟着ECS一起推出, 因为单独推出来的话肯定推不动, 多线程, 线程安全, 线程锁, 线程共享资源, 这些都没什么区别, 我从一个简单列表的功能来说吧.

      先来一个普通的多线程 :  

    using System.Collections;
    using System.Collections.Generic;
    using UnityEngine;
    
    using System;
    using System.Threading;
    
    public class NormalListAccessTest01 : MonoBehaviour
    {
        public class RunData
        {
            public List<int> datas = new List<int>();
            public float speed;
            public float deltaTime;
        }
    
        public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null)
        {
            System.Threading.ThreadPool.QueueUserWorkItem((_obj) =>
            {
                call.Invoke(obj);
                if(endCall != null)
                { ThreadMaster.Instance.CallFromMainThread(endCall); }
            });
        }
    
        private void OnGUI()
        {
            if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test"))
            {
                ThreadMaster.GetOrCreate();
                var data = new RunData();
                data.deltaTime = Time.deltaTime;
                data.speed = 100.0f;
                for(int i = 0; i < 10000; i++)
                {
                    data.datas.Add(i);
                }
                RunOnThread<RunData>((_data) =>
                {
                    // 这是在工作线程里
                    Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
                    var move = _data.deltaTime * _data.speed;
                    for(int i = 0; i < _data.datas.Count; i++)
                    {
                        var val = _data.datas[i] + 1;
                        _data.datas[i] = val;
                    }
                }, data, () =>
                {
                    // 这是在主线程里
                    Debug.Log(data.datas[0]);
                    Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
                });
            }
        }
    }

      线程转换的一个简单封装ThreadMaster : 

    using System.Collections;
    using System.Collections.Generic;
    using UnityEngine;
    
    public class ThreadMaster : MonoBehaviour
    {
        private static ThreadMaster _instance;
        public static ThreadMaster Instance
        {
            get
            {
                return GetOrCreate();
            }
        }
    
        private volatile List<System.Action> _calls = new List<System.Action>();
    
        public static ThreadMaster GetOrCreate()
        {
            if(_instance == false)
            {
                _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>();
            }
            return _instance;
        }
        public void CallFromMainThread(System.Action call)
        {
            _calls.Add(call);
        }
        void Update()
        {
            if(_calls.Count > 0)
            {
                for(int i = 0; i < _calls.Count; i++)
                {
                    var call = _calls[i];
                    call.Invoke();
                }
                _calls.Clear();
            }
        }
    }

      没有加什么锁, 简单运行没有问题, 下面来个Job的跑一下:  

    using UnityEngine;
    using Unity.Collections;
    using Unity.Jobs;
    
    public class JobSystemSample00 : MonoBehaviour
    {
        struct VelocityJob : IJob
        {
            public NativeArray<int> datas;
    
            public void Execute()
            {
                for(var i = 0; i < datas.Length; i++)
                {
                    datas[i] = datas[i] + 1;
                }
            }
        }
    
        public void Test()
        {
            var datas = new NativeArray<int>(100, Allocator.Persistent);
    
            var job = new VelocityJob()
            {
                datas = datas
            };
    
            JobHandle jobHandle = job.Schedule();
            JobHandle.ScheduleBatchedJobs();
    
            //Debug.Log(datas[0]);     // Error : You must call JobHandle.Complete()
            jobHandle.Complete();
            Debug.Log(datas[0]);
    
            datas.Dispose();
        }
    
        private void OnGUI()
        {
            if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
            {
                Test();
            }
        }
    }

      这里就有一个大问题了, 在有注释的地方 // Error : You must call JobHandle.Complete(), 是说在Job没有调用Complete()时, 去获取相关数组内容是非法的! 而这个jobHandle.Complete(); 无法通过工作线程去调用, 也就是说Job的运行它是无法自行结束的, 无法发出运行结束的通知的, 对比上面封装的普通多线程弱爆了.  而这个Complete()函数如果在工作线程执行完成前调用, 会强制立即执行(文档也是写 Wait for the job to complete), 也就是说它只能在主线程调用并且会阻塞主线程, 这样就可以定性了, 它的Job System不是为了提供一般使用的多线程封装给我们用的, 可是它又是很强大的, 因为它能使用高效的内存结构, 能保证数据访问安全, 能在需要的时候调用Complete方法强制等待工作线程执行完毕(如果没猜错的话, 引擎对这个做了很大优化, 并不是简单等待), 还有BurstCompile等, 如果我们封装成功了的话, 就是很好的多线程库了.

      PS : 打个比方一个mesh的渲染, 在渲染之前必须计算完所有坐标转换, Job的好处就是可以进行多线程并行的计算, 然后还能被主线程强制执行完毕, 比在主线程中单独计算强多了. 而这个强制执行才是核心逻辑.

      经过几次测试, 几乎没有办法简单扩展Job系统来让它成为像上面一样拥有自动完成通知的系统, 如下 : 

      1. 添加JobHandle变量到IJob中, 在Execute结束时调用  

        struct VelocityJob : IJob
        {
            public NativeArray<int> datas;
    
            [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction]
            public JobHandle selfHandle;    // 是这个IJob调用Schedule的句柄
    
            public void Execute()
            {
                for(var i = 0; i < datas.Length; i++)
                {
                    datas[i] = datas[i] + 1;
                }
                selfHandle.Complete();
            }
        }

      报错, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 无法解决, 直接就无法在IJob结构体中添加JobHandle变量. 并且无法在工作线程中调用Complete方法.

      2. 添加回调函数进去

        struct VelocityJob : IJob
        {
            public NativeArray<int> datas;
    
            public System.Action endCall;
    
            public void Execute()
            {
                for(var i = 0; i < datas.Length; i++)
                {
                    datas[i] = datas[i] + 1;
                }
                if(endCall != null)
                {
                    endCall.Invoke();
                }
            }
        }

      报错, Job系统的struct里面只能存在值类型的变量 !!-_-

      3. 使用全局的引用以及线程转换逻辑来做成自动回调的形式, 虽然可以使用了可是非常浪费资源 :

    using UnityEngine;
    using Unity.Collections;
    using Unity.Jobs;
    using System.Collections.Generic;
    
    public class JobSystemSample01 : MonoBehaviour
    {
        private static int _id = 0;
        public static int NewID => _id++;
        public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>();
    
        public class IJobCall
        {
            public JobHandle jobHandle;
            public System.Action endCall;
        }
        struct VelocityJob : IJob
        {
            public NativeArray<int> datas;
    
            public int refID;
            public void Execute()
            {
                for(var i = 0; i < datas.Length; i++)
                {
                    datas[i] = datas[i] + 1;
                }
                var handle = ms_handleRef[refID];
                ThreadMaster.Instance.CallFromMainThread(() =>
                {
                    handle.jobHandle.Complete();
                    if(handle.endCall != null)
                    {
                        handle.endCall.Invoke();
                    }
                });
            }
        }
    
        public void Test()
        {
            ThreadMaster.GetOrCreate();
            var datas = new NativeArray<int>(100, Allocator.Persistent);
            int id = NewID;
            var job = new VelocityJob() { refID = id, datas = datas };
            ms_handleRef[id] = new IJobCall()
            {
                jobHandle = job.Schedule(),
                endCall = () => { Debug.Log(datas[0]); datas.Dispose(); }
            };
        }
    
        private void OnGUI()
        {
            if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
            {
                Test();
            }
        }
    }

      通过上面封装就可以作为一般多线程使用了, 并且我们获得了引擎提供的数据安全和高效逻辑性, 再加上利用BurstCpmpile和只读属性, 能够提升一些计算效率吧. ECS on Job已经在另外一篇中说过了, 这里忽略了.

      ----------------------------------------------

      当我测试到IJobParallelFor的时候, 发现并行并不像GPU那样的并行那么美好, 因为GPU它本身就是全并行的, 像卷积之类的, 它跟像素的处理顺序本身就没有关系, 可是我们的逻辑有些会受顺序的影响. 先看看下面的代码 : 

    using UnityEngine;
    using Unity.Collections;
    using Unity.Jobs;
    
    
    public class IJobParallelForSample01 : MonoBehaviour
    {
        struct VelocityJob : IJobParallelFor
        {
            public NativeArray<int> datas;
    
            public void Execute(int index)
            {
                if(index == 0)
                {
                    index = datas.Length - 1;
                }
                datas[index] = datas[index - 1] + 1;
            }
        }
    
        public void Test()
        {
            var datas = new NativeArray<int>(100, Allocator.Persistent);
            for(int i = 0; i < datas.Length; i++)
            {
                datas[i] = i;
            }
            var job = new VelocityJob()
            {
                datas = datas
            };
    
            var jobHandle = job.Schedule(datas.Length, 20);
            JobHandle.ScheduleBatchedJobs();
            
            jobHandle.Complete();
            Debug.Log(datas[0]);
    
            datas.Dispose();
        }
    
        private void OnGUI()
        {
            if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
            {
                Test();
            }
        }
    }

      主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;

      第二个参数innerloopBatchCount表示的是分块的大小, 比如我们数组长度是100,  每20个元素分成一块, 一共可以分5块, 如果你的CPU核心数大于等于5它就能开5个线程来处理, 可是你不能去获取这个块之外的Index的数据:

      显然这里数据每20个一组被分为了5组, 在5个线程里, 然后跨组获取数据就报错了.

      测试一下线程数是否5个 : 

        struct VelocityJob : IJobParallelFor
        {
            public NativeArray<int> datas;
    
            public void Execute(int index)
            {
                throw new System.Exception(index + " ERROR");
            }
        }

       5个线程报错, 应该每个线程内的处理也是按照for的顺序来的.

      把每个块改成5的大小, 看看它能开几个线程:

     var jobHandle = job.Schedule(datas.Length, 5);

      恩开了8个, 我的机器确实是8核的, 不过它的分块不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道为什么, 因为按照我设定每个分组是5, 而整体平均100/8=12.5而不应该是整10的, 具体不详.

      如果我们要跟其它元素进行交互, 就只能把处理单元设置到跟数组一样大, 才能在一个块中处理:

    using UnityEngine;
    using Unity.Collections;
    using Unity.Jobs;
    
    
    public class IJobParallelForSample01 : MonoBehaviour
    {
        struct VelocityJob : IJobParallelFor
        {
            public NativeArray<int> datas;
    
            public void Execute(int index)
            {
                if(index > 0 && index < datas.Length - 1)
                {
                    datas[index] = datas[datas.Length - 1];
                }
            }
        }
    
        public void Test()
        {
            var datas = new NativeArray<int>(10, Allocator.Persistent);
            for(int i = 0; i < datas.Length; i++)
            {
                datas[i] = i;
            }
            var job = new VelocityJob()
            {
                datas = datas
            };
    
            var jobHandle = job.Schedule(datas.Length, datas.Length);
            JobHandle.ScheduleBatchedJobs();
    
            jobHandle.Complete();
            Debug .Log(datas[0]);
    
            datas.Dispose();
        }
    
        private void OnGUI()
        {
            if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
            {
                Test();
            }
        }
    }

      顺便测试一下各个线程的分配情况:

        private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>();
    
        struct VelocityJob : IJobParallelFor
        {
            public NativeArray<int> datas;
    
            public void Execute(int index)
            {
                Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId);
                lock(ms_threads)
                {
                    List<int> val = null;
                    ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val);
                    if(val == null)
                    {
                        val = new List<int>();
                        ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val;
                    }
                    val.Add(index);
                }
            }
        }
            var jobHandle = job.Schedule(100, 5);

      结果是分为8个线程, 4个线程的块为10, 4个为15

      所以不能想当然的去获取其它Index的内容, 毕竟分块逻辑不一定.

  • 相关阅读:
    Axure流程图
    Axure工作区间
    Axure简介
    Java知识导航总图
    SQL筛选出同一学科的时间最新的记录
    从高版本JDK换成低版本JDK报错Unsupported major.minor version 52.0
    java.lang.IllegalArgumentException: No converter found for return value of type
    httpClient创建对象、设置超时
    sql 查出一张表中重复的所有记录数据
    java List分批处理
  • 原文地址:https://www.cnblogs.com/tiancaiwrk/p/12410825.html
Copyright © 2011-2022 走看看