refactor hive text input format

This commit is contained in:
taiyang-li 2021-12-02 16:14:25 +08:00
parent 701ad45aaa
commit 9ec8272186
15 changed files with 138 additions and 84 deletions

View File

@ -585,6 +585,12 @@ class IColumn;
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \
M(Char, input_format_hive_text_fields_delimiter, '\x01', "Delimiter between fields in Hive Text File", 0) \
M(Char, input_format_hive_text_collection_items_delimiter, '\x02', "Delimiter between collection(array or map) items in Hive Text File", 0) \
M(Char, input_format_hive_text_map_keys_delimiter, '\x03', "Delimiter between a pair of map key/values in Hive Text File", 0) \
\
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \

View File

@ -61,6 +61,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.csv.null_representation = settings.format_csv_null_representation;
format_settings.csv.input_format_arrays_as_nested_csv = settings.input_format_csv_arrays_as_nested_csv;
format_settings.hive_text.fields_delimiter = settings.input_format_hive_text_fields_delimiter;
format_settings.hive_text.collection_items_delimiter = settings.input_format_hive_text_collection_items_delimiter;
format_settings.hive_text.map_keys_delimiter = settings.input_format_hive_text_map_keys_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
@ -83,6 +86,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.decimal_trailing_zeros = settings.output_format_decimal_trailing_zeros;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
@ -111,7 +115,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.arrow.allow_missing_columns = settings.input_format_arrow_allow_missing_columns;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.seekable_read = settings.input_format_allow_seeks;

View File

@ -71,6 +71,7 @@ struct FormatSettings
UInt64 row_group_size = 1000000;
bool low_cardinality_as_dictionary = false;
bool import_nested = false;
bool allow_missing_columns = false;
} arrow;
struct
@ -93,10 +94,18 @@ struct FormatSettings
bool input_format_enum_as_number = false;
bool input_format_arrays_as_nested_csv = false;
bool read_bool_as_uint8 = false;
Names input_field_names;
String null_representation = "\\N";
} csv;
struct HiveText
{
char fields_delimiter = '\x01';
char collection_items_delimiter = '\x02';
char map_keys_delimiter = '\x03';
bool read_bool_as_uint8 = true;
Names input_field_names;
} hive_text;
struct Custom
{
std::string result_before_delimiter;
@ -122,6 +131,7 @@ struct FormatSettings
{
UInt64 row_group_size = 1000000;
bool import_nested = false;
bool allow_missing_columns = false;
} parquet;
struct Pretty
@ -200,6 +210,7 @@ struct FormatSettings
struct
{
bool import_nested = false;
bool allow_missing_columns = false;
} orc;
/// For capnProto format we should determine how to

View File

@ -64,10 +64,12 @@ Chunk ArrowBlockInputFormat::generate()
++record_batch_current;
auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
if (format_settings.defaults_for_omitted_fields)
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_columns)
block_missing_values.setBit(column_idx, row_idx);
return res;
}
@ -113,7 +115,8 @@ void ArrowBlockInputFormat::prepareReader()
}
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(), "Arrow", format_settings.arrow.import_nested, format_settings.defaults_for_omitted_fields);
getPort().getHeader(), "Arrow", format_settings.arrow.import_nested, format_settings.arrow.allow_missing_columns);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
if (stream)
record_batch_total = -1;

View File

@ -41,6 +41,7 @@ private:
int record_batch_total = 0;
int record_batch_current = 0;
std::vector<size_t> missing_columns;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;

View File

@ -5,7 +5,6 @@
#include "ArrowBufferedStreams.h"
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <IO/RemoteReadBufferCache.h>
#include <Common/assert_cast.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
@ -13,9 +12,6 @@
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <base/logger_useful.h>
#include <Common/Stopwatch.h>
#include <Poco/Logger.h>
#include <sys/stat.h>

View File

@ -272,7 +272,7 @@ static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray>
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
bytemap_data.emplace_back(chunk->IsNull(value_i));
}
return nullmap_column;
return std::move(nullmap_column);
}
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
@ -290,7 +290,7 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
for (int64_t i = 1; i < arrow_offsets.length(); ++i)
offsets_data.emplace_back(start + arrow_offsets.Value(i));
}
return offsets_column;
return std::move(offsets_column);
}
static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
@ -505,21 +505,30 @@ static Block arrowSchemaToCHHeader(const arrow::Schema & schema, const std::stri
}
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
const arrow::Schema & schema, const std::string & format_name_, bool import_nested_, bool defaults_for_omitted_fields_)
const arrow::Schema & schema,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_)
: header(arrowSchemaToCHHeader(schema, format_name_))
, format_name(format_name_)
, import_nested(import_nested_)
, defaults_for_omitted_fields(defaults_for_omitted_fields_)
, allow_missing_columns(allow_missing_columns_)
{
}
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
const Block & header_, const std::string & format_name_, bool import_nested_, bool defaults_for_omitted_fields_)
: header(header_), format_name(format_name_), import_nested(import_nested_), defaults_for_omitted_fields(defaults_for_omitted_fields_)
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_)
: header(header_)
, format_name(format_name_)
, import_nested(import_nested_)
, allow_missing_columns(allow_missing_columns_)
{
}
std::vector<size_t> ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : table->ColumnNames())
@ -528,23 +537,15 @@ std::vector<size_t> ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std:
name_to_column_ptr[column_name] = arrow_column;
}
return arrowColumnsToCHChunk(res, name_to_column_ptr);
arrowColumnsToCHChunk(res, name_to_column_ptr);
}
std::vector<size_t> ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
{
Columns columns_list;
if (name_to_column_ptr.empty())
return {};
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.rows());
std::unordered_map<String, BlockPtr> nested_tables;
std::vector<size_t> missing_column_indexes;
missing_column_indexes.reserve(header.columns());
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
@ -569,20 +570,15 @@ std::vector<size_t> ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, Na
if (!read_from_nested)
{
missing_column_indexes.push_back(column_i);
if (defaults_for_omitted_fields)
{
ColumnWithTypeAndName column;
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
continue;
}
else
{
if (!allow_missing_columns)
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
}
ColumnWithTypeAndName column;
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
continue;
}
}
@ -610,7 +606,35 @@ std::vector<size_t> ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, Na
}
res.setColumns(columns_list, num_rows);
return missing_column_indexes;
}
std::vector<size_t> ArrowColumnToCHColumn::getMissingColumns(const arrow::Schema & schema) const
{
std::vector<size_t> missing_columns;
auto block_from_arrow = arrowSchemaToCHHeader(schema, format_name);
auto flatten_block_from_arrow = Nested::flatten(block_from_arrow);
for (size_t i = 0, columns = header.columns(); i < columns; ++i)
{
const auto & column = header.getByPosition(i);
bool read_from_nested = false;
String nested_table_name = Nested::extractTableName(column.name);
if (!block_from_arrow.has(column.name))
{
if (import_nested && block_from_arrow.has(nested_table_name))
read_from_nested = flatten_block_from_arrow.has(column.name);
if (!read_from_nested)
{
if (!allow_missing_columns)
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", column.name};
missing_columns.push_back(i);
}
}
}
return missing_columns;
}
}
#endif

View File

@ -21,22 +21,34 @@ class ArrowColumnToCHColumn
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
ArrowColumnToCHColumn(const Block & header_, const std::string & format_name_, bool import_nested_, bool defaults_for_omitted_fields_);
ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_);
/// Constructor that create header by arrow schema. It will be useful for inserting
/// data from file without knowing table structure.
ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_, bool defaults_for_omitted_fields_);
ArrowColumnToCHColumn(
const arrow::Schema & schema,
const std::string & format_name,
bool import_nested_,
bool allow_missing_columns_);
/// Convert arrow::Table to chunk. Returns missing header columns not exists in arrow::Table.
std::vector<size_t> arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
std::vector<size_t> arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
/// Get missing columns that exists in header but not in arrow::Schema
std::vector<size_t> getMissingColumns(const arrow::Schema & schema) const;
private:
const Block header;
const std::string format_name;
bool import_nested;
bool defaults_for_omitted_fields;
/// If false, throw exception if some columns in header not exists in arrow table.
bool allow_missing_columns;
/// Map {column name : dictionary column}.
/// To avoid converting dictionary from Arrow Dictionary

View File

@ -14,32 +14,16 @@ HiveTextRowInputFormat::HiveTextRowInputFormat(
const Block & header_, ReadBuffer & in_, const Params & params_, const FormatSettings & format_settings_)
: CSVRowInputFormat(header_, buf, params_, true, false, format_settings_)
, buf(in_)
, input_field_names(format_settings_.hive_text.input_field_names)
{
}
void HiveTextRowInputFormat::readPrefix()
{
std::vector<bool> read_columns(data_types.size(), false);
/// For Hive Text file, read the first row to get exact number of columns.
auto values = readNames();
input_field_names = format_settings.csv.input_field_names;
input_field_names.resize(values.size());
for (const auto & column_name : input_field_names)
addInputColumn(column_name, read_columns);
for (size_t i = 0; i != read_columns.size(); ++i)
{
if (!read_columns[i])
column_mapping->not_presented_columns.push_back(i);
}
}
std::vector<String> HiveTextRowInputFormat::readNames()
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{buf, true};
auto values = readHeaderRow();
buf.rollbackToCheckpoint();
return values;
input_field_names.resize(values.size());
return input_field_names;
}
std::vector<String> HiveTextRowInputFormat::readTypes()
@ -53,7 +37,10 @@ void registerInputFormatHiveText(FormatFactory & factory)
"HiveText",
[](ReadBuffer & buf, const Block & sample, const RowInputFormatParams & params, const FormatSettings & settings)
{
return std::make_shared<HiveTextRowInputFormat>(sample, buf, params, settings);
FormatSettings settings_copy = settings;
settings_copy.csv.delimiter = settings_copy.hive_text.fields_delimiter;
settings_copy.csv.read_bool_as_uint8 = settings_copy.hive_text.read_bool_as_uint8;
return std::make_shared<HiveTextRowInputFormat>(sample, buf, params, settings_copy);
});
}
}

View File

@ -15,12 +15,15 @@ namespace DB
class HiveTextRowInputFormat : public CSVRowInputFormat
{
public:
HiveTextRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_, const FormatSettings & format_settings_);
HiveTextRowInputFormat(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
const FormatSettings & format_settings_);
String getName() const override { return "HiveTextRowInputFormat"; }
protected:
void readPrefix() override;
std::vector<String> readNames() override;
std::vector<String> readTypes() override;

View File

@ -69,10 +69,12 @@ Chunk ORCBlockInputFormat::generate()
name_to_column_ptr[column_name] = arrow_column;
}
auto missing_column_indexes = arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
if (format_settings.defaults_for_omitted_fields)
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_columns)
block_missing_values.setBit(column_idx, row_idx);
batch_reader.reset();
return res;
@ -126,7 +128,8 @@ void ORCBlockInputFormat::prepareReader()
THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema));
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(), "ORC", format_settings.orc.import_nested, format_settings.defaults_for_omitted_fields);
getPort().getHeader(), "ORC", format_settings.orc.import_nested, format_settings.orc.allow_missing_columns);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
std::unordered_set<String> nested_table_names;
if (format_settings.orc.import_nested)

View File

@ -50,6 +50,7 @@ private:
// indices of columns to read from ORC file
std::vector<int> include_indices;
std::vector<size_t> missing_columns;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;

View File

@ -56,10 +56,12 @@ Chunk ParquetBlockInputFormat::generate()
++row_group_current;
auto missing_column_indexes = arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_column_indexes)
block_missing_values.setBit(column_idx, row_idx);
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
if (format_settings.defaults_for_omitted_fields)
for (size_t row_idx = 0; row_idx < res.getNumRows(); ++row_idx)
for (const auto & column_idx : missing_columns)
block_missing_values.setBit(column_idx, row_idx);
return res;
}
@ -111,7 +113,8 @@ void ParquetBlockInputFormat::prepareReader()
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(), "Parquet", format_settings.parquet.import_nested, format_settings.defaults_for_omitted_fields);
getPort().getHeader(), "Parquet", format_settings.parquet.import_nested, format_settings.parquet.allow_missing_columns);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
std::unordered_set<String> nested_table_names;
if (format_settings.parquet.import_nested)

View File

@ -36,6 +36,7 @@ private:
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int row_group_current = 0;
std::vector<size_t> missing_columns;
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
};

View File

@ -131,10 +131,7 @@ public:
}
/// Initialize format settings
format_settings.csv.delimiter = '\x01';
format_settings.csv.input_field_names = text_input_field_names;
format_settings.csv.read_bool_as_uint8 = true;
format_settings.defaults_for_omitted_fields = true;
format_settings.hive_text.input_field_names = text_input_field_names;
}
String getName() const override { return "Hive"; }
@ -373,7 +370,7 @@ ASTPtr StorageHive::extractKeyExpressionList(const ASTPtr & node)
/// Primary key consists of one column.
auto res = std::make_shared<ASTExpressionList>();
res->children.push_back(node);
return res;
return std::move(res);
}
}