mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
DB中Group信息的更新方式,由replace调整为insert或update
This commit is contained in:
@@ -30,10 +30,14 @@ public interface GroupService {
|
||||
|
||||
int replaceDBData(GroupMemberPO groupMemberPO);
|
||||
|
||||
void batchReplace(List<GroupMemberPO> newGroupMemberList);
|
||||
|
||||
GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName);
|
||||
|
||||
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName);
|
||||
|
||||
List<GroupMemberPO> listGroup(Long clusterPhyId);
|
||||
|
||||
PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
|
||||
String topicName,
|
||||
String groupName,
|
||||
|
||||
@@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_SEARCH_GROUP;
|
||||
@@ -120,6 +121,38 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
|
||||
return groupMemberDAO.replace(groupMemberPO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchReplace(List<GroupMemberPO> newGroupMemberList) {
|
||||
if (newGroupMemberList == null || newGroupMemberList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId();
|
||||
if (clusterPhyId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<GroupMemberPO> dbGroupMemberList = listGroup(clusterPhyId);
|
||||
|
||||
|
||||
Map<String, GroupMemberPO> dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
|
||||
for (GroupMemberPO groupMemberPO : newGroupMemberList) {
|
||||
GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName());
|
||||
try {
|
||||
if (po != null) {
|
||||
groupMemberPO.setId(po.getId());
|
||||
groupMemberDAO.updateById(groupMemberPO);
|
||||
} else {
|
||||
groupMemberDAO.insert(groupMemberPO);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName) {
|
||||
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
@@ -143,6 +176,14 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
|
||||
return groupMemberDAO.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GroupMemberPO> listGroup(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return groupMemberDAO.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
|
||||
String topicName,
|
||||
|
||||
@@ -15,6 +15,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -34,14 +35,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
||||
|
||||
@Override
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
TaskResult tr = TaskResult.SUCCESS;
|
||||
|
||||
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
|
||||
for (String groupName: groupNameList) {
|
||||
if (!TaskResult.SUCCESS.equals(this.updateGroupMembersTask(clusterPhy, groupName, triggerTimeUnitMs))) {
|
||||
tr = TaskResult.FAIL;
|
||||
}
|
||||
}
|
||||
TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs);
|
||||
|
||||
if (!TaskResult.SUCCESS.equals(tr)) {
|
||||
return tr;
|
||||
@@ -53,19 +49,21 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
||||
return tr;
|
||||
}
|
||||
|
||||
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, String groupName, long triggerTimeUnitMs) {
|
||||
try {
|
||||
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
|
||||
for (GroupMemberPO po: poList) {
|
||||
groupService.replaceDBData(po);
|
||||
|
||||
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) {
|
||||
List<GroupMemberPO> groupMemberPOList = new ArrayList<>();
|
||||
TaskResult tr = TaskResult.SUCCESS;
|
||||
for (String groupName : groupNameList) {
|
||||
try {
|
||||
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
|
||||
groupMemberPOList.addAll(poList);
|
||||
} catch (Exception e) {
|
||||
log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e);
|
||||
tr = TaskResult.FAIL;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg={}", clusterPhy.getId(), groupName, e.getMessage());
|
||||
|
||||
return TaskResult.FAIL;
|
||||
}
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
groupService.batchReplace(groupMemberPOList);
|
||||
return tr;
|
||||
}
|
||||
|
||||
private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException {
|
||||
|
||||
Reference in New Issue
Block a user