From 49280a8617ef9dfe14fd92d09d1e1c96eaf7a122 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 19 Dec 2020 00:27:16 +0800 Subject: [PATCH] =?UTF-8?q?v2.1=E7=89=88=E6=9C=AC=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constant/TopicCreationConstant.java | 11 ++ .../manager/common/entity/ResultStatus.java | 2 + .../common/entity/ao/RdTopicBasic.java | 12 ++ .../entity/ao/config/CreateTopicConfig.java | 25 ++- .../common/entity/ao/topic/TopicBasicDTO.java | 14 +- .../common/entity/ao/topic/TopicOverview.java | 11 ++ .../entity/dto/rd/LogicalClusterDTO.java | 6 +- .../common/entity/metrics/TopicMetrics.java | 20 +++ .../entity/vo/common/TopicOverviewVO.java | 12 ++ .../entity/vo/normal/topic/TopicBasicVO.java | 14 ++ .../topic/TopicBrokerRequestTimeVO.java | 39 +++++ .../topic/TopicRequestTimeDetailVO.java | 12 ++ .../common/entity/vo/rd/RdTopicBasicVO.java | 13 ++ .../kafka/manager/common/utils/JsonUtils.java | 2 +- .../common/utils/jmx/MbeanNameUtilV2.java | 5 +- .../manager/common/zookeeper/ZkPathUtil.java | 2 +- .../cache/LogicalClusterMetadataManager.java | 8 +- .../cache/PhysicalClusterMetadataManager.java | 39 ++++- .../service/service/ConfigService.java | 2 + .../manager/service/service/JmxService.java | 5 + .../manager/service/service/TopicService.java | 2 +- .../service/service/gateway/AppService.java | 7 + .../gateway/TopicConnectionService.java | 2 +- .../service/gateway/impl/AppServiceImpl.java | 32 +++- .../impl/GatewayConfigServiceImpl.java | 1 + .../impl/TopicConnectionServiceImpl.java | 9 +- .../service/impl/BrokerServiceImpl.java | 2 + .../service/impl/ClusterServiceImpl.java | 2 +- .../service/impl/ConfigServiceImpl.java | 20 +++ .../service/impl/ConsumerServiceImpl.java | 9 +- .../service/service/impl/JmxServiceImpl.java | 85 +++++++++ .../impl/LogicalClusterServiceImpl.java | 5 +- .../service/impl/RegionServiceImpl.java | 4 + .../service/impl/TopicManagerServiceImpl.java | 22 ++- .../service/impl/TopicServiceImpl.java | 27 +-- .../manager/service/utils/ConfigUtils.java | 11 ++ .../manager/service/utils/TopicCommands.java | 27 ++- .../manager/dao/TopicThrottledMetricsDao.java | 2 + .../impl/TopicThrottledMetricsDaoImpl.java | 5 + .../mapper/TopicThrottledMetricsDao.xml | 5 + .../account/impl/AccountServiceImpl.java | 3 + .../manager/bpm/common/OrderTypeEnum.java | 25 +-- ...OrderExtensionThirdPartDeleteTopicDTO.java | 69 ++++++++ .../bpm/component/LocalStorageService.java | 9 - .../manager/bpm/impl/OrderServiceImpl.java | 8 + .../order/impl/ThirdPartDeleteTopicOrder.java | 164 ++++++++++++++++++ .../manager/kcm/component/agent/n9e/N9e.java | 8 +- .../kcm/component/storage/local/Local.java | 6 +- .../kcm/impl/KafkaFileServiceImpl.java | 2 + .../kcm/tasks/ClusterHostTaskService.java | 2 +- .../src/main/resources/kcm_script.sh | 12 +- .../openapi/impl/ThirdPartServiceImpl.java | 41 ++++- ...llectAndPublishCommunityTopicMetrics.java} | 8 +- .../metrics/delete/DeleteMetrics.java | 43 +++-- .../metrics/store/StoreBrokerMetrics.java | 2 + .../store/StoreDiDiAppTopicMetrics.java | 2 + .../StoreDiDiTopicRequestTimeMetrics.java | 2 + .../dispatch/op/AutoHandleTopicOrder.java | 7 +- .../SinkCommunityTopicMetrics2Monitor.java | 2 +- .../StoreCommunityTopicMetrics2DB.java | 2 + .../StoreTopicThrottledMetrics2DB.java | 2 + .../metadata/FlushClusterMetadata.java | 3 - kafka-manager-web/assembly.xml | 44 +++++ kafka-manager-web/pom.xml | 20 +++ .../gateway/GatewayHeartbeatController.java | 26 ++- .../GatewayServiceDiscoveryController.java | 7 + .../normal/NormalAccountController.java | 8 + .../normal/NormalAppController.java | 18 +- .../normal/NormalConsumerController.java | 1 + .../normal/NormalTopicController.java | 43 +++-- .../api/versionone/rd/RdTopicController.java | 1 + .../web/config/RestTemplateConfig.java | 29 ++++ .../web/converters/TopicModelConverter.java | 69 ++++++++ .../src/main/resources/application.yml | 21 ++- .../src/main/resources/logback-spring.xml | 4 +- 75 files changed, 1098 insertions(+), 148 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBrokerRequestTimeVO.java create mode 100644 kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/OrderExtensionThirdPartDeleteTopicDTO.java create mode 100644 kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/order/impl/ThirdPartDeleteTopicOrder.java rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/{store/StoreCommunityTopicMetrics.java => collect/CollectAndPublishCommunityTopicMetrics.java} (86%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/{dispatch/metrics/store => listener}/SinkCommunityTopicMetrics2Monitor.java (98%) create mode 100644 kafka-manager-web/assembly.xml create mode 100644 kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/RestTemplateConfig.java diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java index 423c6d1d..f2f1922c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java @@ -46,4 +46,15 @@ public class TopicCreationConstant { public static final String TOPIC_NAME_PREFIX_RU = "ru01_"; public static final Integer TOPIC_NAME_MAX_LENGTH = 255; + + + /** + * 单次自动化审批, 默认允许的通过单子 + */ + public static final Integer DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK = 1; + + /** + * 单次自动化审批, 最多允许的通过单子 + */ + public static final Integer MAX_PASSED_ORDER_NUM_PER_TASK = 200; } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java index ce044a13..d59ade76 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java @@ -86,6 +86,8 @@ public enum ResultStatus { APP_ID_OR_PASSWORD_ILLEGAL(1000, "app or password illegal"), SYSTEM_CODE_ILLEGAL(1000, "system code illegal"), + CLUSTER_TASK_HOST_LIST_ILLEGAL(1000, "主机列表错误,请检查主机列表"), + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/RdTopicBasic.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/RdTopicBasic.java index 3cecd3cf..bf57a800 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/RdTopicBasic.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/RdTopicBasic.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.common.entity.ao; +import java.util.List; import java.util.Properties; /** @@ -23,6 +24,8 @@ public class RdTopicBasic { private String description; + private List regionNameList; + public Long getClusterId() { return clusterId; } @@ -87,6 +90,14 @@ public class RdTopicBasic { this.description = description; } + public List getRegionNameList() { + return regionNameList; + } + + public void setRegionNameList(List regionNameList) { + this.regionNameList = regionNameList; + } + @Override public String toString() { return "RdTopicBasic{" + @@ -98,6 +109,7 @@ public class RdTopicBasic { ", appName='" + appName + '\'' + ", properties=" + properties + ", description='" + description + '\'' + + ", regionNameList='" + regionNameList + '\'' + '}'; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/CreateTopicConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/CreateTopicConfig.java index 35f694f8..897222a3 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/CreateTopicConfig.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/CreateTopicConfig.java @@ -1,5 +1,8 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.config; +import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; + import java.util.List; /** @@ -7,8 +10,27 @@ import java.util.List; * @date 20/7/24 */ public class CreateTopicConfig { + /** + * 单次自动化审批, 允许的通过单子 + */ + private Integer maxPassedOrderNumPerTask; + private List configList; + public Integer getMaxPassedOrderNumPerTask() { + if (ValidateUtils.isNull(maxPassedOrderNumPerTask)) { + return TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK; + } + if (maxPassedOrderNumPerTask > TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK) { + return TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK; + } + return maxPassedOrderNumPerTask; + } + + public void setMaxPassedOrderNumPerTask(Integer maxPassedOrderNumPerTask) { + this.maxPassedOrderNumPerTask = maxPassedOrderNumPerTask; + } + public List getConfigList() { return configList; } @@ -20,7 +42,8 @@ public class CreateTopicConfig { @Override public String toString() { return "CreateTopicConfig{" + - "configList=" + configList + + "maxPassedOrderNumPerTask=" + maxPassedOrderNumPerTask + + ", configList=" + configList + '}'; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java index 9522feee..e3ea08ed 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.topic; +import java.util.List; + /** * @author arthur * @date 2018/09/03 @@ -17,7 +19,7 @@ public class TopicBasicDTO { private String description; - private String region; + private List regionNameList; private Integer score; @@ -83,12 +85,12 @@ public class TopicBasicDTO { this.description = description; } - public String getRegion() { - return region; + public List getRegionNameList() { + return regionNameList; } - public void setRegion(String region) { - this.region = region; + public void setRegionNameList(List regionNameList) { + this.regionNameList = regionNameList; } public Integer getScore() { @@ -164,7 +166,7 @@ public class TopicBasicDTO { ", principals='" + principals + '\'' + ", topicName='" + topicName + '\'' + ", description='" + description + '\'' + - ", region='" + region + '\'' + + ", regionNameList='" + regionNameList + '\'' + ", score=" + score + ", topicCodeC='" + topicCodeC + '\'' + ", partitionNum=" + partitionNum + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicOverview.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicOverview.java index 03dd9e37..fe02fe94 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicOverview.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicOverview.java @@ -18,6 +18,8 @@ public class TopicOverview { private Object byteIn; + private Object byteOut; + private Object produceRequest; private String appName; @@ -78,6 +80,14 @@ public class TopicOverview { this.byteIn = byteIn; } + public Object getByteOut() { + return byteOut; + } + + public void setByteOut(Object byteOut) { + this.byteOut = byteOut; + } + public Object getProduceRequest() { return produceRequest; } @@ -135,6 +145,7 @@ public class TopicOverview { ", partitionNum=" + partitionNum + ", retentionTime=" + retentionTime + ", byteIn=" + byteIn + + ", byteOut=" + byteOut + ", produceRequest=" + produceRequest + ", appName='" + appName + '\'' + ", appId='" + appId + '\'' + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java index c3569774..790f9758 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java @@ -1,6 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.dto.rd; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.xiaojukeji.kafka.manager.common.bizenum.ClusterModeEnum; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -108,10 +109,13 @@ public class LogicalClusterDTO { if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(regionIdList) - || ValidateUtils.isNull(appId) || ValidateUtils.isNull(mode)) { return false; } + if (!ClusterModeEnum.SHARED_MODE.getCode().equals(mode) && ValidateUtils.isNull(appId)) { + return false; + } + appId = ValidateUtils.isNull(appId)? "": appId; description = ValidateUtils.isNull(description)? "": description; return true; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/metrics/TopicMetrics.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/metrics/TopicMetrics.java index 768d7d84..33c8aaeb 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/metrics/TopicMetrics.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/metrics/TopicMetrics.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.metrics; +import java.util.List; + /** * @author zengqiao * @date 20/6/17 @@ -11,6 +13,8 @@ public class TopicMetrics extends BaseMetrics { private String topicName; + private List brokerMetricsList; + public TopicMetrics(Long clusterId, String topicName) { super(); this.clusterId = clusterId; @@ -24,6 +28,14 @@ public class TopicMetrics extends BaseMetrics { this.topicName = topicName; } + public TopicMetrics(String appId, Long clusterId, String topicName, List brokerMetricsList) { + super(); + this.appId = appId; + this.clusterId = clusterId; + this.topicName = topicName; + this.brokerMetricsList = brokerMetricsList; + } + public String getAppId() { return appId; } @@ -36,6 +48,14 @@ public class TopicMetrics extends BaseMetrics { return topicName; } + public void setBrokerMetricsList(List brokerMetricsList) { + this.brokerMetricsList = brokerMetricsList; + } + + public List getBrokerMetricsList() { + return brokerMetricsList; + } + @Override public String toString() { return "TopicMetrics{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/common/TopicOverviewVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/common/TopicOverviewVO.java index bf462ffc..724e31b2 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/common/TopicOverviewVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/common/TopicOverviewVO.java @@ -28,6 +28,9 @@ public class TopicOverviewVO { @ApiModelProperty(value = "每秒流入流量(B)") private Object byteIn; + @ApiModelProperty(value = "每秒流出流量(B)") + private Object byteOut; + @ApiModelProperty(value = "发送请求数(个/秒)") private Object produceRequest; @@ -94,6 +97,14 @@ public class TopicOverviewVO { this.byteIn = byteIn; } + public Object getByteOut() { + return byteOut; + } + + public void setByteOut(Object byteOut) { + this.byteOut = byteOut; + } + public Object getProduceRequest() { return produceRequest; } @@ -151,6 +162,7 @@ public class TopicOverviewVO { ", partitionNum=" + partitionNum + ", retentionTime=" + retentionTime + ", byteIn=" + byteIn + + ", byteOut=" + byteOut + ", produceRequest=" + produceRequest + ", appName='" + appName + '\'' + ", appId='" + appId + '\'' + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java index f3fcb952..946a9997 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java @@ -3,6 +3,8 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import java.util.List; + /** * Topic的基本信息 * @author zengqiao @@ -49,6 +51,9 @@ public class TopicBasicVO { @ApiModelProperty(value = "集群地址") private String bootstrapServers; + @ApiModelProperty(value = "所属region") + private List regionNameList; + public Long getClusterId() { return clusterId; } @@ -153,6 +158,14 @@ public class TopicBasicVO { this.score = score; } + public List getRegionNameList() { + return regionNameList; + } + + public void setRegionNameList(List regionNameList) { + this.regionNameList = regionNameList; + } + @Override public String toString() { return "TopicBasicVO{" + @@ -169,6 +182,7 @@ public class TopicBasicVO { ", topicCodeC='" + topicCodeC + '\'' + ", description='" + description + '\'' + ", bootstrapServers='" + bootstrapServers + '\'' + + ", regionNameList=" + regionNameList + '}'; } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBrokerRequestTimeVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBrokerRequestTimeVO.java new file mode 100644 index 00000000..486dcc61 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBrokerRequestTimeVO.java @@ -0,0 +1,39 @@ +package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; + +/** + * author: mrazkonglingxu + * Date: 2020/12/7 + * Time: 7:40 下午 + */ +public class TopicBrokerRequestTimeVO { + + private Long clusterId; + + private Integer brokerId; + + private TopicRequestTimeDetailVO brokerRequestTime; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public Integer getBrokerId() { + return brokerId; + } + + public void setBrokerId(Integer brokerId) { + this.brokerId = brokerId; + } + + public TopicRequestTimeDetailVO getBrokerRequestTime() { + return brokerRequestTime; + } + + public void setBrokerRequestTime(TopicRequestTimeDetailVO brokerRequestTime) { + this.brokerRequestTime = brokerRequestTime; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicRequestTimeDetailVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicRequestTimeDetailVO.java index c89c7cb2..346e2383 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicRequestTimeDetailVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicRequestTimeDetailVO.java @@ -3,6 +3,8 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import java.util.List; + /** * @author zengqiao * @date 20/4/8 @@ -33,6 +35,8 @@ public class TopicRequestTimeDetailVO { @ApiModelProperty(value = "totalTimeMs") private Object totalTimeMs; + private List brokerRequestTimeList; + public String getRequestTimeType() { return requestTimeType; } @@ -97,6 +101,14 @@ public class TopicRequestTimeDetailVO { this.totalTimeMs = totalTimeMs; } + public List getBrokerRequestTimeList() { + return brokerRequestTimeList; + } + + public void setBrokerRequestTimeList(List brokerRequestTimeList) { + this.brokerRequestTimeList = brokerRequestTimeList; + } + @Override public String toString() { return "TopicRequestTimeDetailVO{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/RdTopicBasicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/RdTopicBasicVO.java index 55682938..75d50f05 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/RdTopicBasicVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/RdTopicBasicVO.java @@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.rd; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import java.util.List; import java.util.Properties; /** @@ -35,6 +36,9 @@ public class RdTopicBasicVO { @ApiModelProperty(value = "备注") private String description; + @ApiModelProperty(value = "所属region") + private List regionNameList; + public Long getClusterId() { return clusterId; } @@ -99,6 +103,14 @@ public class RdTopicBasicVO { this.description = description; } + public List getRegionNameList() { + return regionNameList; + } + + public void setRegionNameList(List regionNameList) { + this.regionNameList = regionNameList; + } + @Override public String toString() { return "RdTopicBasicVO{" + @@ -110,6 +122,7 @@ public class RdTopicBasicVO { ", appName='" + appName + '\'' + ", properties=" + properties + ", description='" + description + '\'' + + ", regionNameList='" + regionNameList + '\'' + '}'; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java index a8c4997e..1d4bce26 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java @@ -64,7 +64,7 @@ public class JsonUtils { TopicConnectionDO connectionDO = new TopicConnectionDO(); String[] appIdDetailArray = appIdDetail.toString().split("#"); - if (appIdDetailArray.length == 3) { + if (appIdDetailArray.length >= 3) { connectionDO.setAppId(appIdDetailArray[0]); connectionDO.setIp(appIdDetailArray[1]); connectionDO.setClientVersion(appIdDetailArray[2]); diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/MbeanNameUtilV2.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/MbeanNameUtilV2.java index d7d4ffe9..eb43e989 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/MbeanNameUtilV2.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/MbeanNameUtilV2.java @@ -170,7 +170,10 @@ public class MbeanNameUtilV2 { new MbeanV2( "TopicCodeC", JmxAttributeEnum.VALUE_ATTRIBUTE, - "kafka.server:type=ReplicaManager,name=TopicCodeC" + Arrays.asList( + new AbstractMap.SimpleEntry<>(KafkaVersion.VERSION_0_10_3, "kafka.server:type=ReplicaManager,name=TopicCodeC"), + new AbstractMap.SimpleEntry<>(KafkaVersion.VERSION_MAX, "kafka.server:type=AppIdTopicMetrics,name=RecordCompression,appId=") + ) ), Arrays.asList( KafkaMetricsCollections.TOPIC_BASIC_PAGE_METRICS diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java index b1758205..464dba7b 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java @@ -8,7 +8,7 @@ package com.xiaojukeji.kafka.manager.common.zookeeper; public class ZkPathUtil { private static final String ZOOKEEPER_SEPARATOR = "/"; - private static final String BROKER_ROOT_NODE = ZOOKEEPER_SEPARATOR + "brokers"; + public static final String BROKER_ROOT_NODE = ZOOKEEPER_SEPARATOR + "brokers"; public static final String CONTROLLER_ROOT_NODE = ZOOKEEPER_SEPARATOR + "controller"; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java index 050e2164..07c81878 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java @@ -79,6 +79,7 @@ public class LogicalClusterMetadataManager { Long logicalClusterId = logicalClusterIdMap.get(topicName); if (ValidateUtils.isNull(logicalClusterId)) { + LOGGER.debug("class=LogicalClusterMetadataManager||method=getTopicLogicalCluster||topicName={}||msg=logicalClusterId is null!",topicName); return null; } return LOGICAL_CLUSTER_MAP.get(logicalClusterId); @@ -107,6 +108,7 @@ public class LogicalClusterMetadataManager { public Long getPhysicalClusterId(Long logicalClusterId) { if (ValidateUtils.isNull(logicalClusterId)) { + LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||msg=logicalClusterId is null!"); return null; } if (!LOADED.get()) { @@ -114,6 +116,7 @@ public class LogicalClusterMetadataManager { } LogicalClusterDO logicalClusterDO = LOGICAL_CLUSTER_MAP.get(logicalClusterId); if (ValidateUtils.isNull(logicalClusterDO)) { + LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||logicalClusterId={}||msg=logicalClusterDO is null!",logicalClusterId); return null; } return logicalClusterDO.getClusterId(); @@ -124,6 +127,7 @@ public class LogicalClusterMetadataManager { return clusterId; } if (ValidateUtils.isNull(clusterId)) { + LOGGER.warn("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||isPhysicalClusterId={}||msg=clusterId is null!",isPhysicalClusterId); return null; } if (!LOADED.get()) { @@ -131,6 +135,7 @@ public class LogicalClusterMetadataManager { } LogicalClusterDO logicalClusterDO = LOGICAL_CLUSTER_MAP.get(clusterId); if (ValidateUtils.isNull(logicalClusterDO)) { + LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||clusterId={}||msg=logicalClusterDO is null!",clusterId); return null; } return logicalClusterDO.getClusterId(); @@ -171,8 +176,7 @@ public class LogicalClusterMetadataManager { for (Long regionId: regionIdList) { RegionDO regionDO = regionMap.get(regionId); if (ValidateUtils.isNull(regionDO) || !logicalClusterDO.getClusterId().equals(regionDO.getClusterId())) { - LOGGER.warn("flush logical cluster metadata failed, exist illegal region, logicalCluster:{} region:{}.", - logicalClusterDO, regionId); + LOGGER.warn("flush logical cluster metadata failed, exist illegal region, logicalCluster:{} region:{}.", logicalClusterDO, regionId); continue; } brokerIdSet.addAll(ListUtils.string2IntList(regionDO.getBrokerList())); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index f1b7d1fa..3c68b30f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -86,19 +86,36 @@ public class PhysicalClusterMetadataManager { if (ZK_CONFIG_MAP.containsKey(clusterDO.getId())) { return; } - ZkConfigImpl zkConfig = new ZkConfigImpl(clusterDO.getZookeeper()); - //增加Broker监控 + // 初始化broker-map BROKER_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); JMX_CONNECTOR_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); KAFKA_VERSION_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); + + // 初始化topic-map + TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); + TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); + + // 初始化cluster-map + CLUSTER_MAP.put(clusterDO.getId(), clusterDO); + + if (!zkConfig.checkPathExists(ZkPathUtil.BROKER_ROOT_NODE)) { + LOGGER.info("ignore add cluster, zk path=/brokers not exist, clusterId:{}.", clusterDO.getId()); + try { + zkConfig.close(); + } catch (Exception e) { + LOGGER.warn("ignore add cluster, close zk connection failed, cluster:{}.", clusterDO, e); + } + return; + } + + //增加Broker监控 BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn()); brokerListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); @@ -109,10 +126,6 @@ public class PhysicalClusterMetadataManager { controllerListener.init(); zkConfig.watch(ZkPathUtil.CONTROLLER_ROOT_NODE, controllerListener); - //增加Config变更监控 - TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); - - CLUSTER_MAP.put(clusterDO.getId(), clusterDO); ZK_CONFIG_MAP.put(clusterDO.getId(), zkConfig); } catch (Exception e) { LOGGER.error("add cluster failed, cluster:{}.", clusterDO, e); @@ -444,8 +457,16 @@ public class PhysicalClusterMetadataManager { return kafkaVersion; } - public String getKafkaVersion(Long clusterId) { - return getKafkaVersion(clusterId, PhysicalClusterMetadataManager.getBrokerIdList(clusterId)); + public String getKafkaVersionFromCache(Long clusterId) { + Set kafkaVersionSet = new HashSet<>(); + for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) { + String kafkaVersion = this.getKafkaVersionFromCache(clusterId, brokerId); + if (ValidateUtils.isBlank(kafkaVersion)) { + continue; + } + kafkaVersionSet.add(kafkaVersion); + } + return ListUtils.strList2String(new ArrayList<>(kafkaVersionSet)); } public String getKafkaVersion(Long clusterId, List brokerIdList) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConfigService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConfigService.java index aba1362a..9f5d6915 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConfigService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConfigService.java @@ -31,6 +31,8 @@ public interface ConfigService { List listAll(); + Integer getAutoPassedTopicApplyOrderNumPerTask(); + CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode); ClusterDO getClusterDO(Long clusterId); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/JmxService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/JmxService.java index 157f0e4c..22e20888 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/JmxService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/JmxService.java @@ -31,6 +31,11 @@ public interface JmxService { TopicMetrics getTopicMetrics(Long clusterId, Integer brokerId, String topicName, Integer metricsCode, Boolean byAdd); + /** + * 获取topic消息压缩指标 + */ + String getTopicCodeCValue(Long clusterId, String topicName); + List getTopicMetrics(Long clusterId, Integer metricsCode, Boolean byAdd); /** diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index 4344decb..dacba4b0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -64,7 +64,7 @@ public interface TopicService { /** * 获取Topic的分区的offset */ - Map getPartitionOffset(ClusterDO cluster, String topicName, OffsetPosEnum offsetPosEnum); + Map getPartitionOffset(ClusterDO clusterDO, String topicName, OffsetPosEnum offsetPosEnum); /** * 获取Topic概览信息 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppService.java index b0911e16..c78946b6 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppService.java @@ -51,6 +51,13 @@ public interface AppService { */ List getByPrincipal(String principal); + /** + * 通过appId来查,需要check当前登录人是否有权限. + * @param appId appId + * @return AppDO + */ + AppDO getAppByUserAndId(String appId, String curUser); + /** * 通过appId来查 * @param appId appId diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java index 79544608..f73ff8d5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java @@ -11,7 +11,7 @@ import java.util.List; * @date 20/4/13 */ public interface TopicConnectionService { - int batchAdd(List doList); + void batchAdd(List doList); /** * 查询连接信息 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AppServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AppServiceImpl.java index 0f362f7f..eada7df5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AppServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AppServiceImpl.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * @author zhongyuankai @@ -59,10 +60,13 @@ public class AppServiceImpl implements AppService { @Autowired private OperateRecordService operateRecordService; + + @Override public ResultStatus addApp(AppDO appDO) { try { if (appDao.insert(appDO) < 1) { + LOGGER.warn("class=AppServiceImpl||method=addApp||AppDO={}||msg=add fail,{}",appDO,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } KafkaUserDO kafkaUserDO = new KafkaUserDO(); @@ -72,6 +76,7 @@ public class AppServiceImpl implements AppService { kafkaUserDO.setUserType(0); kafkaUserDao.insert(kafkaUserDO); } catch (DuplicateKeyException e) { + LOGGER.error("class=AppServiceImpl||method=addApp||errMsg={}||appDO={}|", e.getMessage(), appDO, e); return ResultStatus.RESOURCE_ALREADY_EXISTED; } catch (Exception e) { LOGGER.error("add app failed, appDO:{}.", appDO, e); @@ -139,23 +144,42 @@ public class AppServiceImpl implements AppService { return ResultStatus.SUCCESS; } } catch (DuplicateKeyException e) { + LOGGER.error("class=AppServiceImpl||method=updateByAppId||errMsg={}||AppDTO={}||operator={}||adminApi={}", e.getMessage(), dto, operator, adminApi, e); return ResultStatus.RESOURCE_NAME_DUPLICATED; } catch (Exception e) { LOGGER.error("update app failed, dto:{}, operator:{}, adminApi:{}.", dto, operator, adminApi, e); } + LOGGER.warn("class=AppServiceImpl||method=updateByAppId||dto={}||operator={}||adminApi={}||msg=update app fail,{}!", dto,operator,adminApi,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @Override - public List getByPrincipal(String principals) { + public List getByPrincipal(String principal) { try { - return appDao.getByPrincipal(principals); + List appDOs = appDao.getByPrincipal(principal); + if (!ValidateUtils.isEmptyList(appDOs)) { + return appDOs.stream() + .filter(appDO -> ListUtils.string2StrList(appDO.getPrincipals()).contains(principal)) + .collect(Collectors.toList()); + } } catch (Exception e) { - LOGGER.error("get app list failed, principals:{}.", principals); + LOGGER.error("get app list failed, principals:{}.", principal); } return new ArrayList<>(); } + @Override + public AppDO getAppByUserAndId(String appId, String curUser) { + AppDO appDO = this.getByAppId(appId); + if (appDO != null) { + if (ListUtils.string2StrList(appDO.getPrincipals()).contains(curUser)) { + return appDO; + } + } + LOGGER.debug("class=AppServiceImpl||method=getAppByUserAndId||appId={}||curUser={}||msg=appDO is null!", appId, curUser); + return null; + } + @Override public AppDO getByAppId(String appId) { try { @@ -177,6 +201,7 @@ public class AppServiceImpl implements AppService { // 查询AppID AppDO appDO = appDao.getByAppId(appId); if (ValidateUtils.isNull(appDO)) { + LOGGER.debug("class=AppServiceImpl||method=getAppTopicDTOList||appId={}||msg=appDO is null!", appId); return new ArrayList<>(); } @@ -220,6 +245,7 @@ public class AppServiceImpl implements AppService { appTopicDTO.setLogicalClusterId(logicalClusterDO.getId()); appTopicDTO.setLogicalClusterName(logicalClusterDO.getName()); } else { + LOGGER.warn("class=AppServiceImpl||method=getAppTopicDTOList||clusterId={}||topicName={}||msg=logicalClusterDO is null!", authorityDO.getClusterId(), authorityDO.getTopicName()); continue; } appTopicDTO.setOperator(""); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java index 8dfb26c4..3db556a5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java @@ -120,6 +120,7 @@ public class GatewayConfigServiceImpl implements GatewayConfigService { try { doList = gatewayConfigDao.getByConfigType(GatewayConfigKeyEnum.SD_SP_RATE.getConfigType()); if (ValidateUtils.isEmptyList(doList)) { + LOGGER.debug("class=GatewayConfigServiceImpl||method=getSpRateConfig||requestVersion={}||msg=doList is empty!",requestVersion); return new SpRateConfig(Long.MIN_VALUE, new HashMap<>(0)); } Long maxVersion = Long.MIN_VALUE; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index b29db05c..27274aa5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -27,19 +27,20 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { private TopicConnectionDao topicConnectionDao; @Override - public int batchAdd(List doList) { + public void batchAdd(List doList) { if (ValidateUtils.isEmptyList(doList)) { - return 0; + return; } + int count = 0; for (TopicConnectionDO connectionDO: doList) { try { count += topicConnectionDao.replace(connectionDO); } catch (Exception e) { - LOGGER.error("replace topic connections failed, data:{}.", connectionDO); + LOGGER.error("class=TopicConnectionServiceImpl||method=batchAdd||connectionDO={}||errMsg={}", connectionDO, e.getMessage()); } } - return count; + LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", doList.size(), count); } @Override diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index d6065708..12af2e18 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -150,6 +150,8 @@ public class BrokerServiceImpl implements BrokerService { for (Integer brokerId: brokerIdSet) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); if (ValidateUtils.isNull(brokerMetadata)) { + LOGGER.warn("class=BrokerServiceImpl||method=getBrokerOverviewList||brokerId={}|||msg=brokerMetadata is null!", + brokerId); continue; } overviewDTOMap.put(brokerId, BrokerOverviewDTO.newInstance( diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index 4b3d3f9e..49c16c5d 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -286,7 +286,7 @@ public class ClusterServiceImpl implements ClusterService { dto.setClusterName(clusterDO.getClusterName()); dto.setZookeeper(clusterDO.getZookeeper()); dto.setBootstrapServers(clusterDO.getBootstrapServers()); - dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersion(clusterDO.getId())); + dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersionFromCache(clusterDO.getId())); dto.setIdc(configUtils.getIdc()); dto.setSecurityProperties(clusterDO.getSecurityProperties()); dto.setStatus(clusterDO.getStatus()); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java index 5ffc4729..4b896bee 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java @@ -42,6 +42,7 @@ public class ConfigServiceImpl implements ConfigService { } catch (Exception e) { LOGGER.error("insert config failed, config:{}.", dto, e); } + LOGGER.warn("class=ConfigServiceImpl||method=insert||dto={}||msg=insert config fail,{}!", dto,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @@ -54,10 +55,12 @@ public class ConfigServiceImpl implements ConfigService { if (configDao.deleteByKey(configKey) >= 1) { return ResultStatus.SUCCESS; } + LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||msg=delete config fail,{}!", configKey,ResultStatus.CONFIG_NOT_EXIST.getMessage()); return ResultStatus.CONFIG_NOT_EXIST; } catch (Exception e) { LOGGER.error("delete config failed, configKey:{}.", configKey, e); } + LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||msg=delete config fail,{}!", configKey,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @@ -67,10 +70,12 @@ public class ConfigServiceImpl implements ConfigService { if (configDao.updateByKey(convert2ConfigDO(dto)) >= 1) { return ResultStatus.SUCCESS; } + LOGGER.warn("class=ConfigServiceImpl||method=updateByKey||dto={}||msg=update config fail,{}!", dto,ResultStatus.CONFIG_NOT_EXIST.getMessage()); return ResultStatus.CONFIG_NOT_EXIST; } catch (Exception e) { LOGGER.error("update config failed, config:{}.", dto, e); } + LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||dto={}||msg=delete config fail,{}!", dto,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @@ -84,10 +89,15 @@ public class ConfigServiceImpl implements ConfigService { if (configDao.updateByKey(configDO) >= 1) { return ResultStatus.SUCCESS; } + LOGGER.warn("class=ConfigServiceImpl||method=updateByKey||configKey={}||configValue={}||msg=update config fail,{}!" + , configKey,configValue,ResultStatus.CONFIG_NOT_EXIST.getMessage()); return ResultStatus.CONFIG_NOT_EXIST; } catch (Exception e) { LOGGER.error("update config failed, configValue:{}.", configValue, e); } + LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||configValue={}||msg=delete config fail,{}!" + , configKey,configValue,ResultStatus.MYSQL_ERROR.getMessage()); + return ResultStatus.MYSQL_ERROR; } @@ -161,6 +171,16 @@ public class ConfigServiceImpl implements ConfigService { return configDO; } + @Override + public Integer getAutoPassedTopicApplyOrderNumPerTask() { + String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY; + CreateTopicConfig configValue = this.getByKey(configKey, CreateTopicConfig.class); + if (ValidateUtils.isNull(configValue)) { + return TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK; + } + return configValue.getMaxPassedOrderNumPerTask(); + } + @Override public CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode) { String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index 7f905275..fcfc72ec 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -110,6 +110,8 @@ public class ConsumerServiceImpl implements ConsumerService { ConsumerGroupDTO consumeGroupDTO) { TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName); if (topicMetadata == null) { + logger.warn("class=ConsumerServiceImpl||method=getConsumeDetail||clusterId={}||topicName={}||msg=topicMetadata is null!", + clusterDO.getId(), topicName); return null; } @@ -120,6 +122,7 @@ public class ConsumerServiceImpl implements ConsumerService { consumerGroupDetailDTOList = getConsumerPartitionStateInBroker(clusterDO, topicMetadata, consumeGroupDTO); } if (consumerGroupDetailDTOList == null) { + logger.info("class=ConsumerServiceImpl||method=getConsumeDetail||msg=consumerGroupDetailDTOList is null!"); return null; } @@ -167,7 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService { kafkaConsumer.close(); } } - return new ArrayList<>(); + return resultList; } private List resetConsumerOffset(ClusterDO cluster, KafkaConsumer kafkaConsumer, ConsumerGroupDTO consumerGroupDTO, Map offsetMap) { @@ -184,7 +187,9 @@ public class ConsumerServiceImpl implements ConsumerService { } } catch (Exception e) { logger.error("reset failed, clusterId:{} consumerGroup:{} topic-partition:{}.", cluster.getId(), consumerGroupDTO, tp, e); - resultList.add(new Result()); + resultList.add(new Result( + ResultStatus.OPERATION_FAILED.getCode(), + "reset failed...")); } resultList.add(new Result()); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index e4f1af29..611dc203 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -1,11 +1,13 @@ package com.xiaojukeji.kafka.manager.service.service.impl; +import com.google.common.base.Joiner; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; @@ -123,11 +125,19 @@ public class JmxServiceImpl implements JmxService { return null; } TopicMetrics metrics = null; + List brokerMetricsList = new ArrayList<>(); for (Integer brokerId : topicMetadata.getBrokerIdSet()) { TopicMetrics subMetrics = getTopicMetrics(clusterId, brokerId, topicName, metricsCode, byAdd); + if (ValidateUtils.isNull(subMetrics)) { continue; } + + BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId); + brokerMetrics.setMetricsMap(subMetrics.getMetricsMap()); + + brokerMetricsList.add(brokerMetrics); + if (ValidateUtils.isNull(metrics)) { metrics = new TopicMetrics(clusterId, topicName); } @@ -137,6 +147,10 @@ public class JmxServiceImpl implements JmxService { metrics.mergeByMax(subMetrics); } } + if (!ValidateUtils.isNull(metrics)) { + metrics.setBrokerMetricsList(brokerMetricsList); + } + return metrics; } @@ -169,6 +183,77 @@ public class JmxServiceImpl implements JmxService { return metrics; } + @Override + public String getTopicCodeCValue(Long clusterId, String topicName) { + TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName); + if (topicMetadata == null) { + return null; + } + + MbeanV2 topicCodeCMBean = null; + List mbeanV2List = MbeanNameUtilV2.getMbeanList(KafkaMetricsCollections.TOPIC_BASIC_PAGE_METRICS); + if (!ValidateUtils.isEmptyList(mbeanV2List)) { + topicCodeCMBean = mbeanV2List.stream() + .filter(mbeanV2 -> "TopicCodeC".equals(mbeanV2.getFieldName())) + .findFirst() + .orElse(null); + } + + if (topicCodeCMBean == null) { + return null; + } + + KafkaVersion kafkaVersion; + Set codeCValues = new HashSet<>(); + TopicMetrics metrics = new TopicMetrics(clusterId, topicName); + for (Integer brokerId : topicMetadata.getBrokerIdSet()) { + JmxConnectorWrap jmxConnectorWrap = PhysicalClusterMetadataManager.getJmxConnectorWrap(clusterId, brokerId); + if (ValidateUtils.isNull(jmxConnectorWrap)|| !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) { + continue; + } + kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId); + // 如果是高版本,需要获取指标{kafka.server:type=AppIdTopicMetrics,name=RecordCompression,appId=*,topic=xxx} + if (kafkaVersion.getVersionNum() > KafkaVersion.VERSION_0_10_3.longValue()) { + try { + ObjectName objectNameRegX = new ObjectName(topicCodeCMBean.getObjectName(kafkaVersion.getVersionNum()) + + "*,topic=" + topicName); + QueryExp exp = Query.match(Query.attr("Value"), Query.value("*")); + Set objectNames = jmxConnectorWrap.queryNames(objectNameRegX, exp); + for (ObjectName objectName : objectNames) { + if (objectName.toString().indexOf(",appId=admin,") == -1) { + String value = (String) jmxConnectorWrap.getAttribute(objectName, "Value"); + if (!codeCValues.contains(value)) { + codeCValues.add(value); + } + } + } + } catch (Exception e) { + LOGGER.error("get topic codec metrics failed, clusterId:{} brokerId:{} topicName:{} mbean:{}.", + clusterId, brokerId, topicName, topicCodeCMBean, e + ); + } + } else { + // 低版本沿用老逻辑... + try { + getAndSupplyAttributes2BaseMetrics( + metrics, + jmxConnectorWrap, + topicCodeCMBean, + new ObjectName(topicCodeCMBean.getObjectName(kafkaVersion.getVersionNum()) + ",topic=" + topicName) + ); + } catch (Exception e) { + LOGGER.error("get topic codec metrics failed, clusterId:{} topicName:{} mbean:{}.", + clusterId, topicName, topicCodeCMBean, e + ); + } + } + } + + codeCValues.addAll(ListUtils.string2StrList(metrics.getSpecifiedMetrics("TopicCodeCValue", String.class))); + + return Joiner.on(",").join(codeCValues); + } + private void getAndSupplyAttributes2BaseMetrics(BaseMetrics metrics, JmxConnectorWrap jmxConnectorWrap, MbeanV2 mbeanV2, diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java index 2bee3d80..5b2fb703 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java @@ -88,6 +88,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService { public LogicalCluster getLogicalCluster(Long logicalClusterId) { LogicalClusterDO logicalClusterDO = logicClusterMetadataManager.getLogicalCluster(logicalClusterId); if (ValidateUtils.isNull(logicalClusterDO)) { + LOGGER.warn("class=LogicalClusterServiceImpl||method=getLogicalCluster||logicalClusterId={}||msg=logicalClusterDO is null!", logicalClusterId); return null; } return convert2LogicalCluster(logicalClusterDO); @@ -223,8 +224,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService { return ResultStatus.SUCCESS; } } catch (DuplicateKeyException e) { - LOGGER.error("create logical cluster failed, name already existed, newLogicalClusterDO:{}.", - logicalClusterDO, e); + LOGGER.error("create logical cluster failed, name already existed, newLogicalClusterDO:{}.", logicalClusterDO, e); return ResultStatus.RESOURCE_ALREADY_EXISTED; } catch (Exception e) { LOGGER.error("create logical cluster failed, mysql error, newLogicalClusterDO:{}.", logicalClusterDO, e); @@ -264,6 +264,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService { } return ResultStatus.RESOURCE_NOT_EXIST; } catch (Exception e) { + LOGGER.error("class=LogicalClusterServiceImpl||method=getById||errMsg={}||logicalClusterId={}", e.getMessage(), logicalClusterId, e); return ResultStatus.MYSQL_ERROR; } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java index 227f9158..8f957b02 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java @@ -68,6 +68,8 @@ public class RegionServiceImpl implements RegionService { LOGGER.error("create region failed, newRegionDO:{}.", regionDO, e); return ResultStatus.MYSQL_ERROR; } + + LOGGER.warn("class=RegionServiceImpl||method=createRegion||regionDO={}||msg=create region failed", regionDO); return ResultStatus.MYSQL_ERROR; } @@ -107,6 +109,7 @@ public class RegionServiceImpl implements RegionService { if (regionDao.updateById(newRegionDO) > 0) { return ResultStatus.SUCCESS; } + LOGGER.warn("class=RegionServiceImpl||method=updateRegion||newRegionDO={}||msg=update region failed", newRegionDO); return ResultStatus.MYSQL_ERROR; } List newBrokerIdList = ListUtils.string2IntList(newRegionDO.getBrokerList()); @@ -125,6 +128,7 @@ public class RegionServiceImpl implements RegionService { } catch (Exception e) { LOGGER.error("update region failed, newRegionDO:{}", newRegionDO, e); } + LOGGER.warn("class=RegionServiceImpl||method=updateRegion||newRegionDO={}||msg=update region failed", newRegionDO); return ResultStatus.MYSQL_ERROR; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 9cc2d234..a953a50b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -65,9 +65,6 @@ public class TopicManagerServiceImpl implements TopicManagerService { @Autowired private LogicalClusterMetadataManager logicalClusterMetadataManager; - @Autowired - private LogicalClusterService logicalClusterService; - @Autowired private JmxService jmxService; @@ -77,6 +74,9 @@ public class TopicManagerServiceImpl implements TopicManagerService { @Autowired private ClusterService clusterService; + @Autowired + private RegionService regionService; + @Override public List listAll() { try { @@ -288,7 +288,6 @@ public class TopicManagerServiceImpl implements TopicManagerService { private List getTopics(ClusterDO clusterDO, Map appMap, Map topicMap) { - Boolean needAuth = !ValidateUtils.isBlank(clusterDO.getSecurityProperties()); List dtoList = new ArrayList<>(); for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) { LogicalClusterDO logicalClusterDO = logicalClusterMetadataManager.getTopicLogicalCluster( @@ -305,7 +304,7 @@ public class TopicManagerServiceImpl implements TopicManagerService { dto.setLogicalClusterId(logicalClusterDO.getId()); dto.setLogicalClusterName(logicalClusterDO.getName()); dto.setTopicName(topicName); - dto.setNeedAuth(needAuth); + dto.setNeedAuth(Boolean.TRUE); TopicDO topicDO = topicMap.get(topicName); if (ValidateUtils.isNull(topicDO)) { @@ -371,12 +370,14 @@ public class TopicManagerServiceImpl implements TopicManagerService { TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(physicalClusterId, topicName); if (ValidateUtils.isNull(topicMetaData)) { // Topic不存在 + LOGGER.warn("class=TopicManagerServiceImpl||method=getTopicAuthorizedApps||physicalClusterId={}||topicName={}||msg=topicMetaData is null", physicalClusterId,topicName); return new ArrayList<>(); } List authorityDOList = authorityService.getAuthorityByTopic(physicalClusterId, topicName); if (ValidateUtils.isEmptyList(authorityDOList)) { // 无任何权限 + LOGGER.warn("class=TopicManagerServiceImpl||method=getTopicAuthorizedApps||physicalClusterId={}||topicName={}||msg=authorityDOList is null", physicalClusterId,topicName); return new ArrayList<>(); } @@ -489,12 +490,17 @@ public class TopicManagerServiceImpl implements TopicManagerService { PhysicalClusterMetadataManager.getZKConfig(physicalClusterId), topicName ); + List regionDOList = regionService.getRegionListByTopicName(physicalClusterId, topicName); + List regionNameList = regionDOList.stream().map(RegionDO::getName).collect(Collectors.toList()); + TopicDO topicDO = getByTopicName(physicalClusterId, topicName); if (ValidateUtils.isNull(topicDO)) { - return new Result<>(convert2RdTopicBasic(clusterDO, topicName, null, null, properties)); + return new Result<>(convert2RdTopicBasic(clusterDO, topicName, null, null, regionNameList, properties)); } AppDO appDO = appService.getByAppId(topicDO.getAppId()); - return new Result<>(convert2RdTopicBasic(clusterDO, topicName, topicDO, appDO, properties)); + + + return new Result<>(convert2RdTopicBasic(clusterDO, topicName, topicDO, appDO, regionNameList, properties)); } @Override @@ -527,6 +533,7 @@ public class TopicManagerServiceImpl implements TopicManagerService { String topicName, TopicDO topicDO, AppDO appDO, + List regionNameList, Properties properties) { RdTopicBasic rdTopicBasic = new RdTopicBasic(); rdTopicBasic.setClusterId(clusterDO.getId()); @@ -539,6 +546,7 @@ public class TopicManagerServiceImpl implements TopicManagerService { if (!ValidateUtils.isNull(topicDO)) { rdTopicBasic.setDescription(topicDO.getDescription()); } + rdTopicBasic.setRegionNameList(regionNameList); rdTopicBasic.setProperties(properties); rdTopicBasic.setRetentionTime(KafkaZookeeperUtils.getTopicRetentionTime(properties)); return rdTopicBasic; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 5358a6a3..05a8c4e1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -13,7 +13,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; -import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; @@ -44,6 +43,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.*; +import java.util.stream.Collectors; /** * @author limeng @@ -80,6 +80,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private ClusterService clusterService; + @Autowired + private RegionService regionService; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -228,25 +231,10 @@ public class TopicServiceImpl implements TopicService { basicDTO.setPrincipals(appDO.getPrincipals()); } - LogicalClusterDO logicalClusterDO = logicalClusterMetadataManager.getTopicLogicalCluster(clusterId, topicName); - if (!ValidateUtils.isNull(logicalClusterDO)) { - basicDTO.setRegion(logicalClusterDO.getName()); - } + List regionDOList = regionService.getRegionListByTopicName(clusterId, topicName); + basicDTO.setRegionNameList(regionDOList.stream().map(RegionDO::getName).collect(Collectors.toList())); - TopicMetrics metrics = jmxService.getTopicMetrics( - clusterId, - topicName, - KafkaMetricsCollections.TOPIC_BASIC_PAGE_METRICS, - true - ); - - String compressionType = null; - if (!ValidateUtils.isNull(metrics)) { - compressionType = metrics.getSpecifiedMetrics("TopicCodeCValue", String.class); - } - basicDTO.setTopicCodeC( - ListUtils.strList2String(new ArrayList<>(new HashSet<>(ListUtils.string2StrList(compressionType)))) - ); + basicDTO.setTopicCodeC(jmxService.getTopicCodeCValue(clusterId, topicName)); basicDTO.setScore(100); return basicDTO; } @@ -469,6 +457,7 @@ public class TopicServiceImpl implements TopicService { return overview; } overview.setByteIn(metrics.getBytesInPerSecOneMinuteRate(null)); + overview.setByteOut(metrics.getBytesOutPerSecOneMinuteRate(null)); overview.setProduceRequest(metrics.getTotalProduceRequestsPerSecOneMinuteRate(null)); return overview; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java index 2440e78d..53e9a2ba 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java @@ -19,6 +19,9 @@ public class ConfigUtils { @Value(value = "${spring.profiles.active}") private String kafkaManagerEnv; + @Value(value = "${custom.store-metrics-task.save-days}") + private Integer maxMetricsSaveDays; + public String getIdc() { return idc; } @@ -42,4 +45,12 @@ public class ConfigUtils { public void setKafkaManagerEnv(String kafkaManagerEnv) { this.kafkaManagerEnv = kafkaManagerEnv; } + + public Integer getMaxMetricsSaveDays() { + return maxMetricsSaveDays; + } + + public void setMaxMetricsSaveDays(Integer maxMetricsSaveDays) { + this.maxMetricsSaveDays = maxMetricsSaveDays; + } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java index 1c62658f..58e5d98b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.utils; +import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; @@ -11,6 +12,8 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.kafka.common.errors.*; import org.apache.kafka.common.security.JaasUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConversions; import scala.collection.Seq; @@ -22,6 +25,9 @@ import java.util.*; * @date 20/4/22 */ public class TopicCommands { + private static final Logger LOGGER = LoggerFactory.getLogger(TopicCommands.class); + + public static ResultStatus createTopic(ClusterDO clusterDO, String topicName, Integer partitionNum, @@ -56,16 +62,28 @@ public class TopicCommands { false ); } catch (NullPointerException e) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + e.getMessage(), clusterDO, topicName, partitionNum, replicaNum, JSON.toJSONString(brokerIdList), config, e); return ResultStatus.TOPIC_OPERATION_PARAM_NULL_POINTER; } catch (InvalidPartitionsException e) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e); return ResultStatus.TOPIC_OPERATION_PARTITION_NUM_ILLEGAL; } catch (InvalidReplicationFactorException e) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e); return ResultStatus.BROKER_NUM_NOT_ENOUGH; } catch (TopicExistsException | ZkNodeExistsException e) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e); return ResultStatus.TOPIC_OPERATION_TOPIC_EXISTED; } catch (InvalidTopicException e) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e); return ResultStatus.TOPIC_OPERATION_TOPIC_NAME_ILLEGAL; } catch (Throwable t) { + LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}", + t.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, t); return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR; } finally { if (zkUtils != null) { @@ -86,10 +104,13 @@ public class TopicCommands { ); AdminUtils.deleteTopic(zkUtils, topicName); } catch (UnknownTopicOrPartitionException e) { + LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", e.getMessage(), clusterDO, topicName, e); return ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION; } catch (ZkNodeExistsException e) { + LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", e.getMessage(), clusterDO, topicName, e); return ResultStatus.TOPIC_OPERATION_TOPIC_IN_DELETING; } catch (Throwable t) { + LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", t.getMessage(), clusterDO, topicName, t); return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR; } finally { if (zkUtils != null) { @@ -108,13 +129,15 @@ public class TopicCommands { Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS, JaasUtils.isZkSecurityEnabled() ); - AdminUtils.changeTopicConfig(zkUtils, topicName, config); } catch (AdminOperationException e) { + LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", e.getMessage(), clusterDO, topicName,config, e); return ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION; } catch (InvalidConfigurationException e) { + LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", e.getMessage(), clusterDO, topicName,config, e); return ResultStatus.TOPIC_OPERATION_TOPIC_CONFIG_ILLEGAL; } catch (Throwable t) { + LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", t.getMessage(), clusterDO, topicName,config, t); return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR; } finally { if (zkUtils != null) { @@ -174,6 +197,8 @@ public class TopicCommands { true ); } catch (Throwable t) { + LOGGER.error("class=TopicCommands||method=expandTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||brokerIdList={}" + , t.getMessage(), clusterDO, topicName, partitionNum, JSON.toJSONString(brokerIdList), t); return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR; } finally { if (zkUtils != null) { diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java index 16764085..1010cc17 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java @@ -31,4 +31,6 @@ public interface TopicThrottledMetricsDao { List getAppIdThrottle(long clusterId, String appId, Date startTime, Date endTime); List getLatestTopicThrottledMetrics(Long clusterId, Date afterTime); + + int deleteBeforeTime(Date endTime); } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java index 38d48d70..784bc242 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java @@ -73,4 +73,9 @@ public class TopicThrottledMetricsDaoImpl implements TopicThrottledMetricsDao { } return new ArrayList<>(throttleMap.values()); } + + @Override + public int deleteBeforeTime(Date endTime) { + return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", endTime); + } } diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml index 1634223b..c5b6474d 100644 --- a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml @@ -54,4 +54,9 @@ AND gmt_create > #{afterTime} + + + \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java index 8a78dbe1..591e09ac 100644 --- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java @@ -76,6 +76,7 @@ public class AccountServiceImpl implements AccountService { } catch (Exception e) { LOGGER.error("create account failed, operate mysql failed, accountDO:{}.", accountDO, e); } + LOGGER.warn("class=AccountServiceImpl||method=createAccount||accountDO={}||msg=add account fail,{}!", accountDO,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @@ -88,6 +89,7 @@ public class AccountServiceImpl implements AccountService { } catch (Exception e) { LOGGER.error("delete account failed, username:{}.", username, e); } + LOGGER.warn("class=AccountServiceImpl||method=deleteByName||username={}||msg=delete account fail,{}!", username,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } @@ -110,6 +112,7 @@ public class AccountServiceImpl implements AccountService { } catch (Exception e) { LOGGER.error("update account failed, accountDO:{}.", accountDO, e); } + LOGGER.warn("class=AccountServiceImpl||method=updateAccount||accountDO={}||msg=update account fail,{}!", accountDO,ResultStatus.MYSQL_ERROR.getMessage()); return ResultStatus.MYSQL_ERROR; } diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/OrderTypeEnum.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/OrderTypeEnum.java index a2363652..6bd30997 100644 --- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/OrderTypeEnum.java +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/OrderTypeEnum.java @@ -6,23 +6,24 @@ package com.xiaojukeji.kafka.manager.bpm.common; * @date 19/6/23 */ public enum OrderTypeEnum { - APPLY_TOPIC (00, "Topic申请", "applyTopicOrder"), - DELETE_TOPIC (10, "Topic下线", "deleteTopicOrder"), + APPLY_TOPIC (00, "Topic申请", "applyTopicOrder"), + DELETE_TOPIC (10, "Topic下线", "deleteTopicOrder"), + THIRD_PART_DELETE_TOPIC (20, "第三方Topic下线申请", "thirdPartDeleteTopicOrder"), - APPLY_APP (01, "应用申请", "applyAppOrder"), - DELETE_APP (11, "应用下线", "deleteAppOrder"), + APPLY_APP (01, "应用申请", "applyAppOrder"), + DELETE_APP (11, "应用下线", "deleteAppOrder"), - APPLY_QUOTA (02, "配额申请", "applyQuotaOrder"), - APPLY_PARTITION (12, "分区申请", "applyPartitionOrder"), + APPLY_QUOTA (02, "配额申请", "applyQuotaOrder"), + APPLY_PARTITION (12, "分区申请", "applyPartitionOrder"), - APPLY_AUTHORITY (03, "权限申请", "applyAuthorityOrder"), - DELETE_AUTHORITY (13, "权限删除", "deleteAuthorityOrder"), + APPLY_AUTHORITY (03, "权限申请", "applyAuthorityOrder"), + DELETE_AUTHORITY (13, "权限删除", "deleteAuthorityOrder"), - APPLY_CLUSTER (04, "集群申请", "applyClusterOrder"), - DELETE_CLUSTER (14, "集群下线", "deleteClusterOrder"), + APPLY_CLUSTER (04, "集群申请", "applyClusterOrder"), + DELETE_CLUSTER (14, "集群下线", "deleteClusterOrder"), - APPLY_EXPAND_CLUSTER(05, "集群扩容", "modifyClusterOrder"), - APPLY_REDUCE_CLUSTER(15, "集群缩容", "modifyClusterOrder"), + APPLY_EXPAND_CLUSTER (05, "集群扩容", "modifyClusterOrder"), + APPLY_REDUCE_CLUSTER (15, "集群缩容", "modifyClusterOrder"), ; diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/OrderExtensionThirdPartDeleteTopicDTO.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/OrderExtensionThirdPartDeleteTopicDTO.java new file mode 100644 index 00000000..28e03952 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/OrderExtensionThirdPartDeleteTopicDTO.java @@ -0,0 +1,69 @@ +package com.xiaojukeji.kafka.manager.bpm.common.entry.apply; + +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; + +/** + * @author zengqiao + * @date 20/12/2 + */ +public class OrderExtensionThirdPartDeleteTopicDTO { + private Long clusterId; + + private String topicName; + + private String appId; + + private String password; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + @Override + public String toString() { + return "OrderExtensionThirdPartDeleteTopicDTO{" + + "clusterId=" + clusterId + + ", topicName='" + topicName + '\'' + + ", appId='" + appId + '\'' + + ", password='" + password + '\'' + + '}'; + } + + public boolean paramLegal() { + if (ValidateUtils.isNull(clusterId) + || ValidateUtils.isBlank(topicName) + || ValidateUtils.isBlank(appId) + || ValidateUtils.isBlank(password)) { + return false; + } + return true; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/component/LocalStorageService.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/component/LocalStorageService.java index 4b010dc9..0a308a41 100644 --- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/component/LocalStorageService.java +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/component/LocalStorageService.java @@ -3,11 +3,8 @@ package com.xiaojukeji.kafka.manager.bpm.component; import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO; -import com.xiaojukeji.kafka.manager.common.events.OrderApplyEvent; -import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.OrderDao; -import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,17 +24,12 @@ public class LocalStorageService extends AbstractOrderStorageService { @Autowired private OrderDao orderDao; - @Autowired - private ConfigUtils configUtils; - @Override public ResultStatus directSaveHandledOrder(OrderDO orderDO) { try { if (orderDao.directSaveHandledOrder(orderDO) <= 0) { return ResultStatus.MYSQL_ERROR; } -// 无需进行通知 -// SpringTool.publish(new OrderApplyEvent(this, orderDO, configUtils.getIdc())); return ResultStatus.SUCCESS; } catch (Exception e) { LOGGER.error("add order failed, orderDO:{}.", orderDO, e); @@ -52,7 +44,6 @@ public class LocalStorageService extends AbstractOrderStorageService { return false; } - SpringTool.publish(new OrderApplyEvent(this, orderDO, configUtils.getIdc())); return true; } catch (Exception e) { LOGGER.error("add order failed, orderDO:{}.", orderDO, e); diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/impl/OrderServiceImpl.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/impl/OrderServiceImpl.java index f95273e0..71c05163 100644 --- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/impl/OrderServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/impl/OrderServiceImpl.java @@ -261,6 +261,14 @@ public class OrderServiceImpl implements OrderService { resultList.add(new OrderResult(id, Result.buildFrom(ResultStatus.ORDER_NOT_EXIST))); continue; } + // topic申请、topic分区申请不支持批量审批通过. + if (orderDO.getType().equals(OrderTypeEnum.APPLY_TOPIC.getCode()) + || orderDO.getType().equals(OrderTypeEnum.APPLY_PARTITION.getCode())) { + if (OrderStatusEnum.PASSED.getCode().equals(reqObj.getStatus())) { + continue; + } + } + orderDOList.add(orderDO); } // 根据创建时间排序 diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/order/impl/ThirdPartDeleteTopicOrder.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/order/impl/ThirdPartDeleteTopicOrder.java new file mode 100644 index 00000000..ec98ced7 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/order/impl/ThirdPartDeleteTopicOrder.java @@ -0,0 +1,164 @@ +package com.xiaojukeji.kafka.manager.bpm.order.impl; + +import com.alibaba.fastjson.JSONObject; +import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum; +import com.xiaojukeji.kafka.manager.bpm.common.entry.apply.OrderExtensionThirdPartDeleteTopicDTO; +import com.xiaojukeji.kafka.manager.bpm.common.entry.detail.AbstractOrderDetailData; +import com.xiaojukeji.kafka.manager.bpm.common.entry.detail.OrderDetailDeleteTopicDTO; +import com.xiaojukeji.kafka.manager.bpm.common.handle.OrderHandleBaseDTO; +import com.xiaojukeji.kafka.manager.bpm.order.AbstractTopicOrder; +import com.xiaojukeji.kafka.manager.common.constant.Constant; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; +import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.AdminService; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; +import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; + +/** + * @author zengqiao + * @date 20/12/2 + */ +@Component("thirdPartDeleteTopicOrder") +public class ThirdPartDeleteTopicOrder extends AbstractTopicOrder { + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @Autowired + private AppService appService; + + @Autowired + private ClusterService clusterService; + + @Autowired + private AdminService adminService; + + @Autowired + private TopicManagerService topicManagerService; + + @Autowired + private TopicConnectionService connectionService; + + @Override + public AbstractOrderDetailData getOrderExtensionDetailData(String extensions) { + OrderDetailDeleteTopicDTO orderDetailDTO = new OrderDetailDeleteTopicDTO(); + OrderExtensionThirdPartDeleteTopicDTO orderExtensionDTO = JSONObject.parseObject( + extensions, + OrderExtensionThirdPartDeleteTopicDTO.class); + orderDetailDTO.setTopicName(orderExtensionDTO.getTopicName()); + ClusterDO clusterDO = clusterService.getById(orderExtensionDTO.getClusterId()); + if (!ValidateUtils.isNull(clusterDO)) { + orderDetailDTO.setPhysicalClusterId(clusterDO.getId()); + orderDetailDTO.setPhysicalClusterName(clusterDO.getClusterName()); + } + + List connectionDTOList = connectionService.getByTopicName( + clusterDO.getId(), + orderExtensionDTO.getTopicName(), + new Date(System.currentTimeMillis() - Constant.TOPIC_CONNECTION_LATEST_TIME_MS), + new Date()); + orderDetailDTO.setConnectionList(connectionDTOList); + + TopicDO topicDO = topicManagerService.getByTopicName(clusterDO.getId(), orderExtensionDTO.getTopicName()); + if (ValidateUtils.isNull(topicDO)) { + return orderDetailDTO; + } + + AppDO appDO = appService.getByAppId(topicDO.getAppId()); + if (ValidateUtils.isNull(appDO)) { + return orderDetailDTO; + } + orderDetailDTO.setAppId(appDO.getAppId()); + orderDetailDTO.setAppName(appDO.getName()); + orderDetailDTO.setAppPrincipals(appDO.getPrincipals()); + return orderDetailDTO; + } + + @Override + public Result checkExtensionFieldsAndGenerateTitle(String extensions) { + OrderExtensionThirdPartDeleteTopicDTO orderExtensionDTO = JSONObject.parseObject( + extensions, + OrderExtensionThirdPartDeleteTopicDTO.class); + if (!orderExtensionDTO.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(orderExtensionDTO.getClusterId(), true); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + if (!PhysicalClusterMetadataManager.isTopicExist(physicalClusterId, orderExtensionDTO.getTopicName())) { + return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST); + } + AppDO appDO = appService.getByAppId(orderExtensionDTO.getAppId()); + if (ValidateUtils.isNull(appDO)) { + return Result.buildFrom(ResultStatus.APP_NOT_EXIST); + } + if (!appDO.getPassword().equals(orderExtensionDTO.getPassword())) { + return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY); + } + + String title = String.format( + "%s-%d-%s", + OrderTypeEnum.DELETE_TOPIC.getMessage(), + orderExtensionDTO.getClusterId(), + orderExtensionDTO.getTopicName() + ); + return new Result<>(title); + } + + @Override + public ResultStatus handleOrderDetail(OrderDO orderDO, + OrderHandleBaseDTO orderHandleBaseDTO, + String userName) { + OrderExtensionThirdPartDeleteTopicDTO extensionDTO = JSONObject.parseObject(orderDO.getExtensions(), + OrderExtensionThirdPartDeleteTopicDTO.class); + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(extensionDTO.getClusterId(), true); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + ClusterDO clusterDO = clusterService.getById(physicalClusterId); + if (!PhysicalClusterMetadataManager.isTopicExistStrictly(physicalClusterId, extensionDTO.getTopicName())) { + return ResultStatus.TOPIC_NOT_EXIST; + } + if (connectionService.isExistConnection( + physicalClusterId, + extensionDTO.getTopicName(), + new Date(System.currentTimeMillis() - Constant.TOPIC_CONNECTION_LATEST_TIME_MS), + new Date()) + ) { + return ResultStatus.OPERATION_FORBIDDEN; + } + + // 检查申请人是否在应用负责人里面 + AppDO appDO = appService.getByAppId(extensionDTO.getAppId()); + if (ValidateUtils.isNull(appDO)) { + return ResultStatus.APP_NOT_EXIST; + } + if (!appDO.getPassword().equals(extensionDTO.getPassword()) + || !ListUtils.string2StrList(appDO.getPrincipals()).contains(orderDO.getApplicant())) { + // 密码错误 or 申请人不在应用负责人里面, 则返回错误 + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + + ResultStatus resultStatus = adminService.deleteTopic(clusterDO, extensionDTO.getTopicName(), userName); + if (!ResultStatus.SUCCESS.equals(resultStatus)) { + return resultStatus; + } + return resultStatus; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java index 90e347e0..f1f4b586 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java @@ -94,6 +94,7 @@ public class N9e extends AbstractAgent { ); N9eResult zr = JSON.parseObject(response, N9eResult.class); if (!ValidateUtils.isBlank(zr.getErr())) { + LOGGER.warn("class=N9e||method=createTask||param={}||errMsg={}||msg=call create task fail", JsonUtils.toJSONString(param),zr.getErr()); return null; } return Long.valueOf(zr.getDat().toString()); @@ -110,7 +111,7 @@ public class N9e extends AbstractAgent { String response = null; try { - response = HttpUtils.postForString( + response = HttpUtils.putForString( baseUrl + ACTION_TASK_URI.replace("{taskId}", taskId.toString()), JSON.toJSONString(param), buildHeader() @@ -119,6 +120,7 @@ public class N9e extends AbstractAgent { if (ValidateUtils.isBlank(zr.getErr())) { return true; } + LOGGER.warn("class=N9e||method=actionTask||param={}||errMsg={}||msg=call action task fail", JSON.toJSONString(param),zr.getErr()); return false; } catch (Exception e) { LOGGER.error("action task failed, taskId:{}, action:{}.", taskId, action, e); @@ -134,7 +136,7 @@ public class N9e extends AbstractAgent { String response = null; try { - response = HttpUtils.postForString( + response = HttpUtils.putForString( baseUrl + ACTION_HOST_TASK_URI.replace("{taskId}", taskId.toString()), JSON.toJSONString(param), buildHeader() @@ -143,6 +145,7 @@ public class N9e extends AbstractAgent { if (ValidateUtils.isBlank(zr.getErr())) { return true; } + LOGGER.warn("class=N9e||method=actionHostTask||param={}||errMsg={}||msg=call action host task fail", JSON.toJSONString(param),zr.getErr()); return false; } catch (Exception e) { LOGGER.error("action task failed, taskId:{} action:{} hostname:{}.", taskId, action, hostname, e); @@ -265,6 +268,7 @@ public class N9e extends AbstractAgent { while ((line = bufferedReader.readLine()) != null) { stringBuilder.append(line); + stringBuilder.append("\n"); } return stringBuilder.toString(); } catch (IOException e) { diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java index 66d5fe04..40841de4 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.local; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService; import org.springframework.web.multipart.MultipartFile; @@ -12,6 +13,9 @@ import org.springframework.web.multipart.MultipartFile; */ @Service("storageService") public class Local extends AbstractStorageService { + @Value("${kcm.storage.base-url}") + private String baseUrl; + @Override public boolean upload(String fileName, String fileMd5, MultipartFile uploadFile) { return false; @@ -24,6 +28,6 @@ public class Local extends AbstractStorageService { @Override public String getDownloadBaseUrl() { - return ""; + return baseUrl; } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java index 037b2f79..f97510fd 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java @@ -56,6 +56,7 @@ public class KafkaFileServiceImpl implements KafkaFileService { } return ResultStatus.SUCCESS; } catch (DuplicateKeyException e) { + LOGGER.error("class=KafkaFileServiceImpl||method=uploadKafkaFile||errMsg={}||kafkaFileDTO={}||username={}", e.getMessage(), kafkaFileDTO, username, e); return ResultStatus.RESOURCE_ALREADY_EXISTED; } catch (Exception e) { LOGGER.error("upload kafka file failed, kafkaFileDTO:{}.", kafkaFileDTO, e); @@ -93,6 +94,7 @@ public class KafkaFileServiceImpl implements KafkaFileService { return ResultStatus.MYSQL_ERROR; } } catch (DuplicateKeyException e) { + LOGGER.error("class=KafkaFileServiceImpl||method=modifyKafkaFile||errMsg={}||kafkaFileDTO={}||userName={}", e.getMessage(), kafkaFileDTO, userName, e); return ResultStatus.RESOURCE_NAME_DUPLICATED; } catch (Exception e) { LOGGER.error("modify kafka file failed, kafkaFileDTO:{}.", kafkaFileDTO, e); diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/tasks/ClusterHostTaskService.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/tasks/ClusterHostTaskService.java index bea4b37a..f9999768 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/tasks/ClusterHostTaskService.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/tasks/ClusterHostTaskService.java @@ -24,7 +24,7 @@ public class ClusterHostTaskService extends AbstractClusterTaskService { CreationTaskData dto = new CreationTaskData(); for (String hostname: clusterHostTaskDTO.getHostList()) { if (!NetUtils.hostnameLegal(hostname)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + return Result.buildFrom(ResultStatus.CLUSTER_TASK_HOST_LIST_ILLEGAL); } } dto.setHostList(clusterHostTaskDTO.getHostList()); diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh index e369bee8..ffd54a20 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh @@ -19,12 +19,13 @@ p_kafka_server_properties_md5=${8} #server配置MD5 p_kafka_server_properties_url=${9} #server配置文件下载地址 #----------------------------------------配置信息------------------------------------------------------# -g_hostname=`hostname` -g_base_dir='/home/km' +g_base_dir='/home' g_cluster_task_dir=${g_base_dir}"/kafka_cluster_task/task_${p_task_id}" #部署升级路径 g_rollback_version=${g_cluster_task_dir}"/rollback_version" #回滚版本 g_new_kafka_package_name='' #最终的包名 g_kafka_manager_addr='' #kafka-manager地址 +g_local_ip=`ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"` +g_hostname=${g_local_ip} #----------------------------------------操作函数------------------------------------------------------# @@ -71,11 +72,11 @@ function check_and_init_env() { # 检查并等待集群所有的副本处于同步的状态 function check_and_wait_broker_stabled() { - under_replication_count=`curl -s -G -d "hostname="#{g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` + under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` while [ "$under_replication_count" -ne 1 ]; do ECHO_LOG "存在${under_replication_count}个副本未同步, sleep 10s" sleep 10 - under_replication_count=`curl -s ${g_kafka_manager_addr}/api/v1/${p_cluster_id}/overview | python -m json.tool | grep false |wc -l` + under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` done ECHO_LOG "集群副本都已经处于同步的状态, 可以进行集群升级" } @@ -137,6 +138,9 @@ function prepare_cluster_task_files() { exit 1 fi + # listeners配置,换成当前机器的IP,写到server.properties最后一行 + echo "listeners=SASL_PLAINTEXT://${g_local_ip}:9093,PLAINTEXT://${g_local_ip}:9092" >> "${g_cluster_task_dir}/${p_kafka_package_name}/config/server.properties" + # 将MD5信息写到包中 echo "package_md5:${p_kafka_package_md5} server_properties_md5:${p_kafka_package_md5}" > "${g_cluster_task_dir}/${p_kafka_package_name}/package_and_properties.md5" } diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java index 10e0bfdf..d350af3a 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -132,14 +132,12 @@ public class ThirdPartServiceImpl implements ThirdPartService { if (ValidateUtils.isNull(dto)) { return null; } - List offsetDTOList = dto.getPartitionOffsetDTOList(); - if (ValidateUtils.isEmptyList(offsetDTOList)) { - offsetDTOList = topicService.getPartitionOffsetList( - clusterDO, dto.getTopicName(), dto.getTimestamp()); - } + + List offsetDTOList = this.getPartitionOffsetDTOList(clusterDO, dto); if (ValidateUtils.isEmptyList(offsetDTOList)) { return null; } + OffsetLocationEnum offsetLocation = dto.getLocation().equals( OffsetLocationEnum.ZOOKEEPER.location) ? OffsetLocationEnum.ZOOKEEPER : OffsetLocationEnum.BROKER; ResultStatus result = checkConsumerGroupExist(clusterDO, dto.getTopicName(), dto.getConsumerGroup(), offsetLocation, dto.getCreateIfAbsent()); @@ -160,6 +158,39 @@ public class ThirdPartServiceImpl implements ThirdPartService { ); } + private List getPartitionOffsetDTOList(ClusterDO clusterDO, OffsetResetDTO dto) { + List offsetDTOList = dto.getPartitionOffsetDTOList(); + if (!ValidateUtils.isEmptyList(offsetDTOList)) { + return offsetDTOList; + } + + offsetDTOList = topicService.getPartitionOffsetList(clusterDO, dto.getTopicName(), dto.getTimestamp()); + if (!ValidateUtils.isEmptyList(offsetDTOList)) { + return offsetDTOList; + } + + Map endOffsetMap = topicService.getPartitionOffset(clusterDO, dto.getTopicName(), OffsetPosEnum.END); + if (ValidateUtils.isEmptyMap(endOffsetMap)) { + return new ArrayList<>(); + } + + Map beginOffsetMap = topicService.getPartitionOffset(clusterDO, dto.getTopicName(), OffsetPosEnum.BEGINNING); + if (ValidateUtils.isEmptyMap(beginOffsetMap)) { + return new ArrayList<>(); + } + + offsetDTOList = new ArrayList<>(); + for (Map.Entry entry: endOffsetMap.entrySet()) { + Long beginOffset = beginOffsetMap.get(entry.getKey()); + if (ValidateUtils.isNull(beginOffset) || !beginOffset.equals(entry.getValue())) { + // offset 不相等, 表示还有数据, 则直接返回 + return new ArrayList<>(); + } + offsetDTOList.add(new PartitionOffsetDTO(entry.getKey().partition(), entry.getValue())); + } + return offsetDTOList; + } + private ResultStatus checkConsumerGroupExist(ClusterDO clusterDO, String topicName, String consumerGroup, diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreCommunityTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java similarity index 86% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreCommunityTopicMetrics.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java index 7a4a3f96..07a137e6 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreCommunityTopicMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store; +package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; @@ -16,12 +16,12 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.*; /** - * Topic社区指标存储 + * Topic社区指标收集 * @author zengqiao * @date 20/7/21 */ -@CustomScheduled(name = "storeCommunityTopicMetrics", cron = "31 0/1 * * * ?", threadNum = 5) -public class StoreCommunityTopicMetrics extends AbstractScheduledTask { +@CustomScheduled(name = "collectAndPublishCommunityTopicMetrics", cron = "31 0/1 * * * ?", threadNum = 5) +public class CollectAndPublishCommunityTopicMetrics extends AbstractScheduledTask { @Autowired private JmxService jmxService; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java index 6a37edd2..b8632971 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete; +import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; import com.xiaojukeji.kafka.manager.dao.*; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; @@ -19,27 +20,30 @@ import java.util.List; * @author zengqiao * @date 20/1/8 */ -@CustomScheduled(name = "deleteMetrics", cron = "0 0/1 * * * ?", threadNum = 1) +@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1) public class DeleteMetrics extends AbstractScheduledTask { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); @Autowired - private TopicMetricsDao topicMetricsDao; + private ConfigUtils configUtils; @Autowired - private TopicAppMetricsDao topicAppMetricsDao; + private TopicMetricsDao topicMetricsDao; @Autowired - private TopicRequestMetricsDao topicRequestMetricsDao; + private TopicAppMetricsDao topicAppMetricsDao; @Autowired - private BrokerMetricsDao brokerMetricsDao; + private TopicRequestMetricsDao topicRequestMetricsDao; @Autowired - private ClusterMetricsDao clusterMetricsDao; + private BrokerMetricsDao brokerMetricsDao; @Autowired - private ConfigUtils configUtils; + private ClusterMetricsDao clusterMetricsDao; + + @Autowired + private TopicThrottledMetricsDao topicThrottledMetricsDao; @Override public List listAllTasks() { @@ -50,8 +54,8 @@ public class DeleteMetrics extends AbstractScheduledTask { @Override public void processTask(EmptyEntry entryEntry) { - if (!"dev".equals(configUtils.getKafkaManagerEnv())) { - // 非预发&线上环境直接跳过 + if (Constant.INVALID_CODE.equals(configUtils.getMaxMetricsSaveDays())) { + // 无需数据删除 return; } @@ -75,6 +79,12 @@ public class DeleteMetrics extends AbstractScheduledTask { LOGGER.error("delete topic request metrics failed.", e); } + try { + deleteThrottledMetrics(); + } catch (Exception e) { + LOGGER.error("delete topic throttled metrics failed.", e); + } + try { deleteBrokerMetrics(); } catch (Exception e) { @@ -90,27 +100,32 @@ public class DeleteMetrics extends AbstractScheduledTask { } private void deleteTopicMetrics() { - Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000); + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); topicMetricsDao.deleteBeforeTime(endTime); } private void deleteTopicAppMetrics() { - Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000); + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); topicAppMetricsDao.deleteBeforeTime(endTime); } private void deleteTopicRequestMetrics() { - Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000); + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); topicRequestMetricsDao.deleteBeforeTime(endTime); } + private void deleteThrottledMetrics() { + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); + topicThrottledMetricsDao.deleteBeforeTime(endTime); + } + private void deleteBrokerMetrics() { - Date endTime = new Date(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000); + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); brokerMetricsDao.deleteBeforeTime(endTime); } private void deleteClusterMetrics() { - Date endTime = new Date(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000); + Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000); clusterMetricsDao.deleteBeforeTime(endTime); } } \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java index 6ecdb0a4..50f5f633 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java @@ -23,6 +23,7 @@ import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import java.util.ArrayList; import java.util.List; @@ -34,6 +35,7 @@ import java.util.Map; * @date 20/5/7 */ @CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2) +@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreBrokerMetrics extends AbstractScheduledTask { private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java index bf7681b9..ede6525d 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java @@ -16,6 +16,7 @@ import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.util.*; @@ -25,6 +26,7 @@ import java.util.*; * @date 20/7/21 */ @CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5) +@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java index 4644d148..c4caa229 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java @@ -16,6 +16,7 @@ import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import java.util.*; @@ -24,6 +25,7 @@ import java.util.*; * @date 20/7/21 */ @CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5) +@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java index 66308891..5db8a679 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java @@ -69,6 +69,7 @@ public class AutoHandleTopicOrder extends AbstractScheduledTask { return ; } + Integer maxPassedOrderNumPerTask = configService.getAutoPassedTopicApplyOrderNumPerTask(); for (OrderDO orderDO: doList) { if (!OrderTypeEnum.APPLY_TOPIC.getCode().equals(orderDO.getType())) { continue; @@ -77,7 +78,11 @@ public class AutoHandleTopicOrder extends AbstractScheduledTask { if (!handleApplyTopicOrder(orderDO)) { continue; } - return; + maxPassedOrderNumPerTask -= 1; + if (maxPassedOrderNumPerTask <= 0) { + return; + } + LOGGER.info("class=AutoHandleTopicOrder||method=processTask||msg=passed id:{}", orderDO.getId()); } catch (Exception e) { LOGGER.error("handle apply topic order failed, orderDO:{}.", orderDO, e); } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/SinkCommunityTopicMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/SinkCommunityTopicMetrics2Monitor.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java index 2256cc70..e8df775b 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/SinkCommunityTopicMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store; +package com.xiaojukeji.kafka.manager.task.listener; import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java index f4abc5fd..0c0714f7 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java @@ -11,6 +11,7 @@ import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -22,6 +23,7 @@ import java.util.List; * @date 20/9/1 */ @Component("storeCommunityTopicMetrics2DB") +@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreCommunityTopicMetrics2DB implements ApplicationListener { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java index e89c5abb..4e34e732 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java @@ -11,6 +11,7 @@ import com.xiaojukeji.kafka.manager.task.common.TopicThrottledMetricsCollectedEv import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -21,6 +22,7 @@ import java.util.*; * @date 20/9/24 */ @Component("storeTopicThrottledMetrics2DB") +@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics", havingValue = "true", matchIfMissing = true) public class StoreTopicThrottledMetrics2DB implements ApplicationListener { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java index b2ac569c..e2c63e06 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java @@ -31,9 +31,6 @@ public class FlushClusterMetadata { Set oldClusterIdSet = physicalClusterMetadataManager.getClusterIdSet(); for (ClusterDO clusterDO: doList) { newClusterIdSet.add(clusterDO.getId()); - if (oldClusterIdSet.contains(clusterDO.getId())) { - continue; - } // 添加集群 physicalClusterMetadataManager.addNew(clusterDO); diff --git a/kafka-manager-web/assembly.xml b/kafka-manager-web/assembly.xml new file mode 100644 index 00000000..8d2eeed6 --- /dev/null +++ b/kafka-manager-web/assembly.xml @@ -0,0 +1,44 @@ + + bin + + dir + tar.gz + + + + + bin/* + + 0755 + + + + ../docs/install_guide + install + + * + + + + + src/main/resources/ + conf + + application.yml + logback-spring.xml + + + + + ${project.build.directory} + libs + + *.jar + + + + + \ No newline at end of file diff --git a/kafka-manager-web/pom.xml b/kafka-manager-web/pom.xml index 17875676..9970a0bf 100644 --- a/kafka-manager-web/pom.xml +++ b/kafka-manager-web/pom.xml @@ -122,6 +122,26 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java index 78f5ac4a..1f86c48b 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; @@ -15,6 +16,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.util.List; + /** * @author zengqiao * @date 20/7/6 @@ -35,15 +38,22 @@ public class GatewayHeartbeatController { public Result receiveTopicConnections(@RequestParam("clusterId") Long clusterId, @RequestParam("brokerId") Integer brokerId, @RequestBody JSONObject jsonObject) { - try { - if (ValidateUtils.isNull(jsonObject) || jsonObject.isEmpty()) { - return Result.buildSuc(); - } - topicConnectionService.batchAdd(JsonUtils.parseTopicConnections(clusterId, jsonObject)); + if (ValidateUtils.isNull(jsonObject) || jsonObject.isEmpty()) { + LOGGER.info("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||msg=connections empty!", clusterId, brokerId); return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error("receive topic connections failed, clusterId:{} brokerId:{} req:{}", clusterId, brokerId, jsonObject, e); } - return Result.buildFailure("fail"); + + LOGGER.info("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||size={}||msg=receive connections", clusterId, brokerId, jsonObject.size()); + + List doList = null; + try { + doList = JsonUtils.parseTopicConnections(clusterId, jsonObject); + } catch (Exception e) { + LOGGER.error("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||msg=parse data failed||exception={}", clusterId, brokerId, e.getMessage()); + return Result.buildFailure("fail"); + } + + topicConnectionService.batchAdd(doList); + return Result.buildSuc(); } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java index 0d4e899b..e490368d 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java @@ -14,6 +14,8 @@ import com.xiaojukeji.kafka.manager.service.service.gateway.GatewayConfigService import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -29,6 +31,9 @@ import java.util.Map; @RestController @RequestMapping(ApiPrefix.GATEWAY_API_V1_PREFIX) public class GatewayServiceDiscoveryController { + + private final static Logger LOGGER = LoggerFactory.getLogger(GatewayHeartbeatController.class); + @Autowired private GatewayConfigService gatewayConfigService; @@ -38,6 +43,7 @@ public class GatewayServiceDiscoveryController { @ResponseBody public String getKafkaBootstrapServer(@RequestParam("clusterId") Long clusterId) { if (ValidateUtils.isNull(clusterId)) { + LOGGER.warn("class=GatewayServiceDiscoveryController||method=getKafkaBootstrapServer||msg=param clusterId is null!"); return ""; } GatewayConfigDO configDO = gatewayConfigService.getByTypeAndName( @@ -45,6 +51,7 @@ public class GatewayServiceDiscoveryController { String.valueOf(clusterId) ); if (ValidateUtils.isNull(configDO)) { + LOGGER.info("class=GatewayServiceDiscoveryController||method=getKafkaBootstrapServer||msg=configDO is null!"); return ""; } return configDO.getValue(); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java index afca917e..91a0dbaf 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java @@ -9,8 +9,11 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.common.AccountSummaryVO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.web.api.versionone.gateway.GatewayHeartbeatController; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -25,6 +28,9 @@ import java.util.List; @RestController @RequestMapping(ApiPrefix.API_V1_NORMAL_PREFIX) public class NormalAccountController { + + private final static Logger LOGGER = LoggerFactory.getLogger(NormalAccountController.class); + @Autowired private AccountService accountService; @@ -34,6 +40,8 @@ public class NormalAccountController { public Result> searchOnJobStaffByKeyWord(@RequestParam("keyWord") String keyWord) { List staffList = accountService.searchAccountByPrefix(keyWord); if (ValidateUtils.isEmptyList(staffList)) { + LOGGER.info("class=NormalAccountController||method=searchOnJobStaffByKeyWord||keyWord={}||msg=staffList is empty!" + ,keyWord); return new Result<>(); } List voList = new ArrayList<>(); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAppController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAppController.java index e0e231e8..cae8c537 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAppController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAppController.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.normal; +import com.xiaojukeji.kafka.manager.account.AccountService; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; @@ -8,6 +9,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.AppTopicDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.AppDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.QuotaVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.app.AppTopicAuthorityVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.app.AppTopicVO; @@ -45,6 +47,9 @@ public class NormalAppController { @Autowired private AppService appService; + @Autowired + private AccountService accountService; + @Autowired private QuotaService quotaService; @@ -71,9 +76,16 @@ public class NormalAppController { @RequestMapping(value = "apps/{appId}/basic-info", method = RequestMethod.GET) @ResponseBody public Result getAppBasicInfo(@PathVariable String appId) { - return new Result<>(AppConverter.convert2AppVO( - appService.getByAppId(appId)) - ); + if (accountService.isAdminOrderHandler(SpringTool.getUserName())) { + return new Result<>(AppConverter.convert2AppVO(appService.getByAppId(appId))); + } + + AppDO appDO = appService.getAppByUserAndId(appId, SpringTool.getUserName()); + if (appDO == null) { + return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY); + } + + return new Result<>(AppConverter.convert2AppVO(appDO)); } @ApiOperation(value = "App修改", notes = "") diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalConsumerController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalConsumerController.java index e43bc445..a0decf49 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalConsumerController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalConsumerController.java @@ -79,6 +79,7 @@ public class NormalConsumerController { @RequestParam("location") String location, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) { if (ValidateUtils.isNull(location)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index c414045f..efc0eec8 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; @@ -14,6 +15,7 @@ import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO; +import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxAttributeEnum; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; @@ -134,18 +136,27 @@ public class NormalTopicController { public Result> getTopicRequestMetrics( @PathVariable Long clusterId, @PathVariable String topicName, - @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) { + @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId, + @RequestParam(value = "percentile", required = false, defaultValue = "75thPercentile") String percentile) { Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); if (ValidateUtils.isNull(physicalClusterId)) { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); } + + Boolean isPercentileLegal = Arrays.stream(JmxAttributeEnum.PERCENTILE_ATTRIBUTE.getAttribute()) + .anyMatch(percentile::equals); + + if (!isPercentileLegal) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + BaseMetrics metrics = topicService.getTopicMetricsFromJMX( physicalClusterId, topicName, KafkaMetricsCollections.TOPIC_REQUEST_TIME_DETAIL_PAGE_METRICS, false ); - return new Result<>(TopicModelConverter.convert2TopicRequestTimeDetailVOList(metrics)); + return new Result<>(TopicModelConverter.convert2TopicRequestTimeDetailVOList(metrics, percentile)); } @ApiOperation(value = "Topic历史请求耗时信息", notes = "") @@ -184,14 +195,26 @@ public class NormalTopicController { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); } - return new Result<>(TopicModelConverter.convert2TopicConnectionVOList( - connectionService.getByTopicName( - physicalClusterId, - topicName, - new Date(System.currentTimeMillis() - Constant.TOPIC_CONNECTION_LATEST_TIME_MS), - new Date() - ) - )); + List connections; + + if (ValidateUtils.isBlank(appId)) { + connections = connectionService.getByTopicName( + physicalClusterId, + topicName, + new Date(System.currentTimeMillis() - Constant.TOPIC_CONNECTION_LATEST_TIME_MS), + new Date() + ); + } else { + connections = connectionService.getByTopicName( + physicalClusterId, + topicName, + appId, + new Date(System.currentTimeMillis() - Constant.TOPIC_CONNECTION_LATEST_TIME_MS), + new Date() + ); + } + + return new Result<>(TopicModelConverter.convert2TopicConnectionVOList(connections)); } @ApiOperation(value = "Topic分区信息", notes = "") diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdTopicController.java index db4eeb86..a8e1857d 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdTopicController.java @@ -70,6 +70,7 @@ public class RdTopicController { RdTopicBasicVO vo = new RdTopicBasicVO(); CopyUtils.copyProperties(vo, result.getData()); vo.setProperties(result.getData().getProperties()); + vo.setRegionNameList(result.getData().getRegionNameList()); return new Result<>(vo); } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/RestTemplateConfig.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/RestTemplateConfig.java new file mode 100644 index 00000000..f4a02455 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/RestTemplateConfig.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.kafka.manager.web.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * author: mrazkonglingxu + * Date: 2020/12/2 + * Time: 10:48 上午 + */ +@Configuration +public class RestTemplateConfig { + + @Bean + public ClientHttpRequestFactory simpleClientHttpRequestFactory() { + SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); + factory.setConnectTimeout(5000); + factory.setReadTimeout(5000); + return factory; + } + + @Bean + public RestTemplate restTemplate(ClientHttpRequestFactory factory) { + return new RestTemplate(factory); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java index 59e02a52..db9171ee 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java @@ -40,6 +40,7 @@ public class TopicModelConverter { if (!ValidateUtils.isNull(clusterDO)) { vo.setBootstrapServers(clusterDO.getBootstrapServers()); } + vo.setRegionNameList(dto.getRegionNameList()); return vo; } @@ -107,6 +108,54 @@ public class TopicModelConverter { return Arrays.asList(produceVO, fetchVO); } + public static List convert2TopicRequestTimeDetailVOList(BaseMetrics metrics, String percentile) { + if (ValidateUtils.isNull(metrics)) { + return new ArrayList<>(); + } + TopicRequestTimeDetailVO produceVO = new TopicRequestTimeDetailVO(); + produceVO.setRequestTimeType("RequestProduceTime"); + fillTopicProduceTime(produceVO, metrics, percentile); + + TopicRequestTimeDetailVO fetchVO = new TopicRequestTimeDetailVO(); + fetchVO.setRequestTimeType("RequestFetchTime"); + fillTopicFetchTime(fetchVO, metrics, percentile); + + TopicMetrics topicMetrics = (TopicMetrics) metrics; + if (!ValidateUtils.isEmptyList(topicMetrics.getBrokerMetricsList())) { + List brokerProduceTimeList = new ArrayList<>(); + List brokerFetchTimeList = new ArrayList<>(); + topicMetrics.getBrokerMetricsList().forEach(brokerMetrics -> { + TopicBrokerRequestTimeVO topicBrokerProduceReq = new TopicBrokerRequestTimeVO(); + topicBrokerProduceReq.setClusterId(brokerMetrics.getClusterId()); + topicBrokerProduceReq.setBrokerId(brokerMetrics.getBrokerId()); + + TopicRequestTimeDetailVO brokerProduceVO = new TopicRequestTimeDetailVO(); + brokerProduceVO.setRequestTimeType("BrokerRequestProduceTime"); + fillTopicProduceTime(brokerProduceVO, brokerMetrics, percentile); + + topicBrokerProduceReq.setBrokerRequestTime(brokerProduceVO); + + TopicBrokerRequestTimeVO topicBrokerFetchReq = new TopicBrokerRequestTimeVO(); + topicBrokerFetchReq.setClusterId(brokerMetrics.getClusterId()); + topicBrokerFetchReq.setBrokerId(brokerMetrics.getBrokerId()); + + TopicRequestTimeDetailVO brokerFetchVO = new TopicRequestTimeDetailVO(); + brokerProduceVO.setRequestTimeType("BrokerRequestFetchTime"); + fillTopicFetchTime(brokerFetchVO, brokerMetrics, percentile); + + topicBrokerFetchReq.setBrokerRequestTime(brokerFetchVO); + + brokerProduceTimeList.add(topicBrokerProduceReq); + brokerFetchTimeList.add(topicBrokerFetchReq); + }); + + produceVO.setBrokerRequestTimeList(brokerProduceTimeList); + fetchVO.setBrokerRequestTimeList(brokerFetchTimeList); + } + + return Arrays.asList(produceVO, fetchVO); + } + public static List convert2TopicConnectionVOList(List connectionDTOList) { if (ValidateUtils.isNull(connectionDTOList)) { return new ArrayList<>(); @@ -224,4 +273,24 @@ public class TopicModelConverter { CopyUtils.copyProperties(topicBusinessInfoVO,topicBusinessInfo); return topicBusinessInfoVO; } + + private static void fillTopicProduceTime(TopicRequestTimeDetailVO produceVO, BaseMetrics metrics, String thPercentile) { + produceVO.setRequestQueueTimeMs(metrics.getSpecifiedMetrics("ProduceRequestQueueTimeMs" + thPercentile)); + produceVO.setResponseQueueTimeMs(metrics.getSpecifiedMetrics("ProduceResponseQueueTimeMs" + thPercentile)); + produceVO.setResponseSendTimeMs(metrics.getSpecifiedMetrics("ProduceResponseSendTimeMs" + thPercentile)); + produceVO.setLocalTimeMs(metrics.getSpecifiedMetrics("ProduceLocalTimeMs" + thPercentile)); + produceVO.setThrottleTimeMs(metrics.getSpecifiedMetrics("ProduceThrottleTimeMs" + thPercentile)); + produceVO.setRemoteTimeMs(metrics.getSpecifiedMetrics("ProduceRemoteTimeMs" + thPercentile)); + produceVO.setTotalTimeMs(metrics.getSpecifiedMetrics("ProduceTotalTimeMs" + thPercentile)); + } + + private static void fillTopicFetchTime(TopicRequestTimeDetailVO fetchVO, BaseMetrics metrics, String thPercentile) { + fetchVO.setRequestQueueTimeMs(metrics.getSpecifiedMetrics("FetchConsumerRequestQueueTimeMs" + thPercentile)); + fetchVO.setResponseQueueTimeMs(metrics.getSpecifiedMetrics("FetchConsumerResponseQueueTimeMs" + thPercentile)); + fetchVO.setResponseSendTimeMs(metrics.getSpecifiedMetrics("FetchConsumerResponseSendTimeMs" + thPercentile)); + fetchVO.setLocalTimeMs(metrics.getSpecifiedMetrics("FetchConsumerLocalTimeMs" + thPercentile)); + fetchVO.setThrottleTimeMs(metrics.getSpecifiedMetrics("FetchConsumerThrottleTimeMs" + thPercentile)); + fetchVO.setRemoteTimeMs(metrics.getSpecifiedMetrics("FetchConsumerRemoteTimeMs" + thPercentile)); + fetchVO.setTotalTimeMs(metrics.getSpecifiedMetrics("FetchConsumerTotalTimeMs" + thPercentile)); + } } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 0fb0f5a7..2e7b5159 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -22,8 +22,8 @@ spring: active: dev servlet: multipart: - max-file-size: 60MB - max-request-size: 60MB + max-file-size: 100MB + max-request-size: 100MB logging: config: classpath:logback-spring.xml @@ -32,24 +32,35 @@ custom: idc: cn jmx: max-conn: 10 + store-metrics-task: + community: + broker-metrics-enabled: true + topic-metrics-enabled: true + didi: + app-topic-metrics-enabled: true + topic-request-time-metrics-enabled: true + topic-throttled-metrics: true + save-days: 7 account: ldap: kcm: enabled: false + storage: + base-url: http://127.0.0.1 n9e: - base-url: http://127.0.0.1:8080 + base-url: http://127.0.0.1:8004 user-token: 12345678 timeout: 300 - account: km + account: root script-file: kcm_script.sh monitor: enabled: false n9e: nid: 2 - user-token: 123456 + user-token: 1234567890 mon: base-url: http://127.0.0.1:8032 sink: diff --git a/kafka-manager-web/src/main/resources/logback-spring.xml b/kafka-manager-web/src/main/resources/logback-spring.xml index 90bb3bee..c1c16136 100644 --- a/kafka-manager-web/src/main/resources/logback-spring.xml +++ b/kafka-manager-web/src/main/resources/logback-spring.xml @@ -143,7 +143,7 @@ 100MB - 5 + 3 @@ -159,7 +159,7 @@ 100MB - 5 + 3