mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Feature] Consume just filter key or value, not both. 消费消息支持单独过滤key或者value. (#1157)
close #1155 Consume just filter key or value, not both. 消费消息支持单独过滤key或者value。 --------- Co-authored-by: weidong_chang <weidong_chang@intsig.net>
This commit is contained in:
@@ -144,6 +144,8 @@ const ConsumeClientTest = () => {
|
|||||||
...configInfo,
|
...configInfo,
|
||||||
needFilterKeyValue: changeValue === 1 || changeValue === 2,
|
needFilterKeyValue: changeValue === 1 || changeValue === 2,
|
||||||
needFilterSize: changeValue === 3 || changeValue === 4 || changeValue === 5,
|
needFilterSize: changeValue === 3 || changeValue === 4 || changeValue === 5,
|
||||||
|
needFilterKey: changeValue === 6,
|
||||||
|
needFilterValue: changeValue === 7,
|
||||||
});
|
});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,19 +16,19 @@ export const cardList = [
|
|||||||
|
|
||||||
export const filterList = [
|
export const filterList = [
|
||||||
{
|
{
|
||||||
label: 'none',
|
label: 'None',
|
||||||
value: 0,
|
value: 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
label: 'contains',
|
label: 'Contains',
|
||||||
value: 1,
|
value: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
label: 'does not contains',
|
label: 'Does Not Contains',
|
||||||
value: 2,
|
value: 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
label: 'equals',
|
label: 'Equals',
|
||||||
value: 3,
|
value: 3,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -39,6 +39,14 @@ export const filterList = [
|
|||||||
label: 'Under Size',
|
label: 'Under Size',
|
||||||
value: 5,
|
value: 5,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
label: 'Key Contains',
|
||||||
|
value: 6,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'Value Contains',
|
||||||
|
value: 7,
|
||||||
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
export const untilList = [
|
export const untilList = [
|
||||||
@@ -324,10 +332,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
|
|||||||
key: 'filterKey',
|
key: 'filterKey',
|
||||||
label: 'Key',
|
label: 'Key',
|
||||||
type: FormItemType.input,
|
type: FormItemType.input,
|
||||||
invisible: !info?.needFilterKeyValue,
|
invisible: !info?.needFilterKeyValue && !info?.needFilterKey,
|
||||||
rules: [
|
rules: [
|
||||||
{
|
{
|
||||||
required: info?.needFilterKeyValue,
|
required: info?.needFilterKeyValue || info?.needFilterKey,
|
||||||
message: '请输入Key',
|
message: '请输入Key',
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
@@ -336,10 +344,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
|
|||||||
key: 'filterValue',
|
key: 'filterValue',
|
||||||
label: 'Value',
|
label: 'Value',
|
||||||
type: FormItemType.input,
|
type: FormItemType.input,
|
||||||
invisible: !info?.needFilterKeyValue,
|
invisible: !info?.needFilterKeyValue && !info?.needFilterValue,
|
||||||
rules: [
|
rules: [
|
||||||
{
|
{
|
||||||
required: info?.needFilterKeyValue,
|
required: info?.needFilterKeyValue || info?.needFilterValue,
|
||||||
message: '请输入Value',
|
message: '请输入Value',
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -451,6 +451,18 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
|
|||||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "包含的方式过滤,必须有过滤的key或value");
|
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "包含的方式过滤,必须有过滤的key或value");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// key包含过滤
|
||||||
|
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
|
&& ValidateUtils.isBlank(filter.getFilterCompareKey())) {
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "key包含的方式过滤,必须有过滤的key");
|
||||||
|
}
|
||||||
|
|
||||||
|
// value包含过滤
|
||||||
|
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
|
&& ValidateUtils.isBlank(filter.getFilterCompareValue())) {
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "value包含的方式过滤,必须有过滤的value");
|
||||||
|
}
|
||||||
|
|
||||||
// 不包含过滤
|
// 不包含过滤
|
||||||
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
|
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
&& ValidateUtils.isBlank(filter.getFilterCompareKey()) && ValidateUtils.isBlank(filter.getFilterCompareValue())) {
|
&& ValidateUtils.isBlank(filter.getFilterCompareKey()) && ValidateUtils.isBlank(filter.getFilterCompareValue())) {
|
||||||
@@ -550,6 +562,18 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// key包含过滤
|
||||||
|
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
|
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && consumerRecord.key() != null && consumerRecord.key().toString().contains(filter.getFilterCompareKey()))) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// value包含过滤
|
||||||
|
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
|
&& (!ValidateUtils.isBlank(filter.getFilterCompareValue()) && consumerRecord.value() != null && consumerRecord.value().toString().contains(filter.getFilterCompareValue()))) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// 不包含过滤
|
// 不包含过滤
|
||||||
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
|
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
|
||||||
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && (consumerRecord.key() == null || !consumerRecord.key().toString().contains(filter.getFilterCompareKey())))
|
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && (consumerRecord.key() == null || !consumerRecord.key().toString().contains(filter.getFilterCompareKey())))
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class KafkaConsumerFilterDTO extends BaseDTO {
|
|||||||
/**
|
/**
|
||||||
* @see KafkaConsumerFilterEnum
|
* @see KafkaConsumerFilterEnum
|
||||||
*/
|
*/
|
||||||
@Range(min = 0, max = 5, message = "filterType最大和最小值必须在[0, 5]之间")
|
@Range(min = 0, max = 7, message = "filterType最大和最小值必须在[0, 7]之间")
|
||||||
@ApiModelProperty(value = "开始消费位置的类型", example = "2")
|
@ApiModelProperty(value = "开始消费位置的类型", example = "2")
|
||||||
private Integer filterType;
|
private Integer filterType;
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,10 @@ public enum KafkaConsumerFilterEnum {
|
|||||||
|
|
||||||
UNDER_SIZE(5, "size小于"),
|
UNDER_SIZE(5, "size小于"),
|
||||||
|
|
||||||
|
KEY_CONTAINS(6, "key包含"),
|
||||||
|
|
||||||
|
VALUE_CONTAINS(7, "value包含"),
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final Integer code;
|
private final Integer code;
|
||||||
|
|||||||
Reference in New Issue
Block a user