);
const helpCenter = (
@@ -144,7 +144,7 @@ export const Header = observer((props: IHeader) => {

-
Kafka Manager
+
LogiKM
v2.4.2
{/* 添加版本超链接 */}
diff --git a/kafka-manager-console/src/container/user-center/order-list.tsx b/kafka-manager-console/src/container/user-center/order-list.tsx
index 6c81b0ec..5ed5b961 100644
--- a/kafka-manager-console/src/container/user-center/order-list.tsx
+++ b/kafka-manager-console/src/container/user-center/order-list.tsx
@@ -115,11 +115,19 @@ export class OrderList extends SearchAndFilterContainer {
status,
{
title: '申请时间',
- dataIndex: 'gmtTime',
- key: 'gmtTime',
- sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtTime - a.gmtTime,
- render: (t: number) => moment(t).format(timeFormat),
- }, {
+ dataIndex: 'gmtCreate',
+ key: 'gmtCreate',
+ sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtCreate - a.gmtCreate,
+ render: (t: number) => t ? moment(t).format(timeFormat) : '-',
+ },
+ {
+ title: '审批时间',
+ dataIndex: 'gmtHandle',
+ key: 'gmtHandle',
+ sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtHandle - a.gmtHandle,
+ render: (t: number) => t ? moment(t).format(timeFormat) : '-',
+ },
+ {
title: '操作',
key: 'operation',
dataIndex: 'operation',
diff --git a/kafka-manager-console/src/routers/index.htm b/kafka-manager-console/src/routers/index.htm
index b8d8454f..7cb7a0fd 100644
--- a/kafka-manager-console/src/routers/index.htm
+++ b/kafka-manager-console/src/routers/index.htm
@@ -1,12 +1,15 @@
+
-
KafkaManager
+
LogiKM
+
+
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
index 273b62c6..9ab963aa 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
@@ -13,4 +13,12 @@ public interface TopicExpiredService {
List
getExpiredTopicDataList(String username);
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
+
+ /**
+ * 通过topictopic名称删除
+ * @param clusterId 集群id
+ * @param topicName topic名称
+ * @return int
+ */
+ int deleteByTopicName(Long clusterId, String topicName);
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
index 8a0028c7..594f1aa1 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
@@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService {
@Autowired
private TopicManagerService topicManagerService;
+ @Autowired
+ private TopicExpiredService topicExpiredService;
+
@Autowired
private TopicService topicService;
@@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService {
// 3. 数据库中删除topic
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
+ topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName);
// 4. 数据库中删除authority
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
index ea9d22da..153576c4 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
@@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService {
ZooKeeper zk = null;
try {
- zk = new ZooKeeper(zookeeper, 1000, null);
+ zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name()));
for (int i = 0; i < 15; ++i) {
if (zk.getState().isConnected()) {
// 只有状态是connected的时候,才表示地址是合法的
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
index c51e1dcb..d310af1a 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
@@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService {
}
return ResultStatus.MYSQL_ERROR;
}
+
+ @Override
+ public int deleteByTopicName(Long clusterId, String topicName) {
+ try {
+ return topicExpiredDao.deleteByName(clusterId, topicName);
+ } catch (Exception e) {
+ LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e);
+ }
+ return 0;
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
index 4a8f501f..a25115ef 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
@@ -210,7 +210,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
}
}
- // 增加流量信息
+ // 增加流量和描述信息
Map> metricMap = KafkaMetricsCache.getAllTopicMetricsFromCache();
for (MineTopicSummary mineTopicSummary : summaryList) {
TopicMetrics topicMetrics = getTopicMetricsFromCacheOrJmx(
@@ -219,6 +219,10 @@ public class TopicManagerServiceImpl implements TopicManagerService {
metricMap);
mineTopicSummary.setBytesIn(topicMetrics.getSpecifiedMetrics("BytesInPerSecOneMinuteRate"));
mineTopicSummary.setBytesOut(topicMetrics.getSpecifiedMetrics("BytesOutPerSecOneMinuteRate"));
+
+ // 增加topic描述信息
+ TopicDO topicDO = topicDao.getByTopicName(mineTopicSummary.getPhysicalClusterId(), mineTopicSummary.getTopicName());
+ mineTopicSummary.setDescription(topicDO.getDescription());
}
return summaryList;
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
index 18698941..ea189eb4 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
@@ -17,4 +17,6 @@ public interface TopicExpiredDao {
int replace(TopicExpiredDO expiredDO);
TopicExpiredDO getByTopic(Long clusterId, String topicName);
+
+ int deleteByName(Long clusterId, String topicName);
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
index 51853db7..936d4931 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
@@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao {
params.put("topicName", topicName);
return sqlSession.selectOne("TopicExpiredDao.getByTopic", params);
}
+
+ @Override
+ public int deleteByName(Long clusterId, String topicName) {
+ Map params = new HashMap<>(2);
+ params.put("clusterId", clusterId);
+ params.put("topicName", topicName);
+ return sqlSession.delete("TopicExpiredDao.deleteByName", params);
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
index 39ebf8ca..1da6753a 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
@@ -36,4 +36,8 @@
+
+
+ DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName}
+
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
index baa6f4b0..53e13b2d 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
@@ -25,6 +25,7 @@
WHERE cluster_id = #{clusterId}
AND topic_name = #{topicName}
AND gmt_create BETWEEN #{startTime} AND #{endTime}
+ ORDER BY gmt_create
]]>
@@ -32,6 +33,7 @@
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
index 07a137e6..a6757310 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
@@ -30,16 +30,23 @@ public class CollectAndPublishCommunityTopicMetrics extends AbstractScheduledTas
@Override
protected List listAllTasks() {
+ // 获取需要进行指标采集的集群列表,这些集群将会被拆分到多台KM中进行执行。
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
+ // 这里需要实现对clusterDO这个集群进行Topic指标采集的代码逻辑
+
+ // 进行Topic指标获取
List metricsList = getTopicMetrics(clusterDO.getId());
+
+ // 获取到Topic流量指标之后,发布一个事件,
SpringTool.publish(new TopicMetricsCollectedEvent(this, clusterDO.getId(), metricsList));
}
private List getTopicMetrics(Long clusterId) {
+ // 具体获取Topic流量指标的入口代码
List metricsList =
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
if (ValidateUtils.isEmptyList(metricsList)) {
diff --git a/kafka-manager-web/pom.xml b/kafka-manager-web/pom.xml
index 17504ca7..a959f958 100644
--- a/kafka-manager-web/pom.xml
+++ b/kafka-manager-web/pom.xml
@@ -19,7 +19,7 @@
2.1.1.RELEASE
5.1.3.RELEASE
false
- 8.5.66
+ 8.5.72
@@ -109,8 +109,10 @@
+ kafka-manager
+
org.springframework.boot
spring-boot-maven-plugin
${springframework.boot.version}
@@ -121,6 +123,7 @@
+
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
index bbe8c656..ab6c0ba6 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
@@ -1,15 +1,16 @@
package com.xiaojukeji.kafka.manager.web.converters;
-import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
import com.xiaojukeji.kafka.manager.bpm.common.OrderResult;
+import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
import com.xiaojukeji.kafka.manager.bpm.common.entry.BaseOrderDetailData;
+import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.AccountVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderResultVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.detail.OrderDetailBaseVO;
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,7 +42,9 @@ public class OrderConverter {
}
OrderVO orderVO = new OrderVO();
CopyUtils.copyProperties(orderVO, orderDO);
- orderVO.setGmtTime(orderDO.getGmtCreate());
+ if (OrderStatusEnum.WAIT_DEAL.getCode().equals(orderDO.getStatus())) {
+ orderVO.setGmtHandle(null);
+ }
return orderVO;
}
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
index 97b8f04a..e21c41da 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
@@ -29,6 +29,7 @@ public class TopicMineConverter {
vo.setClusterName(data.getLogicalClusterName());
vo.setBytesIn(data.getBytesIn());
vo.setBytesOut(data.getBytesOut());
+ vo.setDescription(data.getDescription());
voList.add(vo);
}
return voList;
diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml
index 4463d746..a4648a46 100644
--- a/kafka-manager-web/src/main/resources/application.yml
+++ b/kafka-manager-web/src/main/resources/application.yml
@@ -9,6 +9,8 @@ server:
spring:
application:
name: kafkamanager
+ profiles:
+ active: dev
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
@@ -18,8 +20,6 @@ spring:
main:
allow-bean-definition-overriding: true
- profiles:
- active: dev
servlet:
multipart:
max-file-size: 100MB
diff --git a/pom.xml b/pom.xml
index a7c70e54..235e8165 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
- 2.4.2-SNAPSHOT
+ 2.4.3-SNAPSHOT
2.7.0
1.5.13
@@ -26,11 +26,15 @@
1.8
UTF-8
UTF-8
- 8.5.66
+ 8.5.72
+ 3.0.0
+
+
kafka-manager-common
kafka-manager-dao
kafka-manager-core
@@ -42,6 +46,7 @@
kafka-manager-extends/kafka-manager-openapi
kafka-manager-task
kafka-manager-web
+ distribution
@@ -231,4 +236,16 @@
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+
+
\ No newline at end of file