Address PR comments

This commit is contained in:
Antonio Andelic 2022-03-23 15:02:19 +00:00
parent 08cb71a067
commit 052057f2ef
10 changed files with 148 additions and 210 deletions

View File

@ -240,15 +240,6 @@ inline bool equalsCaseInsensitive(char a, char b)
return a == b || (isAlphaASCII(a) && alternateCaseIfAlphaASCII(a) == b);
}
inline bool equalsCaseInsensitive(const std::string_view a, const std::string_view b)
{
if (a.length() != b.length())
return false;
return std::equal(
a.begin(), a.end(), b.begin(), [](const auto first, const auto second) { return equalsCaseInsensitive(first, second); });
}
template <typename F>
std::string trim(const std::string & str, F && predicate)

View File

@ -13,6 +13,7 @@
#include <iterator>
#include <base/sort.h>
#include <boost/algorithm/string.hpp>
namespace DB
@ -273,7 +274,7 @@ const ColumnWithTypeAndName * Block::findByName(const std::string & name, bool c
{
if (case_insensitive)
{
auto found = std::find_if(data.begin(), data.end(), [&](const auto & column) { return equalsCaseInsensitive(column.name, name); });
auto found = std::find_if(data.begin(), data.end(), [&](const auto & column) { return boost::iequals(column.name, name); });
if (found == data.end())
{
return nullptr;
@ -304,7 +305,7 @@ const ColumnWithTypeAndName & Block::getByName(const std::string & name, bool ca
bool Block::has(const std::string & name, bool case_insensitive) const
{
if (case_insensitive)
return std::find_if(data.begin(), data.end(), [&](const auto & column) { return equalsCaseInsensitive(column.name, name); })
return std::find_if(data.begin(), data.end(), [&](const auto & column) { return boost::iequals(column.name, name); })
!= data.end();
return index_by_name.end() != index_by_name.find(name);

View File

@ -15,6 +15,8 @@
#include <Parsers/IAST.h>
#include <boost/algorithm/string/case_conv.hpp>
namespace DB
{
@ -227,12 +229,15 @@ void validateArraySizes(const Block & block)
}
std::unordered_set<String> getAllTableNames(const Block & block)
std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_case)
{
std::unordered_set<String> nested_table_names;
for (auto & name : block.getNames())
for (const auto & name : block.getNames())
{
auto nested_table_name = Nested::extractTableName(name);
if (to_lower_case)
boost::to_lower(nested_table_name);
if (!nested_table_name.empty())
nested_table_names.insert(nested_table_name);
}

View File

@ -32,7 +32,7 @@ namespace Nested
void validateArraySizes(const Block & block);
/// Get all nested tables names from a block.
std::unordered_set<String> getAllTableNames(const Block & block);
std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_case = false);
}
}

View File

@ -1,44 +1,40 @@
#include "ArrowColumnToCHColumn.h"
#include <base/logger_useful.h>
#include <boost/algorithm/string/case_conv.hpp>
#include "Common/StringUtils/StringUtils.h"
#if USE_ARROW || USE_ORC || USE_PARQUET
# include <algorithm>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnLowCardinality.h>
# include <Columns/ColumnMap.h>
# include <Columns/ColumnNullable.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/ColumnUnique.h>
# include <Columns/ColumnsNumber.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDate32.h>
# include <DataTypes/DataTypeDateTime64.h>
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeLowCardinality.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypesDecimal.h>
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/NestedUtils.h>
# include <Interpreters/castColumn.h>
# include <Processors/Chunk.h>
# include <arrow/array.h>
# include <arrow/builder.h>
# include <base/types.h>
# include <Common/DateLUTImpl.h>
# include <Common/quoteString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Common/DateLUTImpl.h>
#include <base/types.h>
#include <Processors/Chunk.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/castColumn.h>
#include <Common/quoteString.h>
#include <algorithm>
#include <arrow/builder.h>
#include <arrow/array.h>
#include <boost/algorithm/string/case_conv.hpp>
/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
# define FOR_ARROW_NUMERIC_TYPES(M) \
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::Int8) \
M(arrow::Type::INT16, DB::Int16) \
@ -49,7 +45,7 @@
M(arrow::Type::FLOAT, DB::Float32) \
M(arrow::Type::DOUBLE, DB::Float64)
# define FOR_ARROW_INDEXES_TYPES(M) \
#define FOR_ARROW_INDEXES_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::UInt8) \
M(arrow::Type::UINT16, DB::UInt16) \
@ -73,7 +69,6 @@ namespace ErrorCodes
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
@ -181,12 +176,8 @@ static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::Chu
{
Int32 days_num = static_cast<Int32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM)
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}",
days_num,
column_name,
DATE_LUT_MAX_DAY_NUM);
throw Exception{ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, column_name, DATE_LUT_MAX_DAY_NUM};
column_data.emplace_back(days_num);
}
@ -235,8 +226,7 @@ static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::
}
template <typename DecimalType, typename DecimalArray>
static ColumnWithTypeAndName
readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
{
auto internal_column = internal_type->createColumn();
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(*internal_column);
@ -248,8 +238,7 @@ readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_colum
auto & chunk = dynamic_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
column_data.emplace_back(
chunk.IsNull(value_i) ? DecimalType(0) : *reinterpret_cast<const DecimalType *>(chunk.Value(value_i))); // TODO: copy column
column_data.emplace_back(chunk.IsNull(value_i) ? DecimalType(0) : *reinterpret_cast<const DecimalType *>(chunk.Value(value_i))); // TODO: copy column
}
}
return {std::move(internal_column), internal_type, column_name};
@ -310,9 +299,10 @@ static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray>
switch (arrow_column->type()->id())
{
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: { \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, "").column; \
}
case ARROW_NUMERIC_TYPE: \
{ \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, "").column; \
}
FOR_ARROW_INDEXES_TYPES(DISPATCH)
# undef DISPATCH
default:
@ -366,13 +356,15 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
// so, read UINT16 as Date and UINT32 as DateTime to perform correct conversion
// between Date and DateTime further.
case arrow::Type::UINT16: {
case arrow::Type::UINT16:
{
auto column = readColumnWithNumericData<UInt16>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDate>();
return column;
}
case arrow::Type::UINT32: {
case arrow::Type::UINT32:
{
auto column = readColumnWithNumericData<UInt32>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDateTime>();
@ -384,10 +376,10 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
return readColumnWithDecimalData<arrow::Decimal128Array>(arrow_column, column_name);
case arrow::Type::DECIMAL256:
return readColumnWithDecimalData<arrow::Decimal256Array>(arrow_column, column_name);
case arrow::Type::MAP: {
case arrow::Type::MAP:
{
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
const auto * tuple_column = assert_cast<const ColumnTuple *>(nested_column.column.get());
@ -396,16 +388,17 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
auto map_type = std::make_shared<DataTypeMap>(tuple_type->getElements()[0], tuple_type->getElements()[1]);
return {std::move(map_column), std::move(map_type), column_name};
}
case arrow::Type::LIST: {
case arrow::Type::LIST:
{
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
auto array_column = ColumnArray::create(nested_column.column, offsets_column);
auto array_type = std::make_shared<DataTypeArray>(nested_column.type);
return {std::move(array_column), std::move(array_type), column_name};
}
case arrow::Type::STRUCT: {
case arrow::Type::STRUCT:
{
auto arrow_type = arrow_column->type();
auto * arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
std::vector<arrow::ArrayVector> nested_arrow_columns(arrow_struct_type->num_fields());
@ -423,8 +416,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
{
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
auto element = readColumnFromArrowColumn(
nested_arrow_column, arrow_struct_type->field(i)->name(), format_name, false, dictionary_values, read_ints_as_dates);
auto element = readColumnFromArrowColumn(nested_arrow_column, arrow_struct_type->field(i)->name(), format_name, false, dictionary_values, read_ints_as_dates);
tuple_elements.emplace_back(std::move(element.column));
tuple_types.emplace_back(std::move(element.type));
tuple_names.emplace_back(std::move(element.name));
@ -434,7 +426,8 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
auto tuple_type = std::make_shared<DataTypeTuple>(std::move(tuple_types), std::move(tuple_names));
return {std::move(tuple_column), std::move(tuple_type), column_name};
}
case arrow::Type::DICTIONARY: {
case arrow::Type::DICTIONARY:
{
auto & dict_values = dictionary_values[column_name];
/// Load dictionary values only once and reuse it.
if (!dict_values)
@ -446,14 +439,12 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
dict_array.emplace_back(dict_chunk.dictionary());
}
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column
= readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_values, read_ints_as_dates);
/// We should convert read column to ColumnUnique.
auto tmp_lc_column = DataTypeLowCardinality(dict_column.type).createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
static_cast<IColumnUnique *>(tmp_dict_column.get())
->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
static_cast<IColumnUnique *>(tmp_dict_column.get())->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
dict_column.column = std::move(tmp_dict_column);
dict_values = std::make_shared<ColumnWithTypeAndName>(std::move(dict_column));
}
@ -474,17 +465,13 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, column_name);
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
# undef DISPATCH
// TODO: read JSON as a string?
// TODO: read UUID as a string?
default:
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
"Unsupported {} type '{}' of an input column '{}'.",
format_name,
arrow_column->type()->name(),
column_name);
throw Exception(ErrorCodes::UNKNOWN_TYPE,
"Unsupported {} type '{}' of an input column '{}'.", format_name, arrow_column->type()->name(), column_name);
}
}
@ -504,26 +491,12 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
ColumnsWithTypeAndName sample_columns;
std::unordered_set<String> nested_table_names;
if (hint_header)
nested_table_names = Nested::getAllTableNames(*hint_header);
const auto accept_field = [&](const auto & field_name)
{
if (!hint_header || hint_header->has(field_name, ignore_case))
return true;
if (!ignore_case)
return nested_table_names.contains(field_name);
return std::find_if(
nested_table_names.begin(),
nested_table_names.end(),
[&](const auto & nested_table_name) { return equalsCaseInsensitive(nested_table_name, field_name); })
!= nested_table_names.end();
};
nested_table_names = Nested::getAllTableNames(*hint_header, ignore_case);
for (const auto & field : schema.fields())
{
if (!accept_field(field->name()))
if (hint_header && !hint_header->has(field->name(), ignore_case)
&& !nested_table_names.contains(ignore_case ? boost::to_lower_copy(field->name()) : field->name()))
continue;
/// Create empty arrow column by it's type and convert it to ClickHouse column.
@ -539,8 +512,7 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
arrow::ArrayVector array_vector = {arrow_array};
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dict_values;
ColumnWithTypeAndName sample_column
= readColumnFromArrowColumn(arrow_column, field->name(), format_name, false, dict_values, false);
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(arrow_column, field->name(), format_name, false, dict_values, false);
sample_columns.emplace_back(std::move(sample_column));
}
@ -571,9 +543,7 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name);
if (case_insensitive_matching)
{
boost::to_lower(column_name);
}
name_to_column_ptr[std::move(column_name)] = arrow_column;
}
@ -584,6 +554,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
{
if (unlikely(name_to_column_ptr.empty()))
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Columns is empty");
Columns columns_list;
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.rows());
@ -594,17 +565,14 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
auto search_column_name = header_column.name;
if (case_insensitive_matching)
{
boost::to_lower(search_column_name);
}
bool read_from_nested = false;
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
{
boost::to_lower(search_nested_table_name);
}
if (!name_to_column_ptr.contains(search_column_name))
{
/// Check if it's a column from nested table.

View File

@ -4,10 +4,10 @@
#if USE_ARROW || USE_ORC || USE_PARQUET
# include <Core/Block.h>
# include <Core/ColumnWithTypeAndName.h>
# include <DataTypes/IDataType.h>
# include <arrow/table.h>
#include <DataTypes/IDataType.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Block.h>
#include <arrow/table.h>
namespace DB

View File

@ -1,14 +1,14 @@
#include "ORCBlockInputFormat.h"
#include "Common/StringUtils/StringUtils.h"
#include <boost/algorithm/string/case_conv.hpp>
#if USE_ORC
# include <DataTypes/NestedUtils.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromMemory.h>
# include <IO/WriteHelpers.h>
# include <IO/copyData.h>
# include "ArrowBufferedStreams.h"
# include "ArrowColumnToCHColumn.h"
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -138,9 +138,10 @@ void ORCBlockInputFormat::prepareReader()
format_settings.orc.case_insensitive_column_matching);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
const bool ignore_case = format_settings.orc.case_insensitive_column_matching;
std::unordered_set<String> nested_table_names;
if (format_settings.orc.import_nested)
nested_table_names = Nested::getAllTableNames(getPort().getHeader());
nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
/// In ReadStripe column indices should be started from 1,
/// because 0 indicates to select all columns.
@ -151,29 +152,8 @@ void ORCBlockInputFormat::prepareReader()
/// so we should recursively count the number of indices we need for this type.
int indexes_count = countIndicesForType(schema->field(i)->type());
const auto & name = schema->field(i)->name();
const bool contains_column = std::invoke(
[&]
{
if (getPort().getHeader().has(name, format_settings.parquet.case_insensitive_column_matching))
{
return true;
}
if (!format_settings.parquet.case_insensitive_column_matching)
{
return nested_table_names.contains(name);
}
return std::find_if(
nested_table_names.begin(),
nested_table_names.end(),
[&](const auto & nested_table_name) { return equalsCaseInsensitive(nested_table_name, name); })
!= nested_table_names.end();
});
if (contains_column)
if (getPort().getHeader().has(name, ignore_case) || nested_table_names.contains(ignore_case ? boost::to_lower_copy(name) : name))
{
column_names.push_back(name);
for (int j = 0; j != indexes_count; ++j)
include_indices.push_back(index + j);
}
@ -200,9 +180,14 @@ NamesAndTypesList ORCSchemaReader::readSchema()
void registerInputFormatORC(FormatFactory & factory)
{
factory.registerInputFormat(
"ORC",
[](ReadBuffer & buf, const Block & sample, const RowInputFormatParams &, const FormatSettings & settings)
{ return std::make_shared<ORCBlockInputFormat>(buf, sample, settings); });
"ORC",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
});
factory.markFormatAsColumnOriented("ORC");
}
@ -210,7 +195,11 @@ void registerORCSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"ORC",
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr) { return std::make_shared<ORCSchemaReader>(buf, settings); });
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
{
return std::make_shared<ORCSchemaReader>(buf, settings);
}
);
}
}
@ -218,14 +207,14 @@ void registerORCSchemaReader(FormatFactory & factory)
namespace DB
{
class FormatFactory;
void registerInputFormatORC(FormatFactory &)
{
}
class FormatFactory;
void registerInputFormatORC(FormatFactory &)
{
}
void registerORCSchemaReader(FormatFactory &)
{
}
void registerORCSchemaReader(FormatFactory &)
{
}
}
#endif

View File

@ -45,8 +45,6 @@ private:
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
std::vector<String> column_names;
// indices of columns to read from ORC file
std::vector<int> include_indices;

View File

@ -1,21 +1,19 @@
#include "ParquetBlockInputFormat.h"
#include "Common/StringUtils/StringUtils.h"
#include <boost/algorithm/string/case_conv.hpp>
#if USE_PARQUET
# include <DataTypes/NestedUtils.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromMemory.h>
# include <IO/copyData.h>
# include <arrow/api.h>
# include <arrow/io/api.h>
# include <arrow/status.h>
# include <parquet/arrow/reader.h>
# include <parquet/file_reader.h>
# include "ArrowBufferedStreams.h"
# include "ArrowColumnToCHColumn.h"
# include <base/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/copyData.h>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/status.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -26,12 +24,12 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
# define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
@ -140,9 +138,10 @@ void ParquetBlockInputFormat::prepareReader()
format_settings.parquet.case_insensitive_column_matching);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);
const bool ignore_case = format_settings.parquet.case_insensitive_column_matching;
std::unordered_set<String> nested_table_names;
if (format_settings.parquet.import_nested)
nested_table_names = Nested::getAllTableNames(getPort().getHeader());
nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
int index = 0;
for (int i = 0; i < schema->num_fields(); ++i)
@ -153,27 +152,7 @@ void ParquetBlockInputFormat::prepareReader()
int indexes_count = countIndicesForType(schema->field(i)->type());
const auto & name = schema->field(i)->name();
const bool contains_column = std::invoke(
[&]
{
if (getPort().getHeader().has(name, format_settings.parquet.case_insensitive_column_matching))
{
return true;
}
if (!format_settings.parquet.case_insensitive_column_matching)
{
return nested_table_names.contains(name);
}
return std::find_if(
nested_table_names.begin(),
nested_table_names.end(),
[&](const auto & nested_table_name) { return equalsCaseInsensitive(nested_table_name, name); })
!= nested_table_names.end();
});
if (contains_column)
if (getPort().getHeader().has(name, ignore_case) || nested_table_names.contains(ignore_case ? boost::to_lower_copy(name) : name))
{
for (int j = 0; j != indexes_count; ++j)
column_indices.push_back(index + j);
@ -201,9 +180,14 @@ NamesAndTypesList ParquetSchemaReader::readSchema()
void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerInputFormat(
"Parquet",
[](ReadBuffer & buf, const Block & sample, const RowInputFormatParams &, const FormatSettings & settings)
{ return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings); });
"Parquet",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
});
factory.markFormatAsColumnOriented("Parquet");
}
@ -211,7 +195,11 @@ void registerParquetSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"Parquet",
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr) { return std::make_shared<ParquetSchemaReader>(buf, settings); });
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
{
return std::make_shared<ParquetSchemaReader>(buf, settings);
}
);
}
}
@ -225,9 +213,7 @@ void registerInputFormatParquet(FormatFactory &)
{
}
void registerParquetSchemaReader(FormatFactory &)
{
}
void registerParquetSchemaReader(FormatFactory &) {}
}
#endif