zoukankan      html  css  js  c++  java
  • 《Erlang程序设计》 第八章 并发编程

    第八章 并发编程

    第八章 并发编程

    Erlang中的进程并非属于操作系统, 它是属于程序语言本身的。
    Erlang中的进程的特点:

    1. 创建和销毁进程非常迅速
    2. 在两个进程间收发消息非常迅速
    3. 进程在所有操作系统上行为相同
    4. 可以创建大量进程
    5. 进程之间不共享任何数据, 彼此间完全独立
    6. 进程间交互的唯一方式是消息传递

    8.1 并发原语

    创建进程

    Pid = spawn(Fun).
    

    向进程发送消息

    Pid ! Message
    Pid1 ! Pid2 ! ... M
    

    接收消息

    receive
        Pattern1 [when Guard1] ->
            Expressions1;
        Pattern2 [when Guard2] ->
            Expressions2;
        ...
    end.
    

    8.2 一个简单的例子

    -module(area_server0).
    -export([loop/0]).
    
    # 在loop函数中根据接收的消息执行不同的动作, 并继续等待接收消息
    loop() ->
        receive
            {rectangle, Width, Ht} ->
                io:format("Area of rectangle is ~p~n", [Width * Ht]),
                loop();
            {circle, R} ->
                io:format("Area of circle is ~p~n", [3.14159 * R * R]),
                loop();
            Other ->
                io:format("I don't know what the area of a ~p is ~n", [Other]),
                loop()
        end.
    

    向指定进程(通过Pid指定)传递消息

    1> c(area_server0).
    {ok,area_server0}
    
    2> Pid = spawn(fun area_server0:loop/0).
    <0.37.0>
    
    3> Pid ! {rectangle, 6, 10}.
    Area of rectangle is 60
    {rectangle,6,10}
    
    4> Pid ! {circle, 23}.
    Area of circle is 1661.90111
    {circle,23}
    
    5> Pid ! {triangle, 2, 4, 5}.
    I don't know what the area of a {triangle,2,4,5} is 
    {triangle,2,4,5}
    

    8.3 客户/服务器介绍

    第一步

    -module(area_server1).
    -export([loop/0, rpc/2]).
    
    %% 远程过程调用函数
    %% 封装发送请求和等待回应
    rpc(Pid, Request) ->
        %% 发送时带上进程号以区分发送者
        %% 使用self()函数获取进程自己的Pid
        %% 发送后使用receive原语等待回应 
        Pid ! {self(), Request},
        receive
            Response ->
                Response
        end.
    
    %% 在loop函数中根据接收的消息和Pid, 完成计算后将结果返回发送者 
    loop() ->
        receive
            {From, {rectangle, Width, Ht}} ->
                From ! Width * Ht,
                loop();
            {From, {circle, R}} ->
                From ! 3.14159 * R * R,
                loop();
            {From, Other} ->
                From ! {error, Other},
                loop()
        end.
    

    调用结果:

    1> c(area_server1).
    {ok,area_server1}
    
    2> Pid = spawn(fun area_server1:loop/0).
    <0.48.0>
    
    3> area_server1:rpc(Pid, {rectangle, 6, 8}).
    48
    
    4> area_server1:rpc(Pid, {circle, 6}).
    113.09723999999999
    
    5> area_server1:rpc(Pid, socks).
    {error,socks}
    

    第二步

    -module(area_server2).
    -export([loop/0, rpc/2]).
    
    %% 远程过程调用函数
    %% 相比于第一步, 多了区分服务器进程的Pid 
    rpc(Pid, Request) ->
        Pid ! {self(), Request},
        receive
            {Pid, Response} ->
                Response
        end.
    
    %% 为了便于客户端区分, 将结果返回时带上自己的进程号
    loop() ->
        receive
            {From, {rectangle, Width, Ht}} ->
                From ! {self(), Width * Ht},
                loop();
            {From, {circle, R}} ->
                From ! {self(), 3.14159 * R * R},
                loop();
            {From, Other} ->
                From ! {self(), {error, Other}},
                loop()
        end.
    

    调用结果:

    1> c(area_server2).
    {ok,area_server2}
    
    2> Pid = spawn(fun area_server2:loop/0).
    <0.59.0>
    
    3> area_server2:rpc(Pid, {circle, 5}).
    78.53975
    

    第三步

    -module(area_server3).
    -export([start/0, area/2]).
    
    %% 封装进程创建的过程 
    start() ->spawn(fun loop/0).
    
    %% 隐藏远程过程调用
    area(Pid, What) ->rpc(Pid, What).
    
    rpc(Pid, Request) ->
        Pid ! {self(), Request},
        receive
            {Pid, Response} ->
                Response
        end.
    
    loop() ->
        receive
            {From, {rectangle, Width, Ht}} ->
                From ! {self(), Width * Ht},
                loop();
            {From, {circle, R}} ->
                From ! {self(), 3.14159 * R * R},
                loop();
            {From, Other} ->
                From ! {self(), {error, Other}},
                loop()
        end.
    

    调用结果:

    1> c(area_server3).
    {ok,area_server3}
    
    2> Pid = area_server3:start().
    <0.68.0>
    
    3> area_server3:area(Pid, {rectangle, 10, 8}).
    80
    
    4> area_server3:area(Pid, {circle, 4}).
    50.26544
    

    8.4 创建一个进程需要花费多少时间

    -module(processes).
    -export([max/1]).
    
    %% 创建N个进程并销毁, 查看其运行时间
    max(N) ->
        %% 获取系统支持的最大进程数 可以在启动shell时通过"+P"进行设置
        Max = erlang:system_info(process_limit),
        io:format("Maximum allowed processes:~p~n", [Max]),
        %% 用于统计代码执行所耗的CPU时间和真实时间
        %% 即将要计时的代码段放在statistics(runtime), code..., statistics(runtime)之间
        statistics(runtime),
        statistics(wall_clock),
        %% 创建N个进程
        L = for(1, N, fun() ->spawn(fun() ->wait() end) end),
        {_, Time1} = statistics(runtime),
        {_, Time2} = statistics(wall_clock),
        %% 销毁进程
        lists:foreach(fun(Pid) ->Pid ! die end, L),
        U1 = Time1 * 1000 / N,
        U2 = Time2 * 1000 / N,
        io:format("Process spawn time=~p (~p) microseconds~n",[U1, U2]).
    wait() ->
        receive 
            die ->void
        end.
    for(N, N, F) ->[F()];
    for(I, N, F) ->[F() | for(I+1, N, F)].
    

    在我的Intel i7 2.9G处理器、8G内存的Mac Pro上运行效果如下:

    matrix@MBP:8 $ erl +P 500000
    Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:4:4] [async-threads:0] [hipe] [kernel-poll:false]
    
    Eshell V5.9.1  (abort with ^G)
    1> processes:max(20000).
    Maximum allowed processes:500000
    Process spawn time=3.0 (2.95) microseconds
    ok
    2> processes:max(50000).
    Maximum allowed processes:500000
    Process spawn time=2.8 (2.86) microseconds
    ok
    3> processes:max(100000).
    Maximum allowed processes:500000
    Process spawn time=2.8 (2.92) microseconds
    ok
    4> processes:max(200000).
    Maximum allowed processes:500000
    Process spawn time=2.6 (2.825) microseconds
    ok
    5> processes:max(400000).
    Maximum allowed processes:500000
    Process spawn time=2.6 (2.81) microseconds
    ok
    

    8.5 带超时的receive

    为receive语句添加超时处理, 格式如下:

    receice
        Pattern1 [when Guard1] ->
            Expressions1;
        Pattern2 [when Guard2] ->
            Expressions2;
        ...
    after Time ->
        Expressions
    end.
    

    8.5.1 只有超时的receive

    只有超时的receive其实就是实现的sleep功能

    sleep(T) ->
        receive
        after T ->
            true
        end.
    

    8.5.2 超时时间为0的receive

    设置超时时间为0, 避免进程永久暂停

    flush_buffer() ->
        receive
            %% 这里使用下划线变量(未绑定)来匹配任意消息
            _Any ->
                %%  _Any匹配任意消息后继续调用flush_buffer()将最终清空所有消息
                flush_buffer()
        %% 清空所有消息后如果没有设置超时时间为0将导致flush_buffer()函数的永久暂停
        after 0 ->
            true
        end.
    

    设置超时时间为0, 实现优先接收

    priority_receive() ->
        %% 如果没有消息匹配alarm, 程序将会走到超时设置的代码
        receive
            {alarm, X} ->{alarm, X}
        %% 在超时设置里使用Any将会匹配到第一条消息
        after 0 ->
            receive
                Any ->Any
            end
        end.
    

    8.5.3 使用一个无限等待超时进行接收

    如果将after后跟的时间值设置为infinity, 将会导致系统永远不会触发超时。

    8.5.4 实现一个计时器

    -module(stimer).
    -export([start/2, cancel/1]).
    
    %% 启动函数, 指定间隔时间和要执行的函数
    start(Time, Fun) ->spawn(fun() ->timer(Time, Fun) end).
    
    %% 结束函数, 向指定进程发送结束命令 
    cancel(Pid) ->Pid ! cancel.
    
    %% 计时器函数
    %% 如果在超时时间之前接收到结束消息则销毁进程
    %% 否则将执行指定的函数 
    timer(Time, Fun) ->
        receive
            cancel ->void
        after Time ->
                Fun()
        end.
    

    8.6 选择性接收

    send原语将消息发送到一个进程的邮箱
    receive原语将邮箱中的消息进行处理并删除

    %% 进入receive语句后即启动计时器(如果有after语句)
    receice
        %% 依次从邮箱中取出消息对Pattern进行模式匹配
        %% 匹配成功后将执行模式后面的表达式并删除消息
        Pattern1 [when Guard1] ->
            Expressions1;
        Pattern2 [when Guard2] ->
            Expressions2;
        ...
    %% 如果没有找到可以匹配的消息则进程挂起 
    after Time ->
        Expressions
    end.
    

    8.7 注册进程

    考虑到安全性和便捷性, Erlang提供了进程注册的方式替换PID的方式实现进程间的通信。

    注册进程

    register(AnAtom, Pid), 将Pid注册一个名为AnAtom的原子

    取消注册

    unregister(AnAtom), 移除AnAtom相对于的进程信息

    判断是否已注册

    whereis(AnAtom) -> Pid | undefined, 如果已注册则返回Pid, 否则返回undefined

    查看注册列表

    registered() -> [AnAtom::atom()], 返回一个系统中所有已注册的名称列表

    8.8 如何编写一个并发程序

    作者提供的并发程序的编程模版

    -module(ctemplate).
    -compile(export_all).
    
    %% 在启动函数中创建线程并调用loop函数
    start() ->
        spawn(fun() ->loop([]) end).
    
    %% 在远程过程调用中向指定的进程发送消息, 为区别不同的发送者调用self()带上自己的进程号
    rpc(Pid, Request) ->
        Pid ! {self(), Request},
        receive
            {Pid, Response} ->Response
        end.
    
    %% 在loop函数中处理不同的消息 
    loop(X) ->
        receive
            Any ->
                io:format("Received:~p~n", [Any]),
                loop(X)
        end.
    

    8.9 尾递归技术

    使用求阶乘来说明尾递归更容易理解一些:

    -module(fact).
    -compile(export_all).
    
    factorial(N) ->
        factorial(1, N).
    %% 求阶乘的函数
    %% Res用于记录当前结果, N用于记录乘数的变化
    %% 如计算factorial(5), 展开过程为
    %% (1, 5) -> (1*5, 4) -> (5*4, 3) -> (20*3, 2) -> (60*2, 1) -> (120, 0) -> 120
    %% 始终只需要两个变量来记录状态, 相比于普通的递归, 极为节省栈空间 
    factorial(Res, N) ->
        case N =:= 0 of
            true  ->Res;
            false ->factorial(Res*N, N-1)
        end.
    
    

    8.10 使用MFA启动进程

    MFA方式即通过指定模块、函数、参数的方式来启动进程

    spawn(Mod, FuncName, Args)
    

    相比于普通的方式, 其可以使程序在出于运行状态时仍然可以使用新版本代码进行升级。

    8.11 习题

    测试注册函数

    %% 在main函数中连续调用两次start函数, 模拟并行
    main(AnAtom, Fun) ->
        start(AnAtom, Fun),
        start(AnAtom, Fun).
    
    %% 根据Fun创建进程, 注册之前首先调用whereis函数查看是否已注册 
    start(AnAtom, Fun) ->
        Pid = spawn(Fun),
        Temp = whereis(AnAtom),
        case undefined =:= Temp of
            true  ->register(AnAtom, Pid);
            false ->io:format("~p had registered and Pid is ~p ~n", [AnAtom, Temp])
        end.
    

    调用结果:

    1> test1:main(test1, fun() -> io:format("just for test register") end). 
    test1 had registered and Pid is <0.36.0> 
    just for test register
    just for test register
    

    测试发送消息

    -module(sendm).
    -compile(export_all).
    
    %% 创建N个进程并销毁, 查看其运行时间
    sendmessage(N, M) ->
        %% 用于统计代码执行所耗的CPU时间和真实时间
        statistics(runtime),
        statistics(wall_clock),
        %% 创建N个进程
        L = for(1, N, fun() ->spawn(fun() ->wait() end) end),
        %% 给每个进程发送M条消息
        for(1, M, fun() ->lists:foreach(fun(Pid) ->Pid ! test end, L) end),
        %% 销毁每个进程
        lists:foreach(fun(Pid) ->Pid ! die end, L),
        {_, Time1} = statistics(runtime),
        {_, Time2} = statistics(wall_clock),
        U1 = Time1 * 1000,
        U2 = Time2 * 1000,
        io:format("Process send message time=~p (~p) microseconds~n",[U1, U2]).
    wait() ->
        receive 
            test ->wait();
            die  ->void
        end.
    for(N, N, F) ->[F()];
    for(I, N, F) ->[F() | for(I+1, N, F)].
    

    调用结果:

    1> c(sendm).                      
    {ok,sendm}
    2> sendm:sendmessage(1000, 10000).
    Process send message time=32760000 (56635000) microseconds
    ok
    

    Date: 2013-05-31 16:07:13 CST

    Author: matrix

    Org version 7.8.11 with Emacs version 24

    Validate XHTML 1.0
  • 相关阅读:
    维护
    zabbix监控线
    java——快排、冒泡、希尔、归并
    java——注解处理器
    spring boot——常用注解
    java——修改txt文件中某一行的内容
    spring boot——关于一个Mysql主键的问题
    mysql
    springboot
    自信点,不要怕
  • 原文地址:https://www.cnblogs.com/scheme/p/3110580.html
Copyright © 2011-2022 走看看