mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
@@ -9,7 +9,7 @@ error_exit ()
|
||||
[ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
if $darwin; then
|
||||
if [ "Darwin" = "$(uname -s)" ]; then
|
||||
|
||||
if [ -x '/usr/libexec/java_home' ] ; then
|
||||
export JAVA_HOME=`/usr/libexec/java_home`
|
||||
|
||||
@@ -40,8 +40,7 @@ thread-pool:
|
||||
|
||||
```
|
||||
|
||||
|
||||
**SQL变更**
|
||||
**SQL 变更**
|
||||
|
||||
```sql
|
||||
-- 多集群管理权限2022-09-06新增
|
||||
@@ -80,12 +79,11 @@ ALTER TABLE `logi_security_oplog`
|
||||
|
||||
### 6.2.2、升级至 `v3.0.0-beta.1`版本
|
||||
|
||||
|
||||
**SQL变更**
|
||||
**SQL 变更**
|
||||
|
||||
1、在`ks_km_broker`表增加了一个监听信息字段。
|
||||
2、为`logi_security_oplog`表operation_methods字段设置默认值''。
|
||||
因此需要执行下面的sql对数据库表进行更新。
|
||||
2、为`logi_security_oplog`表 operation_methods 字段设置默认值''。
|
||||
因此需要执行下面的 sql 对数据库表进行更新。
|
||||
|
||||
```sql
|
||||
ALTER TABLE `ks_km_broker`
|
||||
@@ -98,7 +96,6 @@ ALTER COLUMN `operation_methods` set default '';
|
||||
|
||||
---
|
||||
|
||||
|
||||
### 6.2.3、`2.x`版本 升级至 `v3.0.0-beta.0`版本
|
||||
|
||||
**升级步骤:**
|
||||
@@ -123,14 +120,14 @@ ALTER COLUMN `operation_methods` set default '';
|
||||
UPDATE ks_km_topic
|
||||
INNER JOIN
|
||||
(SELECT
|
||||
topic.cluster_id AS cluster_id,
|
||||
topic.topic_name AS topic_name,
|
||||
topic.description AS description
|
||||
topic.cluster_id AS cluster_id,
|
||||
topic.topic_name AS topic_name,
|
||||
topic.description AS description
|
||||
FROM topic WHERE description != ''
|
||||
) AS t
|
||||
|
||||
ON ks_km_topic.cluster_phy_id = t.cluster_id
|
||||
AND ks_km_topic.topic_name = t.topic_name
|
||||
AND ks_km_topic.id > 0
|
||||
SET ks_km_topic.description = t.description;
|
||||
ON ks_km_topic.cluster_phy_id = t.cluster_id
|
||||
AND ks_km_topic.topic_name = t.topic_name
|
||||
AND ks_km_topic.id > 0
|
||||
SET ks_km_topic.description = t.description;
|
||||
```
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
下面是用户第一次使用我们产品的典型体验路径:
|
||||
|
||||

|
||||

|
||||
|
||||
## 5.3、常用功能
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
@@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName()));
|
||||
}
|
||||
|
||||
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()
|
||||
if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()
|
||||
&& ValidateUtils.isEmptyList(dto.getOffsetList())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息");
|
||||
}
|
||||
|
||||
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()
|
||||
&& ValidateUtils.isNull(dto.getTimestamp())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息");
|
||||
}
|
||||
@@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager {
|
||||
}
|
||||
|
||||
private Result<Map<TopicPartition, Long>> getPartitionOffset(GroupOffsetResetDTO dto) {
|
||||
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) {
|
||||
if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) {
|
||||
return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap(
|
||||
elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()),
|
||||
PartitionOffsetDTO::getOffset,
|
||||
@@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
}
|
||||
|
||||
OffsetSpec offsetSpec = null;
|
||||
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
|
||||
offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp());
|
||||
} else if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getResetType()) {
|
||||
} else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) {
|
||||
offsetSpec = OffsetSpec.earliest();
|
||||
} else {
|
||||
offsetSpec = OffsetSpec.latest();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.topic;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
|
||||
|
||||
@@ -22,25 +22,26 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -160,8 +161,31 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
}
|
||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||
kafkaConsumer.assign(partitionList);
|
||||
|
||||
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
partitionList.forEach(topicPartition -> {
|
||||
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
|
||||
});
|
||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
|
||||
for (TopicPartition partition : partitionList) {
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到最旧
|
||||
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
|
||||
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定时间
|
||||
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
||||
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定位置
|
||||
|
||||
} else {
|
||||
// 默认,重置到最新
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
}
|
||||
}
|
||||
|
||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||
@@ -185,6 +209,15 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
}
|
||||
}
|
||||
|
||||
// 排序
|
||||
if (ObjectUtils.isNotEmpty(voList)) {
|
||||
// 默认按时间倒序排序
|
||||
if (StringUtils.isBlank(dto.getSortType())) {
|
||||
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
||||
}
|
||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
|
||||
}
|
||||
|
||||
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO {
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* @see com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum
|
||||
* @see OffsetTypeEnum
|
||||
*/
|
||||
@NotNull(message = "resetType不允许为空")
|
||||
@ApiModelProperty(value = "重置方式", example = "1")
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.dto.topic;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
@@ -15,7 +16,7 @@ import javax.validation.constraints.NotNull;
|
||||
@Data
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
@ApiModel(description = "Topic记录")
|
||||
public class TopicRecordDTO extends BaseDTO {
|
||||
public class TopicRecordDTO extends PaginationSortDTO {
|
||||
@NotNull(message = "truncate不允许为空")
|
||||
@ApiModelProperty(value = "是否截断", example = "true")
|
||||
private Boolean truncate;
|
||||
@@ -34,4 +35,13 @@ public class TopicRecordDTO extends BaseDTO {
|
||||
|
||||
@ApiModelProperty(value = "预览超时时间", example = "10000")
|
||||
private Long pullTimeoutUnitMs = 8000L;
|
||||
|
||||
/**
|
||||
* @see OffsetTypeEnum
|
||||
*/
|
||||
@ApiModelProperty(value = "offset", example = "")
|
||||
private Integer filterOffsetReset = 0;
|
||||
|
||||
@ApiModelProperty(value = "开始日期时间戳", example = "")
|
||||
private Long startTimestampUnitMs;
|
||||
}
|
||||
|
||||
@@ -64,4 +64,5 @@ public class Constant {
|
||||
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
|
||||
|
||||
public static final Integer DEFAULT_RETRY_TIME = 3;
|
||||
|
||||
}
|
||||
|
||||
@@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 重置offset
|
||||
* offset类型
|
||||
* @author zengqiao
|
||||
* @date 19/4/8
|
||||
*/
|
||||
@Getter
|
||||
public enum GroupOffsetResetEnum {
|
||||
LATEST(0, "重置到最新"),
|
||||
public enum OffsetTypeEnum {
|
||||
LATEST(0, "最新"),
|
||||
|
||||
EARLIEST(1, "重置到最旧"),
|
||||
EARLIEST(1, "最旧"),
|
||||
|
||||
PRECISE_TIMESTAMP(2, "按时间进行重置"),
|
||||
PRECISE_TIMESTAMP(2, "指定时间"),
|
||||
|
||||
PRECISE_OFFSET(3, "重置到指定位置"),
|
||||
PRECISE_OFFSET(3, "指定位置"),
|
||||
|
||||
;
|
||||
|
||||
@@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum {
|
||||
|
||||
private final String message;
|
||||
|
||||
GroupOffsetResetEnum(int resetType, String message) {
|
||||
OffsetTypeEnum(int resetType, String message) {
|
||||
this.resetType = resetType;
|
||||
this.message = message;
|
||||
}
|
||||
@@ -10,6 +10,7 @@ const defaultParams: any = {
|
||||
maxRecords: 100,
|
||||
pullTimeoutUnitMs: 5000,
|
||||
// filterPartitionId: 1,
|
||||
filterOffsetReset: 0
|
||||
};
|
||||
const defaultpaPagination = {
|
||||
current: 1,
|
||||
@@ -29,12 +30,20 @@ const TopicMessages = (props: any) => {
|
||||
const [pagination, setPagination] = useState<any>(defaultpaPagination);
|
||||
const [form] = Form.useForm();
|
||||
|
||||
// 获取消息开始位置
|
||||
const offsetResetList = [
|
||||
{ 'label': 'latest', value: 0 },
|
||||
{ 'label': 'earliest', value: 1 }
|
||||
];
|
||||
|
||||
// 默认排序
|
||||
const defaultSorter = {
|
||||
sortField: 'timestampUnitMs',
|
||||
sortType: 'desc',
|
||||
};
|
||||
|
||||
const [sorter, setSorter] = useState<any>(defaultSorter);
|
||||
|
||||
// 请求接口获取数据
|
||||
const genData = async () => {
|
||||
if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return;
|
||||
@@ -49,7 +58,7 @@ const TopicMessages = (props: any) => {
|
||||
});
|
||||
setPartitionIdList(newPartitionIdList || []);
|
||||
});
|
||||
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' })
|
||||
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' })
|
||||
.then((res: any) => {
|
||||
// setPagination({
|
||||
// current: res.pagination?.pageNo,
|
||||
@@ -87,8 +96,15 @@ const TopicMessages = (props: any) => {
|
||||
history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`);
|
||||
};
|
||||
|
||||
const onTableChange = (pagination: any, filters: any, sorter: any) => {
|
||||
const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => {
|
||||
setPagination(pagination);
|
||||
// 只有排序事件时,触发重新请求后端数据
|
||||
if(extra.action === 'sort') {
|
||||
setSorter({
|
||||
sortField: sorter.field || '',
|
||||
sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''
|
||||
});
|
||||
}
|
||||
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
|
||||
// const sortColumn = sorter.field && toLine(sorter.field);
|
||||
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
|
||||
@@ -96,7 +112,7 @@ const TopicMessages = (props: any) => {
|
||||
|
||||
useEffect(() => {
|
||||
props.positionType === 'Messages' && genData();
|
||||
}, [props, params]);
|
||||
}, [props, params, sorter]);
|
||||
|
||||
return (
|
||||
<>
|
||||
@@ -119,6 +135,15 @@ const TopicMessages = (props: any) => {
|
||||
</div>
|
||||
<div className="messages-query">
|
||||
<Form form={form} layout="inline" onFinish={onFinish}>
|
||||
<Form.Item name="filterOffsetReset">
|
||||
<Select
|
||||
options={offsetResetList}
|
||||
size="small"
|
||||
style={{ width: '120px' }}
|
||||
className={'detail-table-select'}
|
||||
placeholder="请选择offset"
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item name="filterPartitionId">
|
||||
<Select
|
||||
options={partitionIdList}
|
||||
@@ -158,7 +183,7 @@ const TopicMessages = (props: any) => {
|
||||
showQueryForm={false}
|
||||
tableProps={{
|
||||
showHeader: false,
|
||||
rowKey: 'path',
|
||||
rowKey: 'offset',
|
||||
loading: loading,
|
||||
columns: getTopicMessagesColmns(),
|
||||
dataSource: data,
|
||||
@@ -169,6 +194,7 @@ const TopicMessages = (props: any) => {
|
||||
bordered: false,
|
||||
onChange: onTableChange,
|
||||
scroll: { x: 'max-content' },
|
||||
sortDirections: ['descend', 'ascend', 'default']
|
||||
},
|
||||
}}
|
||||
/>
|
||||
|
||||
@@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => {
|
||||
title: 'Timestamp',
|
||||
dataIndex: 'timestampUnitMs',
|
||||
key: 'timestampUnitMs',
|
||||
render: (t: number) => (t ? moment(t).format(timeFormat) : '-'),
|
||||
sorter: true,
|
||||
render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'),
|
||||
},
|
||||
{
|
||||
title: 'Key',
|
||||
|
||||
@@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
|
||||
private Result<ClusterMetrics> getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){
|
||||
List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterId);
|
||||
|
||||
float metricsSum = 0f;
|
||||
for(Topic topic : topics){
|
||||
float sumMetricValue = 0f;
|
||||
for(Topic topic : topics) {
|
||||
Result<List<TopicMetrics>> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst(
|
||||
clusterId,
|
||||
topic.getTopicName(),
|
||||
@@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
|
||||
continue;
|
||||
}
|
||||
|
||||
List<TopicMetrics> topicMetrics = ret.getData();
|
||||
for (TopicMetrics metrics : topicMetrics) {
|
||||
if(metrics.isBBrokerAgg()){
|
||||
metricsSum += Double.valueOf(metrics.getMetrics().get(topicMetric));
|
||||
for (TopicMetrics metrics : ret.getData()) {
|
||||
if(metrics.isBBrokerAgg()) {
|
||||
Float metricValue = metrics.getMetric(topicMetric);
|
||||
sumMetricValue += (metricValue == null? 0f: metricValue);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Result.buildSuc(initWithMetrics(clusterId, metric, metricsSum));
|
||||
return Result.buildSuc(initWithMetrics(clusterId, metric, sumMetricValue));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,13 +4,13 @@ description: knowstreaming-manager Helm chart
|
||||
|
||||
type: application
|
||||
|
||||
version: 0.1.3
|
||||
version: 0.1.4
|
||||
|
||||
maintainers:
|
||||
- email: didicloud@didiglobal.com
|
||||
name: didicloud
|
||||
|
||||
appVersion: "3.0.0-beta.1"
|
||||
appVersion: "3.0.0-beta.2"
|
||||
|
||||
dependencies:
|
||||
- name: knowstreaming-web
|
||||
|
||||
@@ -173,8 +173,8 @@ antiAffinityTopologyKey: "kubernetes.io/hostname"
|
||||
|
||||
# Hard means that by default pods will only be scheduled if there are enough nodes for them
|
||||
# and that they will never end up on the same node. Setting this to soft will do this "best effort"
|
||||
antiAffinity: "hard"
|
||||
|
||||
antiAffinity: ""
|
||||
#antiAffinity: "hard"
|
||||
# This is the node affinity settings as defined in
|
||||
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature
|
||||
nodeAffinity: {}
|
||||
|
||||
@@ -21,7 +21,7 @@ spec:
|
||||
{{- include "ksmysql.selectorLabels" . | nindent 8 }}
|
||||
spec:
|
||||
containers:
|
||||
- image: knowstreaming/knowstreaming-mysql:0.1.0
|
||||
- image: knowstreaming/knowstreaming-mysql:0.2.0
|
||||
name: {{ .Chart.Name }}
|
||||
env:
|
||||
- name: MYSQL_DATABASE
|
||||
|
||||
@@ -71,6 +71,7 @@ data:
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
app-name: know-streaming
|
||||
resource-extend-bean-name: myResourceExtendImpl
|
||||
login-extend-bean-name: logiSecurityDefaultLoginExtendImpl
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
@@ -85,11 +86,16 @@ data:
|
||||
queue-size: 10000 # 每个线程池队列大小
|
||||
select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改
|
||||
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
|
||||
task: # 任务模块的配置
|
||||
heaven: # 采集任务配置
|
||||
thread-num: 20 # 采集任务线程池核心线程数
|
||||
queue-size: 1000 # 采集任务线程池队列大小
|
||||
|
||||
task: # 任务模块的配置
|
||||
metrics: # metrics采集任务配置
|
||||
thread-num: 18 # metrics采集任务线程池核心线程数
|
||||
queue-size: 180 # metrics采集任务线程池队列大小
|
||||
metadata: # metadata同步任务配置
|
||||
thread-num: 27 # metadata同步任务线程池核心线程数
|
||||
queue-size: 270 # metadata同步任务线程池队列大小
|
||||
common: # 剩余其他任务配置
|
||||
thread-num: 15 # 剩余其他任务线程池核心线程数
|
||||
queue-size: 150 # 剩余其他任务线程池队列大小
|
||||
|
||||
|
||||
client-pool:
|
||||
@@ -99,17 +105,16 @@ data:
|
||||
max-total-client-num: 20 # 最大客户端数
|
||||
borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒
|
||||
|
||||
es:
|
||||
client:
|
||||
{{ if .Values.elasticsearch.enabled }}
|
||||
es.client.address: elasticsearch-master:9200
|
||||
#es.client.address: {{ .Release.Name }}-elasticsearch:9200
|
||||
address: elasticsearch-master:9200
|
||||
{{- else }}
|
||||
es.client.address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }}
|
||||
address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }}
|
||||
{{- end }}
|
||||
# es.client.pass: knowstreaming-manager
|
||||
# 集群自动均衡相关配置
|
||||
cluster-balance:
|
||||
ignored-topics:
|
||||
time-second: 300
|
||||
client-cnt: 10
|
||||
io-thread-cnt: 2
|
||||
max-retry-cnt: 5
|
||||
|
||||
# 普罗米修斯指标导出相关配置
|
||||
management:
|
||||
@@ -158,4 +163,3 @@ data:
|
||||
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_topic_metric${logdate} || \
|
||||
exit 2
|
||||
done
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ replicaCount: 2
|
||||
image:
|
||||
repository: knowstreaming/knowstreaming-manager
|
||||
pullPolicy: IfNotPresent
|
||||
tag: "0.1.0"
|
||||
tag: "0.2.0"
|
||||
|
||||
imagePullSecrets: []
|
||||
nameOverride: ""
|
||||
@@ -73,7 +73,7 @@ knowstreaming-web:
|
||||
image:
|
||||
repository: knowstreaming/knowstreaming-ui
|
||||
pullPolicy: IfNotPresent
|
||||
tag: "0.1.0"
|
||||
tag: "0.2.0"
|
||||
|
||||
service:
|
||||
type: NodePort
|
||||
|
||||
@@ -48,7 +48,7 @@ INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `l
|
||||
|
||||
|
||||
-- 初始化用户
|
||||
--INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming');
|
||||
-- INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOVGRSUmxweFUycFNhR0V6ZEdKSk1FRjRVVU5PWkdaVmJ6SlZiWGh6WVVWQ09YdEFWbXBLTkdGcmUxc2pRREpBSTExOVNqWlNiR3hvUUgwPXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming');
|
||||
|
||||
-- 初始化角色
|
||||
@@ -96,4 +96,4 @@ INSERT INTO `logi_security_user_role` (`id`, `user_id`, `role_id`, `is_delete`,
|
||||
INSERT INTO `logi_security_config`
|
||||
(`value_group`,`value_name`,`value`,`edit`,`status`,`memo`,`is_delete`,`app_name`,`operator`)
|
||||
VALUES
|
||||
('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin');
|
||||
('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin');
|
||||
|
||||
@@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
|
||||
lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE);
|
||||
|
||||
BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper);
|
||||
if (brokerPO == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return Broker.buildFrom(brokerPO);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic;
|
||||
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
|
||||
Reference in New Issue
Block a user