Fixed vertical merge of multidimensional Nested structures [#CLICKHOUSE-3901]

This commit is contained in:
Alexey Milovidov 2018-10-17 00:22:41 +03:00
parent 6d6e237ac9
commit 98543b7c41
4 changed files with 39 additions and 32 deletions

View File

@ -1223,7 +1223,9 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */, unused_written_offsets);
in.readPrefix();
out.writePrefix();

View File

@ -22,8 +22,6 @@
#include <Interpreters/MutationsInterpreter.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
@ -750,7 +748,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written;
auto it_name_and_type = gathering_columns.cbegin();
@ -767,22 +764,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
column_num < gathering_column_names_size;
++column_num, ++it_name_and_type)
{
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = Nested::extractTableName(column_name);
Names column_name_{column_name};
Names column_names{column_name};
Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
bool offset_written = offset_columns_written.count(offset_column_name);
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, true, min_bytes_when_use_direct_io, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
@ -793,7 +788,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, offset_written);
MergedColumnOnlyOutputStream column_to(
data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns);
size_t column_elems_written = 0;
column_to.writePrefix();
@ -811,9 +807,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
}
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written = merging_column_names.size() + column_num;
@ -971,7 +964,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
createHardLink(dir_it.path().toString(), destination.toString());
}
MergedColumnOnlyOutputStream out(data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets);
in->readPrefix();
out.writePrefix();

View File

@ -19,6 +19,7 @@ constexpr auto MARKS_FILE_EXTENSION = ".mrk";
}
/// Implementation of IMergedBlockOutputStream.
IMergedBlockOutputStream::IMergedBlockOutputStream(
@ -70,7 +71,7 @@ void IMergedBlockOutputStream::addStreams(
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, OffsetColumns & offset_columns, bool skip_offsets)
const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
@ -93,7 +94,7 @@ void IMergedBlockOutputStream::writeData(
const String & name,
const IDataType & type,
const IColumn & column,
OffsetColumns & offset_columns,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
{
@ -304,7 +305,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
@ -395,7 +396,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
size_t rows = block.rows();
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto sort_columns = storage.getPrimarySortColumns();
@ -427,7 +428,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (serialization_states.empty())
{
serialization_states.reserve(columns_list.size());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & col : columns_list)
@ -501,12 +502,15 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_)
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns)
{
}
@ -517,7 +521,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
@ -535,7 +539,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
size_t rows = block.rows();
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
@ -558,11 +562,11 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
auto & column = header.safeGetByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
auto & column = header.getByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}

View File

@ -23,8 +23,9 @@ public:
CompressionSettings compression_settings_,
size_t aio_threshold_);
using WrittenOffsetColumns = std::set<std::string>;
protected:
using OffsetColumns = std::set<std::string>;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
@ -67,10 +68,10 @@ protected:
void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, OffsetColumns & offset_columns, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns,
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state);
MergeTreeData & storage;
@ -150,13 +151,15 @@ private:
};
/// Writes only those columns that are in `block`
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_);
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -171,6 +174,9 @@ private:
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
}