-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Feature][Connector-v2]Resolve null first column in CSV Reader #10383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
| // issues | ||
| List<String> cleanedHeaders = | ||
| headers.stream() | ||
| .map(header -> header.replace("\uFEFF", "").trim()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any other characters besides \uFEFF that can cause this problem? In addition, does the reading of text and excel also have this problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\uFEFF is the UTF-8 BOM (Byte Order Mark), a special non-printable character that usually appears at the start of files saved in the UTF-8 with BOM encoding. (Tools such as Notepad on the Windows system often add this mark automatically when saving files in UTF-8 format.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chl-wxp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request fixes a bug where UTF-8 BOM (Byte Order Mark) characters in CSV files cause the first column to be null when csv_use_header_line=true. The fix introduces BOMInputStream from Apache Commons IO to automatically detect and skip BOM markers, and adds additional header cleaning logic to remove any residual BOM characters.
Changes:
- Added BOMInputStream to detect and handle UTF-8 BOM in CSV files during reading
- Implemented header cleaning logic to remove BOM characters and trim whitespace from column names
- Added test case to verify BOM handling with a sample UTF-8 BOM encoded CSV file
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java | Updated CSV reading logic to wrap input streams with BOMInputStream for automatic BOM detection, detect charset based on BOM presence, and clean headers of BOM characters |
| seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java | Added test method testUtf8BomCsvWithHeaderRead() to verify correct parsing of UTF-8 BOM CSV files with headers |
| seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/csv/utf8_bom.csv | Added test CSV file with UTF-8 BOM marker for testing BOM handling |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Charset charset = | ||
| bomIn.getBOM() == null | ||
| ? Charset.forName(encoding) | ||
| : Charset.forName(bomIn.getBOM().getCharsetName()); |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The charset detection logic assumes that if a BOM is detected, the BOM's charset name will be valid. However, there's no null check on bomIn.getBOM().getCharsetName() when bomIn.getBOM() is not null. If getBOM() returns a non-null value but getCharsetName() returns null or an invalid charset name, Charset.forName() will throw an exception. Consider adding defensive checks or using a try-catch to handle potential IllegalCharsetNameException or UnsupportedCharsetException.
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Outdated
Show resolved
Hide resolved
...t/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
Show resolved
Hide resolved
Issue 1: BOM Processing Logic Error in Split Scenario (BLOCKER)Location: Code: try (BOMInputStream bomIn = new BOMInputStream(wrapInputStream(inputStream, split));
BufferedReader reader =
new BufferedReader(new InputStreamReader(bomIn, getCharset(bomIn)));
CSVParser csvParser = new CSVParser(reader, getCSVFormat())) {Related Context:
Problem Description: In split scenario (
Example Scenario: Root Cause:
Need to determine whether Potential Risks:
Impact Scope:
Severity: BLOCKER - Breaks core functionality Improvement Suggestions: // Solution 1: Pass whether InputStream is chunked via parameter (recommended)
private InputStream wrapInputStream(InputStream inputStream, FileSourceSplit split,
boolean isAlreadySliced) throws IOException {
InputStream resultStream;
switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
resultStream = lzo.createInputStream(inputStream);
break;
case NONE:
resultStream = inputStream;
break;
default:
log.warn("Csv file does not support this compress type: {}",
compressFormat.getCompressCodec());
resultStream = inputStream;
break;
}
// Only perform slice when InputStream is not chunked
if (enableSplitFile && split.getLength() > -1 && !isAlreadySliced) {
resultStream = safeSlice(resultStream, split.getStart(), split.getLength());
}
return resultStream;
}
// Caller needs to pass the flag
// Flag source: Check if inputStream is BoundedInputStream or other chunked stream typeor // Solution 2: Determine by checking InputStream type (more complex implementation)
private InputStream wrapInputStream(InputStream inputStream, FileSourceSplit split)
throws IOException {
InputStream resultStream;
switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
resultStream = lzo.createInputStream(inputStream);
break;
case NONE:
resultStream = inputStream;
break;
default:
log.warn("Csv file does not support this compress type: {}",
compressFormat.getCompressCodec());
resultStream = inputStream;
break;
}
// Check if it's already BoundedInputStream (return type of safeSlice)
boolean isAlreadySliced = (resultStream instanceof BoundedInputStream);
if (enableSplitFile && split.getLength() > -1 && !isAlreadySliced) {
resultStream = safeSlice(resultStream, split.getStart(), split.getLength());
}
return resultStream;
}Rationale:
Issue 2: Bug in Old LZO Compression + Split Scenario Not Explained in PR (CRITICAL)Location: Old Code: switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
actualInputStream = lzo.createInputStream(inputStream); // Create decompression stream
break;
// ...
}
if (enableSplitFile && split.getLength() > -1) {
actualInputStream = safeSlice(inputStream, split.getStart(), split.getLength()); // ❌ Use original inputStream instead of actualInputStream
}New Code: switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
resultStream = lzo.createInputStream(inputStream);
break;
// ...
}
if (enableSplitFile && split.getLength() > -1) {
resultStream = safeSlice(resultStream, split.getStart(), split.getLength()); // ✓ Use resultStream
}Problem Description: Old code has serious bug in LZO compression + split scenario:
Example Impact: Potential Risks:
Impact Scope:
Severity: CRITICAL - Fixed existing bug but not explained Improvement Suggestions:
@Test
public void testLzoCompressedCsvWithSplit() throws Exception {
// Need to prepare LZO compressed CSV test file
// Verify correctness of split read
}
** Reason**:
## Issue 3: BOM handling not tested when firstLineAsHeader=false (MAJOR)** Location**: ** Problem Description**: 当前测试仅覆盖 map.put(FileBaseSourceOptions.CSV_USE_HEADER_LINE.key(), "true");但
** Potential Risk**:
** Example Scenario**: ** Scope of Impact**:
** Severity**: MAJOR - Edge case not covered ** Improvement Suggestion**: 添加测试用例: @Test
public void testUtf8BomCsvWithoutHeaderRead() throws Exception {
URL resource = CsvReadStrategyTest.class.getResource("/csv/utf8_bom.csv");
String path = Paths.get(resource.toURI()).toString();
CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
csvReadStrategy.init(localConf);
csvReadStrategy.getFileNamesByPath(path);
// Do not set CSV_USE_HEADER_LINE, defaults to false
csvReadStrategy.setPluginConfig(ConfigFactory.empty());
csvReadStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"id", "name", "age", "gender"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.INT_TYPE,
BasicType.STRING_TYPE
})));
TestCollector testCollector = new TestCollector();
csvReadStrategy.read(path, "", testCollector);
final List<SeaTunnelRow> rows = testCollector.getRows();
// Should read 3 lines (including header line as data)
Assertions.assertEquals(3, rows.size());
// First line (original header) should parse correctly, first column should not have BOM
Assertions.assertEquals(9821, rows.get(0).getField(0));
Assertions.assertEquals("hawk", rows.get(0).getField(1));
// ...other assertions
}** Reason**:
## Issue 4: Insufficient explanation for cleanedHeaders necessity (MINOR)** Location**: ** Code**: // Clean up BOM characters (\ uFEFF) in the header to solve occasional BOM residue
// issues
List<String> cleanedHeaders =
headers.stream()
.map(header -> header.replace("\uFEFF", ""))
.collect(Collectors.toList());** Problem Description**: 注释提到"occasional BOM residue issues"(偶然的BOM残留问题),但未说明:
** Potential Risk**:
** Scope of Impact**:
** Severity**: MINOR - Documentation/comment issue ** Improvement Suggestion**: // Although BOMInputStream should handle BOM correctly at the byte stream level,
// we add this defensive cleaning to handle edge cases where:
// 1. BOMInputStream fails to detect certain BOM variants
// 2. BOM characters are embedded in the file content (not at the start)
// 3. Encoding conversion issues (e.g., when BOM is present but encoding mismatch)
List<String> cleanedHeaders =
headers.stream()
.map(header -> header.replace("\uFEFF", ""))
.collect(Collectors.toList());或如果确实不需要,考虑移除并添加断言: List<String> headers = getHeaders(csvParser);
// Assert: BOMInputStream should have removed BOM, so headers shouldn't contain \uFEFF
if (headers.stream().anyMatch(h -> h.contains("\uFEFF"))) {
log.warn("BOM character found in CSV headers despite BOMInputStream processing. " +
"Headers: {}", headers);
}** Reason**:
## Issue 5: Lack of support documentation for other BOM types (MINOR)** Location**: ** Code**: private Charset getCharset(BOMInputStream bomIn) throws IOException {
return bomIn.getBOM() == null
? Charset.forName(encoding)
: Charset.forName(bomIn.getBOM().getCharsetName());
}** Problem Description**: 当前代码支持自动检测多种BOM类型(UTF-8, UTF-16 LE, UTF-16 BE, UTF-32等),但:
** Example Scenario**: ** Potential Risk**:
** Scope of Impact**:
** Severity**: MINOR - Feature enhancement/documentation issue ** Improvement Suggestion**:
@Test
public void testUtf16BomCsv() throws Exception {
// Prepare CSV file with UTF-16 BE/LE BOM
// Verify automatic detection and correct parsing
}
/**
* Detects the character set from BOM if present.
*
* Supported BOM types:
* - UTF-8 (EF BB BF)
* - UTF-16 BE (FE FF)
* - UTF-16 LE (FF FE)
* - UTF-32 BE (00 00 FE FF)
* - UTF-32 LE (FF FE 00 00)
*
* If BOM is detected, the charset declared by the BOM takes precedence
* over the user-configured encoding. This follows the standard behavior
* of BOMInputStream.
*
* @param bomIn the BOMInputStream to check
* @return the detected or configured charset
*/
private Charset getCharset(BOMInputStream bomIn) throws IOException {
return bomIn.getBOM() == null
? Charset.forName(encoding)
: Charset.forName(bomIn.getBOM().getCharsetName());
}
// If true, respect BOM charset; if false, always use user-configured encoding
private boolean respectBomEncoding = true;** Reason**:
## Issue 6: Formatting error in comments (MINOR)** Location**: ** Code**: // Clean up BOM characters (\ uFEFF) in the header to solve occasional BOM residue
// issues** Problem Description**: There's a space in the comment ** Scope of Impact**: Comment format ** Severity**: MINOR - Typo ** Improvement Suggestion**: // Clean up BOM characters (\uFEFF) in the header to solve occasional BOM residue issues |

Purpose of this pull request
fix: #10374
Resolve null first column in CSV Reader with UTF-8 BOM files
Does this PR introduce any user-facing change?
How was this patch tested?
CsvReadStrategyTest#testUtf8BomCsvWithHeaderRead
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.