diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java new file mode 100644 index 00000000..bac44235 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java @@ -0,0 +1,32 @@ +package com.xiaojukeji.kafka.manager.common.bizenum; + +/** + * 过期Topic状态 + * @author zengqiao + * @date 21/01/25 + */ +public enum TopicExpiredStatusEnum { + ALREADY_NOTIFIED_AND_DELETED(-2, "已通知, 已下线"), + ALREADY_NOTIFIED_AND_CAN_DELETE(-1, "已通知, 可下线"), + ALREADY_EXPIRED_AND_WAIT_NOTIFY(0, "已过期, 待通知"), + ALREADY_NOTIFIED_AND_WAIT_RESPONSE(1, "已通知, 待反馈"), + + ; + + private int status; + + private String message; + + TopicExpiredStatusEnum(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java index 0fb38302..323e9ec9 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java @@ -97,7 +97,7 @@ public class Result implements Serializable { return result; } - public static Result buildFailure(String message) { + public static Result buildGatewayFailure(String message) { Result result = new Result(); result.setCode(ResultStatus.GATEWAY_INVALID_REQUEST.getCode()); result.setMessage(message); @@ -105,6 +105,14 @@ public class Result implements Serializable { return result; } + public static Result buildFailure(String message) { + Result result = new Result(); + result.setCode(ResultStatus.FAIL.getCode()); + result.setMessage(message); + result.setData(null); + return result; + } + public static Result buildFrom(ResultStatus resultStatus) { Result result = new Result(); result.setCode(resultStatus.getCode()); diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java index 76e3aca8..94acb56d 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java @@ -12,6 +12,8 @@ public enum ResultStatus { SUCCESS(Constant.SUCCESS, "success"), + FAIL(1, "操作失败"), + /** * 操作错误[1000, 2000) * ------------------------------------------------------------------------------------------ @@ -91,6 +93,8 @@ public enum ResultStatus { ZOOKEEPER_CONNECT_FAILED(8020, "zookeeper connect failed"), ZOOKEEPER_READ_FAILED(8021, "zookeeper read failed"), + ZOOKEEPER_WRITE_FAILED(8022, "zookeeper write failed"), + ZOOKEEPER_DELETE_FAILED(8023, "zookeeper delete failed"), // 调用集群任务里面的agent失败 CALL_CLUSTER_TASK_AGENT_FAILED(8030, " call cluster task agent failed"), diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java index 937d9cf8..2e903485 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java @@ -23,6 +23,8 @@ public class ClusterDetailDTO { private String securityProperties; + private String jmxProperties; + private Integer status; private Date gmtCreate; @@ -103,6 +105,14 @@ public class ClusterDetailDTO { this.securityProperties = securityProperties; } + public String getJmxProperties() { + return jmxProperties; + } + + public void setJmxProperties(String jmxProperties) { + this.jmxProperties = jmxProperties; + } + public Integer getStatus() { return status; } @@ -176,8 +186,9 @@ public class ClusterDetailDTO { ", bootstrapServers='" + bootstrapServers + '\'' + ", kafkaVersion='" + kafkaVersion + '\'' + ", idc='" + idc + '\'' + - ", mode='" + mode + '\'' + + ", mode=" + mode + ", securityProperties='" + securityProperties + '\'' + + ", jmxProperties='" + jmxProperties + '\'' + ", status=" + status + ", gmtCreate=" + gmtCreate + ", gmtModify=" + gmtModify + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java deleted file mode 100644 index 91faaba1..00000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.ao.config; - -/** - * @author zengqiao - * @date 20/9/7 - */ -public class SinkTopicRequestTimeMetricsConfig { - private Long clusterId; - - private String topicName; - - private Long startId; - - private Long step; - - public Long getClusterId() { - return clusterId; - } - - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public Long getStartId() { - return startId; - } - - public void setStartId(Long startId) { - this.startId = startId; - } - - public Long getStep() { - return step; - } - - public void setStep(Long step) { - this.step = step; - } - - @Override - public String toString() { - return "SinkTopicRequestTimeMetricsConfig{" + - "clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", startId=" + startId + - ", step=" + step + - '}'; - } -} \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java new file mode 100644 index 00000000..1b4c95b9 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java @@ -0,0 +1,45 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.op; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +import java.util.List; + +/** + * @author zengqiao + * @date 21/01/24 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description="优选为Controller的候选者") +public class ControllerPreferredCandidateDTO { + @ApiModelProperty(value="集群ID") + private Long clusterId; + + @ApiModelProperty(value="优选为controller的BrokerId") + private List brokerIdList; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public List getBrokerIdList() { + return brokerIdList; + } + + public void setBrokerIdList(List brokerIdList) { + this.brokerIdList = brokerIdList; + } + + @Override + public String toString() { + return "ControllerPreferredCandidateDTO{" + + "clusterId=" + clusterId + + ", brokerIdList=" + brokerIdList + + '}'; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java index 0b6fcebb..7afc09c6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java @@ -102,12 +102,11 @@ public class ClusterDTO { '}'; } - public Boolean legal() { + public boolean legal() { if (ValidateUtils.isNull(clusterName) || ValidateUtils.isNull(zookeeper) || ValidateUtils.isNull(idc) - || ValidateUtils.isNull(bootstrapServers) - ) { + || ValidateUtils.isNull(bootstrapServers)) { return false; } return true; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java index c0e96000..fa29c7cf 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java @@ -17,6 +17,8 @@ public class GatewayConfigDO { private Long version; + private String description; + private Date createTime; private Date modifyTime; @@ -61,6 +63,14 @@ public class GatewayConfigDO { this.version = version; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + public Date getCreateTime() { return createTime; } @@ -85,6 +95,7 @@ public class GatewayConfigDO { ", name='" + name + '\'' + ", value='" + value + '\'' + ", version=" + version + + ", description='" + description + '\'' + ", createTime=" + createTime + ", modifyTime=" + modifyTime + '}'; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java index 46c7a3a2..c4921259 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java @@ -28,7 +28,7 @@ public class ExpiredTopicVO { @ApiModelProperty(value = "负责人") private String principals; - @ApiModelProperty(value = "状态, -1:可下线, 0:过期待通知, 1+:已通知待反馈") + @ApiModelProperty(value = "状态, -1:已通知可下线, 0:过期待通知, 1+:已通知待反馈") private Integer status; public Long getClusterId() { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java index a0b402af..72314c31 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java @@ -26,6 +26,9 @@ public class GatewayConfigVO { @ApiModelProperty(value="版本") private Long version; + @ApiModelProperty(value="描述说明") + private String description; + @ApiModelProperty(value="创建时间") private Date createTime; @@ -72,6 +75,14 @@ public class GatewayConfigVO { this.version = version; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + public Date getCreateTime() { return createTime; } @@ -96,6 +107,7 @@ public class GatewayConfigVO { ", name='" + name + '\'' + ", value='" + value + '\'' + ", version=" + version + + ", description='" + description + '\'' + ", createTime=" + createTime + ", modifyTime=" + modifyTime + '}'; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java index 46d177ad..283d59c5 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java @@ -60,6 +60,13 @@ public class JsonUtils { return JSON.parseObject(src, clazz); } + public static List stringToArrObj(String src, Class clazz) { + if (ValidateUtils.isBlank(src)) { + return null; + } + return JSON.parseArray(src, clazz); + } + public static List parseTopicConnections(Long clusterId, JSONObject jsonObject, long postTime) { List connectionDOList = new ArrayList<>(); for (String clientType: jsonObject.keySet()) { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index fc70c6b2..c7c69ca3 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -79,7 +79,7 @@ public class JmxConnectorWrap { try { Map environment = new HashMap(); if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) { - environment.put(javax.management.remote.JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword())); + environment.put(JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword())); } if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) { environment.put(Context.SECURITY_PROTOCOL, "ssl"); diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java index e0a5632a..6705f435 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java @@ -33,7 +33,9 @@ public class ZkPathUtil { private static final String D_METRICS_CONFIG_ROOT_NODE = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "KafkaExMetrics"; - public static final String D_CONTROLLER_CANDIDATES = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "extension/candidates"; + public static final String D_CONFIG_EXTENSION_ROOT_NODE = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "extension"; + + public static final String D_CONTROLLER_CANDIDATES = D_CONFIG_EXTENSION_ROOT_NODE + ZOOKEEPER_SEPARATOR + "candidates"; public static String getBrokerIdNodePath(Integer brokerId) { return BROKER_IDS_ROOT + ZOOKEEPER_SEPARATOR + String.valueOf(brokerId); @@ -111,6 +113,10 @@ public class ZkPathUtil { } public static String getKafkaExtraMetricsPath(Integer brokerId) { - return D_METRICS_CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + String.valueOf(brokerId); + return D_METRICS_CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + brokerId; + } + + public static String getControllerCandidatePath(Integer brokerId) { + return D_CONTROLLER_CANDIDATES + ZOOKEEPER_SEPARATOR + brokerId; } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index 59453919..e3b8f23f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -15,10 +15,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; import com.xiaojukeji.kafka.manager.dao.ControllerDao; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; -import com.xiaojukeji.kafka.manager.dao.TopicDao; -import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao; import com.xiaojukeji.kafka.manager.service.service.JmxService; -import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import com.xiaojukeji.kafka.manager.service.zookeeper.*; import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil; @@ -49,15 +46,6 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; - @Autowired - private ConfigUtils configUtils; - - @Autowired - private TopicDao topicDao; - - @Autowired - private AuthorityDao authorityDao; - private final static Map CLUSTER_MAP = new ConcurrentHashMap<>(); private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -133,7 +121,7 @@ public class PhysicalClusterMetadataManager { zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, topicDao, authorityDao); + TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java index b2c5f7b2..2feb321b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java @@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.cluster.ControllerPreferredCandidate; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.ControllerPreferredCandidateDTO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.ClusterNameDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO; @@ -51,4 +52,20 @@ public interface ClusterService { * @return void */ Result> getControllerPreferredCandidates(Long clusterId); + + /** + * 增加优先被选举为controller的broker + * @param clusterId 集群ID + * @param brokerIdList brokerId列表 + * @return + */ + Result addControllerPreferredCandidates(Long clusterId, List brokerIdList); + + /** + * 减少优先被选举为controller的broker + * @param clusterId 集群ID + * @param brokerIdList brokerId列表 + * @return + */ + Result deleteControllerPreferredCandidates(Long clusterId, List brokerIdList); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java index d24b2d24..d52d3bc7 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java @@ -26,4 +26,20 @@ public interface ZookeeperService { * @return 操作结果 */ Result> getControllerPreferredCandidates(Long clusterId); + + /** + * 增加优先被选举为controller的broker + * @param clusterId 集群ID + * @param brokerId brokerId + * @return + */ + Result addControllerPreferredCandidate(Long clusterId, Integer brokerId); + + /** + * 减少优先被选举为controller的broker + * @param clusterId 集群ID + * @param brokerId brokerId + * @return + */ + Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java index fce7b605..18ee0a0d 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java @@ -221,13 +221,24 @@ public class GatewayConfigServiceImpl implements GatewayConfigService { if (ValidateUtils.isNull(oldGatewayConfigDO)) { return Result.buildFrom(ResultStatus.RESOURCE_NOT_EXIST); } + if (!oldGatewayConfigDO.getName().equals(newGatewayConfigDO.getName()) || !oldGatewayConfigDO.getType().equals(newGatewayConfigDO.getType()) || ValidateUtils.isBlank(newGatewayConfigDO.getValue())) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - newGatewayConfigDO.setVersion(oldGatewayConfigDO.getVersion() + 1); - if (gatewayConfigDao.updateById(oldGatewayConfigDO) > 0) { + + // 获取当前同类配置, 插入之后需要增大这个version + List gatewayConfigDOList = gatewayConfigDao.getByConfigType(newGatewayConfigDO.getType()); + Long version = 1L; + for (GatewayConfigDO elem: gatewayConfigDOList) { + if (elem.getVersion() > version) { + version = elem.getVersion() + 1L; + } + } + + newGatewayConfigDO.setVersion(version); + if (gatewayConfigDao.updateById(newGatewayConfigDO) > 0) { return Result.buildSuc(); } return Result.buildFrom(ResultStatus.MYSQL_ERROR); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index 609c8cf9..e1a619a8 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -111,12 +111,13 @@ public class ClusterServiceImpl implements ClusterService { // 不允许修改zk地址 return ResultStatus.CHANGE_ZOOKEEPER_FORBIDDEN; } - clusterDO.setStatus(originClusterDO.getStatus()); Map content = new HashMap<>(); content.put("cluster id", clusterDO.getId().toString()); content.put("security properties", clusterDO.getSecurityProperties()); content.put("jmx properties", clusterDO.getJmxProperties()); operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content); + + clusterDO.setStatus(originClusterDO.getStatus()); return updateById(clusterDO); } @@ -214,7 +215,7 @@ public class ClusterServiceImpl implements ClusterService { if (zk != null) { zk.close(); } - } catch (Throwable t) { + } catch (Exception e) { return false; } } @@ -275,7 +276,7 @@ public class ClusterServiceImpl implements ClusterService { try { Map content = new HashMap<>(); content.put("cluster id", clusterId.toString()); - operateRecordService.insert(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content); + operateRecordService.insert(operator, ModuleEnum.CLUSTER, String.valueOf(clusterId), OperateEnum.DELETE, content); if (clusterDao.deleteById(clusterId) <= 0) { LOGGER.error("delete cluster failed, clusterId:{}.", clusterId); return ResultStatus.MYSQL_ERROR; @@ -289,8 +290,9 @@ public class ClusterServiceImpl implements ClusterService { private ClusterDetailDTO getClusterDetailDTO(ClusterDO clusterDO, Boolean needDetail) { if (ValidateUtils.isNull(clusterDO)) { - return null; + return new ClusterDetailDTO(); } + ClusterDetailDTO dto = new ClusterDetailDTO(); dto.setClusterId(clusterDO.getId()); dto.setClusterName(clusterDO.getClusterName()); @@ -299,6 +301,7 @@ public class ClusterServiceImpl implements ClusterService { dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersionFromCache(clusterDO.getId())); dto.setIdc(configUtils.getIdc()); dto.setSecurityProperties(clusterDO.getSecurityProperties()); + dto.setJmxProperties(clusterDO.getJmxProperties()); dto.setStatus(clusterDO.getStatus()); dto.setGmtCreate(clusterDO.getGmtCreate()); dto.setGmtModify(clusterDO.getGmtModify()); @@ -337,4 +340,39 @@ public class ClusterServiceImpl implements ClusterService { } return Result.buildSuc(controllerPreferredCandidateList); } + + @Override + public Result addControllerPreferredCandidates(Long clusterId, List brokerIdList) { + if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + + // 增加的BrokerId需要判断是否存活 + for (Integer brokerId: brokerIdList) { + if (!PhysicalClusterMetadataManager.isBrokerAlive(clusterId, brokerId)) { + return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST); + } + + Result result = zookeeperService.addControllerPreferredCandidate(clusterId, brokerId); + if (result.failed()) { + return result; + } + } + return Result.buildSuc(); + } + + @Override + public Result deleteControllerPreferredCandidates(Long clusterId, List brokerIdList) { + if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + + for (Integer brokerId: brokerIdList) { + Result result = zookeeperService.deleteControllerPreferredCandidate(clusterId, brokerId); + if (result.failed()) { + return result; + } + } + return Result.buildSuc(); + } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index e228d36c..0d60d828 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -129,7 +129,7 @@ public class ConsumerServiceImpl implements ConsumerService { } summary.setState(consumerGroupSummary.state()); - java.util.Iterator> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator()); + Iterator> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator()); while (it.hasNext()) { List consumerSummaryList = JavaConversions.asJavaList(it.next()); for (AdminClient.ConsumerSummary consumerSummary: consumerSummaryList) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java index aa31ed33..c4c89513 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java @@ -70,4 +70,58 @@ public class ZookeeperServiceImpl implements ZookeeperService { } return Result.buildFrom(ResultStatus.ZOOKEEPER_READ_FAILED); } + + @Override + public Result addControllerPreferredCandidate(Long clusterId, Integer brokerId) { + if (ValidateUtils.isNull(clusterId)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId); + if (ValidateUtils.isNull(zkConfig)) { + return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED); + } + + try { + if (zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) { + // 节点已经存在, 则直接忽略 + return Result.buildSuc(); + } + + if (!zkConfig.checkPathExists(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE)) { + zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE, ""); + } + + if (!zkConfig.checkPathExists(ZkPathUtil.D_CONTROLLER_CANDIDATES)) { + zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONTROLLER_CANDIDATES, ""); + } + + zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.getControllerCandidatePath(brokerId), ""); + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=addControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage()); + } + return Result.buildFrom(ResultStatus.ZOOKEEPER_WRITE_FAILED); + } + + @Override + public Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId) { + if (ValidateUtils.isNull(clusterId)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId); + if (ValidateUtils.isNull(zkConfig)) { + return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED); + } + + try { + if (!zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) { + return Result.buildSuc(); + } + zkConfig.delete(ZkPathUtil.getControllerCandidatePath(brokerId)); + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=deleteControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage()); + } + return Result.buildFrom(ResultStatus.ZOOKEEPER_DELETE_FAILED); + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java index 58e5d98b..6995eb97 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java @@ -44,7 +44,7 @@ public class TopicCommands { ); // 生成分配策略 - scala.collection.Map> replicaAssignment = + scala.collection.Map> replicaAssignment = AdminUtils.assignReplicasToBrokers( convert2BrokerMetadataSeq(brokerIdList), partitionNum, @@ -177,7 +177,7 @@ public class TopicCommands { ) ); - Map> existingAssignJavaMap = + Map> existingAssignJavaMap = JavaConversions.asJavaMap(existingAssignScalaMap); // 新增分区的分配策略和旧的分配策略合并 Map> targetMap = new HashMap<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java index f808b976..4314a101 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java @@ -5,8 +5,6 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata import com.xiaojukeji.kafka.manager.common.zookeeper.StateChangeListener; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil; -import com.xiaojukeji.kafka.manager.dao.TopicDao; -import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.ThreadPool; import org.apache.zookeeper.data.Stat; @@ -24,28 +22,17 @@ import java.util.concurrent.*; * @date 20/5/14 */ public class TopicStateListener implements StateChangeListener { - private final static Logger LOGGER = LoggerFactory.getLogger(TopicStateListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TopicStateListener.class); private Long clusterId; private ZkConfigImpl zkConfig; - private TopicDao topicDao; - - private AuthorityDao authorityDao; - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) { this.clusterId = clusterId; this.zkConfig = zkConfig; } - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, TopicDao topicDao, AuthorityDao authorityDao) { - this.clusterId = clusterId; - this.zkConfig = zkConfig; - this.topicDao = topicDao; - this.authorityDao = authorityDao; - } - @Override public void init() { try { @@ -53,7 +40,7 @@ public class TopicStateListener implements StateChangeListener { FutureTask[] taskList = new FutureTask[topicNameList.size()]; for (int i = 0; i < topicNameList.size(); i++) { String topicName = topicNameList.get(i); - taskList[i] = new FutureTask(new Callable() { + taskList[i] = new FutureTask(new Callable() { @Override public Object call() throws Exception { processTopicAdded(topicName); @@ -65,7 +52,6 @@ public class TopicStateListener implements StateChangeListener { } catch (Exception e) { LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e); } - return; } @Override @@ -92,8 +78,6 @@ public class TopicStateListener implements StateChangeListener { private void processTopicDelete(String topicName) { LOGGER.warn("delete topic, clusterId:{} topicName:{}.", clusterId, topicName); PhysicalClusterMetadataManager.removeTopicMetadata(clusterId, topicName); - topicDao.removeTopicInCache(clusterId, topicName); - authorityDao.removeAuthorityInCache(clusterId, topicName); } private void processTopicAdded(String topicName) { @@ -122,4 +106,4 @@ public class TopicStateListener implements StateChangeListener { LOGGER.error("add topic failed, clusterId:{} topicMetadata:{}.", clusterId, topicMetadata, e); } } -} \ No newline at end of file +} diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicDao.java index 3d3f5410..64e089a6 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicDao.java @@ -22,6 +22,4 @@ public interface TopicDao { List listAll(); TopicDO getTopic(Long clusterId, String topicName, String appId); - - TopicDO removeTopicInCache(Long clusterId, String topicName); } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AppDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AppDao.java index 218c8656..7802005a 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AppDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AppDao.java @@ -16,8 +16,6 @@ public interface AppDao { */ int insert(AppDO appDO); - int insertIgnoreGatewayDB(AppDO appDO); - /** * 删除appId * @param appName App名称 @@ -60,6 +58,4 @@ public interface AppDao { * @return int */ int updateById(AppDO appDO); - - List listNewAll(); } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AuthorityDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AuthorityDao.java index a7a8affe..655218e9 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AuthorityDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/AuthorityDao.java @@ -15,8 +15,6 @@ public interface AuthorityDao { */ int insert(AuthorityDO authorityDO); - int replaceIgnoreGatewayDB(AuthorityDO authorityDO); - /** * 获取权限 * @param clusterId 集群id @@ -38,7 +36,5 @@ public interface AuthorityDao { Map>> getAllAuthority(); - void removeAuthorityInCache(Long clusterId, String topicName); - int deleteAuthorityByTopic(Long clusterId, String topicName); } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AppDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AppDaoImpl.java index aa08c1b4..62475b9b 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AppDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AppDaoImpl.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.dao.gateway.impl; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.dao.gateway.AppDao; +import com.xiaojukeji.kafka.manager.task.Constant; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -21,7 +22,7 @@ public class AppDaoImpl implements AppDao { /** * APP最近的一次更新时间, 更新之后的缓存 */ - private static Long APP_CACHE_LATEST_UPDATE_TIME = 0L; + private static volatile long APP_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; private static final Map APP_MAP = new ConcurrentHashMap<>(); @Override @@ -29,11 +30,6 @@ public class AppDaoImpl implements AppDao { return sqlSession.insert("AppDao.insert", appDO); } - @Override - public int insertIgnoreGatewayDB(AppDO appDO) { - return sqlSession.insert("AppDao.insert", appDO); - } - @Override public int deleteByName(String appName) { return sqlSession.delete("AppDao.deleteByName", appName); @@ -66,7 +62,12 @@ public class AppDaoImpl implements AppDao { } private void updateTopicCache() { - Long timestamp = System.currentTimeMillis(); + long timestamp = System.currentTimeMillis(); + + if (timestamp + 1000 <= APP_CACHE_LATEST_UPDATE_TIME) { + // 近一秒内的请求不走db + return; + } Date afterTime = new Date(APP_CACHE_LATEST_UPDATE_TIME); List doList = sqlSession.selectList("AppDao.listAfterTime", afterTime); @@ -76,19 +77,22 @@ public class AppDaoImpl implements AppDao { /** * 更新APP缓存 */ - synchronized private void updateTopicCache(List doList, Long timestamp) { + private synchronized void updateTopicCache(List doList, long timestamp) { if (doList == null || doList.isEmpty() || APP_CACHE_LATEST_UPDATE_TIME >= timestamp) { // 本次无数据更新, 或者本次更新过时 时, 忽略本次更新 return; } + if (APP_CACHE_LATEST_UPDATE_TIME == Constant.START_TIMESTAMP) { + APP_MAP.clear(); + } + for (AppDO elem: doList) { APP_MAP.put(elem.getAppId(), elem); } APP_CACHE_LATEST_UPDATE_TIME = timestamp; } - @Override - public List listNewAll() { - return sqlSession.selectList("AppDao.listNewAll"); + public static void resetCache() { + APP_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AuthorityDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AuthorityDaoImpl.java index 74a7cab0..1b5df873 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AuthorityDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/AuthorityDaoImpl.java @@ -1,8 +1,8 @@ package com.xiaojukeji.kafka.manager.dao.gateway.impl; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao; +import com.xiaojukeji.kafka.manager.task.Constant; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -23,7 +23,8 @@ public class AuthorityDaoImpl implements AuthorityDao { * Authority最近的一次更新时间, 更新之后的缓存 * >> */ - private static Long AUTHORITY_CACHE_LATEST_UPDATE_TIME = 0L; + private static volatile long AUTHORITY_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; + private static final Map>> AUTHORITY_MAP = new ConcurrentHashMap<>(); @Override @@ -31,11 +32,6 @@ public class AuthorityDaoImpl implements AuthorityDao { return sqlSession.insert("AuthorityDao.replace", authorityDO); } - @Override - public int replaceIgnoreGatewayDB(AuthorityDO authorityDO) { - return sqlSession.insert("AuthorityDao.replace", authorityDO); - } - @Override public List getAuthority(Long clusterId, String topicName, String appId) { Map params = new HashMap<>(3); @@ -62,8 +58,8 @@ public class AuthorityDaoImpl implements AuthorityDao { } List authorityDOList = new ArrayList<>(); - for (Long clusterId: doMap.keySet()) { - authorityDOList.addAll(doMap.get(clusterId).values()); + for (Map.Entry> entry: doMap.entrySet()) { + authorityDOList.addAll(entry.getValue().values()); } return authorityDOList; } @@ -87,23 +83,6 @@ public class AuthorityDaoImpl implements AuthorityDao { return AUTHORITY_MAP; } - @Override - public void removeAuthorityInCache(Long clusterId, String topicName) { - AUTHORITY_MAP.forEach((appId, map) -> { - map.forEach((id, subMap) -> { - if (id.equals(clusterId)) { - subMap.remove(topicName); - if (subMap.isEmpty()) { - map.remove(id); - } - } - }); - if (map.isEmpty()) { - AUTHORITY_MAP.remove(appId); - } - }); - } - @Override public int deleteAuthorityByTopic(Long clusterId, String topicName) { Map params = new HashMap<>(2); @@ -116,6 +95,11 @@ public class AuthorityDaoImpl implements AuthorityDao { private void updateAuthorityCache() { Long timestamp = System.currentTimeMillis(); + if (timestamp + 1000 <= AUTHORITY_CACHE_LATEST_UPDATE_TIME) { + // 近一秒内的请求不走db + return; + } + Date afterTime = new Date(AUTHORITY_CACHE_LATEST_UPDATE_TIME); List doList = sqlSession.selectList("AuthorityDao.listAfterTime", afterTime); updateAuthorityCache(doList, timestamp); @@ -124,11 +108,15 @@ public class AuthorityDaoImpl implements AuthorityDao { /** * 更新Topic缓存 */ - synchronized private void updateAuthorityCache(List doList, Long timestamp) { + private synchronized void updateAuthorityCache(List doList, Long timestamp) { if (doList == null || doList.isEmpty() || AUTHORITY_CACHE_LATEST_UPDATE_TIME >= timestamp) { // 本次无数据更新, 或者本次更新过时 时, 忽略本次更新 return; } + if (AUTHORITY_CACHE_LATEST_UPDATE_TIME == Constant.START_TIMESTAMP) { + AUTHORITY_MAP.clear(); + } + for (AuthorityDO elem: doList) { Map> doMap = AUTHORITY_MAP.getOrDefault(elem.getAppId(), new ConcurrentHashMap<>()); @@ -139,4 +127,8 @@ public class AuthorityDaoImpl implements AuthorityDao { } AUTHORITY_CACHE_LATEST_UPDATE_TIME = timestamp; } + + public static void resetCache() { + AUTHORITY_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; + } } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java index ba4468df..3c1ba335 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.dao.impl; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; import com.xiaojukeji.kafka.manager.dao.TopicDao; +import com.xiaojukeji.kafka.manager.task.Constant; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -18,7 +19,8 @@ public class TopicDaoImpl implements TopicDao { /** * Topic最近的一次更新时间, 更新之后的缓存 */ - private static Long TOPIC_CACHE_LATEST_UPDATE_TIME = 0L; + private static volatile long TOPIC_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; + private static final Map> TOPIC_MAP = new ConcurrentHashMap<>(); @Autowired @@ -62,7 +64,7 @@ public class TopicDaoImpl implements TopicDao { @Override public List getByClusterId(Long clusterId) { updateTopicCache(); - return new ArrayList<>(TOPIC_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>(0)).values()); + return new ArrayList<>(TOPIC_MAP.getOrDefault(clusterId, Collections.emptyMap()).values()); } @Override @@ -75,28 +77,28 @@ public class TopicDaoImpl implements TopicDao { updateTopicCache(); List doList = new ArrayList<>(); for (Long clusterId: TOPIC_MAP.keySet()) { - doList.addAll(TOPIC_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>(0)).values()); + doList.addAll(TOPIC_MAP.getOrDefault(clusterId, Collections.emptyMap()).values()); } return doList; } @Override public TopicDO getTopic(Long clusterId, String topicName, String appId) { - Map params = new HashMap<>(2); + Map params = new HashMap<>(3); params.put("clusterId", clusterId); params.put("topicName", topicName); params.put("appId", appId); return sqlSession.selectOne("TopicDao.getTopic", params); } - @Override - public TopicDO removeTopicInCache(Long clusterId, String topicName) { - return TOPIC_MAP.getOrDefault(clusterId, new HashMap<>(0)).remove(topicName); - } - private void updateTopicCache() { Long timestamp = System.currentTimeMillis(); + if (timestamp + 1000 <= TOPIC_CACHE_LATEST_UPDATE_TIME) { + // 近一秒内的请求不走db + return; + } + Date afterTime = new Date(TOPIC_CACHE_LATEST_UPDATE_TIME); List doList = sqlSession.selectList("TopicDao.listAfterTime", afterTime); updateTopicCache(doList, timestamp); @@ -105,11 +107,15 @@ public class TopicDaoImpl implements TopicDao { /** * 更新Topic缓存 */ - synchronized private void updateTopicCache(List doList, Long timestamp) { + private synchronized void updateTopicCache(List doList, Long timestamp) { if (doList == null || doList.isEmpty() || TOPIC_CACHE_LATEST_UPDATE_TIME >= timestamp) { // 本次无数据更新, 或者本次更新过时 时, 忽略本次更新 return; } + if (TOPIC_CACHE_LATEST_UPDATE_TIME == Constant.START_TIMESTAMP) { + TOPIC_MAP.clear(); + } + for (TopicDO elem: doList) { Map doMap = TOPIC_MAP.getOrDefault(elem.getClusterId(), new ConcurrentHashMap<>()); doMap.put(elem.getTopicName(), elem); @@ -117,4 +123,8 @@ public class TopicDaoImpl implements TopicDao { } TOPIC_CACHE_LATEST_UPDATE_TIME = timestamp; } + + public static void resetCache() { + TOPIC_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP; + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/Constant.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/Constant.java new file mode 100644 index 00000000..3a50d7c1 --- /dev/null +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/Constant.java @@ -0,0 +1,5 @@ +package com.xiaojukeji.kafka.manager.task; + +public class Constant { + public static final long START_TIMESTAMP = 0; +} diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/DaoBackgroundTask.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/DaoBackgroundTask.java new file mode 100644 index 00000000..a750aff8 --- /dev/null +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/task/DaoBackgroundTask.java @@ -0,0 +1,41 @@ +package com.xiaojukeji.kafka.manager.task; + +import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import com.xiaojukeji.kafka.manager.dao.gateway.impl.AppDaoImpl; +import com.xiaojukeji.kafka.manager.dao.gateway.impl.AuthorityDaoImpl; +import com.xiaojukeji.kafka.manager.dao.impl.TopicDaoImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 后台任务线程 + * @author zengqiao + * @date 21/02/02 + */ +@Service +public class DaoBackgroundTask { + private static final Logger LOGGER = LoggerFactory.getLogger(DaoBackgroundTask.class); + + private static final ScheduledExecutorService SYNC_CACHE_THREAD_POOL = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("syncCacheTask")); + + @PostConstruct + public void init() { + SYNC_CACHE_THREAD_POOL.scheduleAtFixedRate(() -> { + LOGGER.info("class=DaoBackgroundTask||method=init||msg=sync cache start"); + + TopicDaoImpl.resetCache(); + + AppDaoImpl.resetCache(); + + AuthorityDaoImpl.resetCache(); + + LOGGER.info("class=DaoBackgroundTask||method=init||msg=sync cache finished"); + }, 1, 10, TimeUnit.MINUTES); + } +} diff --git a/kafka-manager-dao/src/main/resources/mapper/GatewayConfigDao.xml b/kafka-manager-dao/src/main/resources/mapper/GatewayConfigDao.xml index 8aa91925..ac003836 100644 --- a/kafka-manager-dao/src/main/resources/mapper/GatewayConfigDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/GatewayConfigDao.xml @@ -8,6 +8,7 @@ + @@ -27,9 +28,9 @@ @@ -45,7 +46,8 @@ `type`=#{type}, `name`=#{name}, `value`=#{value}, - `version`=#{version} + `version`=#{version}, + `description`=#{description} WHERE id=#{id} ]]> diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionAddGatewayConfigDTO.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionAddGatewayConfigDTO.java index 0045bfe2..6a2c0bb4 100644 --- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionAddGatewayConfigDTO.java +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionAddGatewayConfigDTO.java @@ -18,6 +18,9 @@ public class OrderExtensionAddGatewayConfigDTO { @ApiModelProperty(value = "值") private String value; + @ApiModelProperty(value = "描述说明") + private String description; + public String getType() { return type; } @@ -42,12 +45,21 @@ public class OrderExtensionAddGatewayConfigDTO { this.value = value; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + @Override public String toString() { return "OrderExtensionAddGatewayConfigDTO{" + "type='" + type + '\'' + ", name='" + name + '\'' + ", value='" + value + '\'' + + ", description='" + description + '\'' + '}'; } diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionModifyGatewayConfigDTO.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionModifyGatewayConfigDTO.java index f5212f8c..3f749ea7 100644 --- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionModifyGatewayConfigDTO.java +++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/common/entry/apply/gateway/OrderExtensionModifyGatewayConfigDTO.java @@ -23,6 +23,9 @@ public class OrderExtensionModifyGatewayConfigDTO { @ApiModelProperty(value = "值") private String value; + @ApiModelProperty(value = "描述说明") + private String description; + public Long getId() { return id; } @@ -55,6 +58,14 @@ public class OrderExtensionModifyGatewayConfigDTO { this.value = value; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + @Override public String toString() { return "OrderExtensionModifyGatewayConfigDTO{" + @@ -62,6 +73,7 @@ public class OrderExtensionModifyGatewayConfigDTO { ", type='" + type + '\'' + ", name='" + name + '\'' + ", value='" + value + '\'' + + ", description='" + description + '\'' + '}'; } diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/Constant.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/Constant.java new file mode 100644 index 00000000..f73c3fd6 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/Constant.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.kcm.common; + +public class Constant { + /** + * + */ + public static final String TASK_TITLE_PREFIX = "Logi-Kafka"; + + /** + * 并发度,顺序执行 + */ + public static final Integer AGENT_TASK_BATCH = 1; + + /** + * 失败的容忍度为0 + */ + public static final Integer AGENT_TASK_TOLERANCE = 0; +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/bizenum/ClusterTaskActionEnum.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/bizenum/ClusterTaskActionEnum.java index 556acab8..a51e2c68 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/bizenum/ClusterTaskActionEnum.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/bizenum/ClusterTaskActionEnum.java @@ -6,34 +6,35 @@ package com.xiaojukeji.kafka.manager.kcm.common.bizenum; * @date 20/4/26 */ public enum ClusterTaskActionEnum { - START(0, "start"), - PAUSE(1, "pause"), - IGNORE(2, "ignore"), - CANCEL(3, "cancel"), - ROLLBACK(4, "rollback"), + UNKNOWN("unknown"), + + START("start"), + PAUSE("pause"), + + IGNORE("ignore"), + CANCEL("cancel"), + + REDO("redo"), + KILL("kill"), + + ROLLBACK("rollback"), + ; - private Integer code; - private String message; + private String action; - ClusterTaskActionEnum(Integer code, String message) { - this.code = code; - this.message = message; + ClusterTaskActionEnum(String action) { + this.action = action; } - public Integer getCode() { - return code; - } - - public String getMessage() { - return message; + public String getAction() { + return action; } @Override public String toString() { - return "TaskActionEnum{" + - "code=" + code + - ", message='" + message + '\'' + + return "ClusterTaskActionEnum{" + + "action='" + action + '\'' + '}'; } } diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/ClusterTaskLog.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/ClusterTaskLog.java new file mode 100644 index 00000000..ff89fa99 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/ClusterTaskLog.java @@ -0,0 +1,24 @@ +package com.xiaojukeji.kafka.manager.kcm.common.entry.ao; + +public class ClusterTaskLog { + private String stdout; + + public ClusterTaskLog(String stdout) { + this.stdout = stdout; + } + + public String getStdout() { + return stdout; + } + + public void setStdout(String stdout) { + this.stdout = stdout; + } + + @Override + public String toString() { + return "AgentOperationTaskLog{" + + "stdout='" + stdout + '\'' + + '}'; + } +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/CreationTaskData.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/CreationTaskData.java index bc025d5c..8c2cd1ec 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/CreationTaskData.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/common/entry/ao/CreationTaskData.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.kcm.common.entry.ao; +import com.xiaojukeji.kafka.manager.common.entity.Result; + import java.util.List; /** @@ -119,7 +121,7 @@ public class CreationTaskData { @Override public String toString() { - return "CreationTaskDTO{" + + return "CreationTaskData{" + "uuid='" + uuid + '\'' + ", clusterId=" + clusterId + ", hostList=" + hostList + diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/AbstractAgent.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/AbstractAgent.java index 88872868..70ce5902 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/AbstractAgent.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/AbstractAgent.java @@ -1,9 +1,18 @@ package com.xiaojukeji.kafka.manager.kcm.component.agent; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskActionEnum; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskSubStateEnum; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskLog; import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.Map; @@ -13,33 +22,79 @@ import java.util.Map; * @date 20/4/26 */ public abstract class AbstractAgent { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAgent.class); + /** * 创建任务 + * @param creationTaskData 创建任务参数 + * @return 任务ID */ - public abstract Long createTask(CreationTaskData dto); + public abstract Result createTask(CreationTaskData creationTaskData); /** - * 任务动作 + * 执行任务 + * @param taskId 任务ID + * @param actionEnum 执行动作 + * @return true:触发成功, false:触发失败 */ - public abstract Boolean actionTask(Long taskId, String action); + public abstract boolean actionTask(Long taskId, ClusterTaskActionEnum actionEnum); /** - * 任务动作 + * 执行任务 + * @param taskId 任务ID + * @param actionEnum 执行动作 + * @param hostname 具体主机 + * @return true:触发成功, false:触发失败 */ - public abstract Boolean actionHostTask(Long taskId, String action, String hostname); + public abstract boolean actionHostTask(Long taskId, ClusterTaskActionEnum actionEnum, String hostname); /** - * 获取任务状态 + * 获取任务运行的状态[阻塞, 执行中, 完成等] + * @param taskId 任务ID + * @return 任务状态 */ - public abstract ClusterTaskStateEnum getTaskState(Long agentTaskId); + public abstract Result getTaskExecuteState(Long taskId); /** * 获取任务结果 + * @param taskId 任务ID + * @return 任务结果 */ - public abstract Map getTaskResult(Long taskId); + public abstract Result> getTaskResult(Long taskId); /** - * 获取任务日志 + * 获取任务执行日志 + * @param taskId 任务ID + * @param hostname 具体主机 + * @return 机器运行日志 */ - public abstract String getTaskLog(Long agentTaskId, String hostname); + public abstract Result getTaskLog(Long taskId, String hostname); + + protected static String readScriptInJarFile(String fileName) { + InputStream inputStream = AbstractAgent.class.getClassLoader().getResourceAsStream(fileName); + if (inputStream == null) { + LOGGER.error("class=AbstractAgent||method=readScriptInJarFile||fileName={}||msg=read script failed", fileName); + return ""; + } + + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + String line = null; + + StringBuilder sb = new StringBuilder(); + while ((line = bufferedReader.readLine()) != null) { + sb.append(line).append("\n"); + } + return sb.toString(); + } catch (Exception e) { + LOGGER.error("class=AbstractAgent||method=readScriptInJarFile||fileName={}||errMsg={}||msg=read script failed", fileName, e.getMessage()); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.error("class=AbstractAgent||method=readScriptInJarFile||fileName={}||errMsg={}||msg=close reading script failed", fileName, e.getMessage()); + } + } + return ""; + } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java index f1f4b586..6e3fa677 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java @@ -1,8 +1,11 @@ package com.xiaojukeji.kafka.manager.kcm.component.agent.n9e; -import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.kcm.common.Constant; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskActionEnum; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskTypeEnum; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskLog; import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData; import com.xiaojukeji.kafka.manager.common.utils.HttpUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; @@ -11,20 +14,17 @@ import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskSubStateEnum; import com.xiaojukeji.kafka.manager.kcm.component.agent.AbstractAgent; +import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eCreationTask; import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eResult; -import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eTaskResultDTO; -import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eTaskStatusEnum; -import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eTaskStdoutDTO; +import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eTaskResult; +import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.N9eTaskStdoutLog; +import com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.bizenum.N9eTaskStatusEnum; import org.springframework.beans.factory.annotation.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,16 +54,6 @@ public class N9e extends AbstractAgent { private String script; - /** - * 并发度,顺序执行 - */ - private static final Integer BATCH = 1; - - /** - * 失败的容忍度为0 - */ - private static final Integer TOLERANCE = 0; - private static final String CREATE_TASK_URI = "/api/job-ce/tasks"; private static final String ACTION_TASK_URI = "/api/job-ce/task/{taskId}/action"; @@ -82,143 +72,134 @@ public class N9e extends AbstractAgent { } @Override - public Long createTask(CreationTaskData creationTaskData) { - Map param = buildCreateTaskParam(creationTaskData); + public Result createTask(CreationTaskData creationTaskData) { + String content = JsonUtils.toJSONString(buildCreateTaskParam(creationTaskData)); String response = null; try { - response = HttpUtils.postForString( - baseUrl + CREATE_TASK_URI, - JsonUtils.toJSONString(param), - buildHeader() - ); - N9eResult zr = JSON.parseObject(response, N9eResult.class); - if (!ValidateUtils.isBlank(zr.getErr())) { - LOGGER.warn("class=N9e||method=createTask||param={}||errMsg={}||msg=call create task fail", JsonUtils.toJSONString(param),zr.getErr()); - return null; + response = HttpUtils.postForString(baseUrl + CREATE_TASK_URI, content, buildHeader()); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (!ValidateUtils.isBlank(nr.getErr())) { + LOGGER.error("class=N9e||method=createTask||param={}||response={}||msg=call create task failed", content, response); + return Result.buildFailure(nr.getErr()); } - return Long.valueOf(zr.getDat().toString()); + return Result.buildSuc(Long.valueOf(nr.getDat().toString())); } catch (Exception e) { - LOGGER.error("create task failed, req:{}.", creationTaskData, e); + LOGGER.error("class=N9e||method=createTask||param={}||response={}||errMsg={}||msg=call create task failed", content, response, e.getMessage()); } - return null; + return Result.buildFailure("create n9e task failed"); } @Override - public Boolean actionTask(Long taskId, String action) { + public boolean actionTask(Long taskId, ClusterTaskActionEnum actionEnum) { Map param = new HashMap<>(1); - param.put("action", action); + param.put("action", actionEnum.getAction()); String response = null; try { - response = HttpUtils.putForString( - baseUrl + ACTION_TASK_URI.replace("{taskId}", taskId.toString()), - JSON.toJSONString(param), - buildHeader() - ); - N9eResult zr = JSON.parseObject(response, N9eResult.class); - if (ValidateUtils.isBlank(zr.getErr())) { + response = HttpUtils.putForString(baseUrl + ACTION_TASK_URI.replace("{taskId}", String.valueOf(taskId)), JsonUtils.toJSONString(param), buildHeader()); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (ValidateUtils.isBlank(nr.getErr())) { return true; } - LOGGER.warn("class=N9e||method=actionTask||param={}||errMsg={}||msg=call action task fail", JSON.toJSONString(param),zr.getErr()); + + LOGGER.error("class=N9e||method=actionTask||param={}||response={}||msg=call action task fail", JsonUtils.toJSONString(param), response); return false; } catch (Exception e) { - LOGGER.error("action task failed, taskId:{}, action:{}.", taskId, action, e); + LOGGER.error("class=N9e||method=actionTask||param={}||response={}||errMsg={}||msg=call action task fail", JsonUtils.toJSONString(param), response, e.getMessage()); } return false; } @Override - public Boolean actionHostTask(Long taskId, String action, String hostname) { - Map param = new HashMap<>(2); - param.put("action", action); - param.put("hostname", hostname); + public boolean actionHostTask(Long taskId, ClusterTaskActionEnum actionEnum, String hostname) { + Map params = new HashMap<>(2); + params.put("action", actionEnum.getAction()); + params.put("hostname", hostname); String response = null; try { - response = HttpUtils.putForString( - baseUrl + ACTION_HOST_TASK_URI.replace("{taskId}", taskId.toString()), - JSON.toJSONString(param), - buildHeader() - ); - N9eResult zr = JSON.parseObject(response, N9eResult.class); - if (ValidateUtils.isBlank(zr.getErr())) { + response = HttpUtils.putForString(baseUrl + ACTION_HOST_TASK_URI.replace("{taskId}", String.valueOf(taskId)), JsonUtils.toJSONString(params), buildHeader()); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (ValidateUtils.isBlank(nr.getErr())) { return true; } - LOGGER.warn("class=N9e||method=actionHostTask||param={}||errMsg={}||msg=call action host task fail", JSON.toJSONString(param),zr.getErr()); + + LOGGER.error("class=N9e||method=actionHostTask||params={}||response={}||msg=call action host task fail", JsonUtils.toJSONString(params), response); return false; } catch (Exception e) { - LOGGER.error("action task failed, taskId:{} action:{} hostname:{}.", taskId, action, hostname, e); + LOGGER.error("class=N9e||method=actionHostTask||params={}||response={}||errMsg={}||msg=call action host task fail", JsonUtils.toJSONString(params), response, e.getMessage()); } return false; } @Override - public ClusterTaskStateEnum getTaskState(Long agentTaskId) { + public Result getTaskExecuteState(Long taskId) { String response = null; try { // 获取任务的state - response = HttpUtils.get( - baseUrl + TASK_STATE_URI.replace("{taskId}", agentTaskId.toString()), null - ); - N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); - if (!ValidateUtils.isBlank(n9eResult.getErr())) { - LOGGER.error("get response result failed, agentTaskId:{} response:{}.", agentTaskId, response); - return null; + response = HttpUtils.get(baseUrl + TASK_STATE_URI.replace("{taskId}", String.valueOf(taskId)), null); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (!ValidateUtils.isBlank(nr.getErr())) { + return Result.buildFailure(nr.getErr()); } - String state = JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), String.class); + + String state = JsonUtils.stringToObj(JsonUtils.toJSONString(nr.getDat()), String.class); + N9eTaskStatusEnum n9eTaskStatusEnum = N9eTaskStatusEnum.getByMessage(state); if (ValidateUtils.isNull(n9eTaskStatusEnum)) { - LOGGER.error("get task status failed, agentTaskId:{} state:{}.", agentTaskId, state); - return null; + LOGGER.error("class=N9e||method=getTaskExecuteState||taskId={}||response={}||msg=get task state failed", taskId, response); + return Result.buildFailure("unknown state, state:" + state); } - return n9eTaskStatusEnum.getStatus(); + return Result.buildSuc(n9eTaskStatusEnum.getStatus()); } catch (Exception e) { - LOGGER.error("get task status failed, agentTaskId:{} response:{}.", agentTaskId, response, e); + LOGGER.error("class=N9e||method=getTaskExecuteState||taskId={}||response={}||errMsg={}||msg=get task state failed", taskId, response, e.getMessage()); } - return null; + return Result.buildFailure("get task state failed"); } @Override - public Map getTaskResult(Long agentTaskId) { + public Result> getTaskResult(Long taskId) { String response = null; try { // 获取子任务的state - response = HttpUtils.get(baseUrl + TASK_SUB_STATE_URI.replace("{taskId}", agentTaskId.toString()), null); - N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + response = HttpUtils.get(baseUrl + TASK_SUB_STATE_URI.replace("{taskId}", String.valueOf(taskId)), null); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (!ValidateUtils.isBlank(nr.getErr())) { + LOGGER.error("class=N9e||method=getTaskResult||taskId={}||response={}||msg=get task result failed", taskId, response); + return Result.buildFailure(nr.getErr()); + } - N9eTaskResultDTO n9eTaskResultDTO = - JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eTaskResultDTO.class); - return n9eTaskResultDTO.convert2HostnameStatusMap(); + return Result.buildSuc(JsonUtils.stringToObj(JsonUtils.toJSONString(nr.getDat()), N9eTaskResult.class).convert2HostnameStatusMap()); } catch (Exception e) { - LOGGER.error("get task result failed, agentTaskId:{} response:{}.", agentTaskId, response, e); + LOGGER.error("class=N9e||method=getTaskResult||taskId={}||response={}||errMsg={}||msg=get task result failed", taskId, response, e.getMessage()); } - return null; + return Result.buildFailure("get task result failed"); } @Override - public String getTaskLog(Long agentTaskId, String hostname) { + public Result getTaskLog(Long taskId, String hostname) { + Map params = new HashMap<>(1); + params.put("hostname", hostname); + String response = null; try { - Map params = new HashMap<>(1); - params.put("hostname", hostname); + response = HttpUtils.get(baseUrl + TASK_STD_LOG_URI.replace("{taskId}", String.valueOf(taskId)), params); + N9eResult nr = JsonUtils.stringToObj(response, N9eResult.class); + if (!ValidateUtils.isBlank(nr.getErr())) { + LOGGER.error("class=N9e||method=getTaskLog||taskId={}||response={}||msg=get task log failed", taskId, response); + return Result.buildFailure(nr.getErr()); + } - response = HttpUtils.get(baseUrl + TASK_STD_LOG_URI.replace("{taskId}", agentTaskId.toString()), params); - N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); - if (!ValidateUtils.isBlank(n9eResult.getErr())) { - LOGGER.error("get task log failed, agentTaskId:{} response:{}.", agentTaskId, response); - return null; - } - List dtoList = - JSON.parseArray(JSON.toJSONString(n9eResult.getDat()), N9eTaskStdoutDTO.class); + List dtoList = JsonUtils.stringToArrObj(JsonUtils.toJSONString(nr.getDat()), N9eTaskStdoutLog.class); if (ValidateUtils.isEmptyList(dtoList)) { - return ""; + return Result.buildSuc(new ClusterTaskLog("")); } - return dtoList.get(0).getStdout(); + return Result.buildSuc(new ClusterTaskLog(dtoList.get(0).getStdout())); } catch (Exception e) { - LOGGER.error("get task log failed, agentTaskId:{}.", agentTaskId, e); + LOGGER.error("class=N9e||method=getTaskLog||taskId={}||response={}||errMsg={}||msg=get task log failed", taskId, response, e.getMessage()); } - return null; + return Result.buildFailure("get task log failed"); } private Map buildHeader() { @@ -228,7 +209,7 @@ public class N9e extends AbstractAgent { return headers; } - private Map buildCreateTaskParam(CreationTaskData creationTaskData) { + private N9eCreationTask buildCreateTaskParam(CreationTaskData creationTaskData) { StringBuilder sb = new StringBuilder(); sb.append(creationTaskData.getUuid()).append(",,"); sb.append(creationTaskData.getClusterId()).append(",,"); @@ -240,46 +221,17 @@ public class N9e extends AbstractAgent { sb.append(creationTaskData.getServerPropertiesMd5()).append(",,"); sb.append(creationTaskData.getServerPropertiesUrl()); - Map params = new HashMap<>(10); - params.put("title", String.format("集群ID=%d-升级部署", creationTaskData.getClusterId())); - params.put("batch", BATCH); - params.put("tolerance", TOLERANCE); - params.put("timeout", timeout); - params.put("pause", ListUtils.strList2String(creationTaskData.getPauseList())); - params.put("script", this.script); - params.put("args", sb.toString()); - params.put("account", account); - params.put("action", "pause"); - params.put("hosts", creationTaskData.getHostList()); - return params; - } - - private static String readScriptInJarFile(String fileName) { - InputStream inputStream = N9e.class.getClassLoader().getResourceAsStream(fileName); - if (inputStream == null) { - LOGGER.error("read kcm script failed, filename:{}", fileName); - return ""; - } - - try { - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - String line = null; - StringBuilder stringBuilder = new StringBuilder(""); - - while ((line = bufferedReader.readLine()) != null) { - stringBuilder.append(line); - stringBuilder.append("\n"); - } - return stringBuilder.toString(); - } catch (IOException e) { - LOGGER.error("read kcm script failed, filename:{}", fileName, e); - return ""; - } finally { - try { - inputStream.close(); - } catch (IOException e) { - LOGGER.error("close reading kcm script failed, filename:{}", fileName, e); - } - } + N9eCreationTask n9eCreationTask = new N9eCreationTask(); + n9eCreationTask.setTitle(Constant.TASK_TITLE_PREFIX + "-集群ID:" + creationTaskData.getClusterId()); + n9eCreationTask.setBatch(Constant.AGENT_TASK_BATCH); + n9eCreationTask.setTolerance(Constant.AGENT_TASK_TOLERANCE); + n9eCreationTask.setTimeout(this.timeout); + n9eCreationTask.setPause(ListUtils.strList2String(creationTaskData.getPauseList())); + n9eCreationTask.setScript(this.script); + n9eCreationTask.setArgs(sb.toString()); + n9eCreationTask.setAccount(this.account); + n9eCreationTask.setAction(ClusterTaskActionEnum.PAUSE.getAction()); + n9eCreationTask.setHosts(creationTaskData.getHostList()); + return n9eCreationTask; } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eCreationTask.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eCreationTask.java new file mode 100644 index 00000000..6ca4c85c --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eCreationTask.java @@ -0,0 +1,151 @@ +package com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry; + +import java.util.List; + +public class N9eCreationTask { + /** + * 任务标题 + */ + private String title; + + /** + * 并发度, =2则表示两台并发执行 + */ + private Integer batch; + + /** + * 错误容忍度, 达到容忍度之上时, 任务会被暂停并不可以继续执行 + */ + private Integer tolerance; + + /** + * 单台任务的超时时间(秒) + */ + private Integer timeout; + + /** + * 暂停点, 格式: host1,host2,host3 + */ + private String pause; + + /** + * 任务执行对应的脚本 + */ + private String script; + + /** + * 任务参数 + */ + private String args; + + /** + * 使用的账号 + */ + private String account; + + /** + * 动作 + */ + private String action; + + /** + * 操作的主机列表 + */ + private List hosts; + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public Integer getBatch() { + return batch; + } + + public void setBatch(Integer batch) { + this.batch = batch; + } + + public Integer getTolerance() { + return tolerance; + } + + public void setTolerance(Integer tolerance) { + this.tolerance = tolerance; + } + + public Integer getTimeout() { + return timeout; + } + + public void setTimeout(Integer timeout) { + this.timeout = timeout; + } + + public String getPause() { + return pause; + } + + public void setPause(String pause) { + this.pause = pause; + } + + public String getScript() { + return script; + } + + public void setScript(String script) { + this.script = script; + } + + public String getArgs() { + return args; + } + + public void setArgs(String args) { + this.args = args; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public List getHosts() { + return hosts; + } + + public void setHosts(List hosts) { + this.hosts = hosts; + } + + @Override + public String toString() { + return "N9eCreationTask{" + + "title='" + title + '\'' + + ", batch=" + batch + + ", tolerance=" + tolerance + + ", timeout=" + timeout + + ", pause='" + pause + '\'' + + ", script='" + script + '\'' + + ", args='" + args + '\'' + + ", account='" + account + '\'' + + ", action='" + action + '\'' + + ", hosts=" + hosts + + '}'; + } +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResultDTO.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResult.java similarity index 99% rename from kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResultDTO.java rename to kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResult.java index b787f016..e0e67b0e 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResultDTO.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskResult.java @@ -12,7 +12,7 @@ import java.util.Map; * @author zengqiao * @date 20/9/7 */ -public class N9eTaskResultDTO { +public class N9eTaskResult { private List waiting; private List running; diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskStdoutLog.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskStdoutLog.java new file mode 100644 index 00000000..622aaa3e --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/N9eTaskStdoutLog.java @@ -0,0 +1,35 @@ +package com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry; + +/** + * @author zengqiao + * @date 20/9/7 + */ +public class N9eTaskStdoutLog { + private String host; + + private String stdout; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getStdout() { + return stdout; + } + + public void setStdout(String stdout) { + this.stdout = stdout; + } + + @Override + public String toString() { + return "N9eTaskStdoutDTO{" + + "host='" + host + '\'' + + ", stdout='" + stdout + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/bizenum/N9eTaskStatusEnum.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/bizenum/N9eTaskStatusEnum.java new file mode 100644 index 00000000..4453e703 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/entry/bizenum/N9eTaskStatusEnum.java @@ -0,0 +1,59 @@ +package com.xiaojukeji.kafka.manager.kcm.component.agent.n9e.entry.bizenum; + +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum; + +/** + * @author zengqiao + * @date 20/9/3 + */ +public enum N9eTaskStatusEnum { + DONE(0, "done", ClusterTaskStateEnum.FINISHED), + PAUSE(1, "pause", ClusterTaskStateEnum.BLOCKED), + START(2, "start", ClusterTaskStateEnum.RUNNING), + ; + + private Integer code; + + private String message; + + private ClusterTaskStateEnum status; + + N9eTaskStatusEnum(Integer code, String message, ClusterTaskStateEnum status) { + this.code = code; + this.message = message; + this.status = status; + } + + public Integer getCode() { + return code; + } + + public void setCode(Integer code) { + this.code = code; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public ClusterTaskStateEnum getStatus() { + return status; + } + + public void setStatus(ClusterTaskStateEnum status) { + this.status = status; + } + + public static N9eTaskStatusEnum getByMessage(String message) { + for (N9eTaskStatusEnum elem: N9eTaskStatusEnum.values()) { + if (elem.message.equals(message)) { + return elem; + } + } + return null; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java index 419e66e0..9519efd2 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java @@ -54,7 +54,7 @@ public class S3Service extends AbstractStorageService { InputStream inputStream = null; try { if (!createBucketIfNotExist()) { - return false; + return false; } inputStream = uploadFile.getInputStream(); @@ -95,7 +95,10 @@ public class S3Service extends AbstractStorageService { @Override public String getDownloadBaseUrl() { - return this.endpoint + "/" + this.bucket; + if (this.endpoint.startsWith("http://")) { + return this.endpoint + "/" + this.bucket; + } + return "http://" + this.endpoint + "/" + this.bucket; } private boolean createBucketIfNotExist() { diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/ClusterTaskServiceImpl.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/ClusterTaskServiceImpl.java index a190350a..b3ef959a 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/ClusterTaskServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/ClusterTaskServiceImpl.java @@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.kcm.ClusterTaskService; import com.xiaojukeji.kafka.manager.kcm.common.Converters; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskActionEnum; import com.xiaojukeji.kafka.manager.kcm.common.entry.ClusterTaskConstant; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskLog; import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskSubStatus; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum; import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskSubStateEnum; @@ -34,7 +35,7 @@ import java.util.*; */ @Service("clusterTaskService") public class ClusterTaskServiceImpl implements ClusterTaskService { - private final static Logger LOGGER = LoggerFactory.getLogger(ClusterTaskServiceImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTaskServiceImpl.class); @Autowired private AbstractAgent abstractAgent; @@ -63,13 +64,13 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { } // 创建任务 - Long agentTaskId = abstractAgent.createTask(dtoResult.getData()); - if (ValidateUtils.isNull(agentTaskId)) { + Result createResult = abstractAgent.createTask(dtoResult.getData()); + if (ValidateUtils.isNull(createResult) || createResult.failed()) { return Result.buildFrom(ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED); } try { - if (clusterTaskDao.insert(Converters.convert2ClusterTaskDO(agentTaskId, dtoResult.getData(), operator)) > 0) { + if (clusterTaskDao.insert(Converters.convert2ClusterTaskDO(createResult.getData(), dtoResult.getData(), operator)) > 0) { return Result.buildFrom(ResultStatus.SUCCESS); } } catch (Exception e) { @@ -87,45 +88,44 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { Long agentTaskId = getActiveAgentTaskId(clusterTaskDO); Boolean rollback = inRollback(clusterTaskDO); - ClusterTaskStateEnum stateEnum = abstractAgent.getTaskState(agentTaskId); - if (ClusterTaskActionEnum.START.getMessage().equals(action) - && ClusterTaskStateEnum.BLOCKED.equals(stateEnum)) { + Result stateEnumResult = abstractAgent.getTaskExecuteState(agentTaskId); + if (ValidateUtils.isNull(stateEnumResult) || stateEnumResult.failed()) { + return ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED; + } + + if (ClusterTaskActionEnum.START.getAction().equals(action) && ClusterTaskStateEnum.BLOCKED.equals(stateEnumResult.getData())) { // 暂停状态, 可以执行开始 - return actionTaskExceptRollbackAction(agentTaskId, action, ""); + return actionTaskExceptRollbackAction(agentTaskId, ClusterTaskActionEnum.START, ""); } - if (ClusterTaskActionEnum.PAUSE.getMessage().equals(action) - && ClusterTaskStateEnum.RUNNING.equals(stateEnum)) { + if (ClusterTaskActionEnum.PAUSE.getAction().equals(action) && ClusterTaskStateEnum.RUNNING.equals(stateEnumResult.getData())) { // 运行状态, 可以执行暂停 - return actionTaskExceptRollbackAction(agentTaskId, action, ""); + return actionTaskExceptRollbackAction(agentTaskId, ClusterTaskActionEnum.PAUSE, ""); } - if (ClusterTaskActionEnum.IGNORE.getMessage().equals(action) - || ClusterTaskActionEnum.CANCEL.getMessage().equals(action)) { + if (ClusterTaskActionEnum.IGNORE.getAction().equals(action)) { // 忽略 & 取消随时都可以操作 - return actionTaskExceptRollbackAction(agentTaskId, action, hostname); + return actionTaskExceptRollbackAction(agentTaskId, ClusterTaskActionEnum.IGNORE, hostname); } - if ((!ClusterTaskStateEnum.FINISHED.equals(stateEnum) || !rollback) - && ClusterTaskActionEnum.ROLLBACK.getMessage().equals(action)) { + if (ClusterTaskActionEnum.CANCEL.getAction().equals(action)) { + // 忽略 & 取消随时都可以操作 + return actionTaskExceptRollbackAction(agentTaskId, ClusterTaskActionEnum.CANCEL, hostname); + } + if ((!ClusterTaskStateEnum.FINISHED.equals(stateEnumResult.getData()) || !rollback) + && ClusterTaskActionEnum.ROLLBACK.getAction().equals(action)) { // 暂未操作完时可以回滚, 回滚所有操作过的机器到上一个版本 return actionTaskRollback(clusterTaskDO); } return ResultStatus.OPERATION_FAILED; } - private ResultStatus actionTaskExceptRollbackAction(Long agentId, String action, String hostname) { + private ResultStatus actionTaskExceptRollbackAction(Long agentId, ClusterTaskActionEnum actionEnum, String hostname) { if (!ValidateUtils.isBlank(hostname)) { - return actionHostTaskExceptRollbackAction(agentId, action, hostname); + return actionHostTaskExceptRollbackAction(agentId, actionEnum, hostname); } - if (abstractAgent.actionTask(agentId, action)) { - return ResultStatus.SUCCESS; - } - return ResultStatus.OPERATION_FAILED; + return abstractAgent.actionTask(agentId, actionEnum)? ResultStatus.SUCCESS: ResultStatus.OPERATION_FAILED; } - private ResultStatus actionHostTaskExceptRollbackAction(Long agentId, String action, String hostname) { - if (abstractAgent.actionHostTask(agentId, action, hostname)) { - return ResultStatus.SUCCESS; - } - return ResultStatus.OPERATION_FAILED; + private ResultStatus actionHostTaskExceptRollbackAction(Long agentId, ClusterTaskActionEnum actionEnum, String hostname) { + return abstractAgent.actionHostTask(agentId, actionEnum, hostname)? ResultStatus.SUCCESS: ResultStatus.OPERATION_FAILED; } private ResultStatus actionTaskRollback(ClusterTaskDO clusterTaskDO) { @@ -133,9 +133,9 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { return ResultStatus.OPERATION_FORBIDDEN; } - Map subStatusEnumMap = + Result> subStatusEnumMapResult = abstractAgent.getTaskResult(clusterTaskDO.getAgentTaskId()); - if (ValidateUtils.isNull(subStatusEnumMap)) { + if (ValidateUtils.isNull(subStatusEnumMapResult) || subStatusEnumMapResult.failed()) { return ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED; } @@ -143,7 +143,7 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { List rollbackHostList = new ArrayList<>(); List rollbackPauseHostList = new ArrayList<>(); for (String host: ListUtils.string2StrList(clusterTaskDO.getHostList())) { - ClusterTaskSubStateEnum subStateEnum = subStatusEnumMap.get(host); + ClusterTaskSubStateEnum subStateEnum = subStatusEnumMapResult.getData().get(host); if (ValidateUtils.isNull(subStateEnum)) { // 机器对应的任务查询失败 return ResultStatus.OPERATION_FAILED; @@ -166,17 +166,17 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { clusterTaskDO.setRollbackPauseHostList(ListUtils.strList2String(rollbackPauseHostList)); // 创建任务 - Long agentTaskId = abstractAgent.createTask(Converters.convert2CreationTaskData(clusterTaskDO)); - if (ValidateUtils.isNull(agentTaskId)) { + Result createResult = abstractAgent.createTask(Converters.convert2CreationTaskData(clusterTaskDO)); + if (ValidateUtils.isNull(createResult) || createResult.failed()) { return ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED; } try { - clusterTaskDO.setAgentRollbackTaskId(agentTaskId); + clusterTaskDO.setAgentRollbackTaskId(createResult.getData()); if (clusterTaskDao.updateRollback(clusterTaskDO) <= 0) { return ResultStatus.MYSQL_ERROR; } - abstractAgent.actionTask(clusterTaskDO.getAgentTaskId(), ClusterTaskActionEnum.CANCEL.getMessage()); + abstractAgent.actionTask(clusterTaskDO.getAgentTaskId(), ClusterTaskActionEnum.CANCEL); return ResultStatus.SUCCESS; } catch (Exception e) { LOGGER.error("create cluster task failed, clusterTaskDO:{}.", clusterTaskDO, e); @@ -191,11 +191,11 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { return Result.buildFrom(ResultStatus.TASK_NOT_EXIST); } - String stdoutLog = abstractAgent.getTaskLog(getActiveAgentTaskId(clusterTaskDO, hostname), hostname); - if (ValidateUtils.isNull(stdoutLog)) { + Result stdoutLogResult = abstractAgent.getTaskLog(getActiveAgentTaskId(clusterTaskDO, hostname), hostname); + if (ValidateUtils.isNull(stdoutLogResult) || stdoutLogResult.failed()) { return Result.buildFrom(ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED); } - return new Result<>(stdoutLog); + return new Result<>(stdoutLogResult.getData().getStdout()); } @Override @@ -205,24 +205,33 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { return Result.buildFrom(ResultStatus.TASK_NOT_EXIST); } + Result statusEnumResult = abstractAgent.getTaskExecuteState(getActiveAgentTaskId(clusterTaskDO)); + if (ValidateUtils.isNull(statusEnumResult) || statusEnumResult.failed()) { + return new Result<>(statusEnumResult.getCode(), statusEnumResult.getMessage()); + } + return new Result<>(new ClusterTaskStatus( clusterTaskDO.getId(), clusterTaskDO.getClusterId(), inRollback(clusterTaskDO), - abstractAgent.getTaskState(getActiveAgentTaskId(clusterTaskDO)), + statusEnumResult.getData(), getTaskSubStatus(clusterTaskDO) )); } @Override public ClusterTaskStateEnum getTaskState(Long agentTaskId) { - return abstractAgent.getTaskState(agentTaskId); + Result statusEnumResult = abstractAgent.getTaskExecuteState(agentTaskId); + if (ValidateUtils.isNull(statusEnumResult) || statusEnumResult.failed()) { + return null; + } + return statusEnumResult.getData(); } private List getTaskSubStatus(ClusterTaskDO clusterTaskDO) { Map statusMap = this.getClusterTaskSubState(clusterTaskDO); if (ValidateUtils.isNull(statusMap)) { - return null; + return Collections.emptyList(); } List pauseList = ListUtils.string2StrList(clusterTaskDO.getPauseHostList()); @@ -242,20 +251,22 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { } private Map getClusterTaskSubState(ClusterTaskDO clusterTaskDO) { - Map statusMap = abstractAgent.getTaskResult(clusterTaskDO.getAgentTaskId()); - if (ValidateUtils.isNull(statusMap)) { + Result> statusMapResult = abstractAgent.getTaskResult(clusterTaskDO.getAgentTaskId()); + if (ValidateUtils.isNull(statusMapResult) || statusMapResult.failed()) { return null; } + Map statusMap = statusMapResult.getData(); if (!inRollback(clusterTaskDO)) { return statusMap; } - Map rollbackStatusMap = + Result> rollbackStatusMapResult = abstractAgent.getTaskResult(clusterTaskDO.getAgentRollbackTaskId()); - if (ValidateUtils.isNull(rollbackStatusMap)) { + if (ValidateUtils.isNull(rollbackStatusMapResult) || rollbackStatusMapResult.failed()) { return null; } - statusMap.putAll(rollbackStatusMap); + + statusMap.putAll(rollbackStatusMapResult.getData()); return statusMap; } @@ -276,7 +287,7 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { } catch (Exception e) { LOGGER.error("get all cluster task failed."); } - return null; + return Collections.emptyList(); } @Override @@ -302,9 +313,6 @@ public class ClusterTaskServiceImpl implements ClusterTaskService { } private boolean inRollback(ClusterTaskDO clusterTaskDO) { - if (ClusterTaskConstant.INVALID_AGENT_TASK_ID.equals(clusterTaskDO.getAgentRollbackTaskId())) { - return false; - } - return true; + return !ClusterTaskConstant.INVALID_AGENT_TASK_ID.equals(clusterTaskDO.getAgentRollbackTaskId()); } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java index 37f8753a..bef2fb89 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java @@ -4,12 +4,12 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO; -import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO; import com.xiaojukeji.kafka.manager.common.utils.CopyUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.KafkaFileDao; -import com.xiaojukeji.kafka.manager.kcm.KafkaFileService; +import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO; import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService; +import com.xiaojukeji.kafka.manager.kcm.KafkaFileService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java index 4fe01e22..02a11497 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java @@ -50,7 +50,7 @@ public class GatewayHeartbeatController { doList = JsonUtils.parseTopicConnections(clusterId, jsonObject, System.currentTimeMillis()); } catch (Exception e) { LOGGER.error("class=GatewayHeartbeatController||method=receiveTopicConnections||clusterId={}||brokerId={}||msg=parse data failed||exception={}", clusterId, brokerId, e.getMessage()); - return Result.buildFailure("fail"); + return Result.buildGatewayFailure("fail"); } topicConnectionService.batchAdd(doList); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java index e490368d..425eba75 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java @@ -31,7 +31,6 @@ import java.util.Map; @RestController @RequestMapping(ApiPrefix.GATEWAY_API_V1_PREFIX) public class GatewayServiceDiscoveryController { - private final static Logger LOGGER = LoggerFactory.getLogger(GatewayHeartbeatController.class); @Autowired @@ -65,7 +64,7 @@ public class GatewayServiceDiscoveryController { KafkaBootstrapServerConfig config = gatewayConfigService.getKafkaBootstrapServersConfig(Long.MIN_VALUE); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) { - return Result.buildFailure("call init kafka bootstrap servers failed"); + return Result.buildGatewayFailure("call init kafka bootstrap servers failed"); } if (ValidateUtils.isEmptyMap(config.getClusterIdBootstrapServersMap())) { return Result.buildSuc(); @@ -81,7 +80,7 @@ public class GatewayServiceDiscoveryController { KafkaBootstrapServerConfig config = gatewayConfigService.getKafkaBootstrapServersConfig(versionNumber); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) { - return Result.buildFailure("call update kafka bootstrap servers failed"); + return Result.buildGatewayFailure("call update kafka bootstrap servers failed"); } if (ValidateUtils.isEmptyMap(config.getClusterIdBootstrapServersMap())) { return Result.buildSuc(); @@ -99,7 +98,7 @@ public class GatewayServiceDiscoveryController { public Result getMaxRequestNum(@RequestParam("versionNumber") long versionNumber) { RequestQueueConfig config = gatewayConfigService.getRequestQueueConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return Result.buildFailure("call get request queue size config failed"); + return Result.buildGatewayFailure("call get request queue size config failed"); } if (ValidateUtils.isNull(config.getMaxRequestQueueSize())) { return Result.buildSuc(); @@ -119,7 +118,7 @@ public class GatewayServiceDiscoveryController { public Result getAppIdRate(@RequestParam("versionNumber") long versionNumber) { AppRateConfig config = gatewayConfigService.getAppRateConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return Result.buildFailure("call get app rate config failed"); + return Result.buildGatewayFailure("call get app rate config failed"); } if (ValidateUtils.isNull(config.getAppRateLimit())) { return Result.buildSuc(); @@ -139,7 +138,7 @@ public class GatewayServiceDiscoveryController { public Result getIpRate(@RequestParam("versionNumber") long versionNumber) { IpRateConfig config = gatewayConfigService.getIpRateConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return Result.buildFailure("call get ip rate config failed"); + return Result.buildGatewayFailure("call get ip rate config failed"); } if (ValidateUtils.isNull(config.getIpRateLimit())) { return Result.buildSuc(); @@ -160,7 +159,7 @@ public class GatewayServiceDiscoveryController { SpRateConfig config = gatewayConfigService.getSpRateConfig(versionNumber); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getSpRateMap())) { - return Result.buildFailure("call update kafka bootstrap servers failed"); + return Result.buildGatewayFailure("call update kafka bootstrap servers failed"); } if (ValidateUtils.isEmptyMap(config.getSpRateMap())) { return Result.buildSuc(); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java index 07b7dbc4..2caaa69b 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.op; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.ControllerPreferredCandidateDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.rd.ClusterDTO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.ClusterService; @@ -13,6 +14,7 @@ import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; + /** * @author zengqiao * @date 20/4/23 @@ -25,48 +27,56 @@ public class OpClusterController { private ClusterService clusterService; @ApiOperation(value = "接入集群") - @RequestMapping(value = "clusters", method = RequestMethod.POST) + @PostMapping(value = "clusters") @ResponseBody public Result addNew(@RequestBody ClusterDTO dto) { if (ValidateUtils.isNull(dto) || !dto.legal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } return Result.buildFrom( - clusterService.addNew( - ClusterModelConverter.convert2ClusterDO(dto), - SpringTool.getUserName() - ) + clusterService.addNew(ClusterModelConverter.convert2ClusterDO(dto), SpringTool.getUserName()) ); } @ApiOperation(value = "删除集群") - @RequestMapping(value = "clusters", method = RequestMethod.DELETE) + @DeleteMapping(value = "clusters") @ResponseBody public Result delete(@RequestParam(value = "clusterId") Long clusterId) { return Result.buildFrom(clusterService.deleteById(clusterId, SpringTool.getUserName())); } @ApiOperation(value = "修改集群信息") - @RequestMapping(value = "clusters", method = RequestMethod.PUT) + @PutMapping(value = "clusters") @ResponseBody public Result modify(@RequestBody ClusterDTO reqObj) { if (ValidateUtils.isNull(reqObj) || !reqObj.legal() || ValidateUtils.isNull(reqObj.getClusterId())) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - ResultStatus rs = clusterService.updateById( - ClusterModelConverter.convert2ClusterDO(reqObj), - SpringTool.getUserName() + return Result.buildFrom( + clusterService.updateById(ClusterModelConverter.convert2ClusterDO(reqObj), SpringTool.getUserName()) ); - return Result.buildFrom(rs); } @ApiOperation(value = "开启|关闭集群监控") - @RequestMapping(value = "clusters/{clusterId}/monitor", method = RequestMethod.PUT) + @PutMapping(value = "clusters/{clusterId}/monitor") @ResponseBody - public Result modifyStatus(@PathVariable Long clusterId, - @RequestParam("status") Integer status) { + public Result modifyStatus(@PathVariable Long clusterId, @RequestParam("status") Integer status) { return Result.buildFrom( clusterService.modifyStatus(clusterId, status, SpringTool.getUserName()) ); } + + @ApiOperation(value = "增加Controller优先候选的Broker", notes = "滴滴内部引擎特性") + @PostMapping(value = "cluster-controller/preferred-candidates") + @ResponseBody + public Result addControllerPreferredCandidates(@RequestBody ControllerPreferredCandidateDTO dto) { + return clusterService.addControllerPreferredCandidates(dto.getClusterId(), dto.getBrokerIdList()); + } + + @ApiOperation(value = "删除Controller优先候选的Broker", notes = "滴滴内部引擎特性") + @DeleteMapping(value = "cluster-controller/preferred-candidates") + @ResponseBody + public Result deleteControllerPreferredCandidates(@RequestBody ControllerPreferredCandidateDTO dto) { + return clusterService.deleteControllerPreferredCandidates(dto.getClusterId(), dto.getBrokerIdList()); + } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpGatewayConfigController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpGatewayConfigController.java index a97bb386..66eb3b7e 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpGatewayConfigController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpGatewayConfigController.java @@ -3,8 +3,11 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.op; import com.xiaojukeji.kafka.manager.bpm.common.entry.apply.gateway.OrderExtensionAddGatewayConfigDTO; import com.xiaojukeji.kafka.manager.bpm.common.entry.apply.gateway.OrderExtensionDeleteGatewayConfigDTO; import com.xiaojukeji.kafka.manager.bpm.common.entry.apply.gateway.OrderExtensionModifyGatewayConfigDTO; +import com.xiaojukeji.kafka.manager.common.bizenum.gateway.GatewayConfigKeyEnum; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.gateway.GatewayConfigService; import com.xiaojukeji.kafka.manager.web.converters.GatewayModelConverter; @@ -16,12 +19,20 @@ import org.springframework.web.bind.annotation.*; @Api(tags = "OP-Gateway配置相关接口(REST)") @RestController +@RequestMapping(ApiPrefix.API_V1_OP_PREFIX) public class OpGatewayConfigController { @Autowired private GatewayConfigService gatewayConfigService; + @ApiOperation(value = "Gateway配置类型", notes = "") + @GetMapping(value = "gateway-configs/type-enums") + @ResponseBody + public Result getClusterModesEnum() { + return new Result<>(JsonUtils.toJson(GatewayConfigKeyEnum.class)); + } + @ApiOperation(value = "创建Gateway配置", notes = "") - @RequestMapping(value = "gateway-configs", method = RequestMethod.POST) + @PostMapping(value = "gateway-configs") @ResponseBody public Result createGatewayConfig(@RequestBody OrderExtensionAddGatewayConfigDTO dto) { if (ValidateUtils.isNull(dto) || !dto.legal()) { @@ -31,7 +42,7 @@ public class OpGatewayConfigController { } @ApiOperation(value = "修改Gateway配置", notes = "") - @RequestMapping(value = "gateway-configs", method = RequestMethod.PUT) + @PutMapping(value = "gateway-configs") @ResponseBody public Result modifyGatewayConfig(@RequestBody OrderExtensionModifyGatewayConfigDTO dto) { if (ValidateUtils.isNull(dto) || !dto.legal()) { @@ -41,7 +52,7 @@ public class OpGatewayConfigController { } @ApiOperation(value = "删除Gateway配置", notes = "") - @RequestMapping(value = "gateway-configs", method = RequestMethod.DELETE) + @DeleteMapping(value = "gateway-configs") @ResponseBody public Result deleteGatewayConfig(@RequestBody OrderExtensionDeleteGatewayConfigDTO dto) { if (ValidateUtils.isNull(dto) || !dto.legal()) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdGatewayConfigController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdGatewayConfigController.java index 3748c3ca..6a46ff0a 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdGatewayConfigController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdGatewayConfigController.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.rd; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.GatewayConfigDO; import com.xiaojukeji.kafka.manager.common.entity.vo.rd.GatewayConfigVO; @@ -15,12 +16,13 @@ import java.util.List; @Api(tags = "RD-Gateway配置相关接口(REST)") @RestController +@RequestMapping(ApiPrefix.API_V1_RD_PREFIX) public class RdGatewayConfigController { @Autowired private GatewayConfigService gatewayConfigService; @ApiOperation(value = "Gateway相关配置信息", notes = "") - @RequestMapping(value = "gateway-configs", method = RequestMethod.GET) + @GetMapping(value = "gateway-configs") @ResponseBody public Result> getGatewayConfigs() { List doList = gatewayConfigService.list(); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java index 009d540a..eaab7dc9 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java @@ -1,17 +1,17 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.rd; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum; -import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO; -import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO; import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaFileVO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum; +import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.kcm.KafkaFileService; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.SpringTool; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; -import com.xiaojukeji.kafka.manager.kcm.KafkaFileService; -import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum; -import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.web.converters.KafkaFileConverter; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java index 11f063e6..68068f97 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java @@ -24,14 +24,13 @@ import java.util.List; @RestController @RequestMapping(ApiPrefix.API_V1_RD_PREFIX) public class RdOperateRecordController { - private static final int MAX_RECORD_COUNT = 200; @Autowired private OperateRecordService operateRecordService; @ApiOperation(value = "查询操作记录", notes = "") - @RequestMapping(value = "operate-record", method = RequestMethod.POST) + @PostMapping(value = "operate-record") @ResponseBody public Result> geOperateRecords(@RequestBody OperateRecordDTO dto) { if (ValidateUtils.isNull(dto) || !dto.legal()) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/GatewayModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/GatewayModelConverter.java index f032e921..6a8b5f79 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/GatewayModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/GatewayModelConverter.java @@ -67,6 +67,7 @@ public class GatewayModelConverter { vo.setName(configDO.getName()); vo.setValue(configDO.getValue()); vo.setVersion(configDO.getVersion()); + vo.setDescription(configDO.getDescription()); vo.setCreateTime(configDO.getCreateTime()); vo.setModifyTime(configDO.getModifyTime()); voList.add(vo); @@ -76,18 +77,20 @@ public class GatewayModelConverter { public static GatewayConfigDO convert2GatewayConfigDO(OrderExtensionAddGatewayConfigDTO configDTO) { GatewayConfigDO configDO = new GatewayConfigDO(); - configDO.setType(configDO.getType()); - configDO.setName(configDO.getName()); - configDO.setValue(configDO.getValue()); + configDO.setType(configDTO.getType()); + configDO.setName(configDTO.getName()); + configDO.setValue(configDTO.getValue()); + configDO.setDescription(ValidateUtils.isNull(configDTO.getDescription())? "": configDTO.getDescription()); return configDO; } public static GatewayConfigDO convert2GatewayConfigDO(OrderExtensionModifyGatewayConfigDTO configDTO) { GatewayConfigDO configDO = new GatewayConfigDO(); - configDO.setId(configDO.getId()); - configDO.setType(configDO.getType()); - configDO.setName(configDO.getName()); - configDO.setValue(configDO.getValue()); + configDO.setId(configDTO.getId()); + configDO.setType(configDTO.getType()); + configDO.setName(configDTO.getName()); + configDO.setValue(configDTO.getValue()); + configDO.setDescription(ValidateUtils.isNull(configDTO.getDescription())? "": configDTO.getDescription()); return configDO; } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java index bf8bc1e1..576fe036 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java @@ -119,7 +119,7 @@ public class WebMetricsInterceptor { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String uri = attributes.getRequest().getRequestURI(); if (uri.contains(ApiPrefix.GATEWAY_API_V1_PREFIX)) { - return Result.buildFailure("api limited"); + return Result.buildGatewayFailure("api limited"); } return new Result<>(ResultStatus.OPERATION_FORBIDDEN); } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 5b01d321..6f5438a3 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -31,7 +31,7 @@ logging: custom: idc: cn jmx: - max-conn: 10 + max-conn: 10 # 2.3版本配置不在这个地方生效 store-metrics-task: community: broker-metrics-enabled: true @@ -53,7 +53,7 @@ account: kcm: enabled: false s3: - endpoint: 127.0.0.1 + endpoint: s3.didiyunapi.com access-key: 1234567890 secret-key: 0987654321 bucket: logi-kafka