This commit is contained in:
Alexander Gololobov 2022-07-17 20:41:17 +02:00
parent f956810fdd
commit f324ca9921
20 changed files with 256 additions and 303 deletions

View File

@ -459,7 +459,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, optimize_respect_aliases, true, "If it is set to true, it will respect aliases in WHERE/GROUP BY/ORDER BY, that will help with partition pruning/secondary indexes/optimize_aggregation_in_order/optimize_read_in_order/optimize_trivial_count", 0) \
M(UInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
M(Bool, allow_experimental_lightweight_delete, false, "Enable lightweight DELETE mutations for mergetree tables. Work in progress", 0) \
M(Bool, allow_experimental_lwd2, false, "Enable lightweight DELETE mutations using __rows_exists column for mergetree tables. Work in progress", 0) \
M(Bool, allow_experimental_lightweight_delete_with_row_exists, false, "Enable lightweight DELETE mutations using __rows_exists column for mergetree tables. Work in progress", 0) \
M(Bool, lightweight_delete_mutation, true, "Enable to make ordinary ALTER DELETE queries lightweight for mergetree tables", 0) \
M(Bool, optimize_move_functions_out_of_any, false, "Move functions out of aggregate functions 'any', 'anyLast'.", 0) \
M(Bool, optimize_normalize_count_variants, true, "Rewrite aggregate functions that semantically equals to count() as count().", 0) \

View File

@ -33,8 +33,11 @@ InterpreterDeleteQuery::InterpreterDeleteQuery(const ASTPtr & query_ptr_, Contex
BlockIO InterpreterDeleteQuery::execute()
{
if (!getContext()->getSettingsRef().allow_experimental_lightweight_delete && !getContext()->getSettingsRef().allow_experimental_lwd2)
if (!getContext()->getSettingsRef().allow_experimental_lightweight_delete &&
!getContext()->getSettingsRef().allow_experimental_lightweight_delete_with_row_exists)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Lightweight delete mutate is experimental. Set `allow_experimental_lightweight_delete` setting to enable it");
}
FunctionNameNormalizer().visit(query_ptr.get());
const ASTDeleteQuery & delete_query = query_ptr->as<ASTDeleteQuery &>();
@ -70,9 +73,9 @@ BlockIO InterpreterDeleteQuery::execute()
MutationCommands mutation_commands;
MutationCommand mut_command;
if (getContext()->getSettingsRef().allow_experimental_lwd2)
if (getContext()->getSettingsRef().allow_experimental_lightweight_delete_with_row_exists)
{
/// UPDATE __row_exists = 0 WHERE predicate
/// Build "UPDATE __row_exists = 0 WHERE predicate" query
mut_command.type = MutationCommand::Type::UPDATE;
mut_command.predicate = delete_query.predicate;
@ -80,14 +83,14 @@ BlockIO InterpreterDeleteQuery::execute()
command->type = ASTAlterCommand::UPDATE;
command->predicate = delete_query.predicate;
command->update_assignments = std::make_shared<ASTExpressionList>();
auto set_row_exists = std::make_shared<ASTAssignment>();
set_row_exists->column_name = "__row_exists";
auto set_row_does_not_exist = std::make_shared<ASTAssignment>();
set_row_does_not_exist->column_name = metadata_snapshot->lightweight_delete_description.filter_column.name;
auto zero_value = std::make_shared<ASTLiteral>(DB::Field(UInt8(0)));
set_row_exists->children.push_back(zero_value);
command->update_assignments->children.push_back(set_row_exists);
set_row_does_not_exist->children.push_back(zero_value);
command->update_assignments->children.push_back(set_row_does_not_exist);
command->children.push_back(command->predicate);
command->children.push_back(command->update_assignments);
mut_command.column_to_update_expression[set_row_exists->column_name] = zero_value;
mut_command.column_to_update_expression[set_row_does_not_exist->column_name] = zero_value;
mut_command.ast = command->ptr();
mutation_commands.emplace_back(mut_command);

View File

@ -29,7 +29,6 @@
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -297,9 +296,7 @@ MutationsInterpreter::MutationsInterpreter(
, is_lightweight(is_lightweight_)
{
if (is_lightweight)
{
mutation_ast = prepareLightweightDelete(!can_execute);
}
else
mutation_ast = prepare(!can_execute);
}
@ -356,7 +353,11 @@ static void validateUpdateColumns(
}
}
if (!found && column_name != "__row_exists") /// TODO: properly handle updating __row_exists column for LWD
/// Allow to override values of virtual columns
if (!found && column_name == metadata_snapshot->lightweight_delete_description.filter_column.name)
found = true;
if (!found)
{
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
{
@ -509,7 +510,14 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
///
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
const auto type = (column == "__row_exists" ? std::make_shared<DataTypeUInt8>() : columns_desc.getPhysical(column).type);
DataTypePtr type;
if (auto physical_column = columns_desc.tryGetPhysical(column))
type = physical_column->type;
else if (column == metadata_snapshot->lightweight_delete_description.filter_column.name)
type = metadata_snapshot->lightweight_delete_description.filter_column.type;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column);
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
const auto & update_expr = kv.second;

View File

@ -785,7 +785,8 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
/// Drop alias is metadata alter, in other case mutation is required.
if (type == DROP_COLUMN)
return metadata.columns.hasColumnOrNested(GetColumnsOptions::AllPhysical, column_name);
return metadata.columns.hasColumnOrNested(GetColumnsOptions::AllPhysical, column_name) ||
column_name == metadata.lightweight_delete_description.filter_column.name;
if (type != MODIFY_COLUMN || data_type == nullptr)
return false;
@ -1149,7 +1150,9 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
}
else if (command.type == AlterCommand::DROP_COLUMN)
{
if (all_columns.has(command.column_name) || all_columns.hasNested(command.column_name))
if (all_columns.has(command.column_name) ||
all_columns.hasNested(command.column_name) ||
(command.clear && column_name == metadata.lightweight_delete_description.filter_column.name))
{
if (!command.clear) /// CLEAR column is Ok even if there are dependencies.
{

View File

@ -813,25 +813,25 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
{
auto columns = global_ctx->merging_column_names;
if (part->getColumns().contains("__row_exists"))
columns.emplace_back("__row_exists");
/// The part might have some rows masked by lightweight deletes
const auto lwd_filter_column = global_ctx->metadata_snapshot->lightweight_delete_description.filter_column.name;
const bool need_to_filter_deleted_rows = !lwd_filter_column.empty() && part->getColumns().contains(lwd_filter_column);
if (need_to_filter_deleted_rows)
columns.emplace_back(lwd_filter_column);
auto input = std::make_unique<MergeTreeSequentialSource>(
*global_ctx->data, global_ctx->storage_snapshot, part, columns, ctx->read_with_direct_io, true);
Pipe pipe(std::move(input));
/////////////
if (part->getColumns().contains("__row_exists"))
/// Add filtering step that discards deleted rows
if (need_to_filter_deleted_rows)
{
pipe.addSimpleTransform([](const Block & header)
pipe.addSimpleTransform([lwd_filter_column](const Block & header)
{
return std::make_shared<FilterTransform>(header, nullptr, "__row_exists", "__row_exists");
return std::make_shared<FilterTransform>(header, nullptr, lwd_filter_column, true);
});
}
/////////////
if (global_ctx->metadata_snapshot->hasSortingKey())
{

View File

@ -208,16 +208,58 @@ Chunk MergeTreeBaseSelectProcessor::generate()
auto res = readFromPart();
if (res.hasRows())
if (res.row_count)
{
injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names);
return res;
injectVirtualColumns(res.block, res.row_count, task.get(), partition_value_type, virt_column_names);
/// Reorder the columns according to output header
const auto & output_header = output.getHeader();
Columns ordered_columns;
ordered_columns.reserve(output_header.columns());
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto name = output_header.getByPosition(i).name;
ordered_columns.push_back(res.block.getByName(name).column);
}
return Chunk(ordered_columns, res.row_count);
}
}
return {};
}
void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
reader = data_part->getReader(task_columns.columns, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback);
pre_reader_for_step.clear();
/// Add lightweight delete filtering step
const auto & lightweigth_delete_info = metadata_snapshot->lightweight_delete_description;
if (!reader_settings.skip_deleted_mask && data_part->getColumns().contains(lightweigth_delete_info.filter_column.name))
{
pre_reader_for_step.push_back(data_part->getReader({lightweigth_delete_info.filter_column}, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task_columns.pre_columns)
{
pre_reader_for_step.push_back(data_part->getReader(pre_columns_per_step, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback));
}
}
}
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
@ -225,9 +267,10 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
bool last_reader = false;
size_t pre_readers_shift = 0;
if (!reader_settings.skip_deleted_mask && current_task.data_part->getColumns().contains("__row_exists"))
/// Add filtering step with lightweight delete mask
const auto & lightweigth_delete_info = storage_snapshot->metadata->lightweight_delete_description;
if (!reader_settings.skip_deleted_mask && current_task.data_part->getColumns().contains(lightweigth_delete_info.filter_column.name))
{
// last_reader = !prewhere_actions || prewhere_actions->steps.empty();
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[0].get(), prev_reader, &lwd_filter_step, last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
@ -241,7 +284,6 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
{
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
@ -304,7 +346,7 @@ static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 cur
}
Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::readFromPartImpl()
{
if (task->size_predictor)
task->size_predictor->startBlock();
@ -347,24 +389,13 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
if (read_result.num_rows == 0)
return {};
Columns ordered_columns;
ordered_columns.reserve(header_without_virtual_columns.columns());
BlockAndRowCount res = { sample_block.cloneWithColumns(read_result.columns), read_result.num_rows };
/// Reorder columns. TODO: maybe skip for default case.
for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps)
{
const auto & name = header_without_virtual_columns.getByPosition(ps).name;
if (name == "__row_exists" && !sample_block.has(name))
continue; /// TODO: properly deal with cases when __row_exists is not read and is filled later
auto pos_in_sample_block = sample_block.getPositionByName(name);
ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
}
return Chunk(std::move(ordered_columns), read_result.num_rows);
return res;
}
Chunk MergeTreeBaseSelectProcessor::readFromPart()
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::readFromPart()
{
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
@ -375,22 +406,46 @@ Chunk MergeTreeBaseSelectProcessor::readFromPart()
namespace
{
/// Simple interfaces to insert virtual columns.
struct VirtualColumnsInserter
{
virtual ~VirtualColumnsInserter() = default;
explicit VirtualColumnsInserter(Block & block_) : block(block_) {}
virtual void insertArrayOfStringsColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt8Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0;
bool columnExists(const String & name) const { return block.has(name); }
virtual void insertPartitionValueColumn(
size_t rows,
const Row & partition_value,
const DataTypePtr & partition_value_type,
const String & name) = 0;
void insertStringColumn(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataTypeString>(), name});
}
void insertUInt8Column(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataTypeUInt8>(), name});
}
void insertUInt64Column(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
}
void insertUUIDColumn(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
void insertPartitionValueColumn(
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name)
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
block.insert({column, partition_value_type, name});
}
Block & block;
};
}
@ -400,16 +455,34 @@ static void injectNonConstVirtualColumns(
VirtualColumnsInserter & inserter,
const Names & virtual_columns)
{
if (unlikely(rows))
throw Exception("Cannot insert non-constant virtual column to non-empty chunk.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & virtual_column_name : virtual_columns)
{
if (virtual_column_name == "_part_offset")
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
{
if (!rows)
{
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
}
else
{
if (!inserter.columnExists(virtual_column_name))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} must have been filled part reader",
virtual_column_name);
}
}
if (virtual_column_name == "__row_exists")
inserter.insertUInt8Column(DataTypeUInt8().createColumn(), virtual_column_name);
{
/// If __row_exists column isn't present in the part then fill it here with 1s
ColumnPtr column;
if (rows)
column = DataTypeUInt8().createColumnConst(rows, 1)->convertToFullColumnIfConst();
else
column = DataTypeUInt8().createColumn();
inserter.insertUInt8Column(column, virtual_column_name);
}
}
}
@ -489,148 +562,15 @@ static void injectPartConstVirtualColumns(
}
}
namespace
{
struct VirtualColumnsInserterIntoBlock : public VirtualColumnsInserter
{
explicit VirtualColumnsInserterIntoBlock(Block & block_) : block(block_) {}
void insertArrayOfStringsColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), name});
}
void insertStringColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeString>(), name});
}
void insertUInt8Column(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUInt8>(), name});
}
void insertUInt64Column(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
}
void insertUUIDColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
void insertPartitionValueColumn(
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name) final
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
block.insert({column, partition_value_type, name});
}
Block & block;
};
struct VirtualColumnsInserterIntoColumns : public VirtualColumnsInserter
{
explicit VirtualColumnsInserterIntoColumns(Columns & columns_) : columns(columns_) {}
void insertArrayOfStringsColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertStringColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertUInt8Column(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertUInt64Column(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertUUIDColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertPartitionValueColumn(
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String &) final
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
columns.push_back(column);
}
Columns & columns;
};
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
VirtualColumnsInserterIntoBlock inserter{block};
VirtualColumnsInserter inserter{block};
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
/// Note that the order is important: virtual columns filled by the range reader must go first
injectNonConstVirtualColumns(block.rows(), inserter, virtual_columns);
injectPartConstVirtualColumns(block.rows(), inserter, task, partition_value_type, virtual_columns);
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Chunk & chunk, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
UInt64 num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
VirtualColumnsInserterIntoColumns inserter{columns};
/////////////////////////
// TODO: implement properly
for (const auto & virtual_column_name : virtual_columns)
{
if (virtual_column_name == "__row_exists")
{
if (task->data_part->getColumns().contains(virtual_column_name))
{
/// If this column is present in the part it must be read from the data
assert(task->task_columns.columns.contains(virtual_column_name));
}
else
{
/// If __row_exists column isn't present in the part then
ColumnPtr column;
if (num_rows)
column = DataTypeUInt8().createColumnConst(num_rows, 1)->convertToFullColumnIfConst();
else
column = DataTypeUInt8().createColumn();
inserter.insertUInt8Column(column, virtual_column_name);
}
}
}
///////////////////////////
/// Only add const virtual columns because non-const ones have already been added
injectPartConstVirtualColumns(num_rows, inserter, task, partition_value_type, virtual_columns);
chunk.setColumns(columns, num_rows);
injectNonConstVirtualColumns(row_count, inserter, virtual_columns);
injectPartConstVirtualColumns(row_count, inserter, task, partition_value_type, virtual_columns);
}
Block MergeTreeBaseSelectProcessor::transformHeader(
@ -676,7 +616,7 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
}
}
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
injectVirtualColumns(block, 0, nullptr, partition_value_type, virtual_columns);
return block;
}

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Processors/ISource.h>
@ -57,6 +58,12 @@ public:
const Block & sample_block);
protected:
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
struct BlockAndRowCount
{
Block block;
size_t row_count = 0;
};
Chunk generate() final;
@ -74,22 +81,29 @@ protected:
/// Closes readers and unlock part locks
virtual void finish() = 0;
virtual Chunk readFromPart();
virtual BlockAndRowCount readFromPart();
Chunk readFromPartImpl();
BlockAndRowCount readFromPartImpl();
/// Two versions for header and chunk.
/// Used for filling header with no rows as well as block with data
static void
injectVirtualColumns(Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static void
injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
/// Sets up range readers corresponding to data readers
void initializeRangeReaders(MergeTreeReadTask & task);
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
PrewhereExprStep lwd_filter_step { nullptr, "__row_exists", true, true };
/// This step is added when the part has lightweight delete mask
const PrewhereExprStep lwd_filter_step { nullptr, "__row_exists", true, true };
PrewhereInfoPtr prewhere_info;
std::unique_ptr<PrewhereExprInfo> prewhere_actions;

View File

@ -1002,6 +1002,7 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
result.columns.emplace_back(std::move(column));
result.extra_columns_filled.push_back("_part_offset");
}
/// Fill deleted_row_mask column, referenced from fillPartOffsetColumn().
void MergeTreeRangeReader::fillDeletedRowMaskColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset)
{

View File

@ -161,9 +161,6 @@ public:
/// The number of bytes read from disk.
size_t numBytesRead() const { return num_bytes_read; }
/// Similar as filter that you need to apply to newly-read columns
ColumnPtr deleted_mask_filter_holder;
private:
/// Only MergeTreeRangeReader is supposed to access ReadResult internals.
friend class MergeTreeRangeReader;
@ -245,6 +242,9 @@ public:
std::map<const IColumn::Filter *, size_t> filter_bytes_map;
/// Similar as filter that you need to apply to newly-read columns
ColumnPtr deleted_mask_filter_holder;
Names extra_columns_filled;
};

View File

@ -44,9 +44,9 @@ catch (...)
throw;
}
Chunk MergeTreeReverseSelectProcessor::readFromPart()
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeReverseSelectProcessor::readFromPart()
{
Chunk res;
BlockAndRowCount res;
if (!chunks.empty())
{
@ -60,7 +60,7 @@ Chunk MergeTreeReverseSelectProcessor::readFromPart()
while (!task->isFinished())
{
Chunk chunk = readFromPartImpl();
auto chunk = readFromPartImpl();
chunks.push_back(std::move(chunk));
}

View File

@ -27,9 +27,9 @@ private:
bool getNewTaskImpl() override;
void finalizeNewTask() override {}
Chunk readFromPart() override;
BlockAndRowCount readFromPart() override;
Chunks chunks;
std::vector<BlockAndRowCount> chunks;
Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor");
};

View File

@ -52,7 +52,7 @@ void MergeTreeSelectProcessor::initializeReaders()
{
task_columns = getReadTaskColumns(
storage, storage_snapshot, data_part,
required_columns, non_const_virtual_column_names, prewhere_info, /*with_subcolumns=*/ true);
required_columns, virt_column_names, prewhere_info, /*with_subcolumns=*/ true);
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
@ -63,25 +63,8 @@ void MergeTreeSelectProcessor::initializeReaders()
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
pre_reader_for_step.clear();
if (!reader_settings.skip_deleted_mask && data_part->getColumns().contains("__row_exists"))
{
pre_reader_for_step.push_back(data_part->getReader({{"__row_exists", std::make_shared<DataTypeUInt8>()}}, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}));
}
if (prewhere_info)
{
for (const auto & pre_columns_for_step : task_columns.pre_columns)
{
pre_reader_for_step.push_back(data_part->getReader(pre_columns_for_step, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}));
}
}
initializeMergeTreeReadersForPart(data_part, task_columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, {}, {});
}

View File

@ -105,68 +105,24 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
const auto & metadata_snapshot = storage_snapshot->metadata;
IMergeTreeReader::ValueSizeMap value_size_map;
if (!reader)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
pre_reader_for_step.clear();
if (!reader_settings.skip_deleted_mask && task->data_part->getColumns().contains("__row_exists"))
{
pre_reader_for_step.push_back(task->data_part->getReader({{"__row_exists", std::make_shared<DataTypeUInt8>()}}, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
{
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback));
}
}
}
else
else if (part_name != last_readed_part_name)
{
/// in other case we can reuse readers, anyway they will be "seeked" to required mark
if (part_name != last_readed_part_name)
{
/// retain avg_value_size_hints
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
value_size_map = reader->getAvgValueSizeHints();
}
pre_reader_for_step.clear();
if (!reader_settings.skip_deleted_mask && task->data_part->getColumns().contains("__row_exists"))
{
pre_reader_for_step.push_back(task->data_part->getReader({{"__row_exists", std::make_shared<DataTypeUInt8>()}}, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
{
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback));
}
}
}
const bool init_new_readers = !reader || part_name != last_readed_part_name;
if (init_new_readers)
{
initializeMergeTreeReadersForPart(task->data_part, task->task_columns, metadata_snapshot,
task->mark_ranges, value_size_map, profile_callback);
}
last_readed_part_name = part_name;

View File

@ -170,6 +170,15 @@ getColumnsForNewDataPart(
NameToNameMap renamed_columns_to_from;
NameToNameMap renamed_columns_from_to;
ColumnsDescription part_columns(source_part->getColumns());
const auto all_virtual_columns = source_part->storage.getVirtuals();
/// Preserve virtual columns that have persisted values in the source_part
/// TODO: only allow LWD mask to be overriden!!!!!
for (const auto & virtual_column : all_virtual_columns)
{
if (part_columns.has(virtual_column.name) && !storage_columns.contains(virtual_column.name))
storage_columns.emplace_back(virtual_column);
}
/// All commands are validated in AlterCommand so we don't care about order
for (const auto & command : commands_for_removes)
@ -178,8 +187,11 @@ getColumnsForNewDataPart(
{
for (const auto & [column_name, _] : command.column_to_update_expression)
{
if (column_name == "__row_exists" && !storage_columns.contains(column_name))
storage_columns.emplace_back("__row_exists", std::make_shared<DataTypeUInt8>());
/// Allow to update and persist values of virtual column
/// TODO: only allow LWD mask to be overriden!!!!!
auto virtual_column = all_virtual_columns.tryGetByName(column_name);
if (virtual_column && !storage_columns.contains(column_name))
storage_columns.emplace_back(column_name, virtual_column->type);
}
}

View File

@ -10,6 +10,7 @@
#include <Common/OptimizedRegularExpression.h>
#include <Common/typeid_cast.h>
#include <Common/thread_local_rng.h>
#include "DataTypes/DataTypesNumber.h"
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
@ -677,6 +678,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (arg_num != arg_cnt)
throw Exception("Wrong number of engine arguments.", ErrorCodes::BAD_ARGUMENTS);
metadata.lightweight_delete_description.filter_column = { "__row_exists", std::make_shared<DataTypeUInt8>() };
if (replicated)
{
auto storage_policy = args.getContext()->getStoragePolicy(storage_settings->storage_policy);

View File

@ -38,6 +38,7 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata &
, sampling_key(other.sampling_key)
, column_ttls_by_name(other.column_ttls_by_name)
, table_ttl(other.table_ttl)
, lightweight_delete_description(other.lightweight_delete_description)
, settings_changes(other.settings_changes ? other.settings_changes->clone() : nullptr)
, select(other.select)
, comment(other.comment)
@ -63,6 +64,7 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo
sampling_key = other.sampling_key;
column_ttls_by_name = other.column_ttls_by_name;
table_ttl = other.table_ttl;
lightweight_delete_description = other.lightweight_delete_description;
if (other.settings_changes)
settings_changes = other.settings_changes->clone();
else

View File

@ -43,6 +43,8 @@ struct StorageInMemoryMetadata
TTLColumnsDescription column_ttls_by_name;
/// TTL expressions for table (Move and Rows)
TTLTableDescription table_ttl;
/// Lightweight delete filter column if the storage supports it.
LightweightDeleteDescription lightweight_delete_description;
/// SETTINGS expression. Supported for MergeTree, Buffer, Kafka, RabbitMQ.
ASTPtr settings_changes;
/// SELECT QUERY. Supported for MaterializedView and View (have to support LiveView).

View File

@ -6,6 +6,8 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/AggregateDescription.h>
#include <Storages/TTLMode.h>
#include "Core/NamesAndTypes.h"
#include "DataTypes/Serializations/ISerialization.h"
namespace DB
{
@ -127,4 +129,9 @@ struct TTLTableDescription
static TTLTableDescription parse(const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key);
};
struct LightweightDeleteDescription
{
NameAndTypePair filter_column;
};
}

View File

@ -13,3 +13,11 @@ Delete 3M more rows using light weight DELETE
Rows in parts 7000000
Count 4000000
First row 6000000 10
Do UPDATE mutation
Rows in parts 7000000
Count 4000000
First row 6000000 1
Force merge to cleanup deleted rows
Rows in parts 4000000
Count 4000000
First row 6000000 1

View File

@ -5,21 +5,20 @@ CREATE TABLE lwd_test (id UInt64 , value String) ENGINE MergeTree() ORDER BY id;
INSERT INTO lwd_test SELECT number, randomString(10) FROM system.numbers LIMIT 10000000;
SET mutations_sync = 1;
SET allow_experimental_lightweight_delete_with_row_exists = 1;
SET allow_experimental_lightweight_delete = 0;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Delete 3M rows using UPDATE __row_exists';
ALTER TABLE lwd_test UPDATE __row_exists = 0 WHERE id < 3000000;
--ALTER TABLE lwd_test UPDATE __row_exists = 0 WHERE id < 3000000;
DELETE FROM lwd_test WHERE id < 3000000;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
@ -27,21 +26,33 @@ SELECT 'Force merge to cleanup deleted rows';
OPTIMIZE TABLE lwd_test FINAL;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SET allow_experimental_lwd2 = 1;
SELECT 'Delete 3M more rows using light weight DELETE';
DELETE FROM lwd_test WHERE id < 6000000;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Do UPDATE mutation';
ALTER TABLE lwd_test UPDATE value = 'v' WHERE id % 2 == 0;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Force merge to cleanup deleted rows';
OPTIMIZE TABLE lwd_test FINAL;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test WHERE id >= 0;
SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
DROP TABLE lwd_test;