Merge pull request #6543 from yandex/fix_adaptive_granularity_and_mutations

Fix bug with enable_mixed_granularity_parts and mutations
This commit is contained in:
alesapin 2019-08-19 16:50:35 +03:00 committed by GitHub
commit 87473465c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 13 deletions

View File

@ -25,25 +25,26 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const MergeTreeIndexGranularity & index_granularity_)
const MergeTreeIndexGranularity & index_granularity_,
const MergeTreeIndexGranularityInfo * index_granularity_info_)
: storage(storage_)
, part_path(part_path_)
, min_compress_block_size(min_compress_block_size_)
, max_compress_block_size(max_compress_block_size_)
, aio_threshold(aio_threshold_)
, marks_file_extension(storage.canUseAdaptiveGranularity() ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension())
, can_use_adaptive_granularity(index_granularity_info_ ? index_granularity_info_->is_adaptive : storage.canUseAdaptiveGranularity())
, marks_file_extension(can_use_adaptive_granularity ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension())
, blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty())
, codec(std::move(codec_))
, skip_indices(indices_to_recalc)
, with_final_mark(storage.settings.write_final_mark && storage.canUseAdaptiveGranularity())
, with_final_mark(storage.settings.write_final_mark && can_use_adaptive_granularity)
{
if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
}
void IMergedBlockOutputStream::addStreams(
const String & path,
const String & name,
@ -145,7 +146,7 @@ void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
blocks_are_granules_size,
index_offset,
index_granularity,
storage.canUseAdaptiveGranularity());
can_use_adaptive_granularity);
}
void IMergedBlockOutputStream::writeSingleMark(
@ -176,7 +177,7 @@ void IMergedBlockOutputStream::writeSingleMark(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (storage.canUseAdaptiveGranularity())
if (can_use_adaptive_granularity)
writeIntBinary(number_of_rows, stream.marks);
}, path);
}
@ -362,7 +363,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
if (storage.canUseAdaptiveGranularity())
if (can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
++skip_index_current_mark;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
@ -23,7 +24,8 @@ public:
size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const MergeTreeIndexGranularity & index_granularity_);
const MergeTreeIndexGranularity & index_granularity_,
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
using WrittenOffsetColumns = std::set<std::string>;
@ -141,6 +143,7 @@ protected:
size_t current_mark = 0;
size_t skip_index_mark = 0;
const bool can_use_adaptive_granularity;
const std::string marks_file_extension;
const bool blocks_are_granules_size;

View File

@ -1581,7 +1581,8 @@ void MergeTreeData::alterDataPart(
true /* skip_offsets */,
{},
unused_written_offsets,
part->index_granularity);
part->index_granularity,
&part->index_granularity_info);
in.readPrefix();
out.writePrefix();

View File

@ -934,6 +934,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
new_data_part->index_granularity_info = source_part->index_granularity_info;
String new_part_tmp_path = new_data_part->getFullPath();
@ -1069,7 +1070,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
unused_written_offsets,
source_part->index_granularity
source_part->index_granularity,
&source_part->index_granularity_info
);
in->readPrefix();

View File

@ -8,14 +8,16 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
CompressionCodecPtr default_codec_, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
WrittenOffsetColumns & already_written_offset_columns_,
const MergeTreeIndexGranularity & index_granularity_)
const MergeTreeIndexGranularity & index_granularity_,
const MergeTreeIndexGranularityInfo * index_granularity_info_)
: IMergedBlockOutputStream(
storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,
storage_.global_context.getSettings().min_bytes_to_use_direct_io,
false,
indices_to_recalc_,
index_granularity_),
index_granularity_,
index_granularity_info_),
header(header_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns_)
{

View File

@ -17,7 +17,8 @@ public:
CompressionCodecPtr default_codec_, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
WrittenOffsetColumns & already_written_offset_columns_,
const MergeTreeIndexGranularity & index_granularity_);
const MergeTreeIndexGranularity & index_granularity_,
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
Block getHeader() const override { return header; }
void write(const Block & block) override;

View File

@ -288,6 +288,17 @@ def test_mixed_granularity_single_node(start_dynamic_cluster, node):
node.exec_in_container(["bash", "-c", "find {p} -name '*.mrk' | grep '.*'".format(p=path_to_old_part)]) # check that we have non adaptive files
node.query("ALTER TABLE table_with_default_granularity UPDATE dummy = dummy + 1 WHERE 1")
# still works
assert node.query("SELECT count() from table_with_default_granularity") == '6\n'
node.query("ALTER TABLE table_with_default_granularity MODIFY COLUMN dummy String")
node.query("ALTER TABLE table_with_default_granularity ADD COLUMN dummy2 Float64")
#still works
assert node.query("SELECT count() from table_with_default_granularity") == '6\n'
def test_version_update_two_nodes(start_dynamic_cluster):
node11.query("INSERT INTO table_with_default_granularity VALUES (toDate('2018-10-01'), 1, 333), (toDate('2018-10-02'), 2, 444)")
node12.query("SYSTEM SYNC REPLICA table_with_default_granularity")