zoukankan      html  css  js  c++  java
  • [Functional Programming] mapReduce over Async operations and fanout results in Pair(rejected, resolved) (fanout, flip, mapReduce)

    This post is similar to previous post. The difference is in this post, we are going to see how to handle both successfuly result and error result by using Pair functor.

    So, still we have our funs.js: which is the same as previous post.

    const fs = require('fs');
    const {Async, constant, composeK, curry} = require('crocks');
    const {fromNode} = Async;
    
    const access = fromNode(fs.access);
    const readFile = fromNode(fs.readFile);
    
    const accessAsync = curry((mode, path) =>
      access(path, mode)
      .map(constant(path)));
    
    // readFileAsync :: Option -> a -> Async Error b
    const readFileAsync = curry((option, path) =>
        readFile(path, option));
    
    const checkRead = accessAsync(fs.constants.F_OK);
    const readTextFile = readFileAsync('utf-8');
    
    // loadTextFile :: String -> Async Error String
    const loadTextFile = composeK(
        readTextFile,
        checkRead
    );
    
    const fork = a => a.fork(
        console.log.bind(null, 'rej'),
        console.log.bind(null, 'res')
    );
    
    module.exports = {
        loadTextFile,
        fork
    }

    For our main.js, we still have the same data input:

    const data = [
        'text.txt',
        'text.big.txt',
        'notfound.txt',
    ];

    This time the difference of requirements are:

    1. we want to read those files one by one, keep all the successfully results in Pair(result, _);

    2. we want to keep the error result in Pair(_, error);

    const concatSpecial = (acc, currAsync) =>
        acc.chain(
            xs => currAsync.bimap(
                e => Pair(xs, e),
                currVal =>  xs.concat(currVal))
        );
    
    // Async (Pair [String] Error) [String]
    const flow = mapReduce(
        loadTextFile,
        concatSpecial,
        Async.Resolved([])
    );
    
    flow(data).fork(
        e => console.log(e.snd(), e.fst()), // Pair(success, error)
        r => console.log(r), // Just success result
    )

    We are still using 'mapRedcue' to map over each filename, fetching the content; then we call 'concatSpecial' method, we want to concat all the successful result into one array. Therefore we give an empty array wrapped in Async:

    const flow = mapReduce(
        loadTextFile,
        concatSpecial,
        Async.Resolved([])
    );

    We can do some pointfree refactor for 'concatSpical', it's not necssary, but just as a partice:

    const fn = flip(
        xs => bimap(
            e => Pair(xs, e),
            currVal =>  xs.concat(currVal)
        )
    );
    
    const concatSpecial = (acc, currAsync) =>
        acc.chain(
            fn(currAsync)
        );

    For the function 'fn', we should take 'xs' as first param, then 'currAsync' as second param. 

    But since we also pass in 'currAsync' as first param, then we need to use 'flip':

    acc.chain(
        fn(currAsync) // pass currAsync as firt, then xs => fn(currAsync)(xs)
    );

    We can also replace 'Pair' with 'fanout':

    const fn = flip(
        xs => bimap(
            fanout(constant(xs), identity),
            currVal =>  xs.concat(currVal)
        )
    );

    ---

    Full code:

    const {fork, loadTextFile} = require('./funs.js');
    const {Async, bimap, fanout, constant, flip, Pair, identity, mapReduce} = require('crocks');
    
    const data = [
        'text.txt',
        'text.big.txt',
        'notfound.txt',
    ];
    
    const fn = flip(
        xs => bimap(
            e => Pair(xs, e),
            fanout(constant(xs), identity),
            currVal =>  xs.concat(currVal)
        )
    );
    /*
    const concatSpecial = (acc, currAsync) =>
        acc.chain(
            xs => currAsync.bimap(
                e => Pair(xs, e),
                currVal =>  xs.concat(currVal))
        );*/
    const concatSpecial = (acc, currAsync) =>
        acc.chain(
            fn(currAsync)
        );
    // Async (Pair [String] Error) [String]
    const flow = mapReduce(
        loadTextFile,
        concatSpecial,
        Async.Resolved([])
    );
    
    flow(data).fork(
        e => console.log(e.snd(), e.fst()), // Pair(success, error)
        r => console.log(r), // Just success result
    )
  • 相关阅读:
    环境是如何建立的 启动文件有什么
    环境中存储的是什么
    串行 并行 异步 同步
    TPC-H is a Decision Support Benchmark
    进程通信类型 管道是Linux支持的最初Unix IPC形式之一 命名管道 匿名管道
    删除环境变量
    14.3.2.2 autocommit, Commit, and Rollback 自动提交 提交和回滚
    14.3.2.2 autocommit, Commit, and Rollback 自动提交 提交和回滚
    14.3.2.1 Transaction Isolation Levels 事务隔离级别
    14.3.2.1 Transaction Isolation Levels 事务隔离级别
  • 原文地址:https://www.cnblogs.com/Answer1215/p/10507231.html
Copyright © 2011-2022 走看看