Agents
Agent是一种异步数据更新的机制.
但同时也是一种并发机制, 因为agent是基于thread pool实现的, 通过send和send-off分别发送到不同的thread pool中.
其中send对应的thread pool中的线程个数基本等于cup核数, 所以多条send指令其实是自动以cup核数的并发度进行并发处理的.
巧妙利用这个机制就可以实现并发
Concurrency Functions
pmap
pmap是map的进化版本, 但是它对于每个集合中的元素都是提交给一个线程去执行function
pmap is partially lazy in that the entire result set is not realized unless required, but the parallel computation does run ahead of the consumption to some degree.
例子, 先定义测试函数, 在真正执行函数前先sleep 1s
(defn make-heavy [f] (fn [& args] (Thread/sleep 1000) (apply f args)))
通过下面时间的简单对比, 就明白差别了
user=> (time (doall (map (make-heavy inc) [1 2 3 4 5]))) "Elapsed time: 5002.96291 msecs" (2 3 4 5 6) user=> (time (doall (pmap (make-heavy inc) [1 2 3 4 5]))) "Elapsed time: 1031.941815 msecs" (2 3 4 5 6)
pvalues
pvalues takes any number of expressions and returns a lazy sequence of the values of each expression, evaluated in parallel.
user=> (pvalues (+ 5 5) (- 5 3) (* 2 4))
(10 2 8)
pcalls
pcalls takes any number of no-argument functions and returns a lazy sequence of their return values, executing them in parallel.
user=> (pcalls #(+ 5 2) #(* 2 5))
(7 10)
Overhead and Performance
并发那么好, 是不是什么情况都需要使用并发了?
肯定不是, 并发本身也是需要overhead的, 如果线程并发带来的时间节省太少, 就得不偿失.
看下面比较极端的例子, 用并发反而慢了那么多, 就是因为时间都花费在并发的overhead上面了
user=> (time (dorun (map inc (range 1 1000)))) "Elapsed time: 9.150946 msecs" user=> (time (dorun (pmap inc (range 1 1000)))) "Elapsed time: 182.349073 msecs"
Futures and Promises
Futures and promises are two slightly more low-level threading constructs, inspired by the similar features available in the Java 6 concurrency API. They are simple to understand, simple to use, and provide a very direct way to spawn threads using native Clojure syntax.
Futures
A Clojure future represents a computation, running in a single thread.
user=> (def my-future (future (* 100 100))) #'user/my-future user=> @my-future ;如果thread没有执行完,会阻塞 10000
可以看到macro future是封装future-call函数, 你也可以直接调用这个函数, 不过比较麻烦些
(defmacro future [& body] `(future-call (fn [] ~@body)))
future-cancel
It is possible to attempt to cancel a future that hasn’t yet finished executing.
future-cancelled?
future-cancelled? takes a single future as an argument and returns true if it has been cancelled.
future-done?
future-done? takes a single future as an argument and returns true if the future’s execution is complete, otherwise false.
future?
future? takes a single value as an argument and returns true if it is a future, otherwise false.
Promises
A promise is a value that may not yet exist. If a promise is dereferenced before its value is set, the dereferencing thread blocks until a value is delivered to the promise.
可以用于多线程间同步协调, 谨慎使用, 容易导致死锁
user=> (def mypromise (promise)) #'user/mypromise user=> @mypromise ;导致主线程阻塞 user=> (def mypromise (promise)) #'user/mypromise user=> (deliver mypromise 5) #<AFn$IDeref$db53459f@c0f1ec: 5> user=> @mypromise 5
Java-based Threading
If none of Clojure’s other concurrency tools meet your needs for any reason, there’s always the option of
falling back to Java’s native threading capabilities.
user=> (.start (Thread. #(println "hello"))) nil hello