package com.yang.producer.fun;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author: Yang
* @date: 2021/9/30 22:38
* @description: 多种目录-刷新式-异步-批量入库Demo
*/
public class Handler {
public static void main(String[] args) {
List<Ffbz> list = new ArrayList<>() {{
add(new Ffbz("1", "one"));
add(new Ffbz("2", "two"));
add(new Ffbz("3", "three"));
add(new Ffbz("4", "four"));
add(new Ffbz("5", "five"));
add(new Ffbz("6", "six"));
}};
//1.查询本地key
HashSet<String> keySet = new HashSet<>() {{
add("1");
add("3");
add("5");
}};
BatchRefresher.refresh(list, new FfbzBatchUpdater(), new FfbzBatchInserter(), keySet);
}
}
/**
* 批量插入器接口
*
* @param <T>
*/
interface BatchInserter<T> {
void insertAll(List<T> list);
}
/**
* 批量更新器
*
* @param <T>
*/
interface BatchUpdater<T> {
void updateAll(List<T> list);
}
/**
* 成对包装类
*
* @param <T>
*/
class MyPair<T> {
public final T toUpdate;
public final T toInsert;
public MyPair(T toUpdate, T toInsert) {
this.toUpdate = toUpdate;
this.toInsert = toInsert;
}
}
/**
* 付费病种实体类
*/
@Data
@AllArgsConstructor
class Ffbz {
private String id;
private String name;
}
/**
* 付费病种批量更新器-实现类
*/
@Slf4j
class FfbzBatchUpdater implements BatchUpdater<Ffbz> {
// TODO 在这里注入持久层
@Override
public void updateAll(List<Ffbz> list) {
for (Ffbz it : list) {
log.info("update--{}---{}", it.getId(), it.getName());
}
}
}
/**
* 付费病种批量插入器-实现类
*/
@Slf4j
class FfbzBatchInserter implements BatchInserter<Ffbz> {
// TODO 在这里注入持久层
@Override
public void insertAll(List<Ffbz> list) {
for (Ffbz it : list) {
log.error("insert--{}---{}", it.getId(), it.getName());
}
}
}
/**
* 批量刷新器
*/
class BatchRefresher {
/**
* 刷新
*
* @param originList
* @param batchUpdater
* @param batchInserter
* @param keySet
* @param <T>
* @param <K>
*/
public static <T, K> void refresh(List<T> originList, BatchUpdater<T> batchUpdater, BatchInserter<T> batchInserter, Set<K> keySet) {
//1.分离
MyPair<List<T>> myPair = separate(originList, keySet);
//2.批量更新
batchUpdate(myPair.toUpdate, batchUpdater);
//3.批量插入
batchInsert(myPair.toInsert, batchInserter);
}
/**
* 数据分离
*
* @param originList
* @param keySet
* @param <T>
* @param <K>
* @return
*/
@SneakyThrows
private static <T, K> MyPair<List<T>> separate(List<T> originList, Set<K> keySet) {
List<T> toUpdate = new ArrayList<>(keySet.size());
List<T> toInsert = new ArrayList<>(originList.size());
for (T it : originList) {
Method idGetter = it.getClass().getDeclaredMethod("getId");
idGetter.setAccessible(true);
K key = (K) idGetter.invoke(it);
if (keySet.contains(key)) {
toUpdate.add(it);
} else {
toInsert.add(it);
}
}
return new MyPair<List<T>>(toUpdate, toInsert);
}
/**
* 批量插入
*
* @param listToInsert
* @param batchInserter
* @param <T>
*/
private static <T> void batchInsert(List<T> listToInsert, BatchInserter<T> batchInserter) {
Pool.POOL.executor.submit(new Runnable() {
@Override
public void run() {
try {
batchInserter.insertAll(listToInsert);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 批量更新
*
* @param listToUpdate
* @param batchUpdater
* @param <T>
*/
private static <T> void batchUpdate(List<T> listToUpdate, BatchUpdater<T> batchUpdater) {
Pool.POOL.executor.submit(new Runnable() {
@Override
public void run() {
try {
batchUpdater.updateAll(listToUpdate);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
/**
* 线程池
*/
enum Pool {
POOL(new ThreadPoolExecutor(
4,
8,
37,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(2048),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()));
public final ThreadPoolExecutor executor;
Pool(ThreadPoolExecutor executor) {
this.executor = executor;
}
}