Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JsonFileFormatProperties extends FileFormatProperties {
public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
public static final String PROP_NUM_AS_STRING = "num_as_string";
public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
public static final String PROP_FILL_MISSING_COLUMNS = "fill_missing_columns";

// from ExternalFileTableValuedFunction:
private String jsonRoot = "";
Expand All @@ -44,6 +45,7 @@ public class JsonFileFormatProperties extends FileFormatProperties {
private boolean readJsonByLine;
private boolean numAsString = false;
private boolean fuzzyParse = false;
private boolean fillMissingColumns = false;
private String lineDelimiter = CsvFileFormatProperties.DEFAULT_LINE_DELIMITER;


Expand Down Expand Up @@ -77,6 +79,9 @@ public void analyzeFileFormatProperties(Map<String, String> formatProperties, bo
fuzzyParse = Boolean.valueOf(
getOrDefault(formatProperties, PROP_FUZZY_PARSE,
"", isRemoveOriginProperty)).booleanValue();
fillMissingColumns = Boolean.valueOf(
getOrDefault(formatProperties, PROP_FILL_MISSING_COLUMNS,
"false", isRemoveOriginProperty)).booleanValue();
lineDelimiter = getOrDefault(formatProperties, CsvFileFormatProperties.PROP_LINE_DELIMITER,
CsvFileFormatProperties.DEFAULT_LINE_DELIMITER, isRemoveOriginProperty);
if (Strings.isNullOrEmpty(lineDelimiter)) {
Expand Down Expand Up @@ -136,6 +141,10 @@ public boolean isFuzzyParse() {
return fuzzyParse;
}

public boolean isFillMissingColumns() {
return fillMissingColumns;
}

public String getLineDelimiter() {
return lineDelimiter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
String.valueOf(jsonFileFormatProperties.isNumAsString()));
jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS,
String.valueOf(jsonFileFormatProperties.isFillMissingColumns()));
} else {
throw new UserException("Invalid format type.");
}
Expand Down Expand Up @@ -710,6 +712,10 @@ public boolean isFuzzyParse() {
return Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FUZZY_PARSE));
}

public boolean isFillMissingColumns() {
return Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
}

@Override
public int getSendBatchParallelism() {
return sendBatchParallelism;
Expand Down Expand Up @@ -1809,6 +1815,7 @@ public String getShowCreateInfo() {
appendProperties(sb, JsonFileFormatProperties.PROP_NUM_AS_STRING, isNumAsString(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_FUZZY_PARSE, isFuzzyParse(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_JSON_ROOT, getJsonRoot(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, isFillMissingColumns(), false);
appendProperties(sb, LoadCommand.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadCommand.TIMEZONE, getTimezone(), false);
appendProperties(sb, LoadCommand.EXEC_MEM_LIMIT, getMemLimit(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ public NereidsDataDescription(String tableName, NereidsLoadTaskInfo taskInfo) {
putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE,
String.valueOf(taskInfo.isReadJsonByLine()));
putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_NUM_AS_STRING, String.valueOf(taskInfo.isNumAsString()));
putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS,
String.valueOf(taskInfo.isFillMissingColumns()));
this.uniquekeyUpdateMode = taskInfo.getUniqueKeyUpdateMode();
columnsNameToLowerCase(fileFieldNames);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
Expand Down Expand Up @@ -116,7 +117,7 @@ private void handleSequenceColumn(List<NereidsImportColumnDesc> columnDescList,
.filter(c -> c.getColumnName().equalsIgnoreCase(finalSequenceCol)).findAny();
// if `columnDescs.descs` is empty, that means it's not a partial update load, and user not specify
// column name.
if (foundCol.isPresent() || shouldAddSequenceColumn(columnDescList)) {
if (foundCol.isPresent() || shouldAddSequenceColumn(columnDescList, fileGroup)) {
columnDescList.add(new NereidsImportColumnDesc(Column.SEQUENCE_COL,
new UnboundSlot(sequenceCol)));
} else if (!fileGroupInfo.isFixedPartialUpdate()) {
Expand Down Expand Up @@ -195,9 +196,16 @@ private void fillContextExprMap(List<NereidsImportColumnDesc> columnDescList, Ne
// If user does not specify the file field names, generate it by using base schema of table.
// So that the following process can be unified
boolean specifyFileFieldNames = copiedColumnExprs.stream().anyMatch(p -> p.isColumn());
if (!specifyFileFieldNames) {
if (!specifyFileFieldNames || isFillMissingColumns(fileGroup)) {
Comment thread
billryan marked this conversation as resolved.
Set<String> existingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
for (NereidsImportColumnDesc desc : copiedColumnExprs) {
existingColumns.add(desc.getColumnName());
}
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
if (existingColumns.contains(column.getName())) {
continue;
}
if (constantMappingColumns.contains(column.getName())) {
// Skip this column because user has already specified a constant mapping expression for it
// in the COLUMNS parameter (e.g., "column_name = 'constant_value'")
Expand Down Expand Up @@ -420,15 +428,29 @@ private void fillContextExprMap(List<NereidsImportColumnDesc> columnDescList, Ne
}

/**
* if not set sequence column and column size is null or only have deleted sign ,return true
* Returns true when the sequence column should be auto-added, i.e.,
* if not set sequence column and column size is null or only have deleted sign,
* or fill_missing_columns is enabled, meaning schema will be auto-filled.
*/
private boolean shouldAddSequenceColumn(List<NereidsImportColumnDesc> columnDescList) {
private boolean shouldAddSequenceColumn(List<NereidsImportColumnDesc> columnDescList,
NereidsBrokerFileGroup fileGroup) {
if (isFillMissingColumns(fileGroup)) {
return true;
}
if (columnDescList.isEmpty()) {
return true;
}
return columnDescList.size() == 1 && columnDescList.get(0).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN);
}

/**
* Returns true if the file format is JSON and fill_missing_columns is enabled. Only meaningful for JSON.
*/
private boolean isFillMissingColumns(NereidsBrokerFileGroup fileGroup) {
return fileGroup.getFileFormatProperties() instanceof JsonFileFormatProperties
&& ((JsonFileFormatProperties) fileGroup.getFileFormatProperties()).isFillMissingColumns();
}

private TFileFormatType formatType(String fileFormat) throws UserException {
if (fileFormat == null) {
// get file format by the file path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ default void setTimeout(int timeout) {

boolean isNumAsString();

default boolean isFillMissingColumns() {
return false;
}

boolean isReadJsonByLine();

String getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
Expand Down Expand Up @@ -222,6 +223,11 @@ public boolean isNumAsString() {
return Boolean.parseBoolean(jobProperties.get(PROPS_NUM_AS_STRING));
}

@Override
public boolean isFillMissingColumns() {
return Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
}

@Override
public boolean isReadJsonByLine() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.add(CreateRoutineLoadInfo.STRICT_MODE)
.add(CreateRoutineLoadInfo.TIMEZONE)
.add(CreateRoutineLoadInfo.WORKLOAD_GROUP)
.add(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
.add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
.add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
Expand Down Expand Up @@ -313,6 +314,15 @@ private void checkJobProperties() throws UserException {
analyzedJobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
String.valueOf(emptyFieldAsNull));
}

if (jobProperties.containsKey(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)) {
String val = jobProperties.get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS);
if (!"true".equalsIgnoreCase(val) && !"false".equalsIgnoreCase(val)) {
throw new AnalysisException(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS
+ " must be 'true' or 'false', but found: " + val);
}
analyzedJobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, val);
}
}

private void checkDataSourceProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class CreateRoutineLoadInfo {
.add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
.add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
.add(JsonFileFormatProperties.PROP_FUZZY_PARSE)
.add(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)
.add(JsonFileFormatProperties.PROP_JSON_ROOT)
.add(CsvFileFormatProperties.PROP_ENCLOSE)
.add(CsvFileFormatProperties.PROP_ESCAPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,29 @@ public void testAnalyzeFileFormatPropertiesEmpty() throws AnalysisException {
Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
Assert.assertEquals(false, jsonFileFormatProperties.isFillMissingColumns());
Assert.assertEquals(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER,
jsonFileFormatProperties.getLineDelimiter());
}

@Test
public void testAnalyzeFileFormatPropertiesFillMissingColumnsTrue() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(true, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
public void testAnalyzeFileFormatPropertiesFillMissingColumnsFalse() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "false");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(false, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
public void testAnalyzeFileFormatPropertiesValidJsonRoot() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
Expand Down Expand Up @@ -159,6 +178,7 @@ public void testAnalyzeFileFormatPropertiesAllProperties() throws AnalysisExcept
properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, "true");
properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);

Expand All @@ -168,6 +188,7 @@ public void testAnalyzeFileFormatPropertiesAllProperties() throws AnalysisExcept
Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
Assert.assertEquals(true, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,60 @@ public void testGetShowCreateInfo() throws UserException {
Assert.assertEquals(expect, showCreateInfo);
}

@Test
public void testGetShowCreateInfoWithFillMissingColumns() throws UserException {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", 1,
11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);

// Set fill_missing_columns to true
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");
jobProperties.put("format", "json");
jobProperties.put("strip_outer_array", "false");
jobProperties.put("num_as_string", "false");
jobProperties.put("fuzzy_parse", "false");
jobProperties.put("strict_mode", "false");
jobProperties.put("timezone", "Asia/Shanghai");
jobProperties.put("desired_concurrent_number", "0");
jobProperties.put("max_error_number", "10");
jobProperties.put("max_filter_ratio", "1.0");
jobProperties.put("max_batch_interval", "60");
jobProperties.put("max_batch_rows", "10");
jobProperties.put("max_batch_size", "1073741824");
jobProperties.put("exec_mem_limit", "2147483648");
Deencapsulation.setField(routineLoadJob, "jobProperties", jobProperties);

String showCreateInfo = routineLoadJob.getShowCreateInfo();
String expect = "CREATE ROUTINE LOAD test_load ON 11\n"
+ "WITH APPEND\n"
+ "PROPERTIES\n"
+ "(\n"
+ "\"desired_concurrent_number\" = \"0\",\n"
+ "\"max_error_number\" = \"10\",\n"
+ "\"max_filter_ratio\" = \"1.0\",\n"
+ "\"max_batch_interval\" = \"60\",\n"
+ "\"max_batch_rows\" = \"10\",\n"
+ "\"max_batch_size\" = \"1073741824\",\n"
+ "\"format\" = \"json\",\n"
+ "\"strip_outer_array\" = \"false\",\n"
+ "\"num_as_string\" = \"false\",\n"
+ "\"fuzzy_parse\" = \"false\",\n"
+ "\"fill_missing_columns\" = \"true\",\n"
+ "\"strict_mode\" = \"false\",\n"
+ "\"timezone\" = \"Asia/Shanghai\",\n"
+ "\"exec_mem_limit\" = \"2147483648\"\n"
+ ")\n"
+ "FROM KAFKA\n"
+ "(\n"
+ "\"kafka_broker_list\" = \"localhost:9092\",\n"
+ "\"kafka_topic\" = \"test_topic\"\n"
+ ");";
System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}

@Test
public void testParseUniqueKeyUpdateMode() {
// Test valid mode strings
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.load;

import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

public class NereidsRoutineLoadTaskInfoTest {

private NereidsRoutineLoadTaskInfo buildTaskInfo(Map<String, String> jobProperties) {
return new NereidsRoutineLoadTaskInfo(0L, jobProperties, 10L, null,
LoadTask.MergeType.APPEND, null, null, 0.0,
new NereidsLoadTaskInfo.NereidsImportColumnDescs(), null, null, null,
null, (byte) 0, (byte) 0, 1, false, false,
TPartialUpdateNewRowPolicy.APPEND, false);
}

@Test
public void testIsFillMissingColumnsTrue() {
Map<String, String> jobProperties = new HashMap<>();
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");
NereidsRoutineLoadTaskInfo taskInfo = buildTaskInfo(jobProperties);
Assertions.assertTrue(taskInfo.isFillMissingColumns());
}

@Test
public void testIsFillMissingColumnsFalse() {
Map<String, String> jobProperties = new HashMap<>();
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "false");
NereidsRoutineLoadTaskInfo taskInfo = buildTaskInfo(jobProperties);
Assertions.assertFalse(taskInfo.isFillMissingColumns());
}

@Test
public void testIsFillMissingColumnsDefault() {
NereidsRoutineLoadTaskInfo taskInfo = buildTaskInfo(new HashMap<>());
Assertions.assertFalse(taskInfo.isFillMissingColumns());
}
}
Loading
Loading