Mono.zip(memberList, batchClient.getGroupDetail(toGetDetailsJson))
.doOnNext(zipData -> {
String memberlist = zipData.getT1();
JSONObject details = zipData.getT2();
JSONObject groupChatObj = details.getJSONObject("group_chat");
System.out.println("status:" + status);
saveOrUpdateGroups(
Groups.buildGroupsByDetailsInfo(groupChatObj, groupStateStr, status, batchId, memberlist)
);
}).subscribe();
Mono<GroupsMemberList> groupsMemberListByChatId = groupsMemberListRepository.findGroupsMemberListByChatId(groups.getChatId());
return Mono.zip(ownerName, groupsMemberListByChatId).flatMap(
tuple -> {
String ownername = tuple.getT1();
log.info("ownername::::::::::{}", ownername);
GroupsMemberList groupsMemberList = tuple.getT2();
groups.setLastMemberList(groupsMemberList.getLastMemberList());
groups.setGroupMaster(ownername);
return externalGroupsRepository.save(groups)
.doOnSubscribe(s -> System.out.print("客户群保存或更新" + groups.getChatId()))
.doOnNext(groupz-> System.out.println(groupz))
.onErrorContinue((error, group) -> {
String localizedMessage = error.getLocalizedMessage();
if (localizedMessage.startsWith("Failed to update table")) {
log.warn("更新客户群失败,执行新增:{}", groups.getChatId());
//因为是新数据,所以lastmemberlist填充一样的
groups.setLastMemberList(groups.getMemberList());
externalGroupsRepository.save(groups.setAsNew()).retry(1).onErrorResume(insertError ->
{
log.error("更新后失败执行新增后也异常了:{},chatId:{}", insertError, groups.getChatId());
return Mono.error(insertError);
}).subscribe();
}
}).retry(1).switchIfEmpty(Mono.empty());
}
);