Add new index data skipping minmax index format for proper Nullable support

Note, that it cannot be done w/o new extension, since index does not
have any header.

v2: use IDisk interface for existence check
v3: remove extra file existence check
v4: fix MATERIALIZE INDEX
This commit is contained in:
Azat Khuzhin 2021-08-05 21:09:17 +03:00
parent 560e71dcfa
commit 038241b6ed
15 changed files with 178 additions and 81 deletions

View File

@ -1663,7 +1663,12 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx"))
if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx2"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx2", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
else if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
@ -1749,6 +1754,7 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + ".idx2");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
for (const auto & projection : projections_to_recalc)
@ -1893,8 +1899,11 @@ std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
{
const auto & index = indices[i];
bool has_index =
source_part->checksums.has(INDEX_FILE_PREFIX + index.name + ".idx") ||
source_part->checksums.has(INDEX_FILE_PREFIX + index.name + ".idx2");
// If we ask to materialize and it already exists
if (!source_part->checksums.has(INDEX_FILE_PREFIX + index.name + ".idx") && materialized_indices.count(index.name))
if (!has_index && materialized_indices.count(index.name))
{
if (indices_to_recalc.insert(index_factory.get(index)).second)
{

View File

@ -9,11 +9,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
{
compressed.next();
@ -165,7 +160,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
std::make_unique<MergeTreeDataPartWriterOnDisk::Stream>(
stream_name,
data_part->volume->getDisk(),
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, index_helper->getSerializedFileExtension(),
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());

View File

@ -1457,9 +1457,10 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
size_t & granules_dropped,
Poco::Logger * log)
{
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index_helper->getFileName() + ".idx"))
const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName();
if (!index_helper->getDeserializedFormat(part->volume->getDisk(), path_prefix))
{
LOG_DEBUG(log, "File for index {} does not exist. Skipping it.", backQuote(index_helper->index.name));
LOG_DEBUG(log, "File for index {} does not exist ({}.*). Skipping it.", backQuote(index_helper->index.name), path_prefix);
return ranges;
}

View File

@ -101,14 +101,17 @@ MergeTreeIndexGranuleFullText::MergeTreeIndexGranuleFullText(
void MergeTreeIndexGranuleFullText::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception("Attempt to write empty fulltext index " + backQuote(index_name), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to write empty fulltext index {}.", backQuote(index_name));
for (const auto & bloom_filter : bloom_filters)
ostr.write(reinterpret_cast<const char *>(bloom_filter.getFilter().data()), params.filter_size);
}
void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (version != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
for (auto & bloom_filter : bloom_filters)
{
istr.read(reinterpret_cast<char *>(

View File

@ -45,7 +45,7 @@ struct MergeTreeIndexGranuleFullText final : public IMergeTreeIndexGranule
~MergeTreeIndexGranuleFullText() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;
bool empty() const override { return !has_elems; }

View File

@ -84,10 +84,12 @@ bool MergeTreeIndexGranuleBloomFilter::empty() const
return !total_rows;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (!empty())
throw Exception("Cannot read data to a non-empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot read data to a non-empty bloom filter index.");
if (version != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
readVarUInt(total_rows, istr);
for (auto & filter : bloom_filters)
@ -102,7 +104,7 @@ void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception("Attempt to write empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to write empty bloom filter index.");
static size_t atom_size = 8;
writeVarUInt(total_rows, ostr);

View File

@ -16,8 +16,7 @@ public:
bool empty() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;
const std::vector<BloomFilterPtr> & getFilters() const { return bloom_filters; }

View File

@ -40,28 +40,12 @@ void MergeTreeIndexGranuleMinMax::serializeBinary(WriteBuffer & ostr) const
const DataTypePtr & type = index_sample_block.getByPosition(i).type;
auto serialization = type->getDefaultSerialization();
if (!type->isNullable())
{
serialization->serializeBinary(hyperrectangle[i].left, ostr);
serialization->serializeBinary(hyperrectangle[i].right, ostr);
}
else
{
/// NOTE: that this serialization differs from
/// IMergeTreeDataPart::MinMaxIndex::store() due to preserve
/// backward compatibility.
bool is_null = hyperrectangle[i].left.isNull() || hyperrectangle[i].right.isNull(); // one is enough
writeBinary(is_null, ostr);
if (!is_null)
{
serialization->serializeBinary(hyperrectangle[i].left, ostr);
serialization->serializeBinary(hyperrectangle[i].right, ostr);
}
}
}
}
void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
hyperrectangle.clear();
Field min_val;
@ -72,6 +56,9 @@ void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr)
const DataTypePtr & type = index_sample_block.getByPosition(i).type;
auto serialization = type->getDefaultSerialization();
switch (version)
{
case 1:
if (!type->isNullable())
{
serialization->deserializeBinary(min_val, istr);
@ -80,8 +67,11 @@ void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr)
else
{
/// NOTE: that this serialization differs from
/// IMergeTreeDataPart::MinMaxIndex::load() due to preserve
/// IMergeTreeDataPart::MinMaxIndex::load() to preserve
/// backward compatibility.
///
/// But this is deprecated format, so this is OK.
bool is_null;
readBinary(is_null, istr);
if (!is_null)
@ -95,6 +85,24 @@ void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr)
max_val = Null();
}
}
break;
/// New format with proper Nullable support for values that includes Null values
case 2:
serialization->deserializeBinary(min_val, istr);
serialization->deserializeBinary(max_val, istr);
// NULL_LAST
if (min_val.isNull())
min_val = PositiveInfinity();
if (max_val.isNull())
max_val = PositiveInfinity();
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
}
hyperrectangle.emplace_back(min_val, true, max_val, true);
}
}
@ -203,6 +211,15 @@ bool MergeTreeIndexMinMax::mayBenefitFromIndexForIn(const ASTPtr & node) const
return false;
}
MergeTreeIndexFormat MergeTreeIndexMinMax::getDeserializedFormat(const DiskPtr disk, const std::string & relative_path_prefix) const
{
if (disk->exists(relative_path_prefix + ".idx2"))
return {2, ".idx2"};
else if (disk->exists(relative_path_prefix + ".idx"))
return {1, ".idx"};
return {0 /* unknown */, ""};
}
MergeTreeIndexPtr minmaxIndexCreator(
const IndexDescription & index)
{

View File

@ -21,7 +21,7 @@ struct MergeTreeIndexGranuleMinMax final : public IMergeTreeIndexGranule
~MergeTreeIndexGranuleMinMax() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;
bool empty() const override { return hyperrectangle.empty(); }
@ -81,6 +81,9 @@ public:
const SelectQueryInfo & query, ContextPtr context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;
const char* getSerializedFileExtension() const override { return ".idx2"; }
MergeTreeIndexFormat getDeserializedFormat(const DiskPtr disk, const std::string & path_prefix) const override;
};
}

View File

@ -1,5 +1,29 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
namespace
{
using namespace DB;
std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
const std::string & extension,
MergeTreeIndexPtr index,
MergeTreeData::DataPartPtr part,
size_t marks_count,
const MarkRanges & all_mark_ranges,
MergeTreeReaderSettings settings)
{
return std::make_unique<MergeTreeReaderStream>(
part->volume->getDisk(),
part->getFullRelativePath() + index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), nullptr, nullptr,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
}
}
namespace DB
{
@ -7,27 +31,28 @@ namespace DB
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_,
MergeTreeReaderSettings settings)
: index(index_), stream(
part_->volume->getDisk(),
part_->getFullRelativePath() + index->getFileName(), ".idx", marks_count_,
all_mark_ranges_,
std::move(settings), nullptr, nullptr,
part_->getFileSizeOrZero(index->getFileName() + ".idx"),
&part_->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE)
: index(index_)
{
stream.seekToStart();
const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName();
auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix);
stream = makeIndexReader(index_format.extension, index_, part_, marks_count_, all_mark_ranges_, std::move(settings));
version = index_format.version;
stream->seekToStart();
}
MergeTreeIndexReader::~MergeTreeIndexReader() = default;
void MergeTreeIndexReader::seek(size_t mark)
{
stream.seekToMark(mark);
stream->seekToMark(mark);
}
MergeTreeIndexGranulePtr MergeTreeIndexReader::read()
{
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream.data_buffer);
granule->deserializeBinary(*stream->data_buffer, version);
return granule;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -16,6 +17,7 @@ public:
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MergeTreeReaderSettings settings);
~MergeTreeIndexReader();
void seek(size_t mark);
@ -23,7 +25,8 @@ public:
private:
MergeTreeIndexPtr index;
MergeTreeReaderStream stream;
std::unique_ptr<MergeTreeReaderStream> stream;
uint8_t version = 0;
};
}

View File

@ -48,8 +48,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
"Attempt to write empty set index " + backQuote(index_name), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to write empty set index {}.", backQuote(index_name));
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
auto size_serialization = size_type->getDefaultSerialization();
@ -80,8 +79,11 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
}
}
void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (version != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
block.clear();
Field field_rows;

View File

@ -28,7 +28,7 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
MutableColumns && columns_);
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }

View File

@ -4,6 +4,7 @@
#include <unordered_map>
#include <vector>
#include <memory>
#include <utility>
#include <Core/Block.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
@ -17,13 +18,37 @@ constexpr auto INDEX_FILE_PREFIX = "skp_idx_";
namespace DB
{
using MergeTreeIndexVersion = uint8_t;
struct MergeTreeIndexFormat
{
MergeTreeIndexVersion version;
const char* extension;
operator bool() const { return version != 0; }
};
/// Stores some info about a single block of data.
struct IMergeTreeIndexGranule
{
virtual ~IMergeTreeIndexGranule() = default;
/// Serialize always last version.
virtual void serializeBinary(WriteBuffer & ostr) const = 0;
virtual void deserializeBinary(ReadBuffer & istr) = 0;
/// Version of the index to deserialize:
///
/// - 2 -- minmax index for proper Nullable support,
/// - 1 -- everything else.
///
/// Implementation is responsible for version check,
/// and throw LOGICAL_ERROR in case of unsupported version.
///
/// See also:
/// - IMergeTreeIndex::getSerializedFileExtension()
/// - IMergeTreeIndex::getDeserializedFormat()
/// - MergeTreeDataMergerMutator::collectFilesToSkip()
/// - MergeTreeDataMergerMutator::collectFilesForRenames()
virtual void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) = 0;
virtual bool empty() const = 0;
};
@ -73,9 +98,26 @@ struct IMergeTreeIndex
virtual ~IMergeTreeIndex() = default;
/// gets filename without extension
/// Returns filename without extension.
String getFileName() const { return INDEX_FILE_PREFIX + index.name; }
/// Returns extension for serialization.
/// Reimplement if you want new index format.
///
/// NOTE: In case getSerializedFileExtension() is reimplemented,
/// getDeserializedFormat() should be reimplemented too,
/// and check all previous extensions too
/// (to avoid breaking backward compatibility).
virtual const char* getSerializedFileExtension() const { return ".idx"; }
/// Returns extension for deserialization.
///
/// Return pair<extension, version>.
virtual MergeTreeIndexFormat getDeserializedFormat(const DiskPtr, const std::string & /* relative_path_prefix */) const
{
return {1, ".idx"};
}
/// Checks whether the column is in data skipping index.
virtual bool mayBenefitFromIndexForIn(const ASTPtr & node) const = 0;

View File

@ -49,15 +49,11 @@ SET force_primary_key = 0;
SELECT * FROM nullable_minmax_index ORDER BY k;
SET max_rows_to_read = 6;
SELECT * FROM nullable_minmax_index WHERE v IS NULL;
-- NOTE: granuals with Null values cannot be filtred in data skipping indexes,
-- due to backward compatibility
SET max_rows_to_read = 0;
SET max_rows_to_read = 8;
SELECT * FROM nullable_minmax_index WHERE v IS NOT NULL;
SET max_rows_to_read = 6;
SELECT * FROM nullable_minmax_index WHERE v > 2;
-- NOTE: granuals with Null values cannot be filtred in data skipping indexes,
-- due to backward compatibility
SET max_rows_to_read = 0;
SET max_rows_to_read = 4;
SELECT * FROM nullable_minmax_index WHERE v <= 2;
DROP TABLE nullable_key;