Merge pull request #4372 from yandex/fix-cancel-of-vertical-merges

Faster cancelling of vertical merges
This commit is contained in:
alexey-milovidov 2019-02-13 02:58:00 +03:00 committed by GitHub
commit 024709db7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 9 additions and 5 deletions

View File

@ -3,6 +3,7 @@
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB

View File

@ -3,6 +3,7 @@
#include <sstream>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB

View File

@ -13,7 +13,6 @@
#include <IO/WriteHelpers.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB

View File

@ -3,6 +3,7 @@
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB

View File

@ -3,6 +3,7 @@
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <deque>

View File

@ -762,11 +762,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t column_elems_written = 0;
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
while (!actions_blocker.isCancelled() && (block = column_gathered_stream.read()))
{
column_elems_written += block.rows();
column_to.write(block);
}
if (actions_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
column_gathered_stream.readSuffix();
checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums());
@ -781,9 +785,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->columns_written += 1;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress.store(progress_before + column_sizes.columnWeight(column_name), std::memory_order_relaxed);
if (actions_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
Poco::File(rows_sources_file_path).remove();