zoukankan      html  css  js  c++  java
  • 《Erlang程序设计》第二十章 多核编程

    第二十章 多核编程

    第二十章 多核编程

    20.1 如何在多核的CPU上更有效率的运行

    20.1.1 使用大量进程

    这个标准…显而易见。

    20.1.2 避免副作用

    因为存在副作用, 导致使用共享内存方式时必须使用锁机制, 虽然Erlang没有共享内存, 但对于可以被多个进程共享的ETS表和DETS表还是应该特别注意。

    20.1.3 顺序瓶颈

    对于本质就是顺序性的问题, 显然无法做到并发化。
    而磁盘IO, 也是一个无法避免的自然瓶颈。
    注册进程, 人为的创建了一个潜在的顺序瓶颈。

    20.2 并行化顺序代码

      并行化的map

    pmap(F, L) ->
        S = self(),
        Ref = erlang:make_ref(),
        %% 对于列表中的每个参数都启动一个进程去处理 
        Pids = map(fun(I) ->spawn(fun() ->do_f(S, Ref, F, I) end) end, L),
        gather(Pids, Ref).
    
    %% 处理完成后向父进程发送结果
    do_f(Parent, Ref, F, I) ->
        Parent ! {self(), Ref, (catch F(I))}.
    
    %% 以正确的顺序拼接每个进程的运行结果
    gather([Pid|T], Ref) ->
        receive
            {Pid, Ref, Ret} ->[Ret|gather(T, Ref)]
        end;
    gather([], _) ->[].
    

    什么时候可以用pmap:1. 计算量很小的函数; 2. 不创建太多的进程; 3. 在恰当的抽象层次上思考

    20.3 小消息, 大计算

      启动SMP Erlang

    # -smp  启动SMP Erlang
    # +S N  使用N个Erlang虚拟机
    $ erl -smp +S N
    

      测试不同的虚拟机数量对性能的影响

    #!/bin/sh
    echo "" >results
    for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 
    do
       echo $i
       erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results
    done
    

    20.4 map-reduce算法和磁盘索引程序

    20.4.1 map-reduce算法

    %% map函数     MapReduce每次给列表中的每个X创建一个新的进程 
    F1 = fun(Pid, X) ->void,
    
    %% reduce函数  针对每个键值, 将它所对应的所有值合并到一起
    %% Acc0       累加器
    F2 = fun(key, [Value], Acc0) ->Acc
    L = [X]
    Acc = X = term()
    
    %% 调用形式
    mapreduce(F1, F2, Acc0, L) ->Acc
    

      具体的实现

    mapreduce(F1, F2, Acc0, L) ->
        S = self(),
        %% 启动新的进程运行reduce函数 
        Pid = spawn(fun() ->reduce(S, F1, F2, Acc0, L) end),
        receive
            {Pid, Result} ->
                Result
        end.
    
    
    reduce(Parent, F1, F2, Acc0, L) ->
        process_flag(trap_exit, true),
        ReducePid = self(),
    
        %% map过程的实现
        %% 对于列表中的每个值都启动一个进程在do_job中调用F1进行处理 
        foreach(fun(X) ->
                spawn_link(fun() ->do_job(ReducePid, F1, X) end)
            end, L),
        N = length(L),
        %% 用字典存储键值
        Dict0 = dict:new(),
        %% 等待map过程完成
        Dict1 = collect_replies(N, Dict0),
        %% 调用F2按相同键值进行合并 
        Acc = dict:fold(F2, Acc0, Dict1),
        %% 向MapReduce进程通知运行结果
        Parent ! {self(), Acc}.
    
    %% 按键值进行合并的过程
    collect_replies(0, Dict) ->
        Dict;
    collect_replies(N, Dict) ->
        receive
            %% 对键-值的处理
            %% 存在Key则将Val相加, 否则插入到字典
            {Key, Val} ->
                case dict:is_key(Key, Dict) of
                    true ->
                        Dict1 = dict:append(Key, Val, Dict),
                        collect_replies(N, Dict1);
                    false ->
                        Dict1 = dict:store(Key,[Val], Dict),
                        collect_replies(N, Dict1)
                end;
            {'EXIT', _,  _Why} ->
                collect_replies(N-1, Dict)
        end.
    
    %% 执行指定的map函数
    do_job(ReducePid, F, X) ->
        F(ReducePid, X).
    

      测试代码:

    -module(test_mapreduce).
    -compile(export_all).
    -import(lists, [reverse/1, sort/1]).
    
    test() ->
        wc_dir(".").
    
    wc_dir(Dir) ->
        %% map函数
        F1 = fun generate_words/2,
        %% reduce函数
        F2 = fun count_words/3,
        %% 参数列表
        Files = lib_find:files(Dir, ".*[.](erl)", false),
        %% 调用mapreduce处理
        L1 = phofs:mapreduce(F1, F2, [], Files),
        reverse(sort(L1)).
    
    %% 查找文件中的每个单词
    generate_words(Pid, File) ->
        F = fun(Word) ->Pid ! {Word, 1} end,
        lib_misc:foreachWordInFile(File, F).
    
    %% 统计有多少个不同的单词
    count_words(Key, Vals, A) ->
        [{length(Vals), Key}|A].
    

      运行结果:

    1> test_mapreduce:test().
    [{115,"L"},
     {84,"T"},
     {80,"1"},
     {77,"end"},
     {72,"X"},
     {52,"H"},
     {47,"file"},
     {46,"S"},
     {44,"of"},
     {43,"F"},
     {40,"2"},
     {39,"Key"},
     {39,"Fun"},
     {37,"is"},
     {35,"case"},
     {34,"fun"},
     {34,"Pid"},
     {34,"N"},
     {33,"File"},
     {32,"true"},
     {31,"Str"},
     {28,"ok"},
     {27,"prefix"},
     {27,"Val"},
     {27,"I"},
     {26,"to"},
     {26,[...]},
     {24,...},
     {...}|...]
    

    20.4.2 全文检索

    • 1. 反向索引
      文件-内容对照表
       
      文件名内容
      /home/dogs rover jack buster winston
      /home/animals/cats zorro daisy jaguar
      /home/cars rover jaguar ford
      索引-文件对照表
       
      索引文件名
      1 /home/dogs
      2 /home/animals/cats
      3 /home/cars
      单词-索引对照表
       
      单词索引
      rover 1,3
      jack 1
      buster 1
      winston 1
      zorro 2
      daisy 2
      jaguar 2,3
      ford 3
    • 2. 反向索引的查询
      通过单词-索引, 索引-文件的对照表查找单词与文件的对应关系
    • 3. 反向索引的数据结构
      因为一个常见的词可能在成千上万的文件中出现, 因此使用数字索引代替文件名可大大节省存储空间, 因此需要文件与索引的对照表。
      对于每个在文件中出现的单词, 都需要记录此文件的索引号, 因此建立单词与索引的对照表。

    20.4.3 索引器的操作

    %% 启动一个名为indexer_server的服务器进程
    %% 启动一个worker进程来执行索引动作 
    start() ->
        indexer_server:start(output_dir()),
        spawn_link(fun() ->worker() end).
    
    worker() ->
        possibly_stop(),
        %% 返回下一个需要索引的目录
        case indexer_server:next_dir() of
        {ok, Dir} ->
            %% 查找目录下需要进行索引的文件 
            Files = indexer_misc:files_in_dir(Dir),
            %% 为其建立索引
            index_these_files(Files),
            %% 检测是否正常完成
            indexer_server:checkpoint(),
            possibly_stop(),
            sleep(10000),
            worker();
        done ->
            true
        end.
    
    %% 使用MapReduce算法实现建立索引的并行处理 
    index_these_files(Files) ->
        Ets = indexer_server:ets_table(),
        OutDir = filename:join(indexer_server:outdir(), "index"),
        %% map函数
        F1 = fun(Pid, File) ->indexer_words:words_in_file(Pid, File, Ets) end,
        %% reduce函数
        F2 = fun(Key, Val, Acc) ->handle_result(Key, Val, OutDir, Acc) end,
        indexer_misc:mapreduce(F1, F2, 0, Files).
    
    %% 按照Key值进行合并
    handle_result(Key, Vals, OutDir, Acc) ->
        add_to_file(OutDir, Key, Vals),
        Acc + 1.
    
    %% 将索引数组添加到Word中
    add_to_file(OutDir, Word, Is) ->
        L1 = map(fun(I) -><<I:32>> end, Is),
        OutFile = filename:join(OutDir, Word),
        case file:open(OutFile, [write,binary,raw,append]) of
        {ok, S} ->
            file:pwrite(S, 0, L1),
            file:close(S);
        {error, E} ->
              exit({ebadFileOp, OutFile, E})
        end.
    

    20.4.4 运行索引器

    1> indexer:cold_start().
    2> indexer:start().
    3> indexer:stop().
    

    20.4.5 评论

    可以改进的三个方面

    1. 改进单词抽取
    2. 改进map-reduce算法, 以便处理海量数据
    3. 方向索引的数据结构只使用了文件系统来存储
    

    20.4.6 索引器的代码

    Date: 2014-01-06 14:28:30 CST

    Author: matrix

    Org version 7.8.11 with Emacs version 24

    Validate XHTML 1.0
  • 相关阅读:
    Javascript的this用法
    angularjs学习笔记--1.入门
    git的简单应用
    转:Netty服务器线程模型概览
    Netty 4.0 中文文档
    转:腾讯CKV海量分布式存储系统
    转Redis性能测试
    maven assemby 打包问题
    转发:TCP
    转:HBase Server启动过程
  • 原文地址:https://www.cnblogs.com/scheme/p/3507140.html
Copyright © 2011-2022 走看看