fix reading and check query for compact parts with different codecs

This commit is contained in:
Anton Popov 2020-07-11 02:33:36 +03:00
parent 24f627e52c
commit fbec940e0f
7 changed files with 41 additions and 11 deletions

View File

@ -79,6 +79,8 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer(
{
if (file_in == nullptr)
throw Exception("Neither file_in nor file_in_creator is initialized in CachedCompressedReadBuffer", ErrorCodes::LOGICAL_ERROR);
compressed_in = file_in;
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(

View File

@ -23,7 +23,7 @@ private:
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
UncompressedCache * cache;
std::unique_ptr<ReadBufferFromFileBase> file_in_holder;
ReadBufferFromFileBase * file_in;
ReadBufferFromFileBase * file_in = nullptr;
const std::string path;
size_t file_pos;

View File

@ -17,7 +17,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
default_codec_, settings_, index_granularity_)
, plain_file(data_part->volume->getDisk()->writeFile(
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
settings.max_compress_block_size,
WriteMode::Rewrite,
settings.estimated_size,
settings.aio_threshold))
@ -31,7 +31,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
compressed_streams[column.name] = std::make_unique<CompressedStream>(
plain_hashing, storage_columns.getCodecOrDefault(column.name, default_codec));
plain_hashing, storage_columns.getCodecOrDefault(column.name, default_codec));
}
void MergeTreeDataPartWriterCompact::write(

View File

@ -53,7 +53,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
auto full_path = fullPath(data_part->volume->getDisk(), full_data_path);
for (const auto & column : columns)
{
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
if (uncompressed_cache)
@ -69,7 +69,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
non_cached_buffer->setProfileCallback(profile_callback_, clock_type_);
}
column_streams[column.name] = ColumnStream{std::move(cached_buffer), std::move(non_cached_buffer)};
auto column_from_part = getColumnFromPart(column);
column_streams[column_from_part.name] = ColumnStream{std::move(cached_buffer), std::move(non_cached_buffer)};
}
size_t columns_num = columns.size();

View File

@ -96,11 +96,24 @@ IMergeTreeDataPart::Checksums checkDataPart(
};
};
/// This function calculates only checksum of file content (compressed or uncompressed).
auto checksum_file = [](const DiskPtr & disk_, const String & file_path)
{
auto file_buf = disk_->readFile(file_path);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
return IMergeTreeDataPart::Checksums::Checksum{hashing_buf.count(), hashing_buf.getHash()};
};
bool check_uncompressed = true;
/// First calculate checksums for columns data
if (part_type == MergeTreeDataPartType::COMPACT)
{
const auto & file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
checksums_data.files[file_name] = checksum_file(disk, path + file_name);
/// Uncompressed checksums in compact parts are computed in a complex way.
/// We check only checksum of compressed file.
check_uncompressed = false;
}
else if (part_type == MergeTreeDataPartType::WIDE)
{
@ -141,10 +154,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (txt_checksum_it == checksum_files_txt.end() || txt_checksum_it->second.uncompressed_size == 0)
{
/// The file is not compressed.
auto file_buf = disk->readFile(it->path());
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
checksums_data.files[file_name] = checksum_file(disk, it->path());
}
else /// If we have both compressed and uncompressed in txt, than calculate them
{
@ -157,7 +167,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
return {};
if (require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true);
checksums_txt.checkEqual(checksums_data, check_uncompressed);
return checksums_data;
}

View File

@ -0,0 +1,2 @@
all_1_1_0 1
all_1_1_0 1

View File

@ -0,0 +1,15 @@
SET check_query_single_value_result = 0;
DROP TABLE IF EXISTS check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
DROP TABLE check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = '10M';
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
DROP TABLE check_codec;