Merge pull request #42126 from ClickHouse/fix_invalid_number_of_rows_in_chunk_with_prewhere

Fixing "Invalid number of rows in Chunk" with lightweight deletes
This commit is contained in:
Alexander Gololobov 2022-12-29 21:46:12 +01:00 committed by GitHub
commit fbae502499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1031 additions and 321 deletions

View File

@ -68,12 +68,13 @@ IMergeTreeSelectAlgorithm::IMergeTreeSelectAlgorithm(
size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
injectNonConstVirtualColumns(0, header_without_const_virtual_columns, virt_column_names);
/// Reverse order is to minimize reallocations when removing columns from the block
for (size_t col_num = non_const_columns_offset; col_num < header_without_const_virtual_columns.columns(); ++col_num)
non_const_virtual_column_names.emplace_back(header_without_const_virtual_columns.getByPosition(col_num).name);
result_header = header_without_const_virtual_columns;
injectPartConstVirtualColumns(0, result_header, nullptr, partition_value_type, virt_column_names);
LOG_TEST(log, "PREWHERE actions: {}", (prewhere_actions ? prewhere_actions->dump() : std::string("<nullptr>")));
}

View File

@ -30,13 +30,17 @@ namespace ErrorCodes
}
static void filterColumns(Columns & columns, const IColumn::Filter & filter)
static void filterColumns(Columns & columns, const IColumn::Filter & filter, size_t filter_bytes)
{
for (auto & column : columns)
{
if (column)
{
column = column->filter(filter, -1);
if (column->size() != filter.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of column {} doesn't match size of filter {}",
column->size(), filter.size());
column = column->filter(filter, filter_bytes);
if (column->empty())
{
@ -47,13 +51,12 @@ static void filterColumns(Columns & columns, const IColumn::Filter & filter)
}
}
static void filterColumns(Columns & columns, const ColumnPtr & filter)
static void filterColumns(Columns & columns, const FilterWithCachedCount & filter)
{
ConstantFilterDescription const_descr(*filter);
if (const_descr.always_true)
if (filter.alwaysTrue())
return;
if (const_descr.always_false)
if (filter.alwaysFalse())
{
for (auto & col : columns)
if (col)
@ -62,8 +65,7 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter)
return;
}
FilterDescription descr(*filter);
filterColumns(columns, *descr.data);
filterColumns(columns, filter.getData(), filter.countBytesInFilter());
}
@ -320,11 +322,13 @@ void MergeTreeRangeReader::ReadResult::clear()
num_rows_to_skip_in_last_granule += rows_per_granule.back();
rows_per_granule.assign(rows_per_granule.size(), 0);
total_rows_per_granule = 0;
filter_holder = nullptr;
filter = nullptr;
final_filter = FilterWithCachedCount();
num_rows = 0;
columns.clear();
additional_columns.clear();
}
void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns)
void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns, const NumRows & rows_per_granule_previous) const
{
for (auto & column : old_columns)
{
@ -337,9 +341,12 @@ void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns)
continue;
}
LOG_TEST(log, "ReadResult::shrink() column size: {} total_rows_per_granule: {}",
column->size(), total_rows_per_granule);
auto new_column = column->cloneEmpty();
new_column->reserve(total_rows_per_granule);
for (size_t j = 0, pos = 0; j < rows_per_granule_original.size(); pos += rows_per_granule_original[j++])
for (size_t j = 0, pos = 0; j < rows_per_granule_previous.size(); pos += rows_per_granule_previous[j++])
{
if (rows_per_granule[j])
new_column->insertRangeFrom(*column, pos, rows_per_granule[j]);
@ -348,74 +355,265 @@ void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns)
}
}
/// The main invariant of the data in the read result is that he number of rows is
/// either equal to total_rows_per_granule (if filter has not been applied) or to the number of
/// 1s in the filter (if filter has been applied).
void MergeTreeRangeReader::ReadResult::checkInternalConsistency() const
{
/// Check that filter size matches number of rows that will be read.
if (final_filter.present() && final_filter.size() != total_rows_per_granule)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Final filter size {} doesn't match total_rows_per_granule {}",
final_filter.size(), total_rows_per_granule);
/// Check that num_rows is consistent with final_filter and rows_per_granule.
if (final_filter.present() && final_filter.countBytesInFilter() != num_rows && total_rows_per_granule != num_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Number of rows {} doesn't match neither filter 1s count {} nor total_rows_per_granule {}",
num_rows, final_filter.countBytesInFilter(), total_rows_per_granule);
/// Check that additional columns have the same number of rows as the main columns.
if (additional_columns && additional_columns.rows() != num_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Number of rows in additional columns {} is not equal to number of rows in result columns {}",
additional_columns.rows(), num_rows);
for (const auto & column : columns)
{
if (column)
chassert(column->size() == num_rows);
}
}
std::string MergeTreeRangeReader::ReadResult::dumpInfo() const
{
WriteBufferFromOwnString out;
out << "num_rows: " << num_rows
<< ", columns: " << columns.size()
<< ", total_rows_per_granule: " << total_rows_per_granule;
if (final_filter.present())
{
out << ", filter size: " << final_filter.size()
<< ", filter 1s: " << final_filter.countBytesInFilter();
}
else
{
out << ", no filter";
}
for (size_t ci = 0; ci < columns.size(); ++ci)
{
out << ", column[" << ci << "]: ";
if (!columns[ci])
out << " nullptr";
else
{
out << " " << columns[ci]->dumpStructure();
}
}
if (additional_columns)
{
out << ", additional_columns: " << additional_columns.dumpStructure();
}
return out.str();
}
static std::string dumpNames(const NamesAndTypesList & columns)
{
WriteBufferFromOwnString out;
for (auto it = columns.begin(); it != columns.end(); ++it)
{
if (it != columns.begin())
out << ", ";
out << it->name;
}
return out.str();
}
void MergeTreeRangeReader::ReadResult::setFilterConstTrue()
{
clearFilter();
filter_holder = DataTypeUInt8().createColumnConst(num_rows, 1u);
/// Remove the filter, so newly read columns will not be filtered.
final_filter = FilterWithCachedCount();
}
void MergeTreeRangeReader::ReadResult::setFilterConstFalse()
static ColumnPtr andFilters(ColumnPtr c1, ColumnPtr c2)
{
clearFilter();
columns.clear();
num_rows = 0;
if (c1->size() != c2->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of filters don't match: {} and {}",
c1->size(), c2->size());
// TODO: use proper vectorized implementation of AND?
auto res = ColumnUInt8::create(c1->size());
auto & res_data = res->getData();
const auto & c1_data = typeid_cast<const ColumnUInt8&>(*c1).getData();
const auto & c2_data = typeid_cast<const ColumnUInt8&>(*c2).getData();
const size_t size = c1->size();
const size_t step = 16;
size_t i = 0;
/// NOTE: '&&' must be used instead of '&' for 'AND' operation because UInt8 columns might contain any non-zero
/// value for true and we cannot bitwise AND them to get the correct result.
for (; i + step < size; i += step)
for (size_t j = 0; j < step; ++j)
res_data[i+j] = (c1_data[i+j] && c2_data[i+j]);
for (; i < size; ++i)
res_data[i] = (c1_data[i] && c2_data[i]);
return res;
}
void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granules, bool allow_filter_columns)
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second);
void MergeTreeRangeReader::ReadResult::applyFilter(const FilterWithCachedCount & filter)
{
if (total_rows_per_granule == 0 || filter == nullptr)
if (filter.size() != num_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filter size {} doesn't match number of rows {}",
filter.size(), num_rows);
LOG_TEST(log, "ReadResult::applyFilter() num_rows before: {}", num_rows);
filterColumns(columns, filter);
{
auto tmp_columns = additional_columns.getColumns();
filterColumns(tmp_columns, filter);
if (!tmp_columns.empty())
additional_columns.setColumns(tmp_columns);
else
additional_columns.clear();
}
num_rows = filter.countBytesInFilter();
LOG_TEST(log, "ReadResult::applyFilter() num_rows after: {}", num_rows);
}
void MergeTreeRangeReader::ReadResult::optimize(const FilterWithCachedCount & current_filter, bool can_read_incomplete_granules)
{
checkInternalConsistency();
/// Combine new filter with the previous one if it is present.
/// This filter has the size of total_rows_per granule. It is applied after reading contiguous chunks from
/// the start of each granule.
FilterWithCachedCount filter = current_filter;
if (final_filter.present())
{
/// If current filter has the same size as the final filter, it means that the final filter has not been applied.
/// In this case we AND current filter with the existing final filter.
/// In other case, when the final filter has been applied, the size of current step filter will be equal to number of ones
/// in the final filter. In this case we combine current filter with the final filter.
ColumnPtr combined_filter;
if (current_filter.size() == final_filter.size())
combined_filter = andFilters(final_filter.getColumn(), current_filter.getColumn());
else
combined_filter = combineFilters(final_filter.getColumn(), current_filter.getColumn());
filter = FilterWithCachedCount(combined_filter);
}
if (total_rows_per_granule == 0 || !filter.present())
return;
NumRows zero_tails;
auto total_zero_rows_in_tails = countZeroTails(filter->getData(), zero_tails, can_read_incomplete_granules);
auto total_zero_rows_in_tails = countZeroTails(filter.getData(), zero_tails, can_read_incomplete_granules);
if (total_zero_rows_in_tails == filter->size())
LOG_TEST(log, "ReadResult::optimize() before: {}", dumpInfo());
SCOPE_EXIT(checkInternalConsistency());
SCOPE_EXIT({
LOG_TEST(log, "ReadResult::optimize() after: {}", dumpInfo());
});
if (total_zero_rows_in_tails == filter.size())
{
LOG_TEST(log, "ReadResult::optimize() combined filter is const False");
clear();
return;
}
else if (total_zero_rows_in_tails == 0 && countBytesInResultFilter(filter->getData()) == filter->size())
else if (total_zero_rows_in_tails == 0 && filter.countBytesInFilter() == filter.size())
{
LOG_TEST(log, "ReadResult::optimize() combined filter is const True");
setFilterConstTrue();
return;
}
/// Just a guess. If only a few rows may be skipped, it's better not to skip at all.
else if (2 * total_zero_rows_in_tails > filter->size())
else if (2 * total_zero_rows_in_tails > filter.size())
{
const NumRows rows_per_granule_previous = rows_per_granule;
const size_t total_rows_per_granule_previous = total_rows_per_granule;
for (auto i : collections::range(0, rows_per_granule.size()))
{
rows_per_granule_original.push_back(rows_per_granule[i]);
rows_per_granule[i] -= zero_tails[i];
}
num_rows_to_skip_in_last_granule += rows_per_granule_original.back() - rows_per_granule.back();
num_rows_to_skip_in_last_granule += rows_per_granule_previous.back() - rows_per_granule.back();
total_rows_per_granule = total_rows_per_granule_previous - total_zero_rows_in_tails;
filter_original = filter;
filter_holder_original = std::move(filter_holder);
/// Check if const 1 after shrink
if (allow_filter_columns && countBytesInResultFilter(filter->getData()) + total_zero_rows_in_tails == total_rows_per_granule)
/// Check if const 1 after shrink.
/// We can apply shrink only if after the previous step the number of rows in the result
/// matches the rows_per_granule info. Otherwise we will not be able to match newly added zeros in granule tails.
if (num_rows == total_rows_per_granule_previous &&
filter.countBytesInFilter() + total_zero_rows_in_tails == total_rows_per_granule_previous) /// All zeros are in tails?
{
total_rows_per_granule = total_rows_per_granule - total_zero_rows_in_tails;
num_rows = total_rows_per_granule;
setFilterConstTrue();
shrink(columns); /// shrink acts as filtering in such case
/// If all zeros are in granule tails, we can use shrink to filter out rows.
shrink(columns, rows_per_granule_previous); /// shrink acts as filtering in such case
auto c = additional_columns.getColumns();
shrink(c, rows_per_granule_previous);
additional_columns.setColumns(c);
num_rows = total_rows_per_granule;
LOG_TEST(log, "ReadResult::optimize() after shrink {}", dumpInfo());
}
else
{
auto new_filter = ColumnUInt8::create(filter->size() - total_zero_rows_in_tails);
auto new_filter = ColumnUInt8::create(filter.size() - total_zero_rows_in_tails);
IColumn::Filter & new_data = new_filter->getData();
collapseZeroTails(filter->getData(), new_data);
total_rows_per_granule = new_filter->size();
num_rows = total_rows_per_granule;
filter = new_filter.get();
filter_holder = std::move(new_filter);
/// Shorten the filter by removing zeros from granule tails
collapseZeroTails(filter.getData(), rows_per_granule_previous, new_data);
if (total_rows_per_granule != new_filter->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "New filter size {} doesn't match number of rows to be read {}",
new_filter->size(), total_rows_per_granule);
/// Need to apply combined filter here before replacing it with shortened one because otherwise
/// the filter size will not match the number of rows in the result columns.
if (num_rows == total_rows_per_granule_previous)
{
/// Filter from the previous steps has not been applied yet, do it now.
applyFilter(filter);
}
else
{
/// Filter was applied before, so apply only new filter from the current step.
applyFilter(current_filter);
}
final_filter = FilterWithCachedCount(new_filter->getPtr());
if (num_rows != final_filter.countBytesInFilter())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Count of 1s in final filter {} doesn't match number of rows {}",
final_filter.countBytesInFilter(), num_rows);
LOG_TEST(log, "ReadResult::optimize() after colapseZeroTails {}", dumpInfo());
}
need_filter = true;
}
/// Another guess, if it's worth filtering at PREWHERE
else if (countBytesInResultFilter(filter->getData()) < 0.6 * filter->size())
need_filter = true;
else
{
/// Check if we have rows already filtered at the previous step. In such case we must apply the filter because
/// otherwise num_rows doesn't match total_rows_per_granule and the next read step will not know how to filter
/// newly read columns to match the num_rows.
if (num_rows != total_rows_per_granule)
{
applyFilter(current_filter);
}
/// Another guess, if it's worth filtering at PREWHERE
else if (filter.countBytesInFilter() < 0.6 * filter.size())
{
applyFilter(filter);
}
final_filter = std::move(filter);
}
}
size_t MergeTreeRangeReader::ReadResult::countZeroTails(const IColumn::Filter & filter_vec, NumRows & zero_tails, bool can_read_incomplete_granules) const
@ -441,7 +639,7 @@ size_t MergeTreeRangeReader::ReadResult::countZeroTails(const IColumn::Filter &
return total_zero_rows_in_tails;
}
void MergeTreeRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter_vec, IColumn::Filter & new_filter_vec)
void MergeTreeRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter_vec, const NumRows & rows_per_granule_previous, IColumn::Filter & new_filter_vec) const
{
const auto * filter_data = filter_vec.data();
auto * new_filter_data = new_filter_vec.data();
@ -449,7 +647,7 @@ void MergeTreeRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter &
for (auto i : collections::range(0, rows_per_granule.size()))
{
memcpySmallAllowReadWriteOverflow15(new_filter_data, filter_data, rows_per_granule[i]);
filter_data += rows_per_granule_original[i];
filter_data += rows_per_granule_previous[i];
new_filter_data += rows_per_granule[i];
}
@ -597,54 +795,6 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con
return count;
}
/// Filter size must match total_rows_per_granule
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
{
if (!new_filter && filter)
throw Exception("Can't replace existing filter with empty.", ErrorCodes::LOGICAL_ERROR);
if (filter)
{
size_t new_size = new_filter->size();
if (new_size != total_rows_per_granule)
throw Exception("Can't set filter because it's size is " + toString(new_size) + " but "
+ toString(total_rows_per_granule) + " rows was read.", ErrorCodes::LOGICAL_ERROR);
}
ConstantFilterDescription const_description(*new_filter);
if (const_description.always_true)
{
setFilterConstTrue();
}
else if (const_description.always_false)
{
clear();
}
else
{
FilterDescription filter_description(*new_filter);
filter_holder = filter_description.data_holder ? filter_description.data_holder : new_filter;
filter = typeid_cast<const ColumnUInt8 *>(filter_holder.get());
if (!filter)
throw Exception("setFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
}
}
size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn::Filter & filter_)
{
auto it = filter_bytes_map.find(&filter_);
if (it == filter_bytes_map.end())
{
auto bytes = countBytesInFilter(filter_);
filter_bytes_map[&filter_] = bytes;
return bytes;
}
else
return it->second;
}
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
@ -659,30 +809,37 @@ MergeTreeRangeReader::MergeTreeRangeReader(
, is_initialized(true)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
result_sample_block = prev_reader->getSampleBlock();
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
{
read_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
result_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
}
for (const auto & column_name : non_const_virtual_column_names_)
{
if (sample_block.has(column_name))
if (result_sample_block.has(column_name))
continue;
non_const_virtual_column_names.push_back(column_name);
if (column_name == "_part_offset")
sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
if (column_name == "_part_offset" && !prev_reader)
{
/// _part_offset column is filled by the first reader.
read_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
result_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
}
}
if (prewhere_info)
{
const auto & step = *prewhere_info;
if (step.actions)
step.actions->execute(sample_block, true);
step.actions->execute(result_sample_block, true);
if (step.remove_column)
sample_block.erase(step.column_name);
result_sample_block.erase(step.column_name);
}
}
@ -765,7 +922,12 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (max_rows == 0)
throw Exception("Expected at least 1 row to read, got 0.", ErrorCodes::LOGICAL_ERROR);
ReadResult read_result;
ReadResult read_result(log);
SCOPE_EXIT({
LOG_TEST(log, "read() returned {}, sample block {}",
read_result.dumpInfo(), this->result_sample_block.dumpNames());
});
if (prev_reader)
{
@ -778,69 +940,52 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (read_result.num_rows == 0)
return read_result;
bool has_columns = false;
/// Calculate and update read bytes
size_t total_bytes = 0;
for (auto & column : columns)
{
if (column)
{
total_bytes += column->byteSize();
has_columns = true;
}
}
read_result.addNumBytesRead(total_bytes);
bool should_evaluate_missing_defaults = false;
if (has_columns)
{
/// num_read_rows >= read_result.num_rows
/// We must filter block before adding columns to read_result.block
/// Fill missing columns before filtering because some arrays from Nested may have empty data.
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, num_read_rows);
if (read_result.getFilter())
filterColumns(columns, read_result.getFilter()->getData());
}
else
{
size_t num_rows = read_result.num_rows;
/// If block is empty, we still may need to add missing columns.
/// In that case use number of rows in result block and don't filter block.
if (num_rows)
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, num_rows);
}
if (!columns.empty())
{
/// If all requested columns are absent in part num_read_rows will be 0.
/// In this case we need to use number of rows in the result to fill the default values and don't filter block.
if (num_read_rows == 0)
num_read_rows = read_result.num_rows;
/// fillMissingColumns() must be called after reading but befoe any filterings because
/// some columns (e.g. arrays) might be only partially filled and thus not be valid and
/// fillMissingColumns() fixes this.
bool should_evaluate_missing_defaults;
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults,
num_read_rows);
if (read_result.total_rows_per_granule == num_read_rows && read_result.num_rows != num_read_rows)
{
/// We have filter applied from the previous step
/// So we need to apply it to the newly read rows
if (!read_result.final_filter.present() || read_result.final_filter.countBytesInFilter() != read_result.num_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Final filter is missing or has mistaching size, read_result: {}",
read_result.dumpInfo());
filterColumns(columns, read_result.final_filter);
}
/// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults)
{
auto block = prev_reader->sample_block.cloneWithColumns(read_result.columns);
auto block_before_prewhere = read_result.block_before_prewhere;
for (const auto & column : block)
{
if (block_before_prewhere.has(column.name))
block_before_prewhere.erase(column.name);
}
Block additional_columns = prev_reader->getSampleBlock().cloneWithColumns(read_result.columns);
for (const auto & col : read_result.additional_columns)
additional_columns.insert(col);
if (block_before_prewhere)
{
if (read_result.need_filter)
{
auto old_columns = block_before_prewhere.getColumns();
filterColumns(old_columns, read_result.getFilterOriginal()->getData());
block_before_prewhere.setColumns(old_columns);
}
for (auto & column : block_before_prewhere)
block.insert(std::move(column));
}
merge_tree_reader->evaluateMissingDefaults(block, columns);
merge_tree_reader->evaluateMissingDefaults(additional_columns, columns);
}
/// If columns not empty, then apply on-fly alter conversions if any required
merge_tree_reader->performRequiredConversions(columns);
}
@ -854,11 +999,15 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
read_result = startReadingChain(max_rows, ranges);
read_result.num_rows = read_result.numReadRows();
if (read_result.num_rows)
LOG_TEST(log, "First reader returned: {}, requested columns: {}",
read_result.dumpInfo(), dumpNames(merge_tree_reader->getColumns()));
if (read_result.num_rows == 0)
return read_result;
{
/// Physical columns go first and then some virtual columns follow
/// TODO: is there a better way to account for virtual columns that were filled by previous readers?
size_t physical_columns_count = read_result.columns.size() - read_result.extra_columns_filled.size();
size_t physical_columns_count = merge_tree_reader->getColumns().size();
Columns physical_columns(read_result.columns.begin(), read_result.columns.begin() + physical_columns_count);
bool should_evaluate_missing_defaults;
@ -875,8 +1024,6 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
for (size_t i = 0; i < physical_columns.size(); ++i)
read_result.columns[i] = std::move(physical_columns[i]);
}
else
read_result.columns.clear();
size_t total_bytes = 0;
for (auto & column : read_result.columns)
@ -885,18 +1032,35 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
read_result.addNumBytesRead(total_bytes);
}
if (read_result.num_rows == 0)
return read_result;
executePrewhereActionsAndFilterColumns(read_result);
read_result.checkInternalConsistency();
if (!read_result.can_return_prewhere_column_without_filtering)
{
if (!read_result.filterWasApplied())
{
/// TODO: another solution might be to set all 0s from final filter into the prewhere column and not filter all the columns here
/// but rely on filtering in WHERE.
read_result.applyFilter(read_result.final_filter);
read_result.checkInternalConsistency();
}
read_result.can_return_prewhere_column_without_filtering = true;
}
if (read_result.num_rows != 0 && read_result.columns.size() != getSampleBlock().columns())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of columns in result doesn't match number of columns in sample block, read_result: {}, sample block: {}",
read_result.dumpInfo(), getSampleBlock().dumpStructure());
return read_result;
}
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t max_rows, MarkRanges & ranges)
{
ReadResult result;
ReadResult result(log);
result.columns.resize(merge_tree_reader->getColumns().size());
size_t current_task_last_mark = getLastMark(ranges);
@ -946,14 +1110,11 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
result.addRows(stream.finalize(result.columns));
/// Last granule may be incomplete.
if (!result.rowsPerGranule().empty())
if (!result.rows_per_granule.empty())
result.adjustLastGranule();
for (const auto & column_name : non_const_virtual_column_names)
{
if (column_name == "_part_offset")
fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
}
if (read_sample_block.has("_part_offset"))
fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
return result;
}
@ -968,11 +1129,13 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
UInt64 * pos = vec.data();
UInt64 * end = &vec[num_rows];
/// Fill the reamining part of the previous range (it was started in the previous read request).
while (pos < end && leading_begin_part_offset < leading_end_part_offset)
*pos++ = leading_begin_part_offset++;
const auto start_ranges = result.startedRanges();
const auto & start_ranges = result.started_ranges;
/// Fill the ranges which were started in the current read request.
for (const auto & start_range : start_ranges)
{
UInt64 start_part_offset = index_granularity->getMarkStartingRow(start_range.range.begin);
@ -983,7 +1146,6 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
}
result.columns.emplace_back(std::move(column));
result.extra_columns_filled.push_back("_part_offset");
}
Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
@ -995,7 +1157,7 @@ Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, si
if (merge_tree_reader->getColumns().empty())
return columns;
if (result.rowsPerGranule().empty())
if (result.rows_per_granule.empty())
{
/// If zero rows were read on prev step, than there is no more rows to read.
/// Last granule may have less rows than index_granularity, so finish reading manually.
@ -1005,8 +1167,8 @@ Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, si
columns.resize(merge_tree_reader->numColumnsInResult());
const auto & rows_per_granule = result.rowsPerGranule();
const auto & started_ranges = result.startedRanges();
const auto & rows_per_granule = result.rows_per_granule;
const auto & started_ranges = result.started_ranges;
size_t current_task_last_mark = ReadResult::getLastMark(started_ranges);
size_t next_range_to_start = 0;
@ -1027,13 +1189,13 @@ Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, si
num_rows += stream.read(columns, rows_per_granule[i], !last);
}
stream.skip(result.numRowsToSkipInLastGranule());
stream.skip(result.num_rows_to_skip_in_last_granule);
num_rows += stream.finalize(columns);
/// added_rows may be zero if all columns were read in prewhere and it's ok.
if (num_rows && num_rows != result.totalRowsPerGranule())
if (num_rows && num_rows != result.total_rows_per_granule)
throw Exception("RangeReader read " + toString(num_rows) + " rows, but "
+ toString(result.totalRowsPerGranule()) + " expected.", ErrorCodes::LOGICAL_ERROR);
+ toString(result.total_rows_per_granule) + " expected.", ErrorCodes::LOGICAL_ERROR);
return columns;
}
@ -1047,7 +1209,7 @@ static void checkCombinedFiltersSize(size_t bytes_in_first_filter, size_t second
}
/// Second filter size must be equal to number of 1s in the first filter.
/// The result size is equal to first filter size.
/// The result has size equal to first filter size and contains 1s only where both filters contain 1s.
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
{
ConstantFilterDescription first_const_descr(*first);
@ -1100,23 +1262,22 @@ static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
return mut_first;
}
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result) const
{
result.checkInternalConsistency();
if (!prewhere_info)
return;
const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size();
const auto & header = read_sample_block;
size_t num_columns = header.columns();
/// Check that we have columns from previous steps and newly read required columns
if (result.columns.size() < num_columns + result.extra_columns_filled.size())
if (result.columns.size() < num_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
/// This filter has the size of total_rows_per granule. It is applied after reading contiguous chunks from
/// the start of each granule.
ColumnPtr combined_filter;
/// Filter computed at the current step. Its size is equal to num_rows which is <= total_rows_per_granule
ColumnPtr current_step_filter;
size_t prewhere_column_pos;
@ -1138,35 +1299,28 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
for (const auto & column_name : non_const_virtual_column_names)
{
if (block.has(column_name))
continue;
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
Block additional_columns = block;
if (column_name == "_part_offset")
{
if (pos >= result.columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
if (prewhere_info->actions)
prewhere_info->actions->execute(block);
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
}
else if (column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
result.additional_columns.clear();
/// Additional columns might only be needed if there are more steps in the chain.
if (!last_reader_in_chain)
{
/// Do nothing, it will be added later
for (auto & col : additional_columns)
{
/// Exclude columns that are present in the result block to avoid storing them and filtering twice.
/// TODO: also need to exclude the columns that are not needed for the next steps.
if (block.has(col.name))
continue;
result.additional_columns.insert(col);
}
}
else
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
++pos;
}
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
if (prewhere_info->actions)
prewhere_info->actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_info->column_name);
result.columns.clear();
@ -1174,90 +1328,38 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
current_step_filter.swap(result.columns[prewhere_column_pos]);
combined_filter = current_step_filter;
current_step_filter = result.columns[prewhere_column_pos];
}
if (result.getFilter())
{
ColumnPtr prev_filter = result.getFilterHolder();
combined_filter = combineFilters(prev_filter, std::move(combined_filter));
}
result.setFilter(combined_filter);
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
if (!last_reader_in_chain)
result.optimize(merge_tree_reader->canReadIncompleteGranules(), true);
/// If we read nothing or filter gets optimized to nothing
if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse();
/// If we need to filter in PREWHERE
else if (prewhere_info->need_filter || result.need_filter)
{
/// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain)
{
const auto * result_filter = result.getFilter();
/// optimize is not called, need to check const 1 and const 0
size_t bytes_in_filter = result.countBytesInResultFilter(result_filter->getData());
if (bytes_in_filter == 0)
result.setFilterConstFalse();
else if (bytes_in_filter == result.num_rows)
result.setFilterConstTrue();
}
/// If there is still a filter, do the filtering now
if (result.getFilter())
{
/// filter might be shrunk while columns not
const auto * result_filter = result.getFilterOriginal();
filterColumns(result.columns, current_step_filter);
result.need_filter = true;
bool has_column = false;
for (auto & column : result.columns)
{
if (column)
{
has_column = true;
result.num_rows = column->size();
break;
}
}
/// There is only one filter column. Record the actual number
if (!has_column)
result.num_rows = result.countBytesInResultFilter(result_filter->getData());
}
/// Check if the PREWHERE column is needed
if (!result.columns.empty())
{
if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
result.columns[prewhere_column_pos] =
getSampleBlock().getByName(prewhere_info->column_name).type->
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
}
}
/// Filter in WHERE instead
if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
{
if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
{
auto type = getSampleBlock().getByName(prewhere_info->column_name).type;
ColumnWithTypeAndName col(result.getFilterHolder()->convertToFullColumnIfConst(), std::make_shared<DataTypeUInt8>(), "");
result.columns[prewhere_column_pos] = castColumn(col, type);
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
}
/// In case when we are not removing prewhere column the caller expects it to serve as a final filter:
/// it must contain 0s not only from the current step but also from all the previous steps.
/// One way to achieve this is to apply the final_filter if we know that the final _filter was not applied at
/// several previous steps but was accumulated instead.
result.can_return_prewhere_column_without_filtering =
(!result.final_filter.present() || result.final_filter.countBytesInFilter() == result.num_rows);
}
FilterWithCachedCount current_filter(current_step_filter);
result.optimize(current_filter, merge_tree_reader->canReadIncompleteGranules());
if (prewhere_info->need_filter && !result.filterWasApplied())
{
/// Depending on whether the final filter was applied at the previous step or not we need to apply either
/// just the current step filter or the accumulated filter.
FilterWithCachedCount filter_to_apply =
current_filter.size() == result.total_rows_per_granule ?
result.final_filter :
current_filter;
result.applyFilter(filter_to_apply);
}
LOG_TEST(log, "After execute prewhere {}", result.dumpInfo());
}
std::string PrewhereExprInfo::dump() const

View File

@ -1,6 +1,9 @@
#pragma once
#include <Core/Block.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/FilterDescription.h>
#include <Storages/MergeTree/MarkRange.h>
namespace DB
@ -34,6 +37,45 @@ struct PrewhereExprInfo
std::string dump() const;
};
class FilterWithCachedCount
{
ConstantFilterDescription const_description; /// TODO: ConstantFilterDescription only checks always true/false for const columns
/// think how to handle when the column in not const but has all 0s or all 1s
ColumnPtr column = nullptr;
const IColumn::Filter * data = nullptr;
mutable size_t cached_count_bytes = -1;
public:
explicit FilterWithCachedCount() = default;
explicit FilterWithCachedCount(const ColumnPtr & column_)
: const_description(*column_)
{
ColumnPtr col = column_->convertToFullIfNeeded();
FilterDescription desc(*col);
column = desc.data_holder ? desc.data_holder : col;
data = desc.data;
}
bool present() const { return !!column; }
bool alwaysTrue() const { return const_description.always_true; }
bool alwaysFalse() const { return const_description.always_false; }
ColumnPtr getColumn() const { return column; }
const IColumn::Filter & getData() const { return *data; }
size_t size() const { return column->size(); }
size_t countBytesInFilter() const
{
if (cached_count_bytes == size_t(-1))
cached_count_bytes = DB::countBytesInFilter(*data);
return cached_count_bytes;
}
};
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
@ -174,53 +216,46 @@ public:
using RangesInfo = std::vector<RangeInfo>;
const RangesInfo & startedRanges() const { return started_ranges; }
const NumRows & rowsPerGranule() const { return rows_per_granule; }
explicit ReadResult(Poco::Logger * log_) : log(log_) {}
static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges);
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
size_t totalRowsPerGranule() const { return total_rows_per_granule; }
size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
/// Filter you need to apply to newly-read columns in order to add them to block.
const ColumnUInt8 * getFilterOriginal() const { return filter_original ? filter_original : filter; }
const ColumnUInt8 * getFilter() const { return filter; }
ColumnPtr & getFilterHolder() { return filter_holder; }
void addGranule(size_t num_rows_);
void adjustLastGranule();
void addRows(size_t rows) { num_read_rows += rows; }
void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }
/// Set filter or replace old one. Filter must have more zeroes than previous.
void setFilter(const ColumnPtr & new_filter);
/// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
void optimize(bool can_read_incomplete_granules, bool allow_filter_columns);
/// Add current step filter to the result and then for each granule calculate the number of filtered rows at the end.
/// Remove them and update filter.
/// Apply the filter to the columns and update num_rows if required
void optimize(const FilterWithCachedCount & current_filter, bool can_read_incomplete_granules);
/// Remove all rows from granules.
void clear();
void clearFilter() { filter = nullptr; }
void setFilterConstTrue();
void setFilterConstFalse();
void addNumBytesRead(size_t count) { num_bytes_read += count; }
void shrink(Columns & old_columns);
/// Shrinks columns according to the diff between current and previous rows_per_granule.
void shrink(Columns & old_columns, const NumRows & rows_per_granule_previous) const;
size_t countBytesInResultFilter(const IColumn::Filter & filter);
/// Applies the filter to the columns and updates num_rows.
void applyFilter(const FilterWithCachedCount & filter);
/// If this flag is false than filtering form PREWHERE can be delayed and done in WHERE
/// to reduce memory copies and applying heavy filters multiple times
bool need_filter = false;
/// Verifies that columns and filter sizes match.
/// The checks might be non-trivial so it make sense to have the only in debug builds.
void checkInternalConsistency() const;
Block block_before_prewhere;
std::string dumpInfo() const;
/// Contains columns that are not included into result but might be needed for default values calculation.
Block additional_columns;
RangesInfo started_ranges;
/// The number of rows read from each granule.
/// Granule here is not number of rows between two marks
/// It's amount of rows per single reading act
NumRows rows_per_granule;
NumRows rows_per_granule_original;
/// Sum(rows_per_granule)
size_t total_rows_per_granule = 0;
/// The number of rows was read at first step. May be zero if no read columns present in part.
@ -229,29 +264,36 @@ public:
size_t num_rows_to_skip_in_last_granule = 0;
/// Without any filtration.
size_t num_bytes_read = 0;
/// nullptr if prev reader hasn't prewhere_actions. Otherwise filter.size() >= total_rows_per_granule.
ColumnPtr filter_holder;
ColumnPtr filter_holder_original;
const ColumnUInt8 * filter = nullptr;
const ColumnUInt8 * filter_original = nullptr;
void collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter);
/// This filter has the size of total_rows_per_granule. This means that it can be applied to newly read columns.
/// The result of applying this filter is that only rows that pass all previous filtering steps will remain.
FilterWithCachedCount final_filter;
/// This flag is true when prewhere column can be returned without filtering.
/// It's true when it contains 0s from all filtering steps (not just the step when it was calculated).
/// NOTE: If we accumulated the final_filter for several steps without applying it then prewhere column calculated at the last step
/// will not contain 0s from all previous steps.
bool can_return_prewhere_column_without_filtering = true;
/// Checks if result columns have current final_filter applied.
bool filterWasApplied() const { return !final_filter.present() || final_filter.countBytesInFilter() == num_rows; }
/// Builds updated filter by cutting zeros in granules tails
void collapseZeroTails(const IColumn::Filter & filter, const NumRows & rows_per_granule_previous, IColumn::Filter & new_filter) const;
size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails, bool can_read_incomplete_granules) const;
static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
std::map<const IColumn::Filter *, size_t> filter_bytes_map;
Names extra_columns_filled;
Poco::Logger * log;
};
ReadResult read(size_t max_rows, MarkRanges & ranges);
const Block & getSampleBlock() const { return sample_block; }
const Block & getSampleBlock() const { return result_sample_block; }
private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
void executePrewhereActionsAndFilterColumns(ReadResult & result) const;
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
IMergeTreeReader * merge_tree_reader = nullptr;
@ -261,11 +303,14 @@ private:
Stream stream;
Block sample_block;
Block read_sample_block; /// Block with columns that are actually read from disk + non-const virtual columns that are filled at this step.
Block result_sample_block; /// Block with columns that are returned by this step.
bool last_reader_in_chain = false;
bool is_initialized = false;
Names non_const_virtual_column_names;
Poco::Logger * log = &Poco::Logger::get("MergeTreeRangeReader");
};
}

View File

@ -3,11 +3,25 @@ create table `table_00609` (key UInt64, val UInt64) engine = MergeTree order by
insert into `table_00609` select number, number / 8192 from system.numbers limit 100000;
alter table `table_00609` add column def UInt64 default val + 1;
select * from `table_00609` prewhere val > 2 format Null;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=100;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=1000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=10000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=20000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=30000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=40000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=80000;
drop table if exists `table_00609`;
create table `table_00609` (key UInt64, val UInt64) engine = MergeTree order by key settings index_granularity=8192;
insert into `table_00609` select number, number / 8192 from system.numbers limit 100000;
alter table `table_00609` add column def UInt64;
select * from `table_00609` prewhere val > 2 format Null;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=100;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=1000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=10000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=20000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=30000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=40000;
select * from `table_00609` prewhere val > 2 format Null SETTINGS max_block_size=80000;
drop table if exists `table_00609`;

View File

@ -0,0 +1,3 @@
foo
foo
foo

View File

@ -0,0 +1,17 @@
drop table if exists t;
create table t (id UInt32, a Int) engine = MergeTree order by id settings min_bytes_for_wide_part=0;
insert into t values (1, 0) (2, 1) (3, 0) (4, 0) (5, 0);
alter table t add column s String default 'foo';
select s from t prewhere a = 1;
drop table t;
create table t (id UInt32, a Int) engine = MergeTree order by id settings min_bytes_for_wide_part=0;
insert into t values (1, 1) (2, 1) (3, 0) (4, 0) (5, 0);
alter table t add column s String default 'foo';
select s from t prewhere a = 1;
drop table t;

View File

@ -2,7 +2,12 @@
0
255
1 ['foo','bar'] 1 1
2 ['foo','bar'] 2 1
3 ['foo','bar'] 3 1
4 ['foo','bar'] 4 1
5 ['foo','bar'] 5 1
2 ['foo','bar'] 2 2
3 ['foo','bar'] 3 3
4 ['foo','bar'] 4 4
5 ['foo','bar'] 5 5
1 ['foo','bar'] 1 1
2 ['foo','bar'] 2 2
3 ['foo','bar'] 3 3
4 ['foo','bar'] 4 4
5 ['foo','bar'] 5 5

View File

@ -10,5 +10,6 @@ ENGINE = MergeTree ORDER BY u;
INSERT INTO t_filter SELECT toString(number), ['foo', 'bar'], number, toUInt8(number) FROM numbers(1000);
SELECT * FROM t_filter WHERE f LIMIT 5;
SELECT * FROM t_filter WHERE f != 0 LIMIT 5;
DROP TABLE IF EXISTS t_filter;

View File

@ -32,9 +32,27 @@
0 0 198401_1_1_1
1 1 198401_1_1_1
999998 999998 198401_1_1_1
0
1
2
0 foo
1 foo
2 foo
SOME GRANULES FILTERED OUT
335872 166463369216 166463369216
34464 1510321840 1510321840
301408 164953047376 164953047376
100000
100001
100002
100000 foo
100001 foo
100002 foo
PREWHERE
301408 164953047376 164953047376
42
10042
20042
42 foo
10042 foo
20042 foo

View File

@ -24,6 +24,8 @@ INSERT INTO t_1 select rowNumberInAllBlocks(), *, '1984-01-01' from t_random_1 l
OPTIMIZE TABLE t_1 FINAL;
ALTER TABLE t_1 ADD COLUMN foo String DEFAULT 'foo';
SELECT COUNT(DISTINCT(_part)) FROM t_1;
SELECT min(_part_offset), max(_part_offset) FROM t_1;
@ -37,13 +39,19 @@ SELECT order_0, _part_offset, _part FROM t_1 WHERE order_0 <= 1 OR (order_0 BETW
SELECT order_0, _part_offset, computed FROM t_1 ORDER BY order_0, _part_offset, computed LIMIT 3;
SELECT order_0, _part_offset, computed FROM t_1 ORDER BY order_0 DESC, _part_offset DESC, computed DESC LIMIT 3;
SELECT order_0, _part_offset, _part FROM t_1 WHERE order_0 <= 1 OR order_0 >= 999998 ORDER BY order_0 LIMIT 3;
SELECT _part_offset FROM t_1 ORDER BY order_0 LIMIT 3;
SELECT _part_offset, foo FROM t_1 ORDER BY order_0 LIMIT 3;
SELECT 'SOME GRANULES FILTERED OUT';
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 where granule == 0;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 where granule == 0 AND _part_offset < 100000;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 where granule == 0 AND _part_offset >= 100000;
SELECT _part_offset FROM t_1 where granule == 0 AND _part_offset >= 100000 ORDER BY order_0 LIMIT 3;
SELECT _part_offset, foo FROM t_1 where granule == 0 AND _part_offset >= 100000 ORDER BY order_0 LIMIT 3;
SELECT 'PREWHERE';
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere granule == 0 where _part_offset >= 100000;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part != '' where granule == 0; -- { serverError 10 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part_offset > 100000 where granule == 0; -- { serverError 10 }
SELECT _part_offset FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;
SELECT _part_offset, foo FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;

View File

@ -0,0 +1,9 @@
-- https://github.com/ClickHouse/ClickHouse/issues/40956#issuecomment-1262096612
DROP TABLE IF EXISTS row_level_policy_prewhere;
DROP ROW POLICY IF EXISTS row_level_policy_prewhere_policy0 ON row_level_policy_prewhere;
CREATE TABLE row_level_policy_prewhere (x Int16, y String) ENGINE = MergeTree ORDER BY x;
INSERT INTO row_level_policy_prewhere(y, x) VALUES ('A',1), ('B',2), ('C',3);
CREATE ROW POLICY row_level_policy_prewhere_policy0 ON row_level_policy_prewhere FOR SELECT USING x >= 0 TO default;
SELECT * FROM row_level_policy_prewhere PREWHERE y = 'foo';
DROP TABLE row_level_policy_prewhere;

View File

@ -0,0 +1,29 @@
{% for index_granularity in [999, 1000, 1001, 9999, 10000, 10001] -%}
-- { echoOn }
SELECT count() FROM url_na_log;
130000
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
130000
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
110000
-- Delete more than a half rows (60K) from the range 2022-08-10 .. 2022-08-20
-- There should be 50K rows remaining in this range
DELETE FROM url_na_log WHERE SiteId = 209 AND DateVisit >= '2022-08-13' AND DateVisit <= '2022-08-18';
SELECT count() FROM url_na_log;
70000
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
130000
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
50000
-- Hide more than a half of remaining rows (30K) from the range 2022-08-10 .. 2022-08-20 using row policy
-- Now the this range should have 20K rows left
CREATE ROW POLICY url_na_log_policy0 ON url_na_log FOR SELECT USING DateVisit < '2022-08-11' or DateVisit > '2022-08-19' TO default;
SELECT count() FROM url_na_log;
40000
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
130000
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
20000
DROP ROW POLICY url_na_log_policy0 ON url_na_log;
{% endfor -%}

View File

@ -0,0 +1,59 @@
{% for index_granularity in [999, 1000, 1001, 9999, 10000, 10001] %}
DROP TABLE IF EXISTS url_na_log;
CREATE TABLE url_na_log(SiteId UInt32, DateVisit Date, PRIMARY KEY (SiteId))
ENGINE = MergeTree()
ORDER BY (SiteId, DateVisit)
SETTINGS index_granularity = {{ index_granularity }}, min_bytes_for_wide_part = 0;
-- Insert some data to have 110K rows in the range 2022-08-10 .. 2022-08-20 and some more rows before and after that range
insert into url_na_log select 209, '2022-08-09' from numbers(10000);
insert into url_na_log select 209, '2022-08-10' from numbers(10000);
insert into url_na_log select 209, '2022-08-11' from numbers(10000);
insert into url_na_log select 209, '2022-08-12' from numbers(10000);
insert into url_na_log select 209, '2022-08-13' from numbers(10000);
insert into url_na_log select 209, '2022-08-14' from numbers(10000);
insert into url_na_log select 209, '2022-08-15' from numbers(10000);
insert into url_na_log select 209, '2022-08-16' from numbers(10000);
insert into url_na_log select 209, '2022-08-17' from numbers(10000);
insert into url_na_log select 209, '2022-08-18' from numbers(10000);
insert into url_na_log select 209, '2022-08-19' from numbers(10000);
insert into url_na_log select 209, '2022-08-20' from numbers(10000);
insert into url_na_log select 209, '2022-08-21' from numbers(10000);
SET mutations_sync=2;
SET allow_experimental_lightweight_delete=1;
OPTIMIZE TABLE url_na_log FINAL;
-- { echoOn }
SELECT count() FROM url_na_log;
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
-- Delete more than a half rows (60K) from the range 2022-08-10 .. 2022-08-20
-- There should be 50K rows remaining in this range
DELETE FROM url_na_log WHERE SiteId = 209 AND DateVisit >= '2022-08-13' AND DateVisit <= '2022-08-18';
SELECT count() FROM url_na_log;
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
-- Hide more than a half of remaining rows (30K) from the range 2022-08-10 .. 2022-08-20 using row policy
-- Now the this range should have 20K rows left
CREATE ROW POLICY url_na_log_policy0 ON url_na_log FOR SELECT USING DateVisit < '2022-08-11' or DateVisit > '2022-08-19' TO default;
SELECT count() FROM url_na_log;
SELECT rows FROM system.parts WHERE database = currentDatabase() AND table = 'url_na_log' AND active;
SELECT count() FROM url_na_log PREWHERE DateVisit >= '2022-08-10' AND DateVisit <= '2022-08-20' WHERE SiteId = 209 SETTINGS max_block_size = 200000, max_threads = 1;
DROP ROW POLICY url_na_log_policy0 ON url_na_log;
-- { echoOff }
{% endfor %}

View File

@ -0,0 +1,148 @@
#!/usr/bin/env python3
import requests
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from pure_http_client import ClickHouseClient
class Tester:
'''
- Creates test table
- Deletes the specified range of rows
- Masks another range using row-level policy
- Runs some read queries and checks that the results
'''
def __init__(self, session, url, index_granularity, total_rows):
self.session = session
self.url = url
self.index_granularity = index_granularity
self.total_rows = total_rows
self.reported_errors = set()
self.repro_queries = []
def report_error(self):
print('Repro steps:', '\n\n\t'.join(self.repro_queries))
exit(1)
def query(self, query_text, include_in_repro_steps = True, expected_data = None):
self.repro_queries.append(query_text)
resp = self.session.post(self.url, data=query_text)
if resp.status_code != 200:
# Group similar errors
error = resp.text[0:40]
if error not in self.reported_errors:
self.reported_errors.add(error)
print('Code:', resp.status_code)
print('Result:', resp.text)
self.report_error()
result = resp.text
# Check that the result is as expected
if ((not expected_data is None) and (int(result) != len(expected_data))):
print('Expected {} rows, got {}'.format(len(expected_data), result))
print('Expected data:' + str(expected_data))
self.report_error()
if not include_in_repro_steps:
self.repro_queries.pop()
def check_data(self, all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end):
all_data_after_delete = all_data[
~((all_data.a == 0) &
(all_data.b > delete_range_start) &
(all_data.b <= delete_range_end))]
all_data_after_row_policy = all_data_after_delete[
(all_data_after_delete.b <= row_level_policy_range_start) |
(all_data_after_delete.b > row_level_policy_range_end)]
for to_select in ['count()', 'sum(d)']: # Test reading with and without column with default value
self.query('SELECT {} FROM tab_02473;'.format(to_select), False, all_data_after_row_policy)
delta = 10
for query_range_start in [0, delta]:
for query_range_end in [self.total_rows - delta]: #, self.total_rows]:
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0) &
(all_data_after_row_policy.b > query_range_start) &
(all_data_after_row_policy.b <= query_range_end)]
self.query('SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} WHERE a == 0;'.format(
to_select, query_range_start, query_range_end), False, expected)
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0) &
(all_data_after_row_policy.c > query_range_start) &
(all_data_after_row_policy.c <= query_range_end)]
self.query('SELECT {} from tab_02473 PREWHERE c > {} AND c <= {} WHERE a == 0;'.format(
to_select, query_range_start, query_range_end), False, expected)
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0) &
((all_data_after_row_policy.c <= query_range_start) |
(all_data_after_row_policy.c > query_range_end))]
self.query('SELECT {} from tab_02473 PREWHERE c <= {} OR c > {} WHERE a == 0;'.format(
to_select, query_range_start, query_range_end), False, expected)
def run_test(self, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end):
self.repro_queries = []
self.query('''
CREATE TABLE tab_02473 (a Int8, b Int32, c Int32, PRIMARY KEY (a))
ENGINE = MergeTree() ORDER BY (a, b)
SETTINGS min_bytes_for_wide_part = 0, index_granularity = {};'''.format(self.index_granularity))
self.query('INSERT INTO tab_02473 select 0, number+1, number+1 FROM numbers({});'.format(self.total_rows))
client = ClickHouseClient()
all_data = client.query_return_df("SELECT a, b, c, 1 as d FROM tab_02473 FORMAT TabSeparatedWithNames;")
self.query('OPTIMIZE TABLE tab_02473 FINAL SETTINGS mutations_sync=2;')
# After all data has been written add a column with default value
self.query('ALTER TABLE tab_02473 ADD COLUMN d Int64 DEFAULT 1;')
self.check_data(all_data, -100, -100, -100, -100)
self.query('DELETE FROM tab_02473 WHERE a = 0 AND b > {} AND b <= {};'.format(
delete_range_start, delete_range_end))
self.check_data(all_data, delete_range_start, delete_range_end, -100, -100)
self.query('CREATE ROW POLICY policy_tab_02473 ON tab_02473 FOR SELECT USING b <= {} OR b > {} TO default;'.format(
row_level_policy_range_start, row_level_policy_range_end))
self.check_data(all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end)
self.query('DROP POLICY policy_tab_02473 ON tab_02473;')
self.query('DROP TABLE tab_02473;')
def main():
# Set mutations to synchronous mode and enable lightweight DELETE's
url = os.environ['CLICKHOUSE_URL'] + '&mutations_sync=2&allow_experimental_lightweight_delete=1&max_threads=1'
default_index_granularity = 10;
total_rows = 8 * default_index_granularity
step = default_index_granularity
session = requests.Session()
for index_granularity in [default_index_granularity-1, default_index_granularity]: # [default_index_granularity-1, default_index_granularity+1, default_index_granularity]:
tester = Tester(session, url, index_granularity, total_rows)
# Test combinations of ranges of various size masked by lightweight DELETES
# along with ranges of various size masked by row-level policies
for delete_range_start in range(0, total_rows, 3 * step):
for delete_range_end in range(delete_range_start + 3 * step, total_rows, 2 * step):
for row_level_policy_range_start in range(0, total_rows, 3 * step):
for row_level_policy_range_end in range(row_level_policy_range_start + 3 * step, total_rows, 2 * step):
tester.run_test(delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02473_multistep_prewhere.python

View File

@ -0,0 +1,76 @@
-- { echoOn }
CREATE TABLE test_filter(a Int32, b Int32, c Int32) ENGINE = MergeTree() ORDER BY a SETTINGS index_granularity = 3;
INSERT INTO test_filter SELECT number, number+1, (number/2 + 1) % 2 FROM numbers(15);
SELECT _part_offset, intDiv(_part_offset, 3) as granule, * FROM test_filter ORDER BY _part_offset;
0 0 0 1 1
1 0 1 2 1
2 0 2 3 0
3 1 3 4 0
4 1 4 5 1
5 1 5 6 1
6 2 6 7 0
7 2 7 8 0
8 2 8 9 1
9 3 9 10 1
10 3 10 11 0
11 3 11 12 0
12 4 12 13 1
13 4 13 14 1
14 4 14 15 0
-- Check that division by zero occurs on some rows
SELECT intDiv(b, c) FROM test_filter; -- { serverError ILLEGAL_DIVISION }
-- Filter out those rows using WHERE or PREWHERE
SELECT intDiv(b, c) FROM test_filter WHERE c != 0;
1
2
5
6
9
10
13
14
SELECT intDiv(b, c) FROM test_filter PREWHERE c != 0;
1
2
5
6
9
10
13
14
SELECT intDiv(b, c) FROM test_filter PREWHERE c != 0 WHERE b%2 != 0;
1
5
9
13
SET mutations_sync = 2, allow_experimental_lightweight_delete = 1;
-- Delete all rows where division by zero could occur
DELETE FROM test_filter WHERE c = 0;
-- Test that now division by zero doesn't occur without explicit condition
SELECT intDiv(b, c) FROM test_filter;
1
2
5
6
9
10
13
14
SELECT * FROM test_filter PREWHERE intDiv(b, c) > 0;
0 1 1
1 2 1
4 5 1
5 6 1
8 9 1
9 10 1
12 13 1
13 14 1
SELECT * FROM test_filter PREWHERE b != 0 WHERE intDiv(b, c) > 0;
0 1 1
1 2 1
4 5 1
5 6 1
8 9 1
9 10 1
12 13 1
13 14 1

View File

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS test_filter;
-- { echoOn }
CREATE TABLE test_filter(a Int32, b Int32, c Int32) ENGINE = MergeTree() ORDER BY a SETTINGS index_granularity = 3;
INSERT INTO test_filter SELECT number, number+1, (number/2 + 1) % 2 FROM numbers(15);
SELECT _part_offset, intDiv(_part_offset, 3) as granule, * FROM test_filter ORDER BY _part_offset;
-- Check that division by zero occurs on some rows
SELECT intDiv(b, c) FROM test_filter; -- { serverError ILLEGAL_DIVISION }
-- Filter out those rows using WHERE or PREWHERE
SELECT intDiv(b, c) FROM test_filter WHERE c != 0;
SELECT intDiv(b, c) FROM test_filter PREWHERE c != 0;
SELECT intDiv(b, c) FROM test_filter PREWHERE c != 0 WHERE b%2 != 0;
SET mutations_sync = 2, allow_experimental_lightweight_delete = 1;
-- Delete all rows where division by zero could occur
DELETE FROM test_filter WHERE c = 0;
-- Test that now division by zero doesn't occur without explicit condition
SELECT intDiv(b, c) FROM test_filter;
SELECT * FROM test_filter PREWHERE intDiv(b, c) > 0;
SELECT * FROM test_filter PREWHERE b != 0 WHERE intDiv(b, c) > 0;
-- { echoOff }
DROP TABLE test_filter;

View File

@ -0,0 +1,110 @@
-- { echoOn }
SELECT * FROM table_02513;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143011
143012
143013
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 WHERE n%11;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143012
143013
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 PREWHERE n%11;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143012
143013
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 WHERE n%11 AND n%13;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143012
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 PREWHERE n%11 WHERE n%13;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143012
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 WHERE n%143011; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SELECT * FROM table_02513 PREWHERE n%143011; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SELECT * FROM table_02513 WHERE n%143011 AND n%13;
143001
143002
143003
143004
143005
143006
143007
143008
143009
143012
143014
143015
143016
143017
143018
143019
SELECT * FROM table_02513 PREWHERE n%143011 WHERE n%13; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS table_02513;
CREATE TABLE table_02513 (n UInt64) ENGINE=MergeTree() ORDER BY tuple() SETTINGS index_granularity=100;
INSERT INTO table_02513 SELECT number+11*13*1000 FROM numbers(20);
SET allow_experimental_lightweight_delete=1;
SET mutations_sync=2;
SET max_threads=1;
DELETE FROM table_02513 WHERE n%10=0;
-- { echoOn }
SELECT * FROM table_02513;
SELECT * FROM table_02513 WHERE n%11;
SELECT * FROM table_02513 PREWHERE n%11;
SELECT * FROM table_02513 WHERE n%11 AND n%13;
SELECT * FROM table_02513 PREWHERE n%11 WHERE n%13;
SELECT * FROM table_02513 WHERE n%143011; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SELECT * FROM table_02513 PREWHERE n%143011; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SELECT * FROM table_02513 WHERE n%143011 AND n%13;
SELECT * FROM table_02513 PREWHERE n%143011 WHERE n%13; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
-- { echoOff }
DROP TABLE table_02513;