diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index 9e4c244c..7a0e3eb0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -104,6 +104,13 @@ public interface TopicService { */ List getTopicBrokerList(Long clusterId, String topicName); + /** + * 判断topic是否有数据写入,即分区topic的offset变化 + * @param physicalClusterId 物理集群Id + * @param topicName topic名称 + * @param latestTime 离当前多久开始计算 + * @return + */ Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 154faf77..70ef139c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -247,11 +247,11 @@ public class TopicServiceImpl implements TopicService { @Override public List getTopicPartitionDTO(ClusterDO clusterDO, String topicName, Boolean needDetail) { if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(topicName)) { - return null; + return new ArrayList<>(); } TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName); if (ValidateUtils.isNull(topicMetadata)) { - return null; + return new ArrayList<>(); } List partitionStateList = KafkaZookeeperUtils.getTopicPartitionState( @@ -528,7 +528,7 @@ public class TopicServiceImpl implements TopicService { public List getPartitionOffsetList(ClusterDO clusterDO, String topicName, Long timestamp) { TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName); if (topicMetadata == null) { - return null; + return new ArrayList<>(); } Map timestampsToSearch = new HashMap<>(); for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) { @@ -572,7 +572,7 @@ public class TopicServiceImpl implements TopicService { kafkaConsumer.close(); } } - return null; + return new ArrayList<>(); } private List fetchTopicData(KafkaConsumer kafkaConsumer, ClusterDO clusterDO, String topicName, TopicDataSampleDTO reqObj) { @@ -585,7 +585,7 @@ public class TopicServiceImpl implements TopicService { tpList.add(new TopicPartition(topicName, partitionId)); } if (ValidateUtils.isEmptyList(tpList)) { - return null; + return new ArrayList<>(); } kafkaConsumer.assign(tpList); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java index cb9827bd..c4c89513 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java @@ -28,7 +28,7 @@ public class ZookeeperServiceImpl implements ZookeeperService { @Override public Result openTopicJmx(Long clusterId, String topicName, TopicJmxSwitch jmxSwitch) { - if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName) || ValidateUtils.isNull(jmxSwitch)) { + if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName)) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); }