Add km module kafka

This commit is contained in:
leewei
2023-02-14 16:27:47 +08:00
parent 229140f067
commit 0b8160a714
4039 changed files with 718112 additions and 46204 deletions

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Very simple connector that works with the console. This connector supports both source and
* sink modes via its 'mode' setting.
*/
public class FileStreamSinkConnector extends SinkConnector {
public static final String FILE_CONFIG = "file";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used");
private String filename;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
filename = parsedConfig.getString(FILE_CONFIG);
}
@Override
public Class<? extends Task> taskClass() {
return FileStreamSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
// Nothing to do since FileStreamSinkConnector has no background monitoring.
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Map;
/**
* FileStreamSinkTask writes records to stdout or a file.
*/
public class FileStreamSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
private String filename;
private PrintStream outputStream;
public FileStreamSinkTask() {
}
// for testing
public FileStreamSinkTask(PrintStream outputStream) {
filename = null;
this.outputStream = outputStream;
}
@Override
public String version() {
return new FileStreamSinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
try {
outputStream = new PrintStream(
Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
false,
StandardCharsets.UTF_8.name());
} catch (IOException e) {
throw new ConnectException("Couldn't find or create file '" + filename + "' for FileStreamSinkTask", e);
}
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
log.trace("Writing line to {}: {}", logFilename(), record.value());
outputStream.println(record.value());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
log.trace("Flushing output stream for {}", logFilename());
outputStream.flush();
}
@Override
public void stop() {
if (outputStream != null && outputStream != System.out)
outputStream.close();
}
private String logFilename() {
return filename == null ? "stdout" : filename;
}
}

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Very simple connector that works with the console. This connector supports both source and
* sink modes via its 'mode' setting.
*/
public class FileStreamSourceConnector extends SourceConnector {
public static final String TOPIC_CONFIG = "topic";
public static final String FILE_CONFIG = "file";
public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
.define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to")
.define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
"The maximum number of records the Source task can read from file one time");
private String filename;
private String topic;
private int batchSize;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
filename = parsedConfig.getString(FILE_CONFIG);
List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
if (topics.size() != 1) {
throw new ConfigException("'topic' in FileStreamSourceConnector configuration requires definition of a single topic");
}
topic = topics.get(0);
batchSize = parsedConfig.getInt(TASK_BATCH_SIZE_CONFIG);
}
@Override
public Class<? extends Task> taskClass() {
return FileStreamSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
configs.add(config);
return configs;
}
@Override
public void stop() {
// Nothing to do since FileStreamSourceConnector has no background monitoring.
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}

View File

@@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FileStreamSourceTask reads from stdin or a file.
*/
public class FileStreamSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
public static final String FILENAME_FIELD = "filename";
public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
private String filename;
private InputStream stream;
private BufferedReader reader = null;
private char[] buffer = new char[1024];
private int offset = 0;
private String topic = null;
private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
private Long streamOffset;
@Override
public String version() {
return new FileStreamSourceConnector().version();
}
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
stream = System.in;
// Tracking offset for stdin doesn't make sense
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
}
// Missing topic or parsing error is not possible because we've parsed the config in the
// Connector
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
if (stream == null) {
try {
stream = Files.newInputStream(Paths.get(filename));
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Object lastRecordedOffset = offset.get(POSITION_FIELD);
if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
throw new ConnectException("Offset position is the incorrect type");
if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = (Long) lastRecordedOffset;
while (skipLeft > 0) {
try {
long skipped = stream.skip(skipLeft);
skipLeft -= skipped;
} catch (IOException e) {
log.error("Error while trying to seek to previous offset in file {}: ", filename, e);
throw new ConnectException(e);
}
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
} else {
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
log.debug("Opened {} for reading", logFilename());
} catch (NoSuchFileException e) {
log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
synchronized (this) {
this.wait(1000);
}
return null;
} catch (IOException e) {
log.error("Error while trying to open file {}: ", filename, e);
throw new ConnectException(e);
}
}
// Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
// Instead we have to manage splitting lines ourselves, using simple backoff when no new data
// is available.
try {
final BufferedReader readerCopy;
synchronized (this) {
readerCopy = reader;
}
if (readerCopy == null)
return null;
ArrayList<SourceRecord> records = null;
int nread = 0;
while (readerCopy.ready()) {
nread = readerCopy.read(buffer, offset, buffer.length - offset);
log.trace("Read {} bytes from {}", nread, logFilename());
if (nread > 0) {
offset += nread;
if (offset == buffer.length) {
char[] newbuf = new char[buffer.length * 2];
System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
buffer = newbuf;
}
String line;
do {
line = extractLine();
if (line != null) {
log.trace("Read a line from {}", logFilename());
if (records == null)
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, null,
null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
if (records.size() >= batchSize) {
return records;
}
}
} while (line != null);
}
}
if (nread <= 0)
synchronized (this) {
this.wait(1000);
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
private String extractLine() {
int until = -1, newStart = -1;
for (int i = 0; i < offset; i++) {
if (buffer[i] == '\n') {
until = i;
newStart = i + 1;
break;
} else if (buffer[i] == '\r') {
// We need to check for \r\n, so we must skip this if we can't check the next char
if (i + 1 >= offset)
return null;
until = i;
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
break;
}
}
if (until != -1) {
String result = new String(buffer, 0, until);
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
offset = offset - newStart;
if (streamOffset != null)
streamOffset += newStart;
return result;
} else {
return null;
}
}
@Override
public void stop() {
log.trace("Stopping");
synchronized (this) {
try {
if (stream != null && stream != System.in) {
stream.close();
log.trace("Closed input stream");
}
} catch (IOException e) {
log.error("Failed to close FileStreamSourceTask stream: ", e);
}
this.notify();
}
}
private Map<String, String> offsetKey(String filename) {
return Collections.singletonMap(FILENAME_FIELD, filename);
}
private Map<String, Long> offsetValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}
private String logFilename() {
return filename == null ? "stdin" : filename;
}
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.sink.SinkConnector;
import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class FileStreamSinkConnectorTest extends EasyMockSupport {
private static final String MULTIPLE_TOPICS = "test1,test2";
private static final String FILENAME = "/afilename";
private FileStreamSinkConnector connector;
private ConnectorContext ctx;
private Map<String, String> sinkProperties;
@Before
public void setup() {
connector = new FileStreamSinkConnector();
ctx = createMock(ConnectorContext.class);
connector.initialize(ctx);
sinkProperties = new HashMap<>();
sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
}
@Test
public void testConnectorConfigValidation() {
replayAll();
List<ConfigValue> configValues = connector.config().validate(sinkProperties);
for (ConfigValue val : configValues) {
assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
}
verifyAll();
}
@Test
public void testSinkTasks() {
replayAll();
connector.start(sinkProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
taskConfigs = connector.taskConfigs(2);
assertEquals(2, taskConfigs.size());
for (int i = 0; i < 2; i++) {
assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
}
verifyAll();
}
@Test
public void testSinkTasksStdout() {
replayAll();
sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
connector.start(sinkProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
verifyAll();
}
@Test
public void testTaskClass() {
replayAll();
connector.start(sinkProperties);
assertEquals(FileStreamSinkTask.class, connector.taskClass());
verifyAll();
}
}

View File

@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
public class FileStreamSinkTaskTest {
private FileStreamSinkTask task;
private ByteArrayOutputStream os;
private PrintStream printStream;
@Rule
public TemporaryFolder topDir = new TemporaryFolder();
private String outputFile;
@Before
public void setup() throws Exception {
os = new ByteArrayOutputStream();
printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream);
File outputDir = topDir.newFolder("file-stream-sink-" + UUID.randomUUID().toString());
outputFile = outputDir.getCanonicalPath() + "/connect.output";
}
@Test
public void testPutFlush() {
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final String newLine = System.getProperty("line.separator");
// We do not call task.start() since it would override the output stream
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
assertEquals("line1" + newLine, os.toString());
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString());
}
@Test
public void testStart() throws IOException {
task = new FileStreamSinkTask();
Map<String, String> props = new HashMap<>();
props.put(FileStreamSinkConnector.FILE_CONFIG, outputFile);
task.start(props);
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
int numLines = 3;
String[] lines = new String[numLines];
int i = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(outputFile))) {
lines[i++] = reader.readLine();
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 2),
new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line2", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
lines[i++] = reader.readLine();
lines[i++] = reader.readLine();
}
while (--i >= 0) {
assertEquals("line" + i, lines[i]);
}
}
}

View File

@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class FileStreamSourceConnectorTest extends EasyMockSupport {
private static final String SINGLE_TOPIC = "test";
private static final String MULTIPLE_TOPICS = "test1,test2";
private static final String FILENAME = "/somefilename";
private FileStreamSourceConnector connector;
private ConnectorContext ctx;
private Map<String, String> sourceProperties;
@Before
public void setup() {
connector = new FileStreamSourceConnector();
ctx = createMock(ConnectorContext.class);
connector.initialize(ctx);
sourceProperties = new HashMap<>();
sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
}
@Test
public void testConnectorConfigValidation() {
replayAll();
List<ConfigValue> configValues = connector.config().validate(sourceProperties);
for (ConfigValue val : configValues) {
assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
}
verifyAll();
}
@Test
public void testSourceTasks() {
replayAll();
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME,
taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
assertEquals(SINGLE_TOPIC,
taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
// Should be able to return fewer than requested #
taskConfigs = connector.taskConfigs(2);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME,
taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
assertEquals(SINGLE_TOPIC,
taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
verifyAll();
}
@Test
public void testSourceTasksStdin() {
EasyMock.replay(ctx);
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
EasyMock.verify(ctx);
}
@Test(expected = ConfigException.class)
public void testMultipleSourcesInvalid() {
sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
connector.start(sourceProperties);
}
@Test
public void testTaskClass() {
EasyMock.replay(ctx);
connector.start(sourceProperties);
assertEquals(FileStreamSourceTask.class, connector.taskClass());
EasyMock.verify(ctx);
}
@Test(expected = ConfigException.class)
public void testMissingTopic() {
sourceProperties.remove(FileStreamSourceConnector.TOPIC_CONFIG);
connector.start(sourceProperties);
}
@Test(expected = ConfigException.class)
public void testBlankTopic() {
// Because of trimming this tests is same as testing for empty string.
sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, " ");
connector.start(sourceProperties);
}
@Test(expected = ConfigException.class)
public void testInvalidBatchSize() {
sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd");
connector.start(sourceProperties);
}
}

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.file;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class FileStreamSourceTaskTest extends EasyMockSupport {
private static final String TOPIC = "test";
private File tempFile;
private Map<String, String> config;
private OffsetStorageReader offsetStorageReader;
private SourceTaskContext context;
private FileStreamSourceTask task;
private boolean verifyMocks = false;
@Before
public void setup() throws IOException {
tempFile = File.createTempFile("file-stream-source-task-test", null);
config = new HashMap<>();
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
task = new FileStreamSourceTask();
offsetStorageReader = createMock(OffsetStorageReader.class);
context = createMock(SourceTaskContext.class);
task.initialize(context);
}
@After
public void teardown() {
tempFile.delete();
if (verifyMocks)
verifyAll();
}
private void replay() {
replayAll();
verifyMocks = true;
}
@Test
public void testNormalLifecycle() throws InterruptedException, IOException {
expectOffsetLookupReturnNone();
replay();
task.start(config);
OutputStream os = Files.newOutputStream(tempFile.toPath());
assertEquals(null, task.poll());
os.write("partial line".getBytes());
os.flush();
assertEquals(null, task.poll());
os.write(" finished\n".getBytes());
os.flush();
List<SourceRecord> records = task.poll();
assertEquals(1, records.size());
assertEquals(TOPIC, records.get(0).topic());
assertEquals("partial line finished", records.get(0).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
assertEquals(null, task.poll());
// Different line endings, and make sure the final \r doesn't result in a line until we can
// read the subsequent byte.
os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
os.flush();
records = task.poll();
assertEquals(4, records.size());
assertEquals("line1", records.get(0).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
assertEquals("line2", records.get(1).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
assertEquals("line3", records.get(2).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
assertEquals("line4", records.get(3).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
os.write("subsequent text".getBytes());
os.flush();
records = task.poll();
assertEquals(1, records.size());
assertEquals("", records.get(0).value());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
os.close();
task.stop();
}
@Test
public void testBatchSize() throws IOException, InterruptedException {
expectOffsetLookupReturnNone();
replay();
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "5000");
task.start(config);
OutputStream os = Files.newOutputStream(tempFile.toPath());
for (int i = 0; i < 10_000; i++) {
os.write("Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
}
os.flush();
List<SourceRecord> records = task.poll();
assertEquals(5000, records.size());
records = task.poll();
assertEquals(5000, records.size());
os.close();
task.stop();
}
@Test
public void testMissingFile() throws InterruptedException {
replay();
String data = "line\n";
System.setIn(new ByteArrayInputStream(data.getBytes()));
config.remove(FileStreamSourceConnector.FILE_CONFIG);
task.start(config);
List<SourceRecord> records = task.poll();
assertEquals(1, records.size());
assertEquals(TOPIC, records.get(0).topic());
assertEquals("line", records.get(0).value());
task.stop();
}
public void testInvalidFile() throws InterruptedException {
config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
task.start(config);
// Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
for (int i = 0; i < 100; i++)
assertEquals(null, task.poll());
}
private void expectOffsetLookupReturnNone() {
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offset(EasyMock.<Map<String, String>>anyObject())).andReturn(null);
}
}