一、背景
现实业务开发中 通常为了避免超时、对方接口限制等原因需要对支持批量的接口的数据分批调用。
比如List参数的size可能为 几十个甚至上百个 但是假如对方dubbo接口比较慢 传入50个以上会超时 那么可以每次传入20个 分批执行。
通常很多人会写 for 循环或者 while 循环 非常不优雅 无法复用 而且容易出错。
下面结合 Java8 的 Stream Function ,Consumer 等特性实现分批调用的工具类封装和自测。
并给出 CompletableFuture 的异步改进方案。
二、实现
工具类
package com.chujianyun.common.java8.function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* 执行工具类
*
* author 明明如月
*/
public class ExecuteUtil {
? ?public static T void partitionRun(List T dataList, int size, Consumer List T consumer) {
? ? ? ?if (CollectionUtils.isEmpty(dataList)) {
? ? ? ? ? ?return;
? ? ? ?}
? ? ? ?Preconditions.checkArgument(size 0, size must not be a minus
? ? ? ?Lists.partition(dataList, size).forEach(consumer);
? ?}
? ?public static T, V List V partitionCall2List(List T dataList, int size, Function List T , List V function) {
? ? ? ?if (CollectionUtils.isEmpty(dataList)) {
? ? ? ? ? ?return new ArrayList (0);
? ? ? ?}
? ? ? ?Preconditions.checkArgument(size 0, size must not be a minus
? ? ? ?return Lists.partition(dataList, size)
? ? ? ? ? ? ? ?.stream()
? ? ? ? ? ? ? ?.map(function)
? ? ? ? ? ? ? ?.filter(Objects::nonNull)
? ? ? ? ? ? ? ?.reduce(new ArrayList (),
? ? ? ? ? ? ? ? ? ? ? ?(resultList1, resultList2) - {
? ? ? ? ? ? ? ? ? ? ? ? ? ?resultList1.addAll(resultList2);
? ? ? ? ? ? ? ? ? ? ? ? ? ?return resultList1;
? ? ? ? ? ? ? ? ? ? ? ?});
? ?}
? ?public static T, V Map T, V partitionCall2Map(List T dataList, int size, Function List T , Map T, V function) {
? ? ? ?if (CollectionUtils.isEmpty(dataList)) {
? ? ? ? ? ?return new HashMap (0);
? ? ? ?}
? ? ? ?Preconditions.checkArgument(size 0, size must not be a minus
? ? ? ?return Lists.partition(dataList, size)
? ? ? ? ? ? ? ?.stream()
? ? ? ? ? ? ? ?.map(function)
? ? ? ? ? ? ? ?.filter(Objects::nonNull)
? ? ? ? ? ? ? ?.reduce(new HashMap (),
? ? ? ? ? ? ? ? ? ? ? ?(resultMap1, resultMap2) - {
? ? ? ? ? ? ? ? ? ? ? ? ? ?resultMap1.putAll(resultMap2);
? ? ? ? ? ? ? ? ? ? ? ? ? ?return resultMap1;
? ? ? ? ? ? ? ? ? ? ? ?});
? ?}
}
待调用的服务 模拟
package com.chujianyun.common.java8.function;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SomeManager {
? ?public void aRun(Long id, List String data) {
? ?}
? ?public List Integer aListMethod(Long id, List String data) {
? ? ? ?return new ArrayList (0);
? ?}
? ?public Map String, Integer aMapMethod(Long id, List String data) {
? ? ? ?return new HashMap (0);
? ?}
}
单元测试
package com.chujianyun.common.java8.function;
import org.apache.commons.lang3.RandomUtils;
import org.jeasy.random.EasyRandom;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.verification.Times;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
RunWith(PowerMockRunner.class)
public class ExecuteUtilTest {
? ?private EasyRandom easyRandom new EasyRandom();
? ? Mock
? ?private SomeManager someManager;
? ?// 测试数据
? ?private List String mockDataList;
? ?private int total 30;
? ? Before
? ?public void init() {
? ? ? ?// 构造30条数据
? ? ? ?mockDataList easyRandom.objects(String.class, 30).collect(Collectors.toList());
? ?}
? ? Test
? ?public void test_a_run_partition() {
? ? ? ?// mock aRun
? ? ? ?PowerMockito.doNothing().when(someManager).aRun(anyLong(), any());
? ? ? ?// 每批 10 个
? ? ? ?ExecuteUtil.partitionRun(mockDataList, 10, (eachList) - someManager.aRun(1L, eachList));
? ? ? ?//验证执行了 3 次
? ? ? ?Mockito.verify(someManager, new Times(3)).aRun(anyLong(), any());
? ?}
? ? Test
? ?public void test_call_return_list_partition() {
? ? ? ?// mock ?每次调用返回条数(注意每次调用都是这2个)
? ? ? ?int eachReturnSize
? ? ? ?PowerMockito
? ? ? ? ? ? ? ?.doReturn(easyRandom.objects(String.class, eachReturnSize).collect(Collectors.toList()))
? ? ? ? ? ? ? ?.when(someManager)
? ? ? ? ? ? ? ?.aListMethod(anyLong(), any());
? ? ? ?// 分批执行
? ? ? ?int size
? ? ? ?List Integer resultList ExecuteUtil.partitionCall2List(mockDataList, size, (eachList) - someManager.aListMethod(2L, eachList));
? ? ? ?//验证执行次数
? ? ? ?int invocations
? ? ? ?Mockito.verify(someManager, new Times(invocations)).aListMethod(anyLong(), any());
? ? ? ?// 正好几轮
? ? ? ?int turns;
? ? ? ?if (total % size 0) {
? ? ? ? ? ?turns total / size;
? ? ? ?} else {
? ? ? ? ? ?turns total / size
? ? ? ?}
? ? ? ?Assert.assertEquals(turns * eachReturnSize, resultList.size());
? ?}
? ? Test
? ?public void test_call_return_map_partition() {
? ? ? ?// mock ?每次调用返回条数
? ? ? ?// 注意
? ? ? ?// 如果仅调用doReturn一次 那么每次返回都是key相同的Map
? ? ? ?// 如果需要不覆盖 则doReturn次数和 invocations 相同
? ? ? ?int eachReturnSize
? ? ? ?PowerMockito
? ? ? ? ? ? ? ?.doReturn(mockMap(eachReturnSize))
? ? ? ? ? ? ? ?.doReturn(mockMap(eachReturnSize))
? ? ? ? ? ? ? ?.when(someManager).aMapMethod(anyLong(), any());
? ? ? ?// 每批
? ? ? ?int size 16;
? ? ? ?Map String, Integer resultMap ExecuteUtil.partitionCall2Map(mockDataList, size, (eachList) - someManager.aMapMethod(2L, eachList));
? ? ? ?//验证执行次数
? ? ? ?int invocations
? ? ? ?Mockito.verify(someManager, new Times(invocations)).aMapMethod(anyLong(), any());
? ? ? ?// 正好几轮
? ? ? ?int turns;
? ? ? ?if (total % size 0) {
? ? ? ? ? ?turns total / size;
? ? ? ?} else {
? ? ? ? ? ?turns total / size
? ? ? ?}
? ? ? ?Assert.assertEquals(turns * eachReturnSize, resultMap.size());
? ?}
? ?private Map String, Integer mockMap(int size) {
? ? ? ?Map String, Integer result new HashMap (size);
? ? ? ?for (int i i size; i ) {
// 极力保证key不重复
? ? ? ? ? ?result.put(easyRandom.nextObject(String.class) RandomUtils.nextInt(), easyRandom.nextInt());
? ? ? ?}
? ? ? ?return result;
? ?}
}
注意
1 判空
.filter(Objects::nonNull)
这里非常重要 避免又一次调用返回 null 而导致空指针异常。
2 实际使用时可以结合apollo配置 灵活设置每批执行的数量 如果超时随时调整
3 用到的类库
集合工具类 commons-collections4、guava 可以不用
这里的list划分子list也可以使用stream的 skip ,limit特性自己去做 集合判空也可以不借助collectionutils.
构造数据 easy-random
单元测试框架 Junit4 、 powermockito、mockito
4 大家可以加一些更强大的功能 如允许设置每次调用的时间间隔、并行或并发调用等。
三、改进
以上面的List接口为例 将其改为异步版本
? ?public static T, V List V partitionCall2ListAsync(List T dataList,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int size,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ExecutorService executorService,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Function List T , List V function) {
? ? ? ?if (CollectionUtils.isEmpty(dataList)) {
? ? ? ? ? ?return new ArrayList (0);
? ? ? ?}
? ? ? ?Preconditions.checkArgument(size 0, size must not be a minus
? ? ? ?List CompletableFuture List V completableFutures Lists.partition(dataList, size)
? ? ? ? ? ? ? ?.stream()
? ? ? ? ? ? ? ?.map(eachList - {
? ? ? ? ? ? ? ? ? ?if (executorService null) {
? ? ? ? ? ? ? ? ? ? ? ?return CompletableFuture.supplyAsync(() - function.apply(eachList));
? ? ? ? ? ? ? ? ? ?} else {
? ? ? ? ? ? ? ? ? ? ? ?return CompletableFuture.supplyAsync(() - function.apply(eachList), executorService);
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?})
? ? ? ? ? ? ? ?.collect(Collectors.toList());
? ? ? ?CompletableFuture Void allFinished CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
? ? ? ?try {
? ? ? ? ? ?allFinished.get();
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ? ? ?return completableFutures.stream()
? ? ? ? ? ? ? ?.map(CompletableFuture::join)
? ? ? ? ? ? ? ?.filter(CollectionUtils::isNotEmpty)
? ? ? ? ? ? ? ?.reduce(new ArrayList V (), ((list1, list2) - {
? ? ? ? ? ? ? ? ? ?List V resultList new ArrayList ();
? ? ? ? ? ? ? ? ? ?if(CollectionUtils.isNotEmpty(list1)){
? ? ? ? ? ? ? ? ? ? ? resultList.addAll(list1);
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ?if(CollectionUtils.isNotEmpty(list2)){
? ? ? ? ? ? ? ? ? ? ? ? resultList.addAll(list2);
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ?return resultList;
? ? ? ? ? ? ? ?}));
? ?}
测试代码
? ?
// 测试数据
? ?private List String mockDataList;
? ?private int total 300;
? ?private AtomicInteger atomicInteger;
? ? Before
? ?public void init() {
? ? ? ?// 构造total条数据
? ? ? ?mockDataList easyRandom.objects(String.class, total).collect(Collectors.toList());
? ?}
Test
public void test_call_return_list_partition_async() {
? ? ? ?ExecutorService executorService Executors.newFixedThreadPool(10);
? ? ? ?atomicInteger new AtomicInteger(0);
? ? ? ?Stopwatch stopwatch Stopwatch.createStarted();
? ? ? ?// 分批执行
? ? ? ?int size
? ? ? ?List Integer resultList ExecuteUtil.partitionCall2ListAsync(mockDataList, size, executorService, (eachList) - someCall(2L, eachList));
? ? ? ?Stopwatch stop stopwatch.stop();
? ? ? ?log.info( 执行时间: {} 秒 , stop.elapsed(TimeUnit.SECONDS));
? ? ? ?Assert.assertEquals(total, resultList.size());
? ? ? ?// 正好几轮
? ? ? ?int turns;
? ? ? ?if (total % size 0) {
? ? ? ? ? ?turns total / size;
? ? ? ?} else {
? ? ? ? ? ?turns total / size
? ? ? ?}
? ? ? ?log.info( 共调用了{}次 , turns);
? ? ? ?Assert.assertEquals(turns, atomicInteger.get());
? ?
? ? ?// 顺序也一致
? ? ? ?for(int i i mockDataList.size();i ){
? ? ? ? ? ?Assert.assertEquals((Integer) mockDataList.get(i).length(), resultList.get(i));
? ? ? ?}
? ?}
?/**
? ? * 模拟一次调用
? ? */
? ?private List Integer someCall(Long id, List String strList) {
? ? ? ?log.info( 当前-- {} strList.size {} , atomicInteger.incrementAndGet(), strList.size());
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(2L);
? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? ?}
? ? ? ?return strList.stream()
? ? ? ? ? ? ? ?.map(String::length)
? ? ? ? ? ? ? ?.collect(Collectors.toList());
? ?}
通过异步可以尽可能快得拿到执行结果。
四、总结
1 要灵活运用Java 8 的 特性简化代码
2 要注意代码的封装来使代码更加优雅,复用性更强
3 要利用来构造单元测试的数据框架如 java-faker和easy-random来提高构造数据的效率
4 要了解性能改进的常见思路 合并请求、并发、并行、缓存等。
本文介绍了MyBatis的${}和#{}的用法区别,以及针对$可能带来的风险提供一种简易...
近日,京东智联云发布全新升级的云计算服务等级协议,最高可用性保证达99.995%。这...
为贯彻落实党的十九届五中全会精神,加快第五代移动通信建设,保障5G发展频率资...
专属主机的规格提供了对应物理服务器的配置信息,决定了您能在专属主机上使用的E...
不知道在哪里注册的 域名 怎么续费?域名就相当于网站在互联网中的门牌号,用户...
作者 | 京廊 来源 | 阿里技术公众号 一 写在前面 互联网工程的高速发展,分布式...
市场选择了IDC(Internet数据中心),资本也选择了IDC。然而,究竟什么是IDC的本质...
据TOP云(zuntop.com)客户反映, 服务器租用 给他们带来了不少的方便,对中小企...
TOP云 (west.cn)5月10日消息,据科技TechCrunch报道,美国个人护理公司 Edgewe...
一、背景介绍 近几年,随着3D扫描设备成本降低,三维扫描硬件、3D建模或重建技术...