zoukankan      html  css  js  c++  java
  • 怎样用Java 8优雅的开发业务

    怎样用Java 8优雅的开发业务

    函数式编程

    匿名函数

    λ演算

    流式编程

    基本原理

    Java中流式编程的基本原理有两点。

    1. 构建流
    2. 数据流转(流水线)
    3. 规约
    IntStream.rangeClosed(1, 100) // 1. 构建流
        .mapToObj(String::valueOf)// 2. 数据流转(流水线)
        .collect(joining());      // 3. 规约
    

    案例

    • 英雄的主位置一共有几类,分别是什么
    @Test
    fun t1() {
        // 英雄的主位置一共有几类,分别是什么
        // 映射
        val roleMains = heroes.map(Hero::getRoleMain)
            // 过滤为空的数据
            .filter(Objects::nonNull)
            // 去重
            .distinct()
        println(roleMains.size)
        println(roleMains)
    }
    
    @Test
    public void t1() {
        // 英雄的主位置一共有几类,分别是什么
        List<String> roleMains = heroes.stream()
                // 映射
                .map(Hero::getRoleMain)
                // 过滤为空的数据
                .filter(Objects::nonNull)
                // 去重
                .distinct()
                // 收集列表
                .collect(toList());
        System.out.println(roleMains.size());
        System.out.println(roleMains);
    }
    

    • 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位
    @Test
    fun t2() {
        // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位
    
        // 主次位置分组的英雄数量
        val groupHeroCount = heroes.groupingBy {
            Pair.of(it.roleMain, it.roleAssist)
        }.eachCount()
    
        // 主次分组后,再按攻击范围分组的英雄数量
        val groupThenGroupCount = heroes.groupBy {
            Pair.of(it.roleMain, it.roleAssist)
        }.map {
            val value = it.value.groupingBy(Hero::getAttackRange).eachCount()
            Pair.of(it.key, value)
        }.associateBy({ it.left }, { it.value })
    
        // 遍历输出
        groupThenGroupCount.forEach { (groupKey, groupValue) ->
            val groupingCount = groupHeroCount[groupKey]
            print("英雄分组key为:$groupKey;英雄数量:$groupingCount;")
            groupValue.forEach { (countKey, countValue) ->
                print("英雄攻击范围:$countKey;英雄数量:$countValue;")
            }
            println()
        }
    }
    
    @Test
    public void t2() {
        // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位
    
        // 主次位置分组的英雄数量
        Map<Pair<String, String>, Long> groupHeroCount = heroes.stream()
                .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting()));
    
        // 主次分组后,再按攻击范围分组的英雄数量
        Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream()
                .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()),
                        groupingBy(Hero::getAttackRange, counting())));
    
        // 遍历输出
        groupThenGroupCount.forEach((groupKey, groupValue) -> {
            Long groupingCount = groupHeroCount.get(groupKey);
            System.out.print("英雄分组key为:" + groupKey + ";英雄数量:" + groupingCount + ";");
            groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻击范围:" + countKey + ";英雄数量:" + countValue + ";"));
            System.out.println();
        });
    }
    

    • 求近战英雄HP初始值的加总
    @Test
    fun t3() {
        // 求近战英雄HP初始值的加总
        val sum = heroes.filter { "近战" == it.attackRange }
            .map(Hero::getHpStart)
            .filter(Objects::nonNull)
            .reduce(BigDecimal::add)
        println("近战英雄HP初始值的加总为:$sum")
    }
    
    @Test
    public void t3() {
        // 求近战英雄HP初始值的加总
        BigDecimal sum = heroes.stream()
                .filter(hero -> "近战".equals(hero.getAttackRange()))
                .map(Hero::getHpStart)
                .filter(Objects::nonNull)
                .reduce(BigDecimal.ZERO, BigDecimal::add);
        System.out.println("近战英雄HP初始值的加总为:" + sum);
    }
    

    • 通过最小列表收集器获取最小列表
    @Test
    public void t4() {
        // 通过最小列表收集器获取最小列表
        List<BigDecimal> minAttackGrowth = heroes.stream()
                .map(Hero::getAttackGrowth)
                .collect(new MinListCollector<>());
        System.out.println(minAttackGrowth);
        List<Hero> minHero = heroes.stream()
                .collect(new MinListCollector<>());
        System.out.println(minHero);
    }
    
    import java.util.*;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.function.BiConsumer;
    import java.util.function.BinaryOperator;
    import java.util.function.Function;
    import java.util.function.Supplier;
    import java.util.stream.Collector;
    import java.util.stream.Collectors;
    
    import static java.util.stream.Collector.Characteristics.*;
    
    /**
     * 最小列表收集器
     *
     * @author switch
     * @since 2020/8/18
     */
    public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> {
        /**
         * 收集器的特性
         *
         * @see Characteristics
         */
        private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
        private final static int ZERO = 0;
    
        /**
         * 最小值
         */
        private final AtomicReference<T> min = new AtomicReference<>();
    
        @Override
        public Supplier<List<T>> supplier() {
            // supplier参数用于生成结果容器,容器类型为A
            return ArrayList::new;
        }
    
        @Override
        public BiConsumer<List<T>, T> accumulator() {
            // accumulator用于消费元素,也就是归纳元素,这里的T就是元素,它会将流中的元素一个一个与结果容器A发生操作
            return (list, element) -> {
                // 获取最小值
                T minValue = min.get();
                if (Objects.isNull(minValue)) {
                    // 第一次比较
                    list.add(element);
                    min.set(element);
                } else if (element.compareTo(minValue) < ZERO) {
                    // 发现更小的值
                    list.clear();
                    list.add(element);
                    min.compareAndSet(minValue, element);
                } else if (element.compareTo(minValue) == ZERO) {
                    // 与最小值相等
                    list.add(element);
                }
            };
        }
    
        @Override
        public BinaryOperator<List<T>> combiner() {
            // combiner用于两个两个合并并行执行的线程的执行结果,将其合并为一个最终结果A
            return (left, right) -> {
                // 最小值列表合并
                List<T> leftList = getMinList(left);
                List<T> rightList = getMinList(right);
                leftList.addAll(rightList);
                return leftList;
            };
        }
    
        private List<T> getMinList(List<T> list) {
            return list.stream()
                    .filter(element -> element.compareTo(min.get()) == ZERO)
                    .collect(Collectors.toList());
        }
    
        @Override
        public Function<List<T>, List<T>> finisher() {
            // finisher用于将之前整合完的结果R转换成为A
            return Function.identity();
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            // characteristics表示当前Collector的特征值,这是个不可变Set
            return CHARACTERISTICS;
        }
    }
    
    

    优雅的空处理

    file

    import org.junit.Test;
    
    import java.util.Optional;
    
    /**
     * @author switch
     * @since 2020/8/18
     */
    public class OptionalTests {
        @Test
        public void t1() {
            // orElse
            System.out.println(Optional.ofNullable(null).orElse("张三"));
            System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四"));
            System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new));
        }
    
        @Test
        public void t2() {
            // isPresent
            Optional<String> name = Optional.ofNullable("张三");
            if (name.isPresent()) {
                System.out.println(name.get());
            }
        }
    
        @Test
        public void t3() {
            // map
            Optional<Integer> number = Optional.of("123456").map(Integer::valueOf);
            if (number.isPresent()) {
                System.out.println(number.get());
            }
        }
    
        @Test
        public void t4() {
            // flatMap
            Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s)));
            if (number.isPresent()) {
                System.out.println(number.get());
            }
        }
    
        @Test
        public void t5() {
            // 过滤
            String number = "123456";
            String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321");
            System.out.println(filterNumber);
        }
    }
    
    

    新的并发工具类CompletableFuture

    file

    单机批处理多线程执行模型

    该模型适用于百万级量级的任务。超过千万数据,可以考虑分组,多机器并行执行。
    基本流程:

    1. 从数据库获取Id列表
    2. 拆分成n个子Id列表
    3. 通过子Id列表获取关联数据(注意:都需要提供批量查询接口)
    4. 映射到需要处理的Model(提交到CompletableFuture)->处理数据->收集成list)(java 8流式处理)
    5. 收集的list进行join操作
    6. 收集list
    模型

    模型原理:Stream+CompletableFuture+lambda

    简要解释:

    • CompletableFuture是java8提供的一个工具类,主要是用于异步处理流程编排的。
    • Stream是java8提供的一个集合流式处理工具类,主要用于数据的流水线处理。
    • lambda在java中是基于内部匿名类实现的,可以大幅减少重复代码。
    • 总结:在该模型中Stream用于集合流水线处理、CompletableFuture解决异步编排问题(非阻塞)、lambda简化代码。
    • 数据流动
    List<List<String>> -> 
    Stream<List<String>> -> 
    Stream<List<Model>> -> 
    Stream<CompletableFuture<List<Model>>> -> 
    Stream<CompletableFuture<List<映射类型>>> -> 
    List<CompletableFuture<Void>>
    
    案例
    • ThreadPoolUtil
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    public final class ThreadPoolUtil {
        public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setAllowCoreThreadTimeOut(true);
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setCorePoolSize(poolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    }
    
    • ThreadPoolConfig
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    @Configuration
    public class ThreadPoolConfig {
        /**
         * 计算规则:N(thread) = N(cpu) * U(cpu) * (1 + w/c)
         * N(thread):线程池大小
         * N(cpu):处理器核数
         * U(cpu):期望CPU利用率(该值应该介于0和1之间)
         * w/c:是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间
         */
        private static final Integer TASK_POOL_SIZE = 50;
        private static final Integer TASK_MAX_POOL_SIZE = 100;
        private static final Integer TASK_QUEUE_CAPACITY = 1000;
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY);
        }
    }
    
    • #getFuturesStream
    public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) {
        return idSubLists.stream()
            .map(ids -> 
                CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor)
            );
    }
    
    • #standardisation
    public void standardisation() {
        List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists)
                .map(future -> future.thenApply(this::listByNormalize))
                .map(future -> future.thenAccept(modelService::batchUpdateData))
                .collect(Collectors.toList());
        List<Void> results = batchFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
    

    调整线程池的大小

    《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
    $$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$

    其中:

    • $N_{CPU}$是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
    • $U_{CPU}$是期望的CPU利用率(该值应该介于0和1之间)
    • $\frac{W}{C}$是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间

    并行——使用流还是CompletableFutures?

    对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,可以调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。

    使用这些API的建议如下:

    • 如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
    • 反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以依据等待/计算,或者$\frac{W}{C}$的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性很难判断到底什么时候触发了等待。

    日期和时间API

    file

    使用指南:https://www.yuque.com/docs/share/ee5ef8a7-d261-4593-bd08-2a7a7d2c11ca?#(密码:gtag) 《时区工具类使用指南》

    项目地址

    GitHub:java8-fluent

    参考

    分享并记录所学所见

  • 相关阅读:
    hdu 1017 A Mathematical Curiosity 解题报告
    hdu 2069 Coin Change 解题报告
    hut 1574 组合问题 解题报告
    hdu 2111 Saving HDU 解题报
    hut 1054 Jesse's Code 解题报告
    hdu1131 Count the Trees解题报告
    hdu 2159 FATE 解题报告
    hdu 1879 继续畅通工程 解题报告
    oracle的系统和对象权限
    oracle 自定义函数 返回一个表类型
  • 原文地址:https://www.cnblogs.com/switchvov/p/14163260.html
Copyright © 2011-2022 走看看