Merge branch 'master' into fix_some_tests

This commit is contained in:
alesapin 2021-06-21 11:14:22 +03:00
commit da0a3997f4
43 changed files with 863 additions and 177 deletions

2
.gitmodules vendored
View File

@ -103,7 +103,7 @@
url = https://github.com/ClickHouse-Extras/fastops
[submodule "contrib/orc"]
path = contrib/orc
url = https://github.com/apache/orc
url = https://github.com/ClickHouse-Extras/orc
[submodule "contrib/sparsehash-c11"]
path = contrib/sparsehash-c11
url = https://github.com/sparsehash/sparsehash-c11.git

2
contrib/arrow vendored

@ -1 +1 @@
Subproject commit 616b3dc76a0c8450b4027ded8a78e9619d7c845f
Subproject commit debf751a129bdda9ff4d1e895e08957ff77000a1

View File

@ -188,6 +188,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/array/util.cc"
"${LIBRARY_DIR}/array/validate.cc"
"${LIBRARY_DIR}/compute/api_aggregate.cc"
"${LIBRARY_DIR}/compute/api_scalar.cc"
"${LIBRARY_DIR}/compute/api_vector.cc"
"${LIBRARY_DIR}/compute/cast.cc"
@ -198,8 +199,11 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/kernels/aggregate_basic.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_mode.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_quantile.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_tdigest.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_var_std.cc"
"${LIBRARY_DIR}/compute/kernels/codegen_internal.cc"
"${LIBRARY_DIR}/compute/kernels/hash_aggregate.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_arithmetic.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_boolean.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_boolean.cc"
@ -243,6 +247,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/io/interfaces.cc"
"${LIBRARY_DIR}/io/memory.cc"
"${LIBRARY_DIR}/io/slow.cc"
"${LIBRARY_DIR}/io/transform.cc"
"${LIBRARY_DIR}/tensor/coo_converter.cc"
"${LIBRARY_DIR}/tensor/csf_converter.cc"
@ -256,11 +261,8 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/util/bitmap_builders.cc"
"${LIBRARY_DIR}/util/bitmap_ops.cc"
"${LIBRARY_DIR}/util/bpacking.cc"
"${LIBRARY_DIR}/util/cancel.cc"
"${LIBRARY_DIR}/util/compression.cc"
"${LIBRARY_DIR}/util/compression_lz4.cc"
"${LIBRARY_DIR}/util/compression_snappy.cc"
"${LIBRARY_DIR}/util/compression_zlib.cc"
"${LIBRARY_DIR}/util/compression_zstd.cc"
"${LIBRARY_DIR}/util/cpu_info.cc"
"${LIBRARY_DIR}/util/decimal.cc"
"${LIBRARY_DIR}/util/delimiting.cc"
@ -268,13 +270,14 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/util/future.cc"
"${LIBRARY_DIR}/util/int_util.cc"
"${LIBRARY_DIR}/util/io_util.cc"
"${LIBRARY_DIR}/util/iterator.cc"
"${LIBRARY_DIR}/util/key_value_metadata.cc"
"${LIBRARY_DIR}/util/logging.cc"
"${LIBRARY_DIR}/util/memory.cc"
"${LIBRARY_DIR}/util/mutex.cc"
"${LIBRARY_DIR}/util/string_builder.cc"
"${LIBRARY_DIR}/util/string.cc"
"${LIBRARY_DIR}/util/task_group.cc"
"${LIBRARY_DIR}/util/tdigest.cc"
"${LIBRARY_DIR}/util/thread_pool.cc"
"${LIBRARY_DIR}/util/time.cc"
"${LIBRARY_DIR}/util/trie.cc"
@ -368,14 +371,14 @@ set(PARQUET_SRCS
"${LIBRARY_DIR}/column_reader.cc"
"${LIBRARY_DIR}/column_scanner.cc"
"${LIBRARY_DIR}/column_writer.cc"
"${LIBRARY_DIR}/deprecated_io.cc"
"${LIBRARY_DIR}/encoding.cc"
"${LIBRARY_DIR}/encryption.cc"
"${LIBRARY_DIR}/encryption_internal.cc"
"${LIBRARY_DIR}/encryption/encryption.cc"
"${LIBRARY_DIR}/encryption/encryption_internal.cc"
"${LIBRARY_DIR}/encryption/internal_file_decryptor.cc"
"${LIBRARY_DIR}/encryption/internal_file_encryptor.cc"
"${LIBRARY_DIR}/exception.cc"
"${LIBRARY_DIR}/file_reader.cc"
"${LIBRARY_DIR}/file_writer.cc"
"${LIBRARY_DIR}/internal_file_decryptor.cc"
"${LIBRARY_DIR}/internal_file_encryptor.cc"
"${LIBRARY_DIR}/level_conversion.cc"
"${LIBRARY_DIR}/level_comparison.cc"
"${LIBRARY_DIR}/metadata.cc"
@ -385,6 +388,8 @@ set(PARQUET_SRCS
"${LIBRARY_DIR}/properties.cc"
"${LIBRARY_DIR}/schema.cc"
"${LIBRARY_DIR}/statistics.cc"
"${LIBRARY_DIR}/stream_reader.cc"
"${LIBRARY_DIR}/stream_writer.cc"
"${LIBRARY_DIR}/types.cc"
"${GEN_LIBRARY_DIR}/parquet_constants.cpp"

2
contrib/orc vendored

@ -1 +1 @@
Subproject commit 5981208e39447df84827f6a961d1da76bacb6078
Subproject commit 0a936f6bbdb9303308973073f8623b5a8d82eae1

View File

@ -191,6 +191,7 @@ public:
void nestedRemoveNullable() { dictionary.getColumnUnique().nestedRemoveNullable(); }
const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); }
IColumnUnique & getDictionary() { return dictionary.getColumnUnique(); }
const ColumnPtr & getDictionaryPtr() const { return dictionary.getColumnUniquePtr(); }
/// IColumnUnique & getUnique() { return static_cast<IColumnUnique &>(*column_unique); }
/// ColumnPtr getUniquePtr() const { return column_unique; }

View File

@ -564,7 +564,8 @@ class IColumn;
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -112,6 +112,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -52,6 +52,7 @@ struct FormatSettings
struct
{
UInt64 row_group_size = 1000000;
bool low_cardinality_as_dictionary = false;
} arrow;
struct

View File

@ -1,4 +1,5 @@
#include "ArrowBlockInputFormat.h"
#if USE_ARROW
#include <Formats/FormatFactory.h>
@ -29,7 +30,6 @@ ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & hea
Chunk ArrowBlockInputFormat::generate()
{
Chunk res;
const Block & header = getPort().getHeader();
arrow::Result<std::shared_ptr<arrow::RecordBatch>> batch_result;
if (stream)
@ -63,7 +63,7 @@ Chunk ArrowBlockInputFormat::generate()
++record_batch_current;
ArrowColumnToCHColumn::arrowTableToCHChunk(res, *table_result, header, "Arrow");
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
return res;
}
@ -81,6 +81,8 @@ void ArrowBlockInputFormat::resetParser()
void ArrowBlockInputFormat::prepareReader()
{
std::shared_ptr<arrow::Schema> schema;
if (stream)
{
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(in));
@ -88,6 +90,7 @@ void ArrowBlockInputFormat::prepareReader()
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", stream_reader_status.status().ToString());
stream_reader = *stream_reader_status;
schema = stream_reader->schema();
}
else
{
@ -96,8 +99,11 @@ void ArrowBlockInputFormat::prepareReader()
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", file_reader_status.status().ToString());
file_reader = *file_reader_status;
schema = file_reader->schema();
}
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), std::move(schema), "Arrow");
if (stream)
record_batch_total = -1;
else

View File

@ -11,6 +11,7 @@ namespace DB
{
class ReadBuffer;
class ArrowColumnToCHColumn;
class ArrowBlockInputFormat : public IInputFormat
{
@ -32,6 +33,8 @@ private:
// The following fields are used only for Arrow format
std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int record_batch_total = 0;
int record_batch_current = 0;

View File

@ -18,17 +18,26 @@ namespace ErrorCodes
}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), stream{stream_}, format_settings{format_settings_}, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
: IOutputFormat(header_, out_)
, stream{stream_}
, format_settings{format_settings_}
, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
{
}
void ArrowBlockOutputFormat::consume(Chunk chunk)
{
const Block & header = getPort(PortKind::Main).getHeader();
const size_t columns_num = chunk.getNumColumns();
std::shared_ptr<arrow::Table> arrow_table;
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Arrow");
if (!ch_column_to_arrow_column)
{
const Block & header = getPort(PortKind::Main).getHeader();
ch_column_to_arrow_column
= std::make_unique<CHColumnToArrowColumn>(header, "Arrow", format_settings.arrow.low_cardinality_as_dictionary);
}
ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num);
if (!writer)
prepareWriter(arrow_table->schema());

View File

@ -12,6 +12,8 @@ namespace arrow::ipc { class RecordBatchWriter; }
namespace DB
{
class CHColumnToArrowColumn;
class ArrowBlockOutputFormat : public IOutputFormat
{
public:
@ -28,6 +30,7 @@ private:
const FormatSettings format_settings;
std::shared_ptr<ArrowBufferedOutputStream> arrow_ostream;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);
};

View File

@ -7,15 +7,22 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <common/DateLUTImpl.h>
#include <common/types.h>
#include <Core/Block.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnMap.h>
#include <Interpreters/castColumn.h>
#include <algorithm>
#include <DataTypes/DataTypeLowCardinality.h>
#include <fmt/format.h>
namespace DB
@ -27,6 +34,7 @@ namespace DB
extern const int CANNOT_CONVERT_TYPE;
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_ARGUMENTS;
}
static const std::initializer_list<std::pair<arrow::Type::type, const char *>> arrow_type_to_internal_type =
@ -152,11 +160,11 @@ namespace DB
if (days_num > DATE_LUT_MAX_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + internal_column.getName()
+ "\" is greater than "
"max allowed Date value, which is "
+ std::to_string(DATE_LUT_MAX_DAY_NUM),
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
throw Exception
{
fmt::format("Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, internal_column.getName(), DATE_LUT_MAX_DAY_NUM),
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE
};
}
column_data.emplace_back(days_num);
@ -263,23 +271,47 @@ namespace DB
offsets_data.emplace_back(start + arrow_offsets.Value(i));
}
}
static ColumnPtr createAndFillColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
switch (arrow_column->type()->id())
{
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
{ \
auto column = DataTypeNumber<CPP_NUMERIC_TYPE>().createColumn(); \
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, *column); \
return column; \
}
FOR_ARROW_INDEXES_TYPES(DISPATCH)
# undef DISPATCH
default:
throw Exception(fmt::format("Unsupported type for indexes in LowCardinality: {}.", arrow_column->type()->name()), ErrorCodes::BAD_ARGUMENTS);
}
}
static void readColumnFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column, const std::string & column_name, const std::string format_name, bool is_nullable)
static void readColumnFromArrowColumn(
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
IColumn & internal_column,
const std::string & column_name,
const std::string & format_name,
bool is_nullable,
std::unordered_map<String, ColumnPtr> dictionary_values)
{
if (internal_column.isNullable())
{
ColumnNullable & column_nullable = typeid_cast<ColumnNullable &>(internal_column);
readColumnFromArrowColumn(arrow_column, column_nullable.getNestedColumn(), column_name, format_name, true);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(internal_column);
readColumnFromArrowColumn(arrow_column, column_nullable.getNestedColumn(), column_name, format_name, true, dictionary_values);
fillByteMapFromArrowColumn(arrow_column, column_nullable.getNullMapColumn());
return;
}
// TODO: check if a column is const?
if (!is_nullable && !checkColumn<ColumnArray>(internal_column) && arrow_column->null_count())
/// TODO: check if a column is const?
if (!is_nullable && arrow_column->null_count() && arrow_column->type()->id() != arrow::Type::LIST
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT)
{
throw Exception
{
"Can not insert NULL data into non-nullable column \"" + column_name + "\"",
fmt::format("Can not insert NULL data into non-nullable column \"{}\".", column_name),
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN
};
}
@ -304,13 +336,11 @@ namespace DB
fillColumnWithTimestampData(arrow_column, internal_column);
break;
case arrow::Type::DECIMAL:
//fillColumnWithNumericData<Decimal128, ColumnDecimal<Decimal128>>(arrow_column, read_column); // Have problems with trash values under NULL, but faster
fillColumnWithDecimalData(arrow_column, internal_column /*, internal_nested_type*/);
break;
case arrow::Type::MAP: [[fallthrough]];
case arrow::Type::LIST:
{
const auto * list_type = static_cast<arrow::ListType *>(arrow_column->type().get());
auto list_nested_type = list_type->value_type();
arrow::ArrayVector array_vector;
array_vector.reserve(arrow_column->num_chunks());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
@ -321,11 +351,70 @@ namespace DB
}
auto arrow_nested_column = std::make_shared<arrow::ChunkedArray>(array_vector);
ColumnArray & column_array = typeid_cast<ColumnArray &>(internal_column);
readColumnFromArrowColumn(arrow_nested_column, column_array.getData(), column_name, format_name, false);
ColumnArray & column_array = arrow_column->type()->id() == arrow::Type::MAP
? assert_cast<ColumnMap &>(internal_column).getNestedColumn()
: assert_cast<ColumnArray &>(internal_column);
readColumnFromArrowColumn(arrow_nested_column, column_array.getData(), column_name, format_name, false, dictionary_values);
fillOffsetsFromArrowListColumn(arrow_column, column_array.getOffsetsColumn());
break;
}
case arrow::Type::STRUCT:
{
ColumnTuple & column_tuple = assert_cast<ColumnTuple &>(internal_column);
int fields_count = column_tuple.tupleSize();
std::vector<arrow::ArrayVector> nested_arrow_columns(fields_count);
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::StructArray & struct_chunk = static_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
for (int i = 0; i < fields_count; ++i)
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
}
for (int i = 0; i != fields_count; ++i)
{
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
readColumnFromArrowColumn(nested_arrow_column, column_tuple.getColumn(i), column_name, format_name, false, dictionary_values);
}
break;
}
case arrow::Type::DICTIONARY:
{
ColumnLowCardinality & column_lc = assert_cast<ColumnLowCardinality &>(internal_column);
auto & dict_values = dictionary_values[column_name];
/// Load dictionary values only once and reuse it.
if (!dict_values)
{
arrow::ArrayVector dict_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = static_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
dict_array.emplace_back(dict_chunk.dictionary());
}
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column = IColumn::mutate(column_lc.getDictionaryPtr());
auto * uniq_column = static_cast<IColumnUnique *>(dict_column.get());
auto values_column = uniq_column->getNestedColumn()->cloneEmpty();
readColumnFromArrowColumn(arrow_dict_column, *values_column, column_name, format_name, false, dictionary_values);
uniq_column->uniqueInsertRangeFrom(*values_column, 0, values_column->size());
dict_values = std::move(dict_column);
}
arrow::ArrayVector indexes_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = static_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
indexes_array.emplace_back(dict_chunk.indices());
}
auto arrow_indexes_column = std::make_shared<arrow::ChunkedArray>(indexes_array);
auto indexes_column = createAndFillColumnWithIndexesData(arrow_indexes_column);
auto new_column_lc = ColumnLowCardinality::create(dict_values, std::move(indexes_column));
column_lc = std::move(*new_column_lc);
break;
}
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, internal_column); \
@ -339,8 +428,7 @@ namespace DB
default:
throw Exception
{
"Unsupported " + format_name + " type \"" + arrow_column->type()->name() + "\" of an input column \""
+ column_name + "\"",
fmt::format(R"(Unsupported {} type "{}" of an input column "{}".)", format_name, arrow_column->type()->name(), column_name),
ErrorCodes::UNKNOWN_TYPE
};
}
@ -350,7 +438,7 @@ namespace DB
{
if (column_type->isNullable())
{
DataTypePtr nested_type = typeid_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
DataTypePtr nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
return makeNullable(getInternalType(arrow_type, nested_type, column_name, format_name));
}
@ -367,11 +455,61 @@ namespace DB
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(column_type.get());
if (!array_type)
throw Exception{"Cannot convert arrow LIST type to a not Array ClickHouse type " + column_type->getName(), ErrorCodes::CANNOT_CONVERT_TYPE};
throw Exception{fmt::format("Cannot convert arrow LIST type to a not Array ClickHouse type {}.", column_type->getName()), ErrorCodes::CANNOT_CONVERT_TYPE};
return std::make_shared<DataTypeArray>(getInternalType(list_nested_type, array_type->getNestedType(), column_name, format_name));
}
if (arrow_type->id() == arrow::Type::STRUCT)
{
const auto * struct_type = static_cast<arrow::StructType *>(arrow_type.get());
const DataTypeTuple * tuple_type = typeid_cast<const DataTypeTuple *>(column_type.get());
if (!tuple_type)
throw Exception{fmt::format("Cannot convert arrow STRUCT type to a not Tuple ClickHouse type {}.", column_type->getName()), ErrorCodes::CANNOT_CONVERT_TYPE};
const DataTypes & tuple_nested_types = tuple_type->getElements();
int internal_fields_num = tuple_nested_types.size();
/// If internal column has less elements then arrow struct, we will select only first internal_fields_num columns.
if (internal_fields_num > struct_type->num_fields())
throw Exception
{
fmt::format(
"Cannot convert arrow STRUCT with {} fields to a ClickHouse Tuple with {} elements: {}.",
struct_type->num_fields(),
internal_fields_num,
column_type->getName()),
ErrorCodes::CANNOT_CONVERT_TYPE
};
DataTypes nested_types;
for (int i = 0; i < internal_fields_num; ++i)
nested_types.push_back(getInternalType(struct_type->field(i)->type(), tuple_nested_types[i], column_name, format_name));
return std::make_shared<DataTypeTuple>(std::move(nested_types));
}
if (arrow_type->id() == arrow::Type::DICTIONARY)
{
const auto * arrow_dict_type = static_cast<arrow::DictionaryType *>(arrow_type.get());
const auto * lc_type = typeid_cast<const DataTypeLowCardinality *>(column_type.get());
/// We allow to insert arrow dictionary into a non-LowCardinality column.
const auto & dict_type = lc_type ? lc_type->getDictionaryType() : column_type;
return std::make_shared<DataTypeLowCardinality>(getInternalType(arrow_dict_type->value_type(), dict_type, column_name, format_name));
}
if (arrow_type->id() == arrow::Type::MAP)
{
const auto * arrow_map_type = typeid_cast<arrow::MapType *>(arrow_type.get());
const auto * map_type = typeid_cast<const DataTypeMap *>(column_type.get());
if (!map_type)
throw Exception{fmt::format("Cannot convert arrow MAP type to a not Map ClickHouse type {}.", column_type->getName()), ErrorCodes::CANNOT_CONVERT_TYPE};
return std::make_shared<DataTypeMap>(
getInternalType(arrow_map_type->key_type(), map_type->getKeyType(), column_name, format_name),
getInternalType(arrow_map_type->item_type(), map_type->getValueType(), column_name, format_name)
);
}
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(),
[=](auto && elem) { return elem.first == arrow_type->id(); });
internal_type_it != arrow_type_to_internal_type.end())
@ -380,13 +518,24 @@ namespace DB
}
throw Exception
{
"The type \"" + arrow_type->name() + "\" of an input column \"" + column_name + "\" is not supported for conversion from a " + format_name + " data format",
fmt::format(R"(The type "{}" of an input column "{}" is not supported for conversion from a {} data format.)", arrow_type->name(), column_name, format_name),
ErrorCodes::CANNOT_CONVERT_TYPE
};
}
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table,
const Block & header, std::string format_name)
ArrowColumnToCHColumn::ArrowColumnToCHColumn(const Block & header_, std::shared_ptr<arrow::Schema> schema_, const std::string & format_name_) : header(header_), format_name(format_name_)
{
for (const auto & field : schema_->fields())
{
if (header.has(field->name()))
{
const auto column_type = recursiveRemoveLowCardinality(header.getByName(field->name()).type);
name_to_internal_type[field->name()] = getInternalType(field->type(), column_type, field->name(), format_name);
}
}
}
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
Columns columns_list;
UInt64 num_rows = 0;
@ -404,20 +553,18 @@ namespace DB
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
const auto column_type = recursiveRemoveLowCardinality(header_column.type);
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
throw Exception{"Column \"" + header_column.name + "\" is not presented in input data",
throw Exception{fmt::format("Column \"{}\" is not presented in input data.", header_column.name),
ErrorCodes::THERE_IS_NO_COLUMN};
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[header_column.name];
DataTypePtr internal_type = getInternalType(arrow_column->type(), column_type, header_column.name, format_name);
DataTypePtr & internal_type = name_to_internal_type[header_column.name];
MutableColumnPtr read_column = internal_type->createColumn();
readColumnFromArrowColumn(arrow_column, *read_column, header_column.name, format_name, false);
readColumnFromArrowColumn(arrow_column, *read_column, header_column.name, format_name, false, dictionary_values);
ColumnWithTypeAndName column;
column.name = header_column.name;

View File

@ -19,11 +19,15 @@
namespace DB
{
class ArrowColumnToCHColumn
{
private:
class ArrowColumnToCHColumn
{
public:
ArrowColumnToCHColumn(const Block & header_, std::shared_ptr<arrow::Schema> schema_, const std::string & format_name_);
# define FOR_ARROW_NUMERIC_TYPES(M) \
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
private:
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::Int8) \
M(arrow::Type::UINT16, DB::UInt16) \
@ -36,11 +40,24 @@ namespace DB
M(arrow::Type::FLOAT, DB::Float32) \
M(arrow::Type::DOUBLE, DB::Float64)
#define FOR_ARROW_INDEXES_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::UInt8) \
M(arrow::Type::UINT16, DB::UInt16) \
M(arrow::Type::INT16, DB::UInt16) \
M(arrow::Type::UINT32, DB::UInt32) \
M(arrow::Type::INT32, DB::UInt32) \
M(arrow::Type::UINT64, DB::UInt64) \
M(arrow::Type::INT64, DB::UInt64)
public:
static void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table,
const Block & header, std::string format_name);
};
const Block & header;
std::unordered_map<std::string, DataTypePtr> name_to_internal_type;
const std::string format_name;
/// Map {column name : dictionary column}.
/// To avoid converting dictionary from Arrow Dictionary
/// to LowCardinality every chunk we save it and reuse.
std::unordered_map<std::string, ColumnPtr> dictionary_values;
};
}
#endif

View File

@ -6,17 +6,22 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Core/callOnTypeIndex.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <Processors/Formats/IOutputFormat.h>
#include <arrow/api.h>
#include <arrow/builder.h>
#include <arrow/type.h>
#include <arrow/util/decimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
@ -25,6 +30,7 @@ namespace DB
{
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_TYPE;
extern const int LOGICAL_ERROR;
}
static const std::initializer_list<std::pair<String, std::shared_ptr<arrow::DataType>>> internal_type_to_arrow_type =
@ -46,16 +52,15 @@ namespace DB
//{"DateTime", arrow::date64()}, // BUG! saves as date32
{"DateTime", arrow::uint32()},
// TODO: ClickHouse can actually store non-utf8 strings!
{"String", arrow::utf8()},
{"FixedString", arrow::utf8()},
{"String", arrow::binary()},
{"FixedString", arrow::binary()},
};
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
{
if (!status.ok())
throw Exception{"Error with a " + format_name + " column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
throw Exception{fmt::format("Error with a {} column \"{}\": {}.", format_name, column_name, status.ToString()), ErrorCodes::UNKNOWN_EXCEPTION};
}
template <typename NumericType, typename ArrowBuilderType>
@ -101,8 +106,10 @@ namespace DB
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end);
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values);
template <typename Builder>
static void fillArrowArrayWithArrayColumnData(
const String & column_name,
ColumnPtr & column,
@ -111,26 +118,164 @@ namespace DB
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end)
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
const auto * column_array = static_cast<const ColumnArray *>(column.get());
const auto * column_array = assert_cast<const ColumnArray *>(column.get());
ColumnPtr nested_column = column_array->getDataPtr();
DataTypePtr nested_type = typeid_cast<const DataTypeArray *>(column_type.get())->getNestedType();
DataTypePtr nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
const auto & offsets = column_array->getOffsets();
arrow::ListBuilder & builder = assert_cast<arrow::ListBuilder &>(*array_builder);
Builder & builder = assert_cast<Builder &>(*array_builder);
arrow::ArrayBuilder * value_builder = builder.value_builder();
arrow::Status components_status;
for (size_t array_idx = start; array_idx < end; ++array_idx)
{
/// Start new array
/// Start new array.
components_status = builder.Append();
checkStatus(components_status, nested_column->getName(), format_name);
fillArrowArray(column_name, nested_column, nested_type, null_bytemap, value_builder, format_name, offsets[array_idx - 1], offsets[array_idx]);
fillArrowArray(column_name, nested_column, nested_type, null_bytemap, value_builder, format_name, offsets[array_idx - 1], offsets[array_idx], dictionary_values);
}
}
static void fillArrowArrayWithTupleColumnData(
const String & column_name,
ColumnPtr & column,
const std::shared_ptr<const IDataType> & column_type,
const PaddedPODArray<UInt8> * null_bytemap,
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get());
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements();
arrow::StructBuilder & builder = assert_cast<arrow::StructBuilder &>(*array_builder);
for (size_t i = 0; i != column_tuple->tupleSize(); ++i)
{
ColumnPtr nested_column = column_tuple->getColumnPtr(i);
fillArrowArray(column_name + "." + std::to_string(i), nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end, dictionary_values);
}
for (size_t i = start; i != end; ++i)
{
auto status = builder.Append();
checkStatus(status, column->getName(), format_name);
}
}
template<typename T>
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end)
{
const PaddedPODArray<T> & data = assert_cast<const ColumnVector<T> *>(column.get())->getData();
PaddedPODArray<Int64> result;
result.reserve(end - start);
std::transform(data.begin() + start, data.begin() + end, std::back_inserter(result), [](T value) { return Int64(value); });
return result;
}
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end)
{
switch (column->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesImpl<UInt8>(column, start, end);
case TypeIndex::UInt16:
return extractIndexesImpl<UInt16>(column, start, end);
case TypeIndex::UInt32:
return extractIndexesImpl<UInt32>(column, start, end);
case TypeIndex::UInt64:
return extractIndexesImpl<UInt64>(column, start, end);
default:
throw Exception(fmt::format("Indexes column must be ColumnUInt, got {}.", column->getName()),
ErrorCodes::LOGICAL_ERROR);
}
}
template<typename ValueType>
static void fillArrowArrayWithLowCardinalityColumnDataImpl(
const String & column_name,
ColumnPtr & column,
const std::shared_ptr<const IDataType> & column_type,
const PaddedPODArray<UInt8> * null_bytemap,
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
const auto * column_lc = assert_cast<const ColumnLowCardinality *>(column.get());
arrow::DictionaryBuilder<ValueType> * builder = assert_cast<arrow::DictionaryBuilder<ValueType> *>(array_builder);
auto & dict_values = dictionary_values[column_name];
/// Convert dictionary from LowCardinality to Arrow dictionary only once and then reuse it.
if (!dict_values)
{
auto value_type = assert_cast<arrow::DictionaryType *>(builder->type().get())->value_type();
std::unique_ptr<arrow::ArrayBuilder> values_builder;
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::Status status = MakeBuilder(pool, value_type, &values_builder);
checkStatus(status, column->getName(), format_name);
auto dict_column = column_lc->getDictionary().getNestedColumn();
const auto & dict_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, 0, dict_column->size(), dictionary_values);
status = values_builder->Finish(&dict_values);
checkStatus(status, column->getName(), format_name);
}
arrow::Status status = builder->InsertMemoValues(*dict_values);
checkStatus(status, column->getName(), format_name);
/// AppendIndices in DictionaryBuilder works only with int64_t data, so we cannot use
/// fillArrowArray here and should copy all indexes to int64_t container.
auto indexes = extractIndexesImpl(column_lc->getIndexesPtr(), start, end);
const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
PaddedPODArray<uint8_t> arrow_null_bytemap;
if (null_bytemap)
{
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
arrow_null_bytemap.reserve(end - start);
for (size_t i = start; i < end; ++i)
arrow_null_bytemap.emplace_back(!(*null_bytemap)[i]);
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
}
status = builder->AppendIndices(indexes.data(), indexes.size(), arrow_null_bytemap_raw_ptr);
checkStatus(status, column->getName(), format_name);
}
static void fillArrowArrayWithLowCardinalityColumnData(
const String & column_name,
ColumnPtr & column,
const std::shared_ptr<const IDataType> & column_type,
const PaddedPODArray<UInt8> * null_bytemap,
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
auto value_type = assert_cast<arrow::DictionaryType *>(array_builder->type().get())->value_type();
#define DISPATCH(ARROW_TYPE_ID, ARROW_TYPE) \
if (arrow::Type::ARROW_TYPE_ID == value_type->id()) \
{ \
fillArrowArrayWithLowCardinalityColumnDataImpl<ARROW_TYPE>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values); \
return; \
}
FOR_ARROW_TYPES(DISPATCH)
#undef DISPATCH
}
template <typename ColumnType>
static void fillArrowArrayWithStringColumnData(
ColumnPtr write_column,
@ -141,7 +286,7 @@ namespace DB
size_t end)
{
const auto & internal_column = assert_cast<const ColumnType &>(*write_column);
arrow::StringBuilder & builder = assert_cast<arrow::StringBuilder &>(*array_builder);
arrow::BinaryBuilder & builder = assert_cast<arrow::BinaryBuilder &>(*array_builder);
arrow::Status status;
for (size_t string_i = start; string_i < end; ++string_i)
@ -155,7 +300,6 @@ namespace DB
StringRef string_ref = internal_column.getDataAt(string_i);
status = builder.Append(string_ref.data, string_ref.size);
}
checkStatus(status, write_column->getName(), format_name);
}
}
@ -218,18 +362,19 @@ namespace DB
arrow::ArrayBuilder * array_builder,
String format_name,
size_t start,
size_t end)
size_t end,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
const String column_type_name = column_type->getFamilyName();
if ("Nullable" == column_type_name)
{
const ColumnNullable * column_nullable = checkAndGetColumn<ColumnNullable>(column.get());
const ColumnNullable * column_nullable = assert_cast<const ColumnNullable *>(column.get());
ColumnPtr nested_column = column_nullable->getNestedColumnPtr();
DataTypePtr nested_type = typeid_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
DataTypePtr nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
const ColumnPtr & null_column = column_nullable->getNullMapColumnPtr();
const PaddedPODArray<UInt8> & bytemap = assert_cast<const ColumnVector<UInt8> &>(*null_column).getData();
fillArrowArray(column_name, nested_column, nested_type, &bytemap, array_builder, format_name, start, end);
fillArrowArray(column_name, nested_column, nested_type, &bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("String" == column_type_name)
{
@ -249,7 +394,21 @@ namespace DB
}
else if ("Array" == column_type_name)
{
fillArrowArrayWithArrayColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end);
fillArrowArrayWithArrayColumnData<arrow::ListBuilder>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("Tuple" == column_type_name)
{
fillArrowArrayWithTupleColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("LowCardinality" == column_type_name)
{
fillArrowArrayWithLowCardinalityColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("Map" == column_type_name)
{
ColumnPtr column_array = assert_cast<const ColumnMap *>(column.get())->getNestedColumnPtr();
DataTypePtr array_type = assert_cast<const DataTypeMap *>(column_type.get())->getNestedType();
fillArrowArrayWithArrayColumnData<arrow::MapBuilder>(column_name, column_array, array_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if (isDecimal(column_type))
{
@ -280,7 +439,7 @@ namespace DB
{
throw Exception
{
"Internal type \"" + column_type_name + "\" of a column \"" + column_name + "\" is not supported for conversion into a " + format_name + " data format",
fmt::format(R"(Internal type "{}" of a column "{}" is not supported for conversion into a {} data format.)", column_type_name, column_name, format_name),
ErrorCodes::UNKNOWN_TYPE
};
}
@ -295,7 +454,7 @@ namespace DB
size_t start,
size_t end)
{
const auto & column = static_cast<const typename DataType::ColumnType &>(*write_column);
const auto & column = assert_cast<const typename DataType::ColumnType &>(*write_column);
arrow::DecimalBuilder & builder = assert_cast<arrow::DecimalBuilder &>(*array_builder);
arrow::Status status;
@ -312,12 +471,33 @@ namespace DB
checkStatus(status, write_column->getName(), format_name);
}
static std::shared_ptr<arrow::DataType> getArrowType(DataTypePtr column_type, const std::string & column_name, const std::string & format_name, bool * is_column_nullable)
static std::shared_ptr<arrow::DataType> getArrowTypeForLowCardinalityIndexes(ColumnPtr indexes_column)
{
/// Arrow docs recommend preferring signed integers over unsigned integers for representing dictionary indices.
/// https://arrow.apache.org/docs/format/Columnar.html#dictionary-encoded-layout
switch (indexes_column->getDataType())
{
case TypeIndex::UInt8:
return arrow::int8();
case TypeIndex::UInt16:
return arrow::int16();
case TypeIndex::UInt32:
return arrow::int32();
case TypeIndex::UInt64:
return arrow::int64();
default:
throw Exception(fmt::format("Indexes column for getUniqueIndex must be ColumnUInt, got {}.", indexes_column->getName()),
ErrorCodes::LOGICAL_ERROR);
}
}
static std::shared_ptr<arrow::DataType> getArrowType(DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * is_column_nullable)
{
if (column_type->isNullable())
{
DataTypePtr nested_type = typeid_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
auto arrow_type = getArrowType(nested_type, column_name, format_name, is_column_nullable);
DataTypePtr nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
ColumnPtr nested_column = assert_cast<const ColumnNullable *>(column.get())->getNestedColumnPtr();
auto arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable);
*is_column_nullable = true;
return arrow_type;
}
@ -334,7 +514,7 @@ namespace DB
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>>
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
{
const auto & decimal_type = static_cast<const ToDataType *>(column_type.get());
const auto & decimal_type = assert_cast<const ToDataType *>(column_type.get());
arrow_type = arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale());
}
@ -346,11 +526,50 @@ namespace DB
if (isArray(column_type))
{
auto nested_type = typeid_cast<const DataTypeArray *>(column_type.get())->getNestedType();
auto nested_arrow_type = getArrowType(nested_type, column_name, format_name, is_column_nullable);
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
auto nested_column = assert_cast<const ColumnArray *>(column.get())->getDataPtr();
auto nested_arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable);
return arrow::list(nested_arrow_type);
}
if (isTuple(column_type))
{
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements();
const auto * tuple_column = assert_cast<const ColumnTuple *>(column.get());
std::vector<std::shared_ptr<arrow::Field>> nested_fields;
for (size_t i = 0; i != nested_types.size(); ++i)
{
String name = column_name + "." + std::to_string(i);
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), name, format_name, is_column_nullable);
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *is_column_nullable));
}
return arrow::struct_(std::move(nested_fields));
}
if (column_type->lowCardinality())
{
auto nested_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
const auto * lc_column = assert_cast<const ColumnLowCardinality *>(column.get());
const auto & nested_column = lc_column->getDictionaryPtr();
const auto & indexes_column = lc_column->getIndexesPtr();
return arrow::dictionary(
getArrowTypeForLowCardinalityIndexes(indexes_column),
getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable));
}
if (isMap(column_type))
{
const auto * map_type = assert_cast<const DataTypeMap *>(column_type.get());
const auto & key_type = map_type->getKeyType();
const auto & val_type = map_type->getValueType();
const auto & columns = assert_cast<const ColumnMap *>(column.get())->getNestedData().getColumns();
return arrow::map(
getArrowType(key_type, columns[0], column_name, format_name, is_column_nullable),
getArrowType(val_type, columns[1], column_name, format_name, is_column_nullable)
);
}
const std::string type_name = column_type->getFamilyName();
if (const auto * arrow_type_it = std::find_if(
internal_type_to_arrow_type.begin(),
@ -361,49 +580,59 @@ namespace DB
return arrow_type_it->second;
}
throw Exception{"The type \"" + column_name + "\" of a column \"" + column_name + "\""
" is not supported for conversion into a " + format_name + " data format",
throw Exception{fmt::format(R"(The type "{}" of a column "{}" is not supported for conversion into a {} data format.)", column_type->getName(), column_name, format_name),
ErrorCodes::UNKNOWN_TYPE};
}
CHColumnToArrowColumn::CHColumnToArrowColumn(const Block & header, const std::string & format_name_, bool low_cardinality_as_dictionary_)
: format_name(format_name_), low_cardinality_as_dictionary(low_cardinality_as_dictionary_)
{
arrow_fields.reserve(header.columns());
header_columns.reserve(header.columns());
for (auto column : header.getColumnsWithTypeAndName())
{
if (!low_cardinality_as_dictionary)
{
column.type = recursiveRemoveLowCardinality(column.type);
column.column = recursiveRemoveLowCardinality(column.column);
}
bool is_column_nullable = false;
auto arrow_type = getArrowType(column.type, column.column, column.name, format_name, &is_column_nullable);
arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, arrow_type, is_column_nullable));
header_columns.emplace_back(std::move(column));
}
}
void CHColumnToArrowColumn::chChunkToArrowTable(
std::shared_ptr<arrow::Table> & res,
const Block & header,
const Chunk & chunk,
size_t columns_num,
String format_name)
size_t columns_num)
{
/// For arrow::Schema and arrow::Table creation
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
arrow_fields.reserve(columns_num);
arrow_arrays.reserve(columns_num);
for (size_t column_i = 0; column_i < columns_num; ++column_i)
{
// TODO: constructed every iteration
ColumnWithTypeAndName column = header.safeGetByPosition(column_i);
column.column = recursiveRemoveLowCardinality(chunk.getColumns()[column_i]);
column.type = recursiveRemoveLowCardinality(column.type);
const ColumnWithTypeAndName & header_column = header_columns[column_i];
auto column = chunk.getColumns()[column_i];
bool is_column_nullable = false;
auto arrow_type = getArrowType(column.type, column.name, format_name, &is_column_nullable);
arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, arrow_type, is_column_nullable));
if (!low_cardinality_as_dictionary)
column = recursiveRemoveLowCardinality(column);
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::Status status = MakeBuilder(pool, arrow_fields[column_i]->type(), &array_builder);
checkStatus(status, column.column->getName(), format_name);
checkStatus(status, column->getName(), format_name);
fillArrowArray(column.name, column.column, column.type, nullptr, array_builder.get(), format_name, 0, column.column->size());
fillArrowArray(header_column.name, column, header_column.type, nullptr, array_builder.get(), format_name, 0, column->size(), dictionary_values);
std::shared_ptr<arrow::Array> arrow_array;
status = array_builder->Finish(&arrow_array);
checkStatus(status, column.column->getName(), format_name);
checkStatus(status, column->getName(), format_name);
arrow_arrays.emplace_back(std::move(arrow_array));
}
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(arrow_fields);
res = arrow::Table::Make(arrow_schema, arrow_arrays);
}

View File

@ -12,6 +12,10 @@ namespace DB
class CHColumnToArrowColumn
{
public:
CHColumnToArrowColumn(const Block & header, const std::string & format_name_, bool low_cardinality_as_dictionary_ = false);
void chChunkToArrowTable(std::shared_ptr<arrow::Table> & res, const Chunk & chunk, size_t columns_num);
private:
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
@ -26,10 +30,27 @@ private:
M(Float32, arrow::FloatBuilder) \
M(Float64, arrow::DoubleBuilder)
#define FOR_ARROW_TYPES(M) \
M(UINT8, arrow::UInt8Type) \
M(INT8, arrow::Int8Type) \
M(UINT16, arrow::UInt16Type) \
M(INT16, arrow::Int16Type) \
M(UINT32, arrow::UInt32Type) \
M(INT32, arrow::Int32Type) \
M(UINT64, arrow::UInt64Type) \
M(INT64, arrow::Int64Type) \
M(FLOAT, arrow::FloatType) \
M(DOUBLE, arrow::DoubleType) \
M(STRING, arrow::StringType)
public:
static void chChunkToArrowTable(std::shared_ptr<arrow::Table> & res, const Block & header, const Chunk & chunk,
size_t columns_num, String format_name);
ColumnsWithTypeAndName header_columns;
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
const std::string format_name;
bool low_cardinality_as_dictionary;
/// Map {column name : arrow dictionary}.
/// To avoid converting dictionary from LowCardinality to Arrow
/// Dictionary every chunk we save it and reuse.
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> dictionary_values;
};
}
#endif

View File

@ -33,7 +33,6 @@ ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInp
Chunk ORCBlockInputFormat::generate()
{
Chunk res;
const Block & header = getPort().getHeader();
if (!file_reader)
prepareReader();
@ -54,7 +53,7 @@ Chunk ORCBlockInputFormat::generate()
++stripe_current;
ArrowColumnToCHColumn::arrowTableToCHChunk(res, *table_result, header, "ORC");
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
return res;
}
@ -67,11 +66,26 @@ void ORCBlockInputFormat::resetParser()
stripe_current = 0;
}
size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
{
if (type->id() == arrow::Type::LIST)
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type()) + 1;
if (type->id() == arrow::Type::STRUCT)
{
int indices = 1;
auto * struct_type = static_cast<arrow::StructType *>(type.get());
for (int i = 0; i != struct_type->num_fields(); ++i)
indices += countIndicesForType(struct_type->field(i)->type());
return indices;
}
if (type->id() == arrow::Type::MAP)
{
auto * map_type = static_cast<arrow::MapType *>(type.get());
return countIndicesForType(map_type->key_type()) + countIndicesForType(map_type->item_type());
}
return 1;
}
@ -84,17 +98,22 @@ void ORCBlockInputFormat::prepareReader()
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(file_reader->ReadSchema(&schema));
int index = 0;
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), schema, "ORC");
/// In ReadStripe column indices should be started from 1,
/// because 0 indicates to select all columns.
int index = 1;
for (int i = 0; i < schema->num_fields(); ++i)
{
/// LIST type require 2 indices, STRUCT - the number of elements + 1,
/// so we should recursively count the number of indices we need for this type.
int indexes_count = countIndicesForType(schema->field(i)->type());
if (getPort().getHeader().has(schema->field(i)->name()))
{
/// LIST type require 2 indices, so we should recursively
/// count the number of indices we need for this type.
int indexes_count = countIndicesForType(schema->field(i)->type());
for (int j = 0; j != indexes_count; ++j)
include_indices.push_back(index++);
include_indices.push_back(index + j);
}
index += indexes_count;
}
}

View File

@ -8,6 +8,9 @@ namespace arrow::adapters::orc { class ORCFileReader; }
namespace DB
{
class ArrowColumnToCHColumn;
class ORCBlockInputFormat : public IInputFormat
{
public:
@ -26,6 +29,8 @@ private:
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int stripe_total = 0;
int stripe_current = 0;

View File

@ -10,12 +10,16 @@
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
namespace DB
{
@ -46,15 +50,9 @@ void ORCOutputStream::write(const void* buf, size_t length)
ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_), data_types(header_.getDataTypes())
{
schema = orc::createStructType();
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
size_t columns_count = header_.columns();
for (size_t i = 0; i != columns_count; ++i)
schema->addStructField(header_.safeGetByPosition(i).name, getORCType(data_types[i]));
writer = orc::createWriter(*schema, &output_stream, options);
}
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type, const std::string & column_name)
{
switch (type->getTypeId())
{
@ -102,28 +100,48 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
}
case TypeIndex::Nullable:
{
return getORCType(removeNullable(type));
return getORCType(removeNullable(type), column_name);
}
case TypeIndex::Array:
{
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
return orc::createListType(getORCType(array_type->getNestedType()));
const auto * array_type = assert_cast<const DataTypeArray *>(type.get());
return orc::createListType(getORCType(array_type->getNestedType(), column_name));
}
case TypeIndex::Decimal32:
{
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal32> *>(type.get());
const auto * decimal_type = assert_cast<const DataTypeDecimal<Decimal32> *>(type.get());
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
}
case TypeIndex::Decimal64:
{
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal64> *>(type.get());
const auto * decimal_type = assert_cast<const DataTypeDecimal<Decimal64> *>(type.get());
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
}
case TypeIndex::Decimal128:
{
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal128> *>(type.get());
const auto * decimal_type = assert_cast<const DataTypeDecimal<Decimal128> *>(type.get());
return orc::createDecimalType(decimal_type->getPrecision(), decimal_type->getScale());
}
case TypeIndex::Tuple:
{
const auto * tuple_type = assert_cast<const DataTypeTuple *>(type.get());
const auto & nested_types = tuple_type->getElements();
auto struct_type = orc::createStructType();
for (size_t i = 0; i < nested_types.size(); ++i)
{
String name = column_name + "." + std::to_string(i);
struct_type->addStructField(name, getORCType(nested_types[i], name));
}
return struct_type;
}
case TypeIndex::Map:
{
const auto * map_type = assert_cast<const DataTypeMap *>(type.get());
return orc::createMapType(
getORCType(map_type->getKeyType(), column_name),
getORCType(map_type->getValueType(), column_name)
);
}
default:
{
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
@ -149,6 +167,8 @@ void ORCBlockOutputFormat::writeNumbers(
number_orc_column.notNull[i] = 0;
continue;
}
number_orc_column.notNull[i] = 1;
number_orc_column.data[i] = convert(number_column.getElement(i));
}
number_orc_column.numElements = number_column.size();
@ -164,7 +184,7 @@ void ORCBlockOutputFormat::writeDecimals(
{
DecimalVectorBatch & decimal_orc_column = dynamic_cast<DecimalVectorBatch &>(orc_column);
const auto & decimal_column = assert_cast<const ColumnDecimal<Decimal> &>(column);
const auto * decimal_type = typeid_cast<const DataTypeDecimal<Decimal> *>(type.get());
const auto * decimal_type = assert_cast<const DataTypeDecimal<Decimal> *>(type.get());
decimal_orc_column.precision = decimal_type->getPrecision();
decimal_orc_column.scale = decimal_type->getScale();
decimal_orc_column.resize(decimal_column.size());
@ -175,6 +195,8 @@ void ORCBlockOutputFormat::writeDecimals(
decimal_orc_column.notNull[i] = 0;
continue;
}
decimal_orc_column.notNull[i] = 1;
decimal_orc_column.values[i] = convert(decimal_column.getElement(i).value);
}
decimal_orc_column.numElements = decimal_column.size();
@ -197,6 +219,8 @@ void ORCBlockOutputFormat::writeStrings(
string_orc_column.notNull[i] = 0;
continue;
}
string_orc_column.notNull[i] = 1;
const StringRef & string = string_column.getDataAt(i);
string_orc_column.data[i] = const_cast<char *>(string.data);
string_orc_column.length[i] = string.size;
@ -223,6 +247,8 @@ void ORCBlockOutputFormat::writeDateTimes(
timestamp_orc_column.notNull[i] = 0;
continue;
}
timestamp_orc_column.notNull[i] = 1;
timestamp_orc_column.data[i] = get_seconds(timestamp_column.getElement(i));
timestamp_orc_column.nanoseconds[i] = get_nanoseconds(timestamp_column.getElement(i));
}
@ -235,11 +261,10 @@ void ORCBlockOutputFormat::writeColumn(
DataTypePtr & type,
const PaddedPODArray<UInt8> * null_bytemap)
{
orc_column.notNull.resize(column.size());
if (null_bytemap)
{
orc_column.hasNulls = true;
orc_column.notNull.resize(column.size());
}
switch (type->getTypeId())
{
case TypeIndex::Int8:
@ -374,12 +399,52 @@ void ORCBlockOutputFormat::writeColumn(
for (size_t i = 0; i != list_column.size(); ++i)
{
list_orc_column.offsets[i + 1] = offsets[i];
list_orc_column.notNull[i] = 1;
}
orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements;
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap);
list_orc_column.numElements = list_column.size();
break;
}
case TypeIndex::Tuple:
{
orc::StructVectorBatch & struct_orc_column = dynamic_cast<orc::StructVectorBatch &>(orc_column);
const auto & tuple_column = assert_cast<const ColumnTuple &>(column);
auto nested_types = assert_cast<const DataTypeTuple *>(type.get())->getElements();
for (size_t i = 0; i != tuple_column.size(); ++i)
struct_orc_column.notNull[i] = 1;
for (size_t i = 0; i != tuple_column.tupleSize(); ++i)
writeColumn(*struct_orc_column.fields[i], tuple_column.getColumn(i), nested_types[i], null_bytemap);
break;
}
case TypeIndex::Map:
{
orc::MapVectorBatch & map_orc_column = dynamic_cast<orc::MapVectorBatch &>(orc_column);
const auto & list_column = assert_cast<const ColumnMap &>(column).getNestedColumn();
const auto & map_type = assert_cast<const DataTypeMap &>(*type);
const ColumnArray::Offsets & offsets = list_column.getOffsets();
map_orc_column.resize(list_column.size());
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
map_orc_column.offsets[0] = 0;
for (size_t i = 0; i != list_column.size(); ++i)
{
map_orc_column.offsets[i + 1] = offsets[i];
map_orc_column.notNull[i] = 1;
}
const auto nested_columns = assert_cast<const ColumnTuple *>(list_column.getDataPtr().get())->getColumns();
orc::ColumnVectorBatch & keys_orc_column = *map_orc_column.keys;
auto key_type = map_type.getKeyType();
writeColumn(keys_orc_column, *nested_columns[0], key_type, null_bytemap);
orc::ColumnVectorBatch & values_orc_column = *map_orc_column.elements;
auto value_type = map_type.getValueType();
writeColumn(values_orc_column, *nested_columns[1], value_type, null_bytemap);
map_orc_column.numElements = list_column.size();
break;
}
default:
throw Exception("Type " + type->getName() + " is not supported for ORC output format", ErrorCodes::ILLEGAL_COLUMN);
}
@ -409,6 +474,8 @@ size_t ORCBlockOutputFormat::getMaxColumnSize(Chunk & chunk)
void ORCBlockOutputFormat::consume(Chunk chunk)
{
if (!writer)
prepareWriter();
size_t columns_num = chunk.getNumColumns();
size_t rows_num = chunk.getNumRows();
/// getMaxColumnSize is needed to write arrays.
@ -425,9 +492,23 @@ void ORCBlockOutputFormat::consume(Chunk chunk)
void ORCBlockOutputFormat::finalize()
{
if (!writer)
prepareWriter();
writer->close();
}
void ORCBlockOutputFormat::prepareWriter()
{
const Block & header = getPort(PortKind::Main).getHeader();
schema = orc::createStructType();
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
size_t columns_count = header.columns();
for (size_t i = 0; i != columns_count; ++i)
schema->addStructField(header.safeGetByPosition(i).name, getORCType(data_types[i], header.safeGetByPosition(i).name));
writer = orc::createWriter(*schema, &output_stream, options);
}
void registerOutputFormatProcessorORC(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("ORC", [](

View File

@ -43,7 +43,7 @@ public:
void finalize() override;
private:
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type, const std::string & column_name);
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
/// converted to unsigned char (bugprone-signed-char-misuse in clang).
@ -71,6 +71,8 @@ private:
size_t getColumnSize(const IColumn & column, DataTypePtr & type);
size_t getMaxColumnSize(Chunk & chunk);
void prepareWriter();
const FormatSettings format_settings;
ORCOutputStream output_stream;
DataTypes data_types;

View File

@ -38,7 +38,6 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_
Chunk ParquetBlockInputFormat::generate()
{
Chunk res;
const Block & header = getPort().getHeader();
if (!file_reader)
prepareReader();
@ -54,7 +53,7 @@ Chunk ParquetBlockInputFormat::generate()
++row_group_current;
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "Parquet");
arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
return res;
}
@ -67,6 +66,29 @@ void ParquetBlockInputFormat::resetParser()
row_group_current = 0;
}
static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
{
if (type->id() == arrow::Type::LIST)
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type());
if (type->id() == arrow::Type::STRUCT)
{
int indices = 0;
auto * struct_type = static_cast<arrow::StructType *>(type.get());
for (int i = 0; i != struct_type->num_fields(); ++i)
indices += countIndicesForType(struct_type->field(i)->type());
return indices;
}
if (type->id() == arrow::Type::MAP)
{
auto * map_type = static_cast<arrow::MapType *>(type.get());
return countIndicesForType(map_type->key_type()) + countIndicesForType(map_type->item_type());
}
return 1;
}
void ParquetBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
@ -76,12 +98,21 @@ void ParquetBlockInputFormat::prepareReader()
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), schema, "Parquet");
int index = 0;
for (int i = 0; i < schema->num_fields(); ++i)
{
/// STRUCT type require the number of indexes equal to the number of
/// nested elements, so we should recursively
/// count the number of indices we need for this type.
int indexes_count = countIndicesForType(schema->field(i)->type());
if (getPort().getHeader().has(schema->field(i)->name()))
{
column_indices.push_back(i);
for (int j = 0; j != indexes_count; ++j)
column_indices.push_back(index + j);
}
index += indexes_count;
}
}

View File

@ -12,6 +12,8 @@ namespace arrow { class Buffer; }
namespace DB
{
class ArrowColumnToCHColumn;
class ParquetBlockInputFormat : public IInputFormat
{
public:
@ -32,6 +34,7 @@ private:
int row_group_total = 0;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
int row_group_current = 0;
};

View File

@ -13,7 +13,6 @@
#include <arrow/api.h>
#include <arrow/util/memory.h>
#include <parquet/arrow/writer.h>
#include <parquet/deprecated_io.h>
#include "ArrowBufferedStreams.h"
#include "CHColumnToArrowColumn.h"
@ -32,11 +31,16 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo
void ParquetBlockOutputFormat::consume(Chunk chunk)
{
const Block & header = getPort(PortKind::Main).getHeader();
const size_t columns_num = chunk.getNumColumns();
std::shared_ptr<arrow::Table> arrow_table;
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Parquet");
if (!ch_column_to_arrow_column)
{
const Block & header = getPort(PortKind::Main).getHeader();
ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(header, "Parquet");
}
ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num);
if (!file_writer)
{

View File

@ -21,6 +21,9 @@ namespace arrow
namespace DB
{
class CHColumnToArrowColumn;
class ParquetBlockOutputFormat : public IOutputFormat
{
public:
@ -36,6 +39,7 @@ private:
const FormatSettings format_settings;
std::unique_ptr<parquet::arrow::FileWriter> file_writer;
std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column;
};
}

View File

@ -377,8 +377,8 @@ class ClickhouseIntegrationTestsRunner:
test_cmd = ' '.join([test for test in sorted(test_names)])
parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else ""
cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format(
repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), output_path)
cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --color=no --durations=0 {}' | tee {}".format(
repo_path, image_cmd, test_cmd, parallel_cmd, _get_deselect_option(self.should_skip_tests()), output_path)
with open(log_path, 'w') as log:
logging.info("Executing cmd: %s", cmd)

View File

@ -28,10 +28,4 @@ def cleanup_environment():
logging.exception(f"cleanup_environment:{str(e)}")
pass
yield
def pytest_addoption(parser):
parser.addoption("--run-id", default="", help="run-id is used as postfix in _instances_{} directory")
def pytest_configure(config):
os.environ['INTEGRATION_TESTS_RUN_ID'] = config.option.run_id
yield

View File

@ -209,14 +209,7 @@ class ClickHouseCluster:
project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r'[^a-z0-9]', '', project_name.lower())
instances_dir_name = '_instances'
if self.name:
instances_dir_name += '_' + self.name
if 'INTEGRATION_TESTS_RUN_ID' in os.environ:
instances_dir_name += '_' + shlex.quote(os.environ['INTEGRATION_TESTS_RUN_ID'])
self.instances_dir = p.join(self.base_dir, instances_dir_name)
self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {}
@ -434,15 +427,7 @@ class ClickHouseCluster:
pass
def get_docker_handle(self, docker_id):
exception = None
for i in range(5):
try:
return self.docker_client.containers.get(docker_id)
except Exception as ex:
print("Got exception getting docker handle", str(ex))
time.sleep(i * 2)
exception = ex
raise exception
return self.docker_client.containers.get(docker_id)
def get_client_cmd(self):
cmd = self.client_bin_path

View File

@ -22,6 +22,9 @@
<value>Native</value>
<value>Avro</value>
<value>MsgPack</value>
<value>ORC</value>
<value>Parquet</value>
<value>Arrow</value>
</values>
</substitution>
</substitutions>

View File

@ -36,6 +36,8 @@
<value>Avro</value>
<value>MsgPack</value>
<value>ORC</value>
<value>Parquet</value>
<value>Arrow</value>
</values>
</substitution>
</substitutions>

View File

@ -298,24 +298,40 @@ Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not y
[[['a','b'],['c','d']],[[],['e']]] 1
[[['a','b'],['c','d'],['e']],[[],['f']]] 1
=== Try load data from nested_maps.snappy.parquet
Code: 70. DB::Ex---tion: The type "map" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin
{'a':{1:1,2:0}} 1 1
{'b':{1:1}} 1 1
{'c':{}} 1 1
{'d':{}} 1 1
{'e':{1:1}} 1 1
{'f':{3:1,4:0,5:1}} 1 1
=== Try load data from non_hadoop_lz4_compressed.parquet
1593604800 abc 42
1593604800 def 7.7
1593604801 abc 42.125
1593604801 def 7.7
=== Try load data from nonnullable.impala.parquet
../contrib/arrow/cpp/src/arrow/array/array_nested.cc:192: Check failed: (self->list_type_->value_type()->id()) == (data->child_data[0]->type->id())
8 [-1] [[-1,-2],[]] {'k1':-1} [{},{'k1':1},{},{}] (-1,[-1],([[(-1)]]),{})
=== Try load data from nullable.impala.parquet
../contrib/arrow/cpp/src/arrow/array/array_nested.cc:192: Check failed: (self->list_type_->value_type()->id()) == (data->child_data[0]->type->id())
1 [1,2,3] [[1,2],[3,4]] {'k1':1,'k2':100} [{'k1':1}] (1,[1],([[(10),(-10)],[(11)]]),{'foo':(([1.1]))})
2 [NULL,1,2,NULL,3,NULL] [[NULL,1,2,NULL],[3,NULL,4],[],[]] {'k1':2,'k2':NULL} [{'k3':NULL,'k1':1},{},{}] (NULL,[NULL],([[(NULL),(10),(NULL),(-10),(NULL)],[(11),(NULL)],[],[]]),{'g1':(([2.2,NULL])),'g2':(([])),'g3':(([])),'g4':(([])),'g5':(([]))})
3 [] [[]] {} [{},{}] (NULL,[],([]),{})
4 [] [] {} [] (NULL,[],([]),{})
5 [] [] {} [] (NULL,[],([]),{'foo':(([2.2,3.3]))})
6 [] [] {} [] (NULL,[],([]),{})
7 [] [[],[5,6]] {'k1':NULL,'k3':NULL} [] (7,[2,3,NULL],([[],[(NULL)],[]]),{})
=== Try load data from nullable_list.parquet
[1,NULL,2] [NULL,'Some string',NULL] [0.00,NULL,42.42]
[NULL] [NULL] [NULL]
[] [] []
=== Try load data from nulls.snappy.parquet
Code: 70. DB::Ex---tion: The type "struct" of an input column "b_struct" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin
(NULL)
(NULL)
(NULL)
(NULL)
(NULL)
(NULL)
(NULL)
(NULL)
=== Try load data from single_nan.parquet
\N
=== Try load data from userdata1.parquet

View File

@ -55,7 +55,10 @@ for NAME in $(find "$DATA_DIR"/*.parquet -print0 | xargs -0 -n 1 basename | LC_A
COLUMNS=$(cat "$COLUMNS_FILE") || continue
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load ($COLUMNS) ENGINE = Memory"
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET allow_experimental_map_type = 1;
CREATE TABLE parquet_load ($COLUMNS) ENGINE = Memory;
EOF
# Some files contain unsupported data structures, exception is ok.
cat "$DATA_DIR"/"$NAME" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO parquet_load FORMAT Parquet" 2>&1 | sed 's/Exception/Ex---tion/'

View File

@ -0,0 +1,6 @@
Arrow
{1:2,2:3} {'1':'a','2':'b'} {1:(1,2),2:(3,4)} {1:[1,2],2:[3,4]} [{1:2,2:3},{3:4,4:5}] ({1:2,2:3},{'a':'b','c':'d'}) [{1:[({1:2},(1)),({2:3},(2))]},{2:[({3:4},(3)),({4:5},(4))]}]
Parquet
{1:2,2:3} {'1':'a','2':'b'} {1:(1,2),2:(3,4)} {1:[1,2],2:[3,4]} [{1:2,2:3},{3:4,4:5}] ({1:2,2:3},{'a':'b','c':'d'}) [{1:[({1:2},(1)),({2:3},(2))]},{2:[({3:4},(3)),({4:5},(4))]}]
ORC
{1:2,2:3} {'1':'a','2':'b'} {1:(1,2),2:(3,4)} {1:[1,2],2:[3,4]} [{1:2,2:3},{3:4,4:5}] ({1:2,2:3},{'a':'b','c':'d'}) [{1:[({1:2},(1)),({2:3},(2))]},{2:[({3:4},(3)),({4:5},(4))]}]

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS maps"
${CLICKHOUSE_CLIENT} --multiquery <<EOF
SET allow_experimental_map_type = 1;
CREATE TABLE maps (m1 Map(UInt32, UInt32), m2 Map(String, String), m3 Map(UInt32, Tuple(UInt32, UInt32)), m4 Map(UInt32, Array(UInt32)), m5 Array(Map(UInt32, UInt32)), m6 Tuple(Map(UInt32, UInt32), Map(String, String)), m7 Array(Map(UInt32, Array(Tuple(Map(UInt32, UInt32), Tuple(UInt32)))))) ENGINE=Memory();
EOF
${CLICKHOUSE_CLIENT} --query="INSERT INTO maps VALUES ({1 : 2, 2 : 3}, {'1' : 'a', '2' : 'b'}, {1 : (1, 2), 2 : (3, 4)}, {1 : [1, 2], 2 : [3, 4]}, [{1 : 2, 2 : 3}, {3 : 4, 4 : 5}], ({1 : 2, 2 : 3}, {'a' : 'b', 'c' : 'd'}), [{1 : [({1 : 2}, (1)), ({2 : 3}, (2))]}, {2 : [({3 : 4}, (3)), ({4 : 5}, (4))]}])"
formats="Arrow Parquet ORC";
for format in ${formats}; do
echo $format
${CLICKHOUSE_CLIENT} --query="SELECT * FROM maps FORMAT Parquet" > "${CLICKHOUSE_TMP}"/maps
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE maps"
cat "${CLICKHOUSE_TMP}"/maps | ${CLICKHOUSE_CLIENT} -q "INSERT INTO maps FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM maps"
done
${CLICKHOUSE_CLIENT} --query="DROP TABLE maps"

View File

@ -0,0 +1,6 @@
Arrow
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
Parquet
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]
ORC
(1,2) ('1','2') ((1,'1'),1) ((1,2),('1','2')) ([1,2,3],1) (([1,2,3],[1,2,3]),([[1,2,3],[1,2,3]],1)) [([[1,2,3],[1,2,3]],([(1,2),(1,2)],1))]

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS tuples";
${CLICKHOUSE_CLIENT} --query="CREATE TABLE tuples (t1 Tuple(UInt32, UInt32), t2 Tuple(String, String), t3 Tuple(Tuple(UInt32, String), UInt32), t4 Tuple(Tuple(UInt32, UInt32), Tuple(String, String)), t5 Tuple(Array(UInt32), UInt32), t6 Tuple(Tuple(Array(UInt32), Array(UInt32)), Tuple(Array(Array(UInt32)), UInt32)), t7 Array(Tuple(Array(Array(UInt32)), Tuple(Array(Tuple(UInt32, UInt32)), UInt32)))) ENGINE=Memory()"
${CLICKHOUSE_CLIENT} --query="INSERT INTO tuples VALUES ((1, 2), ('1', '2'), ((1, '1'), 1), ((1, 2), ('1', '2')), ([1,2,3], 1), (([1,2,3], [1,2,3]), ([[1,2,3], [1,2,3]], 1)), [([[1,2,3], [1,2,3]], ([(1, 2), (1, 2)], 1))])"
formats="Arrow Parquet ORC";
for format in ${formats}; do
echo $format
${CLICKHOUSE_CLIENT} --query="SELECT * FROM tuples FORMAT $format" > "${CLICKHOUSE_TMP}"/tuples
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE tuples"
cat "${CLICKHOUSE_TMP}"/tuples | ${CLICKHOUSE_CLIENT} -q "INSERT INTO tuples FORMAT $format"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM tuples"
done
${CLICKHOUSE_CLIENT} --query="DROP TABLE tuples"

View File

@ -4,7 +4,7 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS orc_arrays"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_arrays"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_arrays (arr1 Array(Int8), arr2 Array(UInt8), arr3 Array(Int16), arr4 Array(UInt16), arr5 Array(Int32), arr6 Array(UInt32), arr7 Array(Int64), arr8 Array(UInt64), arr9 Array(String), arr10 Array(FixedString(4)), arr11 Array(Float32), arr12 Array(Float64), arr13 Array(Date), arr14 Array(Datetime)) ENGINE=Memory()"
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_arrays VALUES ([1,-2,3],[1,2,3],[100,-200,300],[100,200,300],[10000000,-20000000,30000000],[10000000,2000000,3000000],[100000000000000,-200000000000,3000000000000],[100000000000000,20000000000000,3000000000000],['Some string','Some string','Some string'],['0000','1111','2222'],[42.42,424.2,0.4242],[424242.424242,4242042420.242424,42],['2000-01-01','2001-01-01','2002-01-01'],['2000-01-01 00:00:00','2001-01-01 00:00:00','2002-01-01 00:00:00']),([],[],[],[],[],[],[],[],[],[],[],[],[],[])"

View File

@ -0,0 +1,2 @@
1 ['a','b','c'] ('z','6')
2 ['d','e'] ('x','9')

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_dicts"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_dicts (a LowCardinality(String), b Array(LowCardinality(String)), c Tuple(LowCardinality(String), LowCardinality(String))) ENGINE=Memory()"
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_dicts VALUES ('1', ['a', 'b', 'c'], ('z', '6')), ('2', ['d', 'e'], ('x', '9'))"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_dicts FORMAT Arrow SETTINGS output_format_arrow_low_cardinality_as_dictionary=1" > "${CLICKHOUSE_TMP}"/dicts.arrow
cat "${CLICKHOUSE_TMP}"/dicts.arrow | ${CLICKHOUSE_CLIENT} -q "INSERT INTO arrow_dicts FORMAT Arrow"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_dicts"
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_dicts"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_dicts (a LowCardinality(String)) ENGINE=Memory()"
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_dicts SELECT toString(number % 500) from numbers(10000000)"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_dicts FORMAT Arrow SETTINGS output_format_arrow_low_cardinality_as_dictionary=1" > "${CLICKHOUSE_TMP}"/dicts.arrow
cat "${CLICKHOUSE_TMP}"/dicts.arrow | ${CLICKHOUSE_CLIENT} -q "INSERT INTO arrow_dicts FORMAT Arrow"
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_dicts"

View File

@ -1 +1 @@
`a` Nullable(String), `b` Nullable(Int32), `c` Nullable(Float64), `d` Nullable(UInt8), `e` Nullable(Int32)
`a` Nullable(String), `b` Array(Nullable(Int32)), `c` Nullable(Float64), `d` Nullable(UInt8), `e` Array(Nullable(Int32))

View File

@ -1 +1 @@
`a` Tuple(Nullable(String), Nullable(Int32), Nullable(UInt8)), `b` Nullable(Int32), `c` Nullable(Float64)
`a` Map(String, Map(Int32, Nullable(UInt8))), `b` Nullable(Int32), `c` Nullable(Float64)

View File

@ -1 +1 @@
`ID` Nullable(Int64), `Int_Array` Nullable(Int32), `int_array_array` Nullable(Int32), `Int_Map` Tuple(Nullable(String), Nullable(Int32)), `int_map_array` Tuple(Nullable(String), Nullable(Int32)), `nested_Struct` Tuple(Nullable(Int32), Nullable(Int32), Nullable(Int32), Nullable(String), Nullable(String), Nullable(Float64))
`ID` Nullable(Int64), `Int_Array` Array(Nullable(Int32)), `int_array_array` Array(Array(Nullable(Int32))), `Int_Map` Map(String, Nullable(Int32)), `int_map_array` Array(Map(String, Nullable(Int32))), `nested_Struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))

View File

@ -1 +1 @@
`id` Nullable(Int64), `int_array` Nullable(Int32), `int_array_Array` Nullable(Int32), `int_map` Tuple(Nullable(String), Nullable(Int32)), `int_Map_Array` Tuple(Nullable(String), Nullable(Int32)), `nested_struct` Tuple(Nullable(Int32), Nullable(Int32), Nullable(Int32), Nullable(String), Nullable(String), Nullable(Float64))
`id` Nullable(Int64), `int_array` Array(Nullable(Int32)), `int_array_Array` Array(Array(Nullable(Int32))), `int_map` Map(String, Nullable(Int32)), `int_Map_Array` Array(Map(String, Nullable(Int32))), `nested_struct` Tuple(Nullable(Int32), Array(Nullable(Int32)), Tuple(Array(Array(Tuple(Nullable(Int32))))), Map(String, Tuple(Tuple(Array(Nullable(Float64))))))

View File

@ -1 +1 @@
`b_struct` Nullable(Int32)
`b_struct` Tuple(Nullable(Int32))