fix 'ALTER MODIFY COLUMN' with compact parts

This commit is contained in:
Anton Popov 2020-04-08 19:20:52 +03:00
parent cf14718588
commit c8c4dc8104
4 changed files with 34 additions and 57 deletions

View File

@ -32,6 +32,8 @@ IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part))
{
for (const NameAndTypePair & column_from_part : data_part->getColumns())
columns_from_part[column_from_part.name] = column_from_part.type;
}
IMergeTreeReader::~IMergeTreeReader() = default;
@ -183,6 +185,23 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
}
NameAndTypePair IMergeTreeReader::getColumnFromPart(const NameAndTypePair & required_column) const
{
auto it = columns_from_part.find(required_column.name);
if (it != columns_from_part.end())
return {it->first, it->second};
if (alter_conversions.isColumnRenamed(required_column.name))
{
String old_name = alter_conversions.getColumnOldName(required_column.name);
it = columns_from_part.find(old_name);
if (it != columns_from_part.end())
return {it->first, it->second};
}
return required_column;
}
void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
{
try
@ -209,10 +228,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
if (res_columns[pos] == nullptr)
continue;
if (columns_from_part.count(name_and_type->name))
copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name});
else
copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
copy_block.insert({res_columns[pos], getColumnFromPart(*name_and_type).type, name_and_type->name});
}
DB::performRequiredConversions(copy_block, columns, storage.global_context);

View File

@ -4,7 +4,6 @@
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
namespace DB
{
@ -59,6 +58,9 @@ public:
MergeTreeData::DataPartPtr data_part;
protected:
/// Returns actual column type in part, which can differ from table metadata.
NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
@ -67,8 +69,6 @@ protected:
/// Columns that are read.
NamesAndTypesList columns;
std::unordered_map<String, DataTypePtr> columns_from_part;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
@ -78,8 +78,13 @@ protected:
MarkRanges all_mark_ranges;
friend class MergeTreeRangeReader::DelayedStream;
private:
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;
/// Actual data type of columns in part
std::unordered_map<String, DataTypePtr> columns_from_part;
};
}

View File

@ -78,15 +78,9 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
auto name_and_type = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++name_and_type)
{
const auto & [name, type] = *name_and_type;
const auto & [name, type] = getColumnFromPart(*name_and_type);
auto position = data_part->getColumnPosition(name);
if (!position && alter_conversions.isColumnRenamed(name))
{
String old_name = alter_conversions.getColumnOldName(name);
position = data_part->getColumnPosition(old_name);
}
if (!position && typeid_cast<const DataTypeArray *>(type.get()))
{
/// If array of Nested column is missing in part,
@ -118,7 +112,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
bool append = res_columns[i] != nullptr;
if (!append)
res_columns[i] = column_it->type->createColumn();
res_columns[i] = getColumnFromPart(*column_it).type->createColumn();
mutable_columns[i] = res_columns[i]->assumeMutable();
}
@ -132,15 +126,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
if (!res_columns[pos])
continue;
auto [name, type] = *name_and_type;
if (alter_conversions.isColumnRenamed(name))
{
String old_name = alter_conversions.getColumnOldName(name);
if (!data_part->getColumnPosition(name) && data_part->getColumnPosition(old_name))
name = old_name;
}
auto [name, type] = getColumnFromPart(*name_and_type);
auto & column = mutable_columns[pos];
try

View File

@ -41,28 +41,10 @@ MergeTreeReaderWide::MergeTreeReaderWide(
{
try
{
for (const NameAndTypePair & column_from_part : data_part->getColumns())
columns_from_part[column_from_part.name] = column_from_part.type;
for (const NameAndTypePair & column : columns)
{
if (columns_from_part.count(column.name))
{
addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_);
}
else
{
if (alter_conversions.isColumnRenamed(column.name))
{
String old_name = alter_conversions.getColumnOldName(column.name);
if (columns_from_part.count(old_name))
addStreams(old_name, *columns_from_part[old_name], profile_callback_, clock_type_);
}
else
{
addStreams(column.name, *column.type, profile_callback_, clock_type_);
}
}
auto column_from_part = getColumnFromPart(column);
addStreams(column_from_part.name, *column_from_part.type, profile_callback_, clock_type_);
}
}
catch (...)
@ -93,19 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
String name = name_and_type->name;
if (alter_conversions.isColumnRenamed(name))
{
String original_name = alter_conversions.getColumnOldName(name);
if (!columns_from_part.count(name) && columns_from_part.count(original_name))
name = original_name;
}
DataTypePtr type;
if (columns_from_part.count(name))
type = columns_from_part[name];
else
type = name_and_type->type;
auto [name, type] = getColumnFromPart(*name_and_type);
/// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr;