Add km module kafka

This commit is contained in:
leewei
2023-02-14 14:57:39 +08:00
parent 229140f067
commit 469baad65b
4310 changed files with 736354 additions and 46204 deletions

View File

@@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.test.TestRecord;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
/**
* {@code TestInputTopic} is used to pipe records to topic in {@link TopologyTestDriver}.
* To use {@code TestInputTopic} create a new instance via
* {@link TopologyTestDriver#createInputTopic(String, Serializer, Serializer)}.
* In actual test code, you can pipe new record values, keys and values or list of {@link KeyValue} pairs.
* If you have multiple source topics, you need to create a {@code TestInputTopic} for each.
*
* <h2>Processing messages</h2>
* <pre>{@code
* private TestInputTopic<Long, String> inputTopic;
* ...
* inputTopic = testDriver.createInputTopic(INPUT_TOPIC, longSerializer, stringSerializer);
* ...
* inputTopic.pipeInput("Hello");
* }</pre>
*
* @param <K> the type of the record key
* @param <V> the type of the record value
* @see TopologyTestDriver
*/
public class TestInputTopic<K, V> {
private final TopologyTestDriver driver;
private final String topic;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
//Timing
private Instant currentTime;
private final Duration advanceDuration;
TestInputTopic(final TopologyTestDriver driver,
final String topicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final Instant startTimestamp,
final Duration autoAdvance) {
Objects.requireNonNull(driver, "TopologyTestDriver cannot be null");
Objects.requireNonNull(topicName, "topicName cannot be null");
Objects.requireNonNull(keySerializer, "keySerializer cannot be null");
Objects.requireNonNull(valueSerializer, "valueSerializer cannot be null");
Objects.requireNonNull(startTimestamp, "startTimestamp cannot be null");
Objects.requireNonNull(autoAdvance, "autoAdvance cannot be null");
this.driver = driver;
this.topic = topicName;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.currentTime = startTimestamp;
if (autoAdvance.isNegative()) {
throw new IllegalArgumentException("autoAdvance must be positive");
}
this.advanceDuration = autoAdvance;
}
/**
* Advances the internally tracked event time of this input topic.
* Each time a record without explicitly defined timestamp is piped,
* the current topic event time is used as record timestamp.
* <p>
* Note: advancing the event time on the input topic, does not advance the tracked stream time in
* {@link TopologyTestDriver} as long as no new input records are piped.
* Furthermore, it does not advance the wall-clock time of {@link TopologyTestDriver}.
*
* @param advance the duration of time to advance
*/
public void advanceTime(final Duration advance) {
if (advance.isNegative()) {
throw new IllegalArgumentException("advance must be positive");
}
currentTime = currentTime.plus(advance);
}
private Instant getTimestampAndAdvance() {
final Instant timestamp = currentTime;
currentTime = currentTime.plus(advanceDuration);
return timestamp;
}
/**
* Send an input record with the given record on the topic and then commit the records.
* May auto advance topic time.
*
* @param record the record to sent
*/
public void pipeInput(final TestRecord<K, V> record) {
//if record timestamp not set get timestamp and advance
final Instant timestamp = (record.getRecordTime() == null) ? getTimestampAndAdvance() : record.getRecordTime();
driver.pipeRecord(topic, record, keySerializer, valueSerializer, timestamp);
}
/**
* Send an input record with the given value on the topic and then commit the records.
* May auto advance topic time.
*
* @param value the record value
*/
public void pipeInput(final V value) {
pipeInput(new TestRecord<>(value));
}
/**
* Send an input record with the given key and value on the topic and then commit the records.
* May auto advance topic time
*
* @param key the record key
* @param value the record value
*/
public void pipeInput(final K key,
final V value) {
pipeInput(new TestRecord<>(key, value));
}
/**
* Send an input record with the given value and timestamp on the topic and then commit the records.
* Does not auto advance internally tracked time.
*
* @param value the record value
* @param timestamp the record timestamp
*/
public void pipeInput(final V value,
final Instant timestamp) {
pipeInput(new TestRecord<>(null, value, timestamp));
}
/**
* Send an input record with the given key, value and timestamp on the topic and then commit the records.
* Does not auto advance internally tracked time.
*
* @param key the record key
* @param value the record value
* @param timestampMs the record timestamp
*/
public void pipeInput(final K key,
final V value,
final long timestampMs) {
pipeInput(new TestRecord<>(key, value, null, timestampMs));
}
/**
* Send an input record with the given key, value and timestamp on the topic and then commit the records.
* Does not auto advance internally tracked time.
*
* @param key the record key
* @param value the record value
* @param timestamp the record timestamp
*/
public void pipeInput(final K key,
final V value,
final Instant timestamp) {
pipeInput(new TestRecord<>(key, value, timestamp));
}
/**
* Send input records with the given KeyValue list on the topic then commit each record individually.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param records the list of TestRecord records
*/
public void pipeRecordList(final List<? extends TestRecord<K, V>> records) {
for (final TestRecord<K, V> record : records) {
pipeInput(record);
}
}
/**
* Send input records with the given KeyValue list on the topic then commit each record individually.
* The timestamp will be generated based on the constructor provided start time and time will auto advance based on
* {@link #TestInputTopic(TopologyTestDriver, String, Serializer, Serializer, Instant, Duration) autoAdvance} setting.
*
* @param keyValues the {@link List} of {@link KeyValue} records
*/
public void pipeKeyValueList(final List<KeyValue<K, V>> keyValues) {
for (final KeyValue<K, V> keyValue : keyValues) {
pipeInput(keyValue.key, keyValue.value);
}
}
/**
* Send input records with the given value list on the topic then commit each record individually.
* The timestamp will be generated based on the constructor provided start time and time will auto advance based on
* {@link #TestInputTopic(TopologyTestDriver, String, Serializer, Serializer, Instant, Duration) autoAdvance} setting.
*
* @param values the {@link List} of {@link KeyValue} records
*/
public void pipeValueList(final List<V> values) {
for (final V value : values) {
pipeInput(value);
}
}
/**
* Send input records with the given {@link KeyValue} list on the topic then commit each record individually.
* Does not auto advance internally tracked time.
*
* @param keyValues the {@link List} of {@link KeyValue} records
* @param startTimestamp the timestamp for the first generated record
* @param advance the time difference between two consecutive generated records
*/
public void pipeKeyValueList(final List<KeyValue<K, V>> keyValues,
final Instant startTimestamp,
final Duration advance) {
Instant recordTime = startTimestamp;
for (final KeyValue<K, V> keyValue : keyValues) {
pipeInput(keyValue.key, keyValue.value, recordTime);
recordTime = recordTime.plus(advance);
}
}
/**
* Send input records with the given value list on the topic then commit each record individually.
* The timestamp will be generated based on the constructor provided start time and time will auto advance based on
* {@link #TestInputTopic(TopologyTestDriver, String, Serializer, Serializer, Instant, Duration) autoAdvance} setting.
*
* @param values the {@link List} of values
* @param startTimestamp the timestamp for the first generated record
* @param advance the time difference between two consecutive generated records
*/
public void pipeValueList(final List<V> values,
final Instant startTimestamp,
final Duration advance) {
Instant recordTime = startTimestamp;
for (final V value : values) {
pipeInput(value, recordTime);
recordTime = recordTime.plus(advance);
}
}
@Override
public String toString() {
return new StringJoiner(", ", TestInputTopic.class.getSimpleName() + "[", "]")
.add("topic='" + topic + "'")
.add("keySerializer=" + keySerializer.getClass().getSimpleName())
.add("valueSerializer=" + valueSerializer.getClass().getSimpleName())
.toString();
}
}

View File

@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.test.TestRecord;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
/**
* {@code TestOutputTopic} is used to read records from a topic in {@link TopologyTestDriver}.
* To use {@code TestOutputTopic} create a new instance via
* {@link TopologyTestDriver#createOutputTopic(String, Deserializer, Deserializer)}.
* In actual test code, you can read record values, keys, {@link KeyValue} or {@link TestRecord}
* If you have multiple source topics, you need to create a {@code TestOutputTopic} for each.
* <p>
* If you need to test key, value and headers, use {@link #readRecord()} methods.
* Using {@link #readKeyValue()} you get a {@link KeyValue} pair, and thus, don't get access to the record's
* timestamp or headers.
* Similarly using {@link #readValue()} you only get the value of a record.
*
* <h2>Processing records</h2>
* <pre>{@code
* private TestOutputTopic<String, Long> outputTopic;
* ...
* outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC, stringDeserializer, longDeserializer);
* ...
* assertThat(outputTopic.readValue()).isEqual(1);
* }</pre>
*
* @param <K> the type of the record key
* @param <V> the type of the record value
* @see TopologyTestDriver
*/
public class TestOutputTopic<K, V> {
private final TopologyTestDriver driver;
private final String topic;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
TestOutputTopic(final TopologyTestDriver driver,
final String topicName,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
Objects.requireNonNull(driver, "TopologyTestDriver cannot be null");
Objects.requireNonNull(topicName, "topicName cannot be null");
Objects.requireNonNull(keyDeserializer, "keyDeserializer cannot be null");
Objects.requireNonNull(valueDeserializer, "valueDeserializer cannot be null");
this.driver = driver;
this.topic = topicName;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
/**
* Read one record from the output topic and return record's value.
*
* @return Next value for output topic.
*/
public V readValue() {
final TestRecord<K, V> record = readRecord();
return record.value();
}
/**
* Read one record from the output topic and return its key and value as pair.
*
* @return Next output as {@link KeyValue}.
*/
public KeyValue<K, V> readKeyValue() {
final TestRecord<K, V> record = readRecord();
return new KeyValue<>(record.key(), record.value());
}
/**
* Read one Record from output topic.
*
* @return Next output as {@link TestRecord}.
*/
public TestRecord<K, V> readRecord() {
return driver.readRecord(topic, keyDeserializer, valueDeserializer);
}
/**
* Read output to List.
* This method can be used if the result is considered a stream.
* If the result is considered a table, the list will contain all updated, ie, a key might be contained multiple times.
* If you are only interested in the last table update (ie, the final table state),
* you can use {@link #readKeyValuesToMap()} instead.
*
* @return List of output.
*/
public List<TestRecord<K, V>> readRecordsToList() {
final List<TestRecord<K, V>> output = new LinkedList<>();
while (!isEmpty()) {
output.add(readRecord());
}
return output;
}
/**
* Read output to map.
* This method can be used if the result is considered a table,
* when you are only interested in the last table update (ie, the final table state).
* If the result is considered a stream, you can use {@link #readRecordsToList()} instead.
* The list will contain all updated, ie, a key might be contained multiple times.
* If the last update to a key is a delete/tombstone, the key will still be in the map (with null-value).
*
* @return Map of output by key.
*/
public Map<K, V> readKeyValuesToMap() {
final Map<K, V> output = new HashMap<>();
TestRecord<K, V> outputRow;
while (!isEmpty()) {
outputRow = readRecord();
if (outputRow.key() == null) {
throw new IllegalStateException("Null keys not allowed with readKeyValuesToMap method");
}
output.put(outputRow.key(), outputRow.value());
}
return output;
}
/**
* Read all KeyValues from topic to List.
*
* @return List of output KeyValues.
*/
public List<KeyValue<K, V>> readKeyValuesToList() {
final List<KeyValue<K, V>> output = new LinkedList<>();
KeyValue<K, V> outputRow;
while (!isEmpty()) {
outputRow = readKeyValue();
output.add(outputRow);
}
return output;
}
/**
* Read all values from topic to List.
*
* @return List of output values.
*/
public List<V> readValuesToList() {
final List<V> output = new LinkedList<>();
V outputValue;
while (!isEmpty()) {
outputValue = readValue();
output.add(outputValue);
}
return output;
}
/**
* Get size of unread record in the topic queue.
*
* @return size of topic queue.
*/
public final long getQueueSize() {
return driver.getQueueSize(topic);
}
/**
* Verify if the topic queue is empty.
*
* @return {@code true} if no more record in the topic queue.
*/
public final boolean isEmpty() {
return driver.isEmpty(topic);
}
@Override
public String toString() {
return new StringJoiner(", ", TestOutputTopic.class.getSimpleName() + "[", "]")
.add("topic='" + topic + "'")
.add("keyDeserializer=" + keyDeserializer.getClass().getSimpleName())
.add("valueDeserializer=" + valueDeserializer.getClass().getSimpleName())
.add("size=" + getQueueSize())
.toString();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
import java.util.List;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
super(inner);
}
@Override
public void init(final ProcessorContext context,
final StateStore root) {
inner.init(context, root);
}
@Override
public void put(final K key,
final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
}
@Override
public V putIfAbsent(final K key,
final V value) {
return getValueOrNull(inner.putIfAbsent(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)));
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
for (final KeyValue<K, V> entry : entries) {
inner.put(entry.key, ValueAndTimestamp.make(entry.value, ConsumerRecord.NO_TIMESTAMP));
}
}
@Override
public V delete(final K key) {
return getValueOrNull(inner.delete(key));
}
@Override
public void flush() {
inner.flush();
}
@Override
public void close() {
inner.close();
}
@Override
public String name() {
return inner.name();
}
@Override
public boolean persistent() {
return inner.persistent();
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> implements WindowStore<K, V> {
public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
super(store);
}
@Override
public void init(final ProcessorContext context,
final StateStore root) {
inner.init(context, root);
}
@Deprecated
@Override
public void put(final K key,
final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
}
@Override
public void put(final K key,
final V value,
final long windowStartTimestamp) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
}
@Override
public void flush() {
inner.flush();
}
@Override
public void close() {
inner.close();
}
@Override
public String name() {
return inner.name();
}
@Override
public boolean persistent() {
return inner.persistent();
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
}

View File

@@ -0,0 +1,543 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
* {@link Transformer}, and {@link ValueTransformer} implementations.
* <p>
* The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
* tests that serve as example usage.
* <p>
* Note that this class does not take any automated actions (such as firing scheduled punctuators).
* It simply captures any data it witnesses.
* If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
* {@link Topology} and using the {@link TopologyTestDriver}.
*/
public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
// Immutable fields ================================================
private final StreamsMetricsImpl metrics;
private final TaskId taskId;
private final StreamsConfig config;
private final File stateDir;
// settable record metadata ================================================
private String topic;
private Integer partition;
private Long offset;
private Headers headers;
private Long timestamp;
// mocks ================================================
private final Map<String, StateStore> stateStores = new HashMap<>();
private final List<CapturedPunctuator> punctuators = new LinkedList<>();
private final List<CapturedForward> capturedForwards = new LinkedList<>();
private boolean committed = false;
/**
* {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
*/
public static class CapturedPunctuator {
private final long intervalMs;
private final PunctuationType type;
private final Punctuator punctuator;
private boolean cancelled = false;
private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
this.intervalMs = intervalMs;
this.type = type;
this.punctuator = punctuator;
}
@SuppressWarnings({"WeakerAccess", "unused"})
public long getIntervalMs() {
return intervalMs;
}
@SuppressWarnings({"WeakerAccess", "unused"})
public PunctuationType getType() {
return type;
}
@SuppressWarnings({"WeakerAccess", "unused"})
public Punctuator getPunctuator() {
return punctuator;
}
@SuppressWarnings({"WeakerAccess", "unused"})
public void cancel() {
cancelled = true;
}
@SuppressWarnings({"WeakerAccess", "unused"})
public boolean cancelled() {
return cancelled;
}
}
public static class CapturedForward {
private final String childName;
private final long timestamp;
private final KeyValue keyValue;
private CapturedForward(final To to, final KeyValue keyValue) {
if (keyValue == null) {
throw new IllegalArgumentException();
}
this.childName = to.childName;
this.timestamp = to.timestamp;
this.keyValue = keyValue;
}
/**
* The child this data was forwarded to.
*
* @return The child name, or {@code null} if it was broadcast.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public String childName() {
return childName;
}
/**
* The timestamp attached to the forwarded record.
*
* @return A timestamp, or {@code -1} if none was forwarded.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public long timestamp() {
return timestamp;
}
/**
* The data forwarded.
*
* @return A key/value pair. Not null.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public KeyValue keyValue() {
return keyValue;
}
@Override
public String toString() {
return "CapturedForward{" +
"childName='" + childName + '\'' +
", timestamp=" + timestamp +
", keyValue=" + keyValue +
'}';
}
}
// constructors ================================================
/**
* Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
* Most unit tests using this mock won't need to know the taskId,
* and most unit tests should be able to get by with the
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext() {
//noinspection DoubleBraceInitialization
this(
new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, "");
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
}
},
new TaskId(0, 0),
null);
}
/**
* Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
* Most unit tests using this mock won't need to know the taskId,
* and most unit tests should be able to get by with the
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
*
* @param config a Properties object, used to configure the context and the processor.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config) {
this(config, new TaskId(0, 0), null);
}
/**
* Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
*
* @param config a {@link Properties} object, used to configure the context and the processor.
* @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
final MetricConfig metricConfig = new MetricConfig();
metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
final String threadId = Thread.currentThread().getName();
this.metrics = new StreamsMetricsImpl(
new Metrics(metricConfig),
threadId,
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
);
TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
}
@Override
public String applicationId() {
return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}
@Override
public TaskId taskId() {
return taskId;
}
@Override
public Map<String, Object> appConfigs() {
final Map<String, Object> combined = new HashMap<>();
combined.putAll(config.originals());
combined.putAll(config.values());
return combined;
}
@Override
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return config.originalsWithPrefix(prefix);
}
@Override
public Serde<?> keySerde() {
return config.defaultKeySerde();
}
@Override
public Serde<?> valueSerde() {
return config.defaultValueSerde();
}
@Override
public File stateDir() {
return stateDir;
}
@Override
public StreamsMetrics metrics() {
return metrics;
}
// settable record metadata ================================================
/**
* The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set them directly.
*
* @param topic A topic name
* @param partition A partition number
* @param offset A record offset
* @param timestamp A record timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setRecordMetadata(final String topic,
final int partition,
final long offset,
final Headers headers,
final long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.headers = headers;
this.timestamp = timestamp;
}
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param topic A topic name
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setTopic(final String topic) {
this.topic = topic;
}
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param partition A partition number
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setPartition(final int partition) {
this.partition = partition;
}
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param offset A record offset
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setOffset(final long offset) {
this.offset = offset;
}
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param headers Record headers
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setHeaders(final Headers headers) {
this.headers = headers;
}
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param timestamp A record timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
@Override
public String topic() {
if (topic == null) {
throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
}
return topic;
}
@Override
public int partition() {
if (partition == null) {
throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
}
return partition;
}
@Override
public long offset() {
if (offset == null) {
throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
}
return offset;
}
@Override
public Headers headers() {
return headers;
}
@Override
public long timestamp() {
if (timestamp == null) {
throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
}
return timestamp;
}
// mocks ================================================
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
stateStores.put(store.name(), store);
}
@Override
public StateStore getStateStore(final String name) {
return stateStores.get(name);
}
@Override
@Deprecated
public Cancellable schedule(final long intervalMs,
final PunctuationType type,
final Punctuator callback) {
final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
punctuators.add(capturedPunctuator);
return capturedPunctuator::cancel;
}
@SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
return schedule(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback);
}
/**
* Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
*
* @return A list of captured punctuators.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedPunctuator> scheduledPunctuators() {
return new LinkedList<>(punctuators);
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value) {
forward(key, value, To.all());
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value, final To to) {
capturedForwards.add(
new CapturedForward(
to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to,
new KeyValue(key, value)
)
);
}
@Override
@Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException(
"Forwarding to a child by index is deprecated. " +
"Please transition processors to forward using a 'To' object instead."
);
}
@Override
@Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException(
"Forwarding to a child by name is deprecated. " +
"Please transition processors to forward using 'To.child(childName)' instead."
);
}
/**
* Get all the forwarded data this context has observed. The returned list will not be
* affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
* {@code forward(...)}.
*
* @return A list of key/value pairs that were previously passed to the context.
*/
public List<CapturedForward> forwarded() {
return new LinkedList<>(capturedForwards);
}
/**
* Get all the forwarded data this context has observed for a specific child by name.
* The returned list will not be affected by subsequent interactions with the context.
* The data in the list is in the same order as the calls to {@code forward(...)}.
*
* @param childName The child name to retrieve forwards for
* @return A list of key/value pairs that were previously passed to the context.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedForward> forwarded(final String childName) {
final LinkedList<CapturedForward> result = new LinkedList<>();
for (final CapturedForward capture : capturedForwards) {
if (capture.childName() == null || capture.childName().equals(childName)) {
result.add(capture);
}
}
return result;
}
/**
* Clear the captured forwarded data.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void resetForwards() {
capturedForwards.clear();
}
@Override
public void commit() {
committed = true;
}
/**
* Whether {@link ProcessorContext#commit()} has been called in this context.
*
* @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
*/
@SuppressWarnings("WeakerAccess")
public boolean committed() {
return committed;
}
/**
* Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void resetCommit() {
committed = false;
}
@Override
public RecordCollector recordCollector() {
// This interface is assumed by state stores that add change-logging.
// Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
throw new UnsupportedOperationException(
"MockProcessorContext does not provide record collection. " +
"For processor unit tests, use an in-memory state store with change-logging disabled. " +
"Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
);
}
}

View File

@@ -0,0 +1,582 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Factory to create {@link ConsumerRecord consumer records} for a single single-partitioned topic with given key and
* value {@link Serializer serializers}.
*
* @deprecated Since 2.4 use methods of {@link TestInputTopic} instead
*
* @param <K> the type of the key
* @param <V> the type of the value
*
* @see TopologyTestDriver
*/
@Deprecated
public class ConsumerRecordFactory<K, V> {
private final String topicName;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private long timeMs;
private final long advanceMs;
/**
* Create a new factory for the given topic.
* Uses current system time as start timestamp.
* Auto-advance is disabled.
*
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(null, keySerializer, valueSerializer, System.currentTimeMillis());
}
/**
* Create a new factory for the given topic.
* Uses current system time as start timestamp.
* Auto-advance is disabled.
*
* @param defaultTopicName the default topic name used for all generated {@link ConsumerRecord consumer records}
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(defaultTopicName, keySerializer, valueSerializer, System.currentTimeMillis());
}
/**
* Create a new factory for the given topic.
* Auto-advance is disabled.
*
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs) {
this(null, keySerializer, valueSerializer, startTimestampMs, 0L);
}
/**
* Create a new factory for the given topic.
* Auto-advance is disabled.
*
* @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs) {
this(defaultTopicName, keySerializer, valueSerializer, startTimestampMs, 0L);
}
/**
* Create a new factory for the given topic.
*
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
* @param autoAdvanceMs the time increment pre generated record
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs,
final long autoAdvanceMs) {
this(null, keySerializer, valueSerializer, startTimestampMs, autoAdvanceMs);
}
/**
* Create a new factory for the given topic.
*
* @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
* @param autoAdvanceMs the time increment pre generated record
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs,
final long autoAdvanceMs) {
Objects.requireNonNull(keySerializer, "keySerializer cannot be null");
Objects.requireNonNull(valueSerializer, "valueSerializer cannot be null");
this.topicName = defaultTopicName;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
timeMs = startTimestampMs;
advanceMs = autoAdvanceMs;
}
/**
* Advances the internally tracked time.
*
* @param advanceMs the amount of time to advance
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public void advanceTimeMs(final long advanceMs) {
if (advanceMs < 0) {
throw new IllegalArgumentException("advanceMs must be positive");
}
timeMs += advanceMs;
}
/**
* Create a {@link ConsumerRecord} with the given topic name, key, value, headers, and timestamp.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param key the record key
* @param value the record value
* @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
final Headers headers,
final long timestampMs) {
Objects.requireNonNull(topicName, "topicName cannot be null.");
Objects.requireNonNull(headers, "headers cannot be null.");
final byte[] serializedKey = keySerializer.serialize(topicName, headers, key);
final byte[] serializedValue = valueSerializer.serialize(topicName, headers, value);
return new ConsumerRecord<>(
topicName,
-1,
-1L,
timestampMs,
TimestampType.CREATE_TIME,
(long) ConsumerRecord.NULL_CHECKSUM,
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
serializedValue,
headers);
}
/**
* Create a {@link ConsumerRecord} with the given topic name and given topic, key, value, and timestamp.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param key the record key
* @param value the record value
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
final long timestampMs) {
return create(topicName, key, value, new RecordHeaders(), timestampMs);
}
/**
* Create a {@link ConsumerRecord} with default topic name and given key, value, and timestamp.
* Does not auto advance internally tracked time.
*
* @param key the record key
* @param value the record value
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final long timestampMs) {
return create(key, value, new RecordHeaders(), timestampMs);
}
/**
* Create a {@link ConsumerRecord} with default topic name and given key, value, headers, and timestamp.
* Does not auto advance internally tracked time.
*
* @param key the record key
* @param value the record value
* @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final Headers headers,
final long timestampMs) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, K key, V value, long timestampMs) instead.");
}
return create(topicName, key, value, headers, timestampMs);
}
/**
* Create a {@link ConsumerRecord} with the given topic name, key, and value.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param key the record key
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value) {
final long timestamp = timeMs;
timeMs += advanceMs;
return create(topicName, key, value, new RecordHeaders(), timestamp);
}
/**
* Create a {@link ConsumerRecord} with the given topic name, key, value, and headers.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param key the record key
* @param value the record value
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
final Headers headers) {
final long timestamp = timeMs;
timeMs += advanceMs;
return create(topicName, key, value, headers, timestamp);
}
/**
* Create a {@link ConsumerRecord} with default topic name and given key and value.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param key the record key
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value) {
return create(key, value, new RecordHeaders());
}
/**
* Create a {@link ConsumerRecord} with default topic name and given key, value, and headers.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param key the record key
* @param value the record value
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final Headers headers) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, K key, V value) instead.");
}
return create(topicName, key, value, headers);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param value the record value
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final long timestampMs) {
return create(topicName, null, value, new RecordHeaders(), timestampMs);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, headers, and timestamp.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param value the record value
* @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final Headers headers,
final long timestampMs) {
return create(topicName, null, value, headers, timestampMs);
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp.
* Does not auto advance internally tracked time.
*
* @param value the record value
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final long timestampMs) {
return create(value, new RecordHeaders(), timestampMs);
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value, headers, and timestamp.
* Does not auto advance internally tracked time.
*
* @param value the record value
* @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final Headers headers,
final long timestampMs) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, V value, long timestampMs) instead.");
}
return create(topicName, value, headers, timestampMs);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and headers.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param value the record value
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final Headers headers) {
return create(topicName, null, value, headers);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value) {
return create(topicName, null, value, new RecordHeaders());
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value) {
return create(value, new RecordHeaders());
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value and headers.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param value the record value
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final Headers headers) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, V value, long timestampMs) instead.");
}
return create(topicName, value, headers);
}
/**
* Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param keyValues the record keys and values
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues) {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
for (final KeyValue<K, V> keyValue : keyValues) {
records.add(create(topicName, keyValue.key, keyValue.value));
}
return records;
}
/**
* Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
* The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param keyValues the record keys and values
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, List<KeyValue<K, V>> keyValues) instead.");
}
return create(topicName, keyValues);
}
/**
* Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param keyValues the record keys and values
* @param startTimestamp the timestamp for the first generated record
* @param advanceMs the time difference between two consecutive generated records
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues,
final long startTimestamp,
final long advanceMs) {
if (advanceMs < 0) {
throw new IllegalArgumentException("advanceMs must be positive");
}
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
long timestamp = startTimestamp;
for (final KeyValue<K, V> keyValue : keyValues) {
records.add(create(topicName, keyValue.key, keyValue.value, new RecordHeaders(), timestamp));
timestamp += advanceMs;
}
return records;
}
/**
* Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
* Does not auto advance internally tracked time.
*
* @param keyValues the record keys and values
* @param startTimestamp the timestamp for the first generated record
* @param advanceMs the time difference between two consecutive generated records
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
final long startTimestamp,
final long advanceMs) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs) instead.");
}
return create(topicName, keyValues, startTimestamp, advanceMs);
}
/**
* Creates {@link ConsumerRecord consumer records} with the given topic name, keys and values.
* For each generated record, the time is advanced by 1.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param keyValues the record keys and values
* @param startTimestamp the timestamp for the first generated record
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues,
final long startTimestamp) {
return create(topicName, keyValues, startTimestamp, 1);
}
/**
* Creates {@link ConsumerRecord consumer records} with the given keys and values.
* For each generated record, the time is advanced by 1.
* Does not auto advance internally tracked time.
*
* @param keyValues the record keys and values
* @param startTimestamp the timestamp for the first generated record
* @return the generated {@link ConsumerRecord consumer records}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
final long startTimestamp) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp) instead.");
}
return create(topicName, keyValues, startTimestamp, 1);
}
}

View File

@@ -0,0 +1,458 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import java.util.Objects;
/**
* Helper class to verify topology result records.
*
* @deprecated Since 2.4 use methods of {@link TestOutputTopic} and standard assertion libraries instead
*
* @see TopologyTestDriver
*/
@Deprecated
public class OutputVerifier {
/**
* Compares a {@link ProducerRecord} with the provided value and throws an {@link AssertionError} if the
* {@code ProducerRecord}'s value is not equal to the expected value.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedValue}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValue(final ProducerRecord<K, V> record,
final V expectedValue) throws AssertionError {
Objects.requireNonNull(record);
final V recordValue = record.value();
final AssertionError error = new AssertionError("Expected value=" + expectedValue + " but was value=" + recordValue);
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
}
/**
* Compares the values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if they are not equal to
* each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedRecord}'s value
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValue(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareValue(record, expectedRecord.value());
}
/**
* Compares a {@link ProducerRecord} with the provided key and value and throws an {@link AssertionError} if the
* {@code ProducerRecord}'s key or value is not equal to the expected key or value.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedKey the expected key of the {@code ProducerRecord}
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedKey} or {@code expectedValue}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue) throws AssertionError {
Objects.requireNonNull(record);
final K recordKey = record.key();
final V recordValue = record.value();
final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> " +
"but was <" + recordKey + ", " + recordValue + ">");
if (recordKey != null) {
if (!recordKey.equals(expectedKey)) {
throw error;
}
} else if (expectedKey != null) {
throw error;
}
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
}
/**
* Compares the keys and values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the keys or
* values are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedRecord}'s key or value
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareKeyValue(record, expectedRecord.key(), expectedRecord.value());
}
/**
* Compares a {@link ProducerRecord} with the provided value and timestamp and throws an {@link AssertionError} if
* the {@code ProducerRecord}'s value or timestamp is not equal to the expected value or timestamp.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param expectedTimestamp the expected timestamps of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedValue} or {@code expectedTimestamp}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
final V expectedValue,
final long expectedTimestamp) throws AssertionError {
Objects.requireNonNull(record);
final V recordValue = record.value();
final long recordTimestamp = record.timestamp();
final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with timestamp=" + expectedTimestamp +
" but was value=" + recordValue + " with timestamp=" + recordTimestamp);
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordTimestamp != expectedTimestamp) {
throw error;
}
}
/**
* Compares the values and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
* values or timestamps are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedRecord}'s value or timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareValueTimestamp(record, expectedRecord.value(), expectedRecord.timestamp());
}
/**
* Compares a {@link ProducerRecord} with the provided key, value, and timestamp and throws an
* {@link AssertionError} if the {@code ProducerRecord}'s key, value, or timestamp is not equal to the expected key,
* value, or timestamp.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedKey the expected key of the {@code ProducerRecord}
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, timestamp is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedTimestamps}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
final long expectedTimestamp) throws AssertionError {
Objects.requireNonNull(record);
final K recordKey = record.key();
final V recordValue = record.value();
final long recordTimestamp = record.timestamp();
final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
" but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
if (recordKey != null) {
if (!recordKey.equals(expectedKey)) {
throw error;
}
} else if (expectedKey != null) {
throw error;
}
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordTimestamp != expectedTimestamp) {
throw error;
}
}
/**
* Compares the keys, values, and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
* the keys, values, or timestamps are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, or timestamp is not equal to
* {@code expectedRecord}'s key, value, or timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp());
}
/**
* Compares a {@link ProducerRecord} with the provided value and headers and throws an {@link AssertionError} if
* the {@code ProducerRecord}'s value or headers is not equal to the expected value or headers.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param expectedHeaders the expected headers of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
final V expectedValue,
final Headers expectedHeaders) throws AssertionError {
Objects.requireNonNull(record);
final V recordValue = record.value();
final Headers recordHeaders = record.headers();
final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with headers=" + expectedHeaders +
" but was value=" + recordValue + " with headers=" + recordHeaders);
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordHeaders != null) {
if (!recordHeaders.equals(expectedHeaders)) {
throw error;
}
} else if (expectedHeaders != null) {
throw error;
}
}
/**
* Compares the values and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
* values or headers are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareValueHeaders(record, expectedRecord.value(), expectedRecord.headers());
}
/**
* Compares a {@link ProducerRecord} with the provided key, value, and headers and throws an
* {@link AssertionError} if the {@code ProducerRecord}'s key, value, or headers is not equal to the expected key,
* value, or headers.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedKey the expected key of the {@code ProducerRecord}
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param expectedHeaders the expected headers of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedHeaders}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
final Headers expectedHeaders) throws AssertionError {
Objects.requireNonNull(record);
final K recordKey = record.key();
final V recordValue = record.value();
final Headers recordHeaders = record.headers();
final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with headers=" + expectedHeaders +
" but was <" + recordKey + ", " + recordValue + "> with headers=" + recordHeaders);
if (recordKey != null) {
if (!recordKey.equals(expectedKey)) {
throw error;
}
} else if (expectedKey != null) {
throw error;
}
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordHeaders != null) {
if (!recordHeaders.equals(expectedHeaders)) {
throw error;
}
} else if (expectedHeaders != null) {
throw error;
}
}
/**
* Compares the keys, values, and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
* the keys, values, or headers are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to
* {@code expectedRecord}'s key, value, or headers
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareKeyValueHeaders(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers());
}
/**
* Compares a {@link ProducerRecord} with the provided key, value, headers, and timestamp and throws an
* {@link AssertionError} if the {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to the expected key,
* value, headers, or timestamp.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedKey the expected key of the {@code ProducerRecord}
* @param expectedValue the expected value of the {@code ProducerRecord}
* @param expectedHeaders the expected headers of the {@code ProducerRecord}
* @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedHeaders}
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
final Headers expectedHeaders,
final long expectedTimestamp) throws AssertionError {
Objects.requireNonNull(record);
final K recordKey = record.key();
final V recordValue = record.value();
final Headers recordHeaders = record.headers();
final long recordTimestamp = record.timestamp();
final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + ">" +
" with timestamp=" + expectedTimestamp + " and headers=" + expectedHeaders +
" but was <" + recordKey + ", " + recordValue + ">" +
" with timestamp=" + recordTimestamp + " and headers=" + recordHeaders);
if (recordKey != null) {
if (!recordKey.equals(expectedKey)) {
throw error;
}
} else if (expectedKey != null) {
throw error;
}
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordHeaders != null) {
if (!recordHeaders.equals(expectedHeaders)) {
throw error;
}
} else if (expectedHeaders != null) {
throw error;
}
if (recordTimestamp != expectedTimestamp) {
throw error;
}
}
/**
* Compares the keys, values, headers, and timestamp of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
* the keys, values, headers, or timestamps are not equal to each other.
*
* @param record a output {@code ProducerRecord} for verification
* @param expectedRecord a {@code ProducerRecord} for verification
* @param <K> the key type
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to
* {@code expectedRecord}'s key, value, headers, or timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
compareKeyValueHeadersTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers(), expectedRecord.timestamp());
}
}

View File

@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import java.time.Instant;
import java.util.Objects;
import java.util.StringJoiner;
/**
* A key/value pair, including timestamp and record headers, to be sent to or received from {@link TopologyTestDriver}.
* If [a] record does not contain a timestamp,
* {@link TestInputTopic} will auto advance it's time when the record is piped.
*/
public class TestRecord<K, V> {
private final Headers headers;
private final K key;
private final V value;
private final Instant recordTime;
/**
* Creates a record.
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param recordTime The timestamp of the record.
*/
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) {
this.key = key;
this.value = value;
this.recordTime = recordTime;
this.headers = new RecordHeaders(headers);
}
/**
* Creates a record.
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch.
*/
public TestRecord(final K key, final V value, final Headers headers, final Long timestampMs) {
if (timestampMs != null) {
if (timestampMs < 0) {
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestampMs));
}
this.recordTime = Instant.ofEpochMilli(timestampMs);
} else {
this.recordTime = null;
}
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
* @param recordTime The timestamp of the record as Instant.
*/
public TestRecord(final K key, final V value, final Instant recordTime) {
this(key, value, null, recordTime);
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
* @param headers The record headers that will be included in the record
*/
public TestRecord(final K key, final V value, final Headers headers) {
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
this.recordTime = null;
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
*/
public TestRecord(final K key, final V value) {
this.key = key;
this.value = value;
this.headers = new RecordHeaders();
this.recordTime = null;
}
/**
* Create a record with {@code null} key.
*
* @param value The value of the record
*/
public TestRecord(final V value) {
this(null, value);
}
/**
* Create a {@code TestRecord} from a {@link ConsumerRecord}.
*
* @param record The v
*/
public TestRecord(final ConsumerRecord<K, V> record) {
Objects.requireNonNull(record);
this.key = record.key();
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
}
/**
* Create a {@code TestRecord} from a {@link ProducerRecord}.
*
* @param record The record contents
*/
public TestRecord(final ProducerRecord<K, V> record) {
Objects.requireNonNull(record);
this.key = record.key();
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
}
/**
* @return The headers.
*/
public Headers headers() {
return headers;
}
/**
* @return The key (or {@code null} if no key is specified).
*/
public K key() {
return key;
}
/**
* @return The value.
*/
public V value() {
return value;
}
/**
* @return The timestamp, which is in milliseconds since epoch.
*/
public Long timestamp() {
return this.recordTime == null ? null : this.recordTime.toEpochMilli();
}
/**
* @return The headers.
*/
public Headers getHeaders() {
return headers;
}
/**
* @return The key (or null if no key is specified)
*/
public K getKey() {
return key;
}
/**
* @return The value.
*/
public V getValue() {
return value;
}
/**
* @return The timestamp.
*/
public Instant getRecordTime() {
return recordTime;
}
@Override
public String toString() {
return new StringJoiner(", ", TestRecord.class.getSimpleName() + "[", "]")
.add("key=" + key)
.add("value=" + value)
.add("headers=" + headers)
.add("recordTime=" + recordTime)
.toString();
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TestRecord<?, ?> that = (TestRecord<?, ?>) o;
return Objects.equals(headers, that.headers) &&
Objects.equals(key, that.key) &&
Objects.equals(value, that.value) &&
Objects.equals(recordTime, that.recordTime);
}
@Override
public int hashCode() {
return Objects.hash(headers, key, value, recordTime);
}
}

View File

@@ -0,0 +1,410 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import java.io.File;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class MockProcessorContextTest {
@Test
public void shouldCaptureOutputRecords() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@Override
public void process(final String key, final Long value) {
context().forward(key + value, key.length() + value);
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
context.resetForwards();
assertEquals(0, context.forwarded().size());
}
@Test
public void shouldCaptureOutputRecordsUsingTo() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@Override
public void process(final String key, final Long value) {
context().forward(key + value, key.length() + value, To.all());
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
context.resetForwards();
assertEquals(0, context.forwarded().size());
}
@Test
public void shouldCaptureRecordsOutputToChildByName() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
private int count = 0;
@Override
public void process(final String key, final Long value) {
if (count == 0) {
context().forward("start", -1L, To.all()); // broadcast
}
final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
context().forward(key + value, key.length() + value, toChild);
count++;
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
{
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final CapturedForward forward1 = forwarded.next();
assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
assertNull(forward1.childName());
final CapturedForward forward2 = forwarded.next();
assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
assertEquals("george", forward2.childName());
final CapturedForward forward3 = forwarded.next();
assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
assertEquals("pete", forward3.childName());
assertFalse(forwarded.hasNext());
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("george").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("pete").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("steve").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
}
@Test
public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
context().forward(key, value, 0);
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
try {
processor.process("foo", 5L);
fail("Should have thrown an UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) {
// expected
}
}
@Test
public void shouldThrowIfForwardedWithDeprecatedChildName() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
context().forward(key, value, "child1");
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
try {
processor.process("foo", 5L);
fail("Should have thrown an UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) {
// expected
}
}
@Test
public void shouldCaptureCommitsAndAllowReset() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
private int count = 0;
@Override
public void process(final String key, final Long value) {
if (++count > 2) {
context().commit();
}
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
assertFalse(context.committed());
processor.process("foobar", 500L);
assertTrue(context.committed());
context.resetCommit();
assertFalse(context.committed());
}
@SuppressWarnings("unchecked")
@Test
public void shouldStoreAndReturnStateStores() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@Override
public void process(final String key, final Long value) {
@SuppressWarnings("unchecked")
final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
}
};
final MockProcessorContext context = new MockProcessorContext();
final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-state"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled();
final KeyValueStore<String, Long> store = (KeyValueStore<String, Long>) storeBuilder.build();
store.init(context, store);
processor.init(context);
processor.process("foo", 5L);
processor.process("bar", 50L);
assertEquals(5L, (long) store.get("foo"));
assertEquals(50L, (long) store.get("bar"));
assertEquals(55L, (long) store.get("all"));
}
@Test
public void shouldCaptureApplicationAndRecordMetadata() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
@Override
public void process(final String key, final Object value) {
context().forward("appId", context().applicationId());
context().forward("taskId", context().taskId());
context().forward("topic", context().topic());
context().forward("partition", context().partition());
context().forward("offset", context().offset());
context().forward("timestamp", context().timestamp());
context().forward("key", key);
context().forward("value", value);
}
};
final MockProcessorContext context = new MockProcessorContext(config);
processor.init(context);
try {
processor.process("foo", 5L);
fail("Should have thrown an exception.");
} catch (final IllegalStateException expected) {
// expected, since the record metadata isn't initialized
}
context.resetForwards();
context.setRecordMetadata("t1", 0, 0L, null, 0L);
{
processor.process("foo", 5L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
}
context.resetForwards();
// record metadata should be "sticky"
context.setOffset(1L);
context.setTimestamp(10L);
{
processor.process("bar", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
}
context.resetForwards();
// record metadata should be "sticky"
context.setTopic("t2");
context.setPartition(30);
{
processor.process("baz", 500L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
}
}
@Test
public void shouldCapturePunctuator() {
final Processor<String, Long> processor = new Processor<String, Long>() {
@Override
public void init(final ProcessorContext context) {
context.schedule(
Duration.ofSeconds(1L),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> context.commit()
);
}
@Override
public void process(final String key, final Long value) {
}
@Override
public void close() {
}
};
final MockProcessorContext context = new MockProcessorContext();
processor.init(context);
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
assertEquals(1000L, capturedPunctuator.getIntervalMs());
assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
assertFalse(capturedPunctuator.cancelled());
final Punctuator punctuator = capturedPunctuator.getPunctuator();
assertFalse(context.committed());
punctuator.punctuate(1234L);
assertTrue(context.committed());
}
@Test
public void fullConstructorShouldSetAllExpectedAttributes() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
final File dummyFile = new File("");
final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile);
assertEquals("testFullConstructor", context.applicationId());
assertEquals(new TaskId(1, 1), context.taskId());
assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
assertEquals(dummyFile, context.stateDir());
}
}

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class MockTimeTest {
@Test
public void shouldSetStartTime() {
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
assertEquals(42L, time.milliseconds());
assertEquals(42L * 1000L * 1000L, time.nanoseconds());
}
@Test
public void shouldGetNanosAsMillis() {
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
assertEquals(42L, time.hiResClockMs());
}
@Test(expected = IllegalArgumentException.class)
public void shouldNotAllowNegativeSleep() {
new TopologyTestDriver.MockTime(42).sleep(-1L);
}
@Test
public void shouldAdvanceTimeOnSleep() {
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
assertEquals(42L, time.milliseconds());
time.sleep(1L);
assertEquals(43L, time.milliseconds());
time.sleep(0L);
assertEquals(43L, time.milliseconds());
time.sleep(3L);
assertEquals(46L, time.milliseconds());
}
}

View File

@@ -0,0 +1,461 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertThrows;
public class TestTopicsTest {
private static final Logger log = LoggerFactory.getLogger(TestTopicsTest.class);
private final static String INPUT_TOPIC = "input";
private final static String OUTPUT_TOPIC = "output1";
private final static String INPUT_TOPIC_MAP = OUTPUT_TOPIC;
private final static String OUTPUT_TOPIC_MAP = "output2";
private TopologyTestDriver testDriver;
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();
private final Properties config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "TestTopicsTest"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
));
private final Instant testBaseTime = Instant.parse("2019-06-01T10:00:00Z");
@Before
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
//Create Actual Stream Processing pipeline
builder.stream(INPUT_TOPIC).to(OUTPUT_TOPIC);
final KStream<Long, String> source = builder.stream(INPUT_TOPIC_MAP, Consumed.with(longSerde, stringSerde));
final KStream<String, Long> mapped = source.map((key, value) -> new KeyValue<>(value, key));
mapped.to(OUTPUT_TOPIC_MAP, Produced.with(stringSerde, longSerde));
testDriver = new TopologyTestDriver(builder.build(), config);
}
@After
public void tearDown() {
try {
testDriver.close();
} catch (final RuntimeException e) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
log.warn("Ignoring exception, test failing in Windows due this exception: {}", e.getLocalizedMessage());
}
}
@Test
public void testValue() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void testValueList() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
//Feed list of words to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeValueList(inputList);
final List<String> output = outputTopic.readValuesToList();
assertThat(output, hasItems("This", "is", "an", "example"));
assertThat(output, is(equalTo(inputList)));
}
@Test
public void testKeyValue() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>(1L, "Hello")));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void testKeyValueList() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
final List<KeyValue<Long, String>> input = new LinkedList<>();
final List<KeyValue<String, Long>> expected = new LinkedList<>();
long i = 0;
for (final String s : inputList) {
input.add(new KeyValue<>(i, s));
expected.add(new KeyValue<>(s, i));
i++;
}
inputTopic.pipeKeyValueList(input);
final List<KeyValue<String, Long>> output = outputTopic.readKeyValuesToList();
assertThat(output, is(equalTo(expected)));
}
@Test
public void testKeyValuesToMap() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
final List<KeyValue<Long, String>> input = new LinkedList<>();
final Map<String, Long> expected = new HashMap<>();
long i = 0;
for (final String s : inputList) {
input.add(new KeyValue<>(i, s));
expected.put(s, i);
i++;
}
inputTopic.pipeKeyValueList(input);
final Map<String, Long> output = outputTopic.readKeyValuesToMap();
assertThat(output, is(equalTo(expected)));
}
@Test
public void testKeyValuesToMapWithNull() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput("value");
assertThrows(IllegalStateException.class, outputTopic::readKeyValuesToMap);
}
@Test
public void testKeyValueListDuration() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
final List<KeyValue<Long, String>> input = new LinkedList<>();
final List<TestRecord<String, Long>> expected = new LinkedList<>();
long i = 0;
final Duration advance = Duration.ofSeconds(15);
Instant recordInstant = testBaseTime;
for (final String s : inputList) {
input.add(new KeyValue<>(i, s));
expected.add(new TestRecord<>(s, i, recordInstant));
i++;
recordInstant = recordInstant.plus(advance);
}
inputTopic.pipeKeyValueList(input, testBaseTime, advance);
final List<TestRecord<String, Long>> output = outputTopic.readRecordsToList();
assertThat(output, is(equalTo(expected)));
}
@Test
public void testRecordList() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
final List<TestRecord<Long, String>> input = new LinkedList<>();
final List<TestRecord<String, Long>> expected = new LinkedList<>();
final Duration advance = Duration.ofSeconds(15);
Instant recordInstant = testBaseTime;
Long i = 0L;
for (final String s : inputList) {
input.add(new TestRecord<>(i, s, recordInstant));
expected.add(new TestRecord<>(s, i, recordInstant));
i++;
recordInstant = recordInstant.plus(advance);
}
inputTopic.pipeRecordList(input);
final List<TestRecord<String, Long>> output = outputTopic.readRecordsToList();
assertThat(output, is(equalTo(expected)));
}
@Test
public void testTimestamp() {
long baseTime = 3;
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(null, "Hello", baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", null, baseTime))));
inputTopic.pipeInput(2L, "Kafka", ++baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime))));
inputTopic.pipeInput(2L, "Kafka", testBaseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime))));
final List<String> inputList = Arrays.asList("Advancing", "time");
//Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant
final Duration advance = Duration.ofSeconds(15);
final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1));
inputTopic.pipeValueList(inputList, recordInstant, advance);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Advancing", recordInstant))));
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "time", null, recordInstant.plus(advance)))));
}
@Test
public void testWithHeaders() {
long baseTime = 3;
final Headers headers = new RecordHeaders(
new Header[]{
new RecordHeader("foo", "value".getBytes()),
new RecordHeader("bar", (byte[]) null),
new RecordHeader("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
});
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(new TestRecord<>(1L, "Hello", headers));
assertThat(outputTopic.readRecord(), allOf(
hasProperty("key", equalTo(1L)),
hasProperty("value", equalTo("Hello")),
hasProperty("headers", equalTo(headers))));
inputTopic.pipeInput(new TestRecord<>(2L, "Kafka", headers, ++baseTime));
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", headers, baseTime))));
}
@Test
public void testStartTimestamp() {
final Duration advance = Duration.ofSeconds(2);
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, Duration.ZERO);
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "World");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "World", null, testBaseTime.toEpochMilli()))));
inputTopic.advanceTime(advance);
inputTopic.pipeInput(3L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(3L, "Kafka", testBaseTime.plus(advance)))));
}
@Test
public void testTimestampAutoAdvance() {
final Duration advance = Duration.ofSeconds(2);
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance);
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput("Hello");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance)))));
}
@Test
public void testMultipleTopics() {
final TestInputTopic<Long, String> inputTopic1 =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestInputTopic<Long, String> inputTopic2 =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic1 =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
final TestOutputTopic<String, Long> outputTopic2 =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
inputTopic1.pipeInput(1L, "Hello");
assertThat(outputTopic1.readKeyValue(), equalTo(new KeyValue<>(1L, "Hello")));
assertThat(outputTopic2.readKeyValue(), equalTo(new KeyValue<>("Hello", 1L)));
assertThat(outputTopic1.isEmpty(), is(true));
assertThat(outputTopic2.isEmpty(), is(true));
inputTopic2.pipeInput(1L, "Hello");
//This is not visible in outputTopic1 even it is the same topic
assertThat(outputTopic2.readKeyValue(), equalTo(new KeyValue<>("Hello", 1L)));
assertThat(outputTopic1.isEmpty(), is(true));
assertThat(outputTopic2.isEmpty(), is(true));
}
@Test
public void testNonExistingOutputTopic() {
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic("no-exist", longSerde.deserializer(), stringSerde.deserializer());
assertThrows("Unknown topic", IllegalArgumentException.class, outputTopic::readRecord);
}
@Test
public void testNonUsedOutputTopic() {
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
assertThrows("Uninitialized topic", NoSuchElementException.class, outputTopic::readRecord);
}
@Test
public void testEmptyTopic() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic
assertThrows("Empty topic", NoSuchElementException.class, outputTopic::readRecord);
}
@Test
public void testNonExistingInputTopic() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic("no-exist", longSerde.serializer(), stringSerde.serializer());
assertThrows("Unknown topic", IllegalArgumentException.class, () -> inputTopic.pipeInput(1L, "Hello"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicName() {
testDriver.createInputTopic(null, stringSerde.serializer(), stringSerde.serializer());
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateWithNullDriver() {
new TestInputTopic<>(null, INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer(), Instant.now(), Duration.ZERO);
}
@Test(expected = StreamsException.class)
public void testWrongSerde() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, stringSerde.serializer(), stringSerde.serializer());
inputTopic.pipeInput("1L", "Hello");
}
@Test(expected = IllegalArgumentException.class)
public void testDuration() {
testDriver.createInputTopic(INPUT_TOPIC_MAP, stringSerde.serializer(), stringSerde.serializer(), testBaseTime, Duration.ofDays(-1));
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeAdvance() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, stringSerde.serializer(), stringSerde.serializer());
inputTopic.advanceTime(Duration.ofDays(-1));
}
@Test
public void testInputToString() {
final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("topicName", stringSerde.serializer(), stringSerde.serializer());
assertThat(inputTopic.toString(), allOf(
containsString("TestInputTopic"),
containsString("topic='topicName'"),
containsString("StringSerializer")));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateOutputTopicWithNullTopicName() {
testDriver.createOutputTopic(null, stringSerde.deserializer(), stringSerde.deserializer());
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateOutputWithNullDriver() {
new TestOutputTopic<>(null, OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
}
@Test(expected = SerializationException.class)
public void testOutputWrongSerde() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>(1L, "Hello")));
}
@Test
public void testOutputToString() {
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
assertThat(outputTopic.toString(), allOf(
containsString("TestOutputTopic"),
containsString("topic='output1'"),
containsString("size=0"),
containsString("StringDeserializer")));
}
@Test
public void testRecordsToList() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC_MAP, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC_MAP, stringSerde.deserializer(), longSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example");
final List<KeyValue<Long, String>> input = new LinkedList<>();
final List<TestRecord<String, Long>> expected = new LinkedList<>();
long i = 0;
final Duration advance = Duration.ofSeconds(15);
Instant recordInstant = Instant.parse("2019-06-01T10:00:00Z");
for (final String s : inputList) {
input.add(new KeyValue<>(i, s));
expected.add(new TestRecord<>(s, i, recordInstant));
i++;
recordInstant = recordInstant.plus(advance);
}
inputTopic.pipeKeyValueList(input, Instant.parse("2019-06-01T10:00:00Z"), advance);
final List<TestRecord<String, Long>> output = outputTopic.readRecordsToList();
assertThat(output, is(equalTo(expected)));
}
}

View File

@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
@RunWith(EasyMockRunner.class)
public class KeyValueStoreFacadeTest {
@Mock
private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
@Mock
private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
private KeyValueStoreFacade<String, String> keyValueStoreFacade;
@Before
public void setup() {
keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
}
@Test
public void shouldForwardInit() {
final ProcessorContext context = mock(ProcessorContext.class);
final StateStore store = mock(StateStore.class);
mockedKeyValueTimestampStore.init(context, store);
expectLastCall();
replay(mockedKeyValueTimestampStore);
keyValueStoreFacade.init(context, store);
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldPutWithUnknownTimestamp() {
mockedKeyValueTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
expectLastCall();
replay(mockedKeyValueTimestampStore);
keyValueStoreFacade.put("key", "value");
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldPutIfAbsentWithUnknownTimestamp() {
expect(mockedKeyValueTimestampStore.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)))
.andReturn(null)
.andReturn(ValueAndTimestamp.make("oldValue", 42L));
replay(mockedKeyValueTimestampStore);
assertNull(keyValueStoreFacade.putIfAbsent("key", "value"));
assertThat(keyValueStoreFacade.putIfAbsent("key", "value"), is("oldValue"));
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldPutAllWithUnknownTimestamp() {
mockedKeyValueTimestampStore.put("key1", ValueAndTimestamp.make("value1", ConsumerRecord.NO_TIMESTAMP));
mockedKeyValueTimestampStore.put("key2", ValueAndTimestamp.make("value2", ConsumerRecord.NO_TIMESTAMP));
expectLastCall();
replay(mockedKeyValueTimestampStore);
keyValueStoreFacade.putAll(asList(
KeyValue.pair("key1", "value1"),
KeyValue.pair("key2", "value2")
));
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldDeleteAndReturnPlainValue() {
expect(mockedKeyValueTimestampStore.delete("key"))
.andReturn(null)
.andReturn(ValueAndTimestamp.make("oldValue", 42L));
replay(mockedKeyValueTimestampStore);
assertNull(keyValueStoreFacade.delete("key"));
assertThat(keyValueStoreFacade.delete("key"), is("oldValue"));
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldForwardFlush() {
mockedKeyValueTimestampStore.flush();
expectLastCall();
replay(mockedKeyValueTimestampStore);
keyValueStoreFacade.flush();
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldForwardClose() {
mockedKeyValueTimestampStore.close();
expectLastCall();
replay(mockedKeyValueTimestampStore);
keyValueStoreFacade.close();
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnName() {
expect(mockedKeyValueTimestampStore.name()).andReturn("name");
replay(mockedKeyValueTimestampStore);
assertThat(keyValueStoreFacade.name(), is("name"));
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnIsPersistent() {
expect(mockedKeyValueTimestampStore.persistent())
.andReturn(true)
.andReturn(false);
replay(mockedKeyValueTimestampStore);
assertThat(keyValueStoreFacade.persistent(), is(true));
assertThat(keyValueStoreFacade.persistent(), is(false));
verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnIsOpen() {
expect(mockedKeyValueTimestampStore.isOpen())
.andReturn(true)
.andReturn(false);
replay(mockedKeyValueTimestampStore);
assertThat(keyValueStoreFacade.isOpen(), is(true));
assertThat(keyValueStoreFacade.isOpen(), is(false));
verify(mockedKeyValueTimestampStore);
}
}

View File

@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(EasyMockRunner.class)
public class WindowStoreFacadeTest {
@Mock
private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
private WindowStoreFacade<String, String> windowStoreFacade;
@Before
public void setup() {
windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
}
@Test
public void shouldForwardInit() {
final ProcessorContext context = mock(ProcessorContext.class);
final StateStore store = mock(StateStore.class);
mockedWindowTimestampStore.init(context, store);
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.init(context, store);
verify(mockedWindowTimestampStore);
}
@Test
@SuppressWarnings("deprecation")
public void shouldPutWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.put("key", "value");
verify(mockedWindowTimestampStore);
}
@Test
public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.put("key", "value", 21L);
verify(mockedWindowTimestampStore);
}
@Test
public void shouldForwardFlush() {
mockedWindowTimestampStore.flush();
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.flush();
verify(mockedWindowTimestampStore);
}
@Test
public void shouldForwardClose() {
mockedWindowTimestampStore.close();
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.close();
verify(mockedWindowTimestampStore);
}
@Test
public void shouldReturnName() {
expect(mockedWindowTimestampStore.name()).andReturn("name");
replay(mockedWindowTimestampStore);
assertThat(windowStoreFacade.name(), is("name"));
verify(mockedWindowTimestampStore);
}
@Test
public void shouldReturnIsPersistent() {
expect(mockedWindowTimestampStore.persistent())
.andReturn(true)
.andReturn(false);
replay(mockedWindowTimestampStore);
assertThat(windowStoreFacade.persistent(), is(true));
assertThat(windowStoreFacade.persistent(), is(false));
verify(mockedWindowTimestampStore);
}
@Test
public void shouldReturnIsOpen() {
expect(mockedWindowTimestampStore.isOpen())
.andReturn(true)
.andReturn(false);
replay(mockedWindowTimestampStore);
assertThat(windowStoreFacade.isOpen(), is(true));
assertThat(windowStoreFacade.isOpen(), is(false));
verify(mockedWindowTimestampStore);
}
}

View File

@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@Deprecated
public class ConsumerRecordFactoryTest {
private final StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer integerSerializer = new IntegerSerializer();
private final String topicName = "topic";
private final String otherTopicName = "otherTopic";
private final String key = "key";
private final Integer value = 42;
private final long timestamp = 21L;
private final byte[] rawKey = stringSerializer.serialize(topicName, key);
private final byte[] rawValue = integerSerializer.serialize(topicName, value);
private final ConsumerRecordFactory<byte[], Integer> factory =
new ConsumerRecordFactory<>(topicName, new ByteArraySerializer(), integerSerializer, 0L);
private final ConsumerRecordFactory<byte[], Integer> defaultFactory =
new ConsumerRecordFactory<>(new ByteArraySerializer(), integerSerializer);
@Test
public void shouldAdvanceTime() {
factory.advanceTimeMs(3L);
verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(topicName, rawKey, value));
factory.advanceTimeMs(2L);
verifyRecord(topicName, rawKey, rawValue, 5L, factory.create(topicName, rawKey, value));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicName() {
factory.create(null, rawKey, value, timestamp);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullHeaders() {
factory.create(topicName, rawKey, value, null, timestamp);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
factory.create(null, rawKey, value);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey() {
factory.create((String) null, value, timestamp);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp() {
factory.create((String) null, value);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs() {
factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps() {
factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName() {
defaultFactory.create(rawKey, value, timestamp);
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp() {
defaultFactory.create(rawKey, value);
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey() {
defaultFactory.create(value, timestamp);
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKeyAndDefaultTimestamp() {
defaultFactory.create(value);
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs() {
defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)));
}
@Test(expected = IllegalStateException.class)
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps() {
defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
}
@Test
public void shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp() {
verifyRecord(otherTopicName, rawKey, rawValue, timestamp, factory.create(otherTopicName, rawKey, value, timestamp));
}
@Test
public void shouldCreateConsumerRecordWithTimestamp() {
verifyRecord(topicName, rawKey, rawValue, timestamp, factory.create(rawKey, value, timestamp));
}
@Test
public void shouldCreateConsumerRecordWithOtherTopicName() {
verifyRecord(otherTopicName, rawKey, rawValue, 0L, factory.create(otherTopicName, rawKey, value));
factory.advanceTimeMs(3L);
verifyRecord(otherTopicName, rawKey, rawValue, 3L, factory.create(otherTopicName, rawKey, value));
}
@Test
public void shouldCreateConsumerRecord() {
verifyRecord(topicName, rawKey, rawValue, 0L, factory.create(rawKey, value));
factory.advanceTimeMs(3L);
verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(rawKey, value));
}
@Test
public void shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp() {
verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
}
@Test
public void shouldCreateNullKeyConsumerRecordWithTimestampWithTimestamp() {
verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
}
@Test
public void shouldCreateNullKeyConsumerRecord() {
verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
factory.advanceTimeMs(3L);
verifyRecord(topicName, null, rawValue, 3L, factory.create(value));
}
@SuppressWarnings("unchecked")
@Test
public void shouldCreateConsumerRecordsFromKeyValuePairs() {
final ConsumerRecordFactory<String, Integer> factory =
new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
final KeyValue[] keyValuePairs = new KeyValue[5];
final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
for (int i = 0; i < keyValuePairs.length; ++i) {
keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
rawKeyValuePairs[i] = KeyValue.pair(
stringSerializer.serialize(topicName, key + "-" + i),
integerSerializer.serialize(topicName, value + i));
}
final List<ConsumerRecord<byte[], byte[]>> records =
factory.create(Arrays.<KeyValue<String, Integer>>asList(keyValuePairs));
for (int i = 0; i < keyValuePairs.length; ++i) {
verifyRecord(
topicName,
(byte[]) rawKeyValuePairs[i].key,
(byte[]) rawKeyValuePairs[i].value,
0L,
records.get(i));
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldCreateConsumerRecordsFromKeyValuePairsWithTimestampAndIncrements() {
final ConsumerRecordFactory<String, Integer> factory =
new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, timestamp, 2L);
final KeyValue[] keyValuePairs = new KeyValue[5];
final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
for (int i = 0; i < keyValuePairs.length; ++i) {
keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
rawKeyValuePairs[i] = KeyValue.pair(
stringSerializer.serialize(topicName, key + "-" + i),
integerSerializer.serialize(topicName, value + i));
}
final List<ConsumerRecord<byte[], byte[]>> records =
factory.create(Arrays.<KeyValue<String, Integer>>asList(keyValuePairs));
for (int i = 0; i < keyValuePairs.length; ++i) {
verifyRecord(
topicName,
(byte[]) rawKeyValuePairs[i].key,
(byte[]) rawKeyValuePairs[i].value,
timestamp + 2L * i,
records.get(i));
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime() {
final ConsumerRecordFactory<String, Integer> factory =
new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
final KeyValue[] keyValuePairs = new KeyValue[5];
final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
for (int i = 0; i < keyValuePairs.length; ++i) {
keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
rawKeyValuePairs[i] = KeyValue.pair(
stringSerializer.serialize(topicName, key + "-" + i),
integerSerializer.serialize(topicName, value + i));
}
final List<ConsumerRecord<byte[], byte[]>> records =
factory.create(Arrays.<KeyValue<String, Integer>>asList(keyValuePairs), timestamp, 2L);
for (int i = 0; i < keyValuePairs.length; ++i) {
verifyRecord(
topicName,
(byte[]) rawKeyValuePairs[i].key,
(byte[]) rawKeyValuePairs[i].value,
timestamp + 2L * i,
records.get(i));
}
// should not have incremented internally tracked time
verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
}
private void verifyRecord(final String topicName,
final byte[] rawKey,
final byte[] rawValue,
final long timestamp,
final ConsumerRecord<byte[], byte[]> record) {
assertEquals(topicName, record.topic());
assertArrayEquals(rawKey, record.key());
assertArrayEquals(rawValue, record.value());
assertEquals(timestamp, record.timestamp());
}
}

View File

@@ -0,0 +1,585 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
@Deprecated
public class OutputVerifierTest {
private final byte[] key = new byte[0];
private final byte[] value = new byte[0];
private final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
value
);
private final ProducerRecord<byte[], byte[]> nullKeyValueRecord = new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
null
);
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordForCompareValue() {
OutputVerifier.compareValue(null, value);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValue() {
OutputVerifier.compareValue((ProducerRecord<byte[], byte[]>) null, producerRecord);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullExpectedRecordForCompareValue() {
OutputVerifier.compareValue(producerRecord, (ProducerRecord<byte[], byte[]>) null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordForCompareKeyValue() {
OutputVerifier.compareKeyValue(null, key, value);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue() {
OutputVerifier.compareKeyValue(null, producerRecord);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullExpectedRecordForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(null, value, 0L);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(null, producerRecord);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullExpectedRecordForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(producerRecord, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(null, key, value, 0L);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(null, producerRecord);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, null);
}
@Test
public void shouldPassIfValueIsEqualForCompareValue() {
OutputVerifier.compareValue(producerRecord, value);
}
@Test
public void shouldPassIfValueIsEqualWithNullForCompareValue() {
OutputVerifier.compareValue(nullKeyValueRecord, (byte[]) null);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareValue() {
OutputVerifier.compareValue(producerRecord, key);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareValue() {
OutputVerifier.compareValue(producerRecord, (byte[]) null);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReverseForCompareValue() {
OutputVerifier.compareValue(nullKeyValueRecord, value);
}
@Test
public void shouldPassIfValueIsEqualForCompareValueWithProducerRecord() {
OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
"otherTopic",
0,
0L,
value,
value
));
}
@Test
public void shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord() {
OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
"otherTopic",
0,
0L,
value,
null
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareValueWithProducerRecord() {
OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
"sameTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
value,
key
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord() {
OutputVerifier.compareValue(producerRecord, new ProducerRecord<byte[], byte[]>(
"sameTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
value,
null
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueWithProducerRecord() {
OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<>(
"sameTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
value,
value
));
}
@Test
public void shouldPassIfKeyAndValueIsEqualForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, key, value);
}
@Test
public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValue() {
OutputVerifier.compareKeyValue(nullKeyValueRecord, null, null);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, value, value);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, null, value);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue() {
OutputVerifier.compareKeyValue(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value),
key,
value);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, key, key);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareKeyValue() {
OutputVerifier.compareKeyValue(producerRecord, key, null);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue() {
OutputVerifier.compareKeyValue(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null),
key,
value);
}
@Test
public void shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
"otherTopic",
0,
0L,
key,
value));
}
@Test
public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
"otherTopic",
0,
0L,
null,
null));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
value,
value));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value),
producerRecord);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
key));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
OutputVerifier.compareKeyValue(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null),
producerRecord);
}
@Test
public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(producerRecord, value, Long.MAX_VALUE);
}
@Test
public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(producerRecord, key, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(producerRecord, null, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(nullKeyValueRecord, value, Long.MAX_VALUE);
}
@Test
public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
"otherTopic",
0,
Long.MAX_VALUE,
value,
value
));
}
@Test
public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
"otherTopic",
0,
Long.MAX_VALUE,
value,
null
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
key
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
value
));
}
@Test(expected = AssertionError.class)
public void shouldFailIfTimestampIsDifferentForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(producerRecord, value, 0);
}
@Test(expected = AssertionError.class)
public void shouldFailIfTimestampDifferentWithNullReverseForCompareValueTimestamp() {
OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, 0);
}
@Test(expected = AssertionError.class)
public void shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord() {
OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
0L,
key,
value
));
}
@Test
public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, key, value, Long.MAX_VALUE);
}
@Test
public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, null, null, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, value, value, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, null, value, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value),
key,
value,
Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, key, key, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, key, null, Long.MAX_VALUE);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp() {
OutputVerifier.compareKeyValueTimestamp(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null),
key,
value,
Long.MAX_VALUE);
}
@Test
public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
"otherTopic",
0,
Long.MAX_VALUE,
key,
value));
}
@Test
public void shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
"otherTopic",
0,
Long.MAX_VALUE,
null,
null));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
value,
value));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value));
}
@Test(expected = AssertionError.class)
public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
null,
value),
producerRecord);
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
key));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null));
}
@Test(expected = AssertionError.class)
public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
OutputVerifier.compareKeyValueTimestamp(
new ProducerRecord<byte[], byte[]>(
"someTopic",
Integer.MAX_VALUE,
Long.MAX_VALUE,
key,
null),
producerRecord);
}
}

View File

@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import java.time.Instant;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class TestRecordTest {
private final String key = "testKey";
private final int value = 1;
private final Headers headers = new RecordHeaders(
new Header[]{
new RecordHeader("foo", "value".getBytes()),
new RecordHeader("bar", (byte[]) null),
new RecordHeader("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
});
private final Instant recordTime = Instant.parse("2019-06-01T10:00:00Z");
private final long recordMs = recordTime.toEpochMilli();
@Test
public void testFields() {
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime);
assertThat(testRecord.key(), equalTo(key));
assertThat(testRecord.value(), equalTo(value));
assertThat(testRecord.headers(), equalTo(headers));
assertThat(testRecord.timestamp(), equalTo(recordMs));
assertThat(testRecord.getKey(), equalTo(key));
assertThat(testRecord.getValue(), equalTo(value));
assertThat(testRecord.getHeaders(), equalTo(headers));
assertThat(testRecord.getRecordTime(), equalTo(recordTime));
}
@Test
public void testMultiFieldMatcher() {
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime);
assertThat(testRecord, allOf(
hasProperty("key", equalTo(key)),
hasProperty("value", equalTo(value)),
hasProperty("headers", equalTo(headers))));
assertThat(testRecord, allOf(
hasProperty("key", equalTo(key)),
hasProperty("value", equalTo(value)),
hasProperty("headers", equalTo(headers)),
hasProperty("recordTime", equalTo(recordTime))));
assertThat(testRecord, allOf(
hasProperty("key", equalTo(key)),
hasProperty("value", equalTo(value))));
}
@Test
public void testEqualsAndHashCode() {
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime);
assertEquals(testRecord, testRecord);
assertEquals(testRecord.hashCode(), testRecord.hashCode());
final TestRecord<String, Integer> equalRecord = new TestRecord<>(key, value, headers, recordTime);
assertEquals(testRecord, equalRecord);
assertEquals(testRecord.hashCode(), equalRecord.hashCode());
final TestRecord<String, Integer> equalRecordMs = new TestRecord<>(key, value, headers, recordMs);
assertEquals(testRecord, equalRecordMs);
assertEquals(testRecord.hashCode(), equalRecordMs.hashCode());
final Headers headers2 = new RecordHeaders(
new Header[]{
new RecordHeader("foo", "value".getBytes()),
new RecordHeader("bar", (byte[]) null),
});
final TestRecord<String, Integer> headerMismatch = new TestRecord<>(key, value, headers2, recordTime);
assertNotEquals(testRecord, headerMismatch);
final TestRecord<String, Integer> keyMisMatch = new TestRecord<>("test-mismatch", value, headers, recordTime);
assertNotEquals(testRecord, keyMisMatch);
final TestRecord<String, Integer> valueMisMatch = new TestRecord<>(key, 2, headers, recordTime);
assertNotEquals(testRecord, valueMisMatch);
final TestRecord<String, Integer> timeMisMatch = new TestRecord<>(key, value, headers, recordTime.plusMillis(1));
assertNotEquals(testRecord, timeMisMatch);
final TestRecord<String, Integer> nullFieldsRecord = new TestRecord<>(null, null, null, (Instant) null);
assertEquals(nullFieldsRecord, nullFieldsRecord);
assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
}
@Test
public void testPartialConstructorEquals() {
final TestRecord<String, Integer> record1 = new TestRecord<>(value);
assertThat(record1, equalTo(new TestRecord<>(null, value, null, (Instant) null)));
final TestRecord<String, Integer> record2 = new TestRecord<>(key, value);
assertThat(record2, equalTo(new TestRecord<>(key, value, null, (Instant) null)));
final TestRecord<String, Integer> record3 = new TestRecord<>(key, value, headers);
assertThat(record3, equalTo(new TestRecord<>(key, value, headers, (Long) null)));
final TestRecord<String, Integer> record4 = new TestRecord<>(key, value, recordTime);
assertThat(record4, equalTo(new TestRecord<>(key, value, null, recordMs)));
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidRecords() {
new TestRecord<>(key, value, headers, -1L);
}
@Test
public void testToString() {
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime);
assertThat(testRecord.toString(), equalTo("TestRecord[key=testKey, value=1, "
+ "headers=RecordHeaders(headers = [RecordHeader(key = foo, value = [118, 97, 108, 117, 101]), "
+ "RecordHeader(key = bar, value = null), RecordHeader(key = \"A\\u00ea\\u00f1\\u00fcC\", value = [118, 97, 108, 117, 101])], isReadOnly = false), "
+ "recordTime=2019-06-01T10:00:00Z]"));
}
@Test
public void testConsumerRecord() {
final String topicName = "topic";
final ConsumerRecord<String, Integer> consumerRecord =
new ConsumerRecord<>(topicName, 1, 0, recordMs, TimestampType.CREATE_TIME, 0L, 0, 0, key, value, headers);
final TestRecord<String, Integer> testRecord = new TestRecord<>(consumerRecord);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
assertEquals(expectedRecord, testRecord);
}
@Test
public void testProducerRecord() {
final String topicName = "topic";
final ProducerRecord<String, Integer> producerRecord =
new ProducerRecord<>(topicName, 1, recordMs, key, value, headers);
final TestRecord<String, Integer> testRecord = new TestRecord<>(producerRecord);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
assertEquals(expectedRecord, testRecord);
assertNotEquals(expectedRecord, producerRecord);
}
}

View File

@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka=INFO