mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
日志错误信息中补充Topic名称信息
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
@@ -10,13 +10,13 @@ import java.util.Map;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class PartitionOffsetParam extends ClusterPhyParam {
|
||||
public class PartitionOffsetParam extends TopicParam {
|
||||
private Map<TopicPartition, OffsetSpec> topicPartitionOffsets;
|
||||
|
||||
private Long timestamp;
|
||||
|
||||
public PartitionOffsetParam(Long clusterPhyId, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
|
||||
super(clusterPhyId);
|
||||
public PartitionOffsetParam(Long clusterPhyId, String topicName, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
|
||||
super(clusterPhyId, topicName);
|
||||
this.topicPartitionOffsets = topicPartitionOffsets;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
@@ -15,4 +15,12 @@ public class TopicParam extends ClusterPhyParam {
|
||||
super(clusterPhyId);
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicParam{" +
|
||||
"clusterPhyId=" + clusterPhyId +
|
||||
", topicName='" + topicName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +207,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
|
||||
|
||||
try {
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
@@ -226,7 +226,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
|
||||
|
||||
try {
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
@@ -300,7 +300,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
} catch (NotExistException nee) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
|
||||
log.error(
|
||||
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!",
|
||||
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
@@ -355,7 +358,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
} catch (NotExistException nee) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
|
||||
log.error(
|
||||
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!",
|
||||
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user