Merge branch 'master' into backport_mergetask

This commit is contained in:
Alexander Gololobov 2024-09-12 13:29:59 +02:00 committed by GitHub
commit e15b86cd9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 193 additions and 83 deletions

View File

@ -39,7 +39,7 @@ using Checksum = CityHash_v1_0_2::uint128;
/// Validate checksum of data, and if it mismatches, find out possible reason and throw exception.
static void validateChecksum(char * data, size_t size, const Checksum expected_checksum)
static void validateChecksum(char * data, size_t size, const Checksum expected_checksum, bool external_data)
{
auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size);
if (expected_checksum == calculated_checksum)
@ -64,6 +64,8 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
"this can be caused by disk bit rot. This exception protects ClickHouse "
"from data corruption due to hardware failures.";
int error_code = external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CHECKSUM_DOESNT_MATCH;
auto flip_bit = [](char * buf, size_t pos)
{
buf[pos / 8] ^= 1 << pos % 8;
@ -87,7 +89,7 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
{
message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". "
<< message_hardware_failure;
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
flip_bit(tmp_data, bit_pos); /// Restore
@ -102,10 +104,10 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
{
message << ". The mismatch is caused by single bit flip in checksum. "
<< message_hardware_failure;
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
static void readHeaderAndGetCodecAndSize(
@ -151,7 +153,7 @@ static void readHeaderAndGetCodecAndSize(
"Most likely corrupted data.", size_compressed_without_checksum);
if (size_compressed_without_checksum < header_size)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
"the compressed data size ({}, this should include header size) is less than the header size ({})",
size_compressed_without_checksum, static_cast<size_t>(header_size));
}
@ -202,7 +204,7 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
readBinaryLittleEndian(checksum.low64, checksum_in);
readBinaryLittleEndian(checksum.high64, checksum_in);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
@ -247,7 +249,7 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t &
readBinaryLittleEndian(checksum.low64, checksum_in);
readBinaryLittleEndian(checksum.high64, checksum_in);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
@ -307,7 +309,7 @@ void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_d
UInt8 header_size = ICompressionCodec::getHeaderSize();
if (size_compressed_without_checksum < header_size)
throw Exception(ErrorCodes::CORRUPTED_DATA,
throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA,
"Can't decompress data: the compressed data size ({}, this should include header size) is less than the header size ({})",
size_compressed_without_checksum, static_cast<size_t>(header_size));

View File

@ -1060,6 +1060,11 @@ namespace
void setNullTableEngine(ASTStorage & storage)
{
storage.forEachPointerToChild([](void ** ptr) mutable
{
*ptr = nullptr;
});
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
@ -1146,7 +1151,9 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
{
setNullTableEngine(*create.storage);
}
}
return;
}

View File

@ -12,12 +12,15 @@ namespace DB
struct Settings;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
struct ITemporaryFileLookup;
using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
struct BuildQueryPipelineSettings
{
ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
TemporaryFileLookupPtr temporary_file_lookup;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);

View File

@ -0,0 +1,31 @@
#pragma once
#include <base/types.h>
#include <boost/noncopyable.hpp>
#include <vector>
#include <memory>
namespace DB
{
class WriteBuffer;
class ReadBuffer;
/// Interface for accessing temporary files by some logical name (or id).
/// While building query pipeline processors can lookup temporary files by some id and use them for writing and/or reading temporary data
/// without knowing what exactly is behind the name: local file, memory buffer, object in cloud storage, etc.
struct ITemporaryFileLookup : boost::noncopyable
{
virtual ~ITemporaryFileLookup() = default;
/// Give the caller a temporary write buffer, but don't give away the ownership.
virtual WriteBuffer & getTemporaryFileForWriting(const String & file_id) = 0;
/// Give the caller a temporary read buffer, it exclusively belongs to the caller.
/// Other callers can get their own read buffer for the same temporary file.
virtual std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & file_id) = 0;
};
using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
}

View File

@ -183,13 +183,14 @@ void ColumnGathererStream::consume(Input & input, size_t source_num)
ColumnGathererTransform::ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
std::unique_ptr<ReadBuffer> row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_,
bool is_result_sparse_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_)
num_inputs, *row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_)
, row_sources_buf_holder(std::move(row_sources_buf_))
, log(getLogger("ColumnGathererStream"))
{
if (header.columns() != 1)

View File

@ -115,7 +115,7 @@ public:
ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
std::unique_ptr<ReadBuffer> row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_,
bool is_result_sparse_);
@ -124,6 +124,8 @@ public:
protected:
void onFinish() override;
std::unique_ptr<ReadBuffer> row_sources_buf_holder; /// Keep ownership of row_sources_buf while it's in use by ColumnGathererStream.
LoggerPtr log;
};

View File

@ -2133,7 +2133,7 @@ bool TCPHandler::receiveUnexpectedData(bool throw_exception)
std::shared_ptr<ReadBuffer> maybe_compressed_in;
if (last_block_in.compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY);
else
maybe_compressed_in = in;
@ -2157,7 +2157,7 @@ void TCPHandler::initBlockInput()
/// with another codec that the rest of the data. Example: data sent by Distributed tables.
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY);
else
state.maybe_compressed_in = in;

View File

@ -14,7 +14,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <IO/IReadableWriteBuffer.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
@ -43,6 +43,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/TemporaryFiles.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -90,6 +91,68 @@ static ColumnsStatistics getStatisticsForColumns(
return all_statistics;
}
/// Manages the "rows_sources" temporary file that is used during vertical merge.
class RowsSourcesTemporaryFile : public ITemporaryFileLookup
{
public:
/// A logical name of the temporary file under which it will be known to the plan steps that use it.
static constexpr auto FILE_ID = "rows_sources";
explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_)
: tmp_disk(std::make_unique<TemporaryDataOnDisk>(temporary_data_on_disk_))
, uncompressed_write_buffer(tmp_disk->createRawStream())
, tmp_file_name_on_disk(uncompressed_write_buffer->getFileName())
{
}
WriteBuffer & getTemporaryFileForWriting(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (write_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
write_buffer = (std::make_unique<CompressedWriteBuffer>(*uncompressed_write_buffer));
return *write_buffer;
}
std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (!finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file is not finalized yet");
/// tmp_disk might not create real file if no data was written to it.
if (final_size == 0)
return std::make_unique<ReadBufferFromEmptyFile>();
/// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning.
auto raw_file_read_buffer = std::make_unique<ReadBufferFromFile>(tmp_file_name_on_disk);
return std::make_unique<CompressedReadBufferFromFile>(std::move(raw_file_read_buffer));
}
/// Returns written data size in bytes
size_t finalizeWriting()
{
write_buffer->finalize();
uncompressed_write_buffer->finalize();
finalized = true;
final_size = write_buffer->count();
return final_size;
}
private:
std::unique_ptr<TemporaryDataOnDisk> tmp_disk;
std::unique_ptr<WriteBufferFromFileBase> uncompressed_write_buffer;
std::unique_ptr<WriteBuffer> write_buffer;
const String tmp_file_name_on_disk;
bool finalized = false;
size_t final_size = 0;
};
static void addMissedColumnsToSerializationInfos(
size_t num_rows_in_parts,
const Names & part_columns,
@ -365,8 +428,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);
ctx->tmp_disk = std::make_unique<TemporaryDataOnDisk>(global_ctx->context->getTempDataOnDisk());
switch (global_ctx->chosen_merge_algorithm)
{
case MergeAlgorithm::Horizontal:
@ -379,8 +440,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
case MergeAlgorithm::Vertical:
{
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);
ctx->rows_sources_temporary_file = std::make_shared<RowsSourcesTemporaryFile>(global_ctx->context->getTempDataOnDisk());
std::map<String, UInt64> local_merged_column_to_size;
for (const auto & part : global_ctx->future_part->parts)
@ -500,11 +560,9 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
new_ctx->rows_sources_uncompressed_write_buf = std::move(ctx->rows_sources_uncompressed_write_buf);
new_ctx->rows_sources_temporary_file = std::move(ctx->rows_sources_temporary_file);
new_ctx->column_sizes = std::move(ctx->column_sizes);
new_ctx->compression_codec = std::move(ctx->compression_codec);
new_ctx->tmp_disk = std::move(ctx->tmp_disk);
new_ctx->it_name_and_type = std::move(ctx->it_name_and_type);
new_ctx->read_with_direct_io = std::move(ctx->read_with_direct_io);
new_ctx->need_sync = std::move(ctx->need_sync);
@ -761,11 +819,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
/// Ensure data has written to disk.
ctx->rows_sources_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
size_t rows_sources_count = ctx->rows_sources_write_buf->count();
size_t rows_sources_count = ctx->rows_sources_temporary_file->finalizeWriting();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
@ -776,29 +830,6 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
/// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer
/// and we expect to get ReadBufferFromFile here.
/// So, it's relatively safe to use dynamic_cast here and downcast to ReadBufferFromFile.
auto * wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(ctx->rows_sources_uncompressed_write_buf.get());
std::unique_ptr<ReadBuffer> reread_buf = wbuf_readable ? wbuf_readable->tryGetReadBuffer() : nullptr;
if (!reread_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot read temporary file {}", ctx->rows_sources_uncompressed_write_buf->getFileName());
auto * reread_buffer_raw = dynamic_cast<ReadBufferFromFileBase *>(reread_buf.get());
if (!reread_buffer_raw)
{
const auto & reread_buf_ref = *reread_buf;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ReadBufferFromFileBase, but got {}", demangle(typeid(reread_buf_ref).name()));
}
/// Move ownership from std::unique_ptr<ReadBuffer> to std::unique_ptr<ReadBufferFromFile> for CompressedReadBufferFromFile.
/// First, release ownership from unique_ptr to base type.
reread_buf.release(); /// NOLINT(bugprone-unused-return-value,hicpp-ignored-remove-result): we already have the pointer value in `reread_buffer_raw`
/// Then, move ownership to unique_ptr to concrete type.
std::unique_ptr<ReadBufferFromFileBase> reread_buffer_from_file(reread_buffer_raw);
/// CompressedReadBufferFromFile expects std::unique_ptr<ReadBufferFromFile> as argument.
ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(std::move(reread_buffer_from_file));
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
const auto & settings = global_ctx->context->getSettingsRef();
@ -829,12 +860,12 @@ class ColumnGathererStep : public ITransformingStep
public:
ColumnGathererStep(
const DataStream & input_stream_,
CompressedReadBufferFromFile * rows_sources_read_buf_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, rows_sources_read_buf(rows_sources_read_buf_)
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, is_result_sparse(is_result_sparse_)
@ -842,17 +873,20 @@ public:
String getName() const override { return "ColumnGatherer"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
rows_sources_read_buf->seek(0, 0);
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
auto rows_sources_read_buf = pipeline_settings.temporary_file_lookup->getTemporaryFileForReading(rows_sources_temporary_file_name);
auto transform = std::make_unique<ColumnGathererTransform>(
header,
input_streams_count,
*rows_sources_read_buf,
std::move(rows_sources_read_buf),
merge_block_size_rows,
merge_block_size_bytes,
is_result_sparse);
@ -882,7 +916,7 @@ private:
}
MergeTreeData::MergingParams merging_params{};
CompressedReadBufferFromFile * rows_sources_read_buf;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool is_result_sparse;
@ -933,7 +967,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
merge_column_query_plan.getCurrentDataStream(),
ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name,
RowsSourcesTemporaryFile::FILE_ID,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
@ -962,7 +996,8 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
}
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
@ -1014,10 +1049,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
/// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time
/// This sharing also prevents from from running multiple merge of individual columns in parallel.
ctx->rows_sources_read_buf->seek(0, 0);
}
@ -1331,7 +1362,7 @@ public:
const SortDescription & sort_description_,
const Names partition_key_columns_,
const MergeTreeData::MergingParams & merging_params_,
WriteBuffer * rows_sources_write_buf_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
@ -1341,7 +1372,7 @@ public:
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
, rows_sources_write_buf(rows_sources_write_buf_)
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
@ -1351,7 +1382,7 @@ public:
String getName() const override { return "MergeParts"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
@ -1361,6 +1392,14 @@ public:
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
WriteBuffer * rows_sources_write_buf = nullptr;
if (!rows_sources_temporary_file_name.empty())
{
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
rows_sources_write_buf = &pipeline_settings.temporary_file_lookup->getTemporaryFileForWriting(rows_sources_temporary_file_name);
}
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
@ -1450,7 +1489,7 @@ private:
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
WriteBuffer * rows_sources_write_buf;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool blocks_are_granules_size;
@ -1607,8 +1646,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
const bool is_vertical_merge = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
ctx->blocks_are_granules_size = is_vertical_merge;
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
@ -1618,7 +1658,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
sort_description,
partition_key_columns,
global_ctx->merging_params,
ctx->rows_sources_write_buf.get(),
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
ctx->blocks_are_granules_size,
@ -1683,7 +1723,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));

View File

@ -40,6 +40,7 @@ namespace DB
class MergeTask;
using MergeTaskPtr = std::shared_ptr<MergeTask>;
class RowsSourcesTemporaryFile;
/**
* Overview of the merge algorithm
@ -224,13 +225,11 @@ private:
/// Proper initialization is responsibility of the author
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
{
TemporaryDataOnDiskPtr tmp_disk{nullptr};
bool need_remove_expired_values{false};
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes{};
/// For projections to rebuild
@ -309,11 +308,9 @@ private:
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
{
/// Begin dependencies from previous stage
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes;
CompressionCodecPtr compression_codec;
TemporaryDataOnDiskPtr tmp_disk{nullptr};
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
bool read_with_direct_io{false};
bool need_sync{false};
@ -345,7 +342,6 @@ private:
size_t column_elems_written{0};
QueryPipeline column_parts_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
UInt64 elapsed_execute_ns{0};
};

View File

@ -735,11 +735,14 @@ def test_mutation_with_broken_projection(cluster):
f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1"
)
parts = get_parts(node, table_name)
# All parts changes because this is how alter delete works,
# but all parts apart from the first have only hardlinks to files in previous part.
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(
node, table_name
) or ["all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(node, table_name)
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == parts or [
"all_1_1_0_5",
"all_2_2_0_5",
"all_3_3_0_5",
] == parts
# Still broken because it was hardlinked.
broken = get_broken_projections_info(node, table_name)
@ -752,11 +755,13 @@ def test_mutation_with_broken_projection(cluster):
f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1"
)
assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
)
parts = get_parts(node, table_name)
assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == parts or [
"all_0_0_0_6",
"all_1_1_0_6",
"all_2_2_0_6",
"all_3_3_0_6",
] == parts
# Not broken anymore.
assert not get_broken_projections_info(node, table_name)

View File

@ -70,6 +70,12 @@ def get_mysql_conn(cluster):
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
node1.query(
f"""CREATE TABLE {dbname}.example_s3_engine_table (name String, value UInt32)
ENGINE = S3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'gzip')
SETTINGS input_format_with_names_use_header = 0"""
)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
@ -136,6 +142,7 @@ def test_restore_table(start_cluster):
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.example_s3_engine_table")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
@ -149,6 +156,13 @@ def test_restore_table(start_cluster):
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated' and name = 'example_s3_engine_table'"
)
== "S3\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine"
@ -175,6 +189,7 @@ def test_restore_table_null(start_cluster):
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.example_s3_engine_table")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
@ -188,6 +203,13 @@ def test_restore_table_null(start_cluster):
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name = 'example_s3_engine_table'"
)
== "Null\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine"