ColumnSparse: fix MergeTree in old syntax

This commit is contained in:
Anton Popov 2021-04-17 04:06:59 +03:00
parent 6ce875175b
commit 2afa1590e0
5 changed files with 29 additions and 10 deletions

View File

@ -7,6 +7,8 @@
#include <Common/HashTable/Hash.h> #include <Common/HashTable/Hash.h>
#include <DataStreams/ColumnGathererStream.h> #include <DataStreams/ColumnGathererStream.h>
#include <algorithm>
namespace DB namespace DB
{ {
@ -41,6 +43,13 @@ ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offs
if (_size < offsets->size()) if (_size < offsets->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"Size of sparse column ({}) cannot be lower than number of non-default values ({})", _size, offsets->size()); "Size of sparse column ({}) cannot be lower than number of non-default values ({})", _size, offsets->size());
#ifndef NDEBUG
const auto & offsets_data = getOffsetsData();
auto it = std::adjacent_find(offsets_data.begin(), offsets_data.end(), std::greater_equal<UInt64>());
if (it != offsets_data.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Offsets of ColumnSparse must be strictly sorted");
#endif
} }
MutableColumnPtr ColumnSparse::cloneResized(size_t new_size) const MutableColumnPtr ColumnSparse::cloneResized(size_t new_size) const

View File

@ -157,11 +157,6 @@ public:
IColumn & getOffsetsColumn() { return *offsets; } IColumn & getOffsetsColumn() { return *offsets; }
private: private:
[[noreturn]] void throwMustBeDense() const
{
throw Exception("Not implemented for ColumnSparse", ErrorCodes::LOGICAL_ERROR);
}
class Iterator class Iterator
{ {
public: public:

View File

@ -55,10 +55,12 @@ size_t deserializeOffsets(IColumn::Offsets & offsets,
/// TODO: /// TODO:
offsets.reserve(limit / 10); offsets.reserve(limit / 10);
bool first = true;
size_t total_rows = state.num_trailing_defaults; size_t total_rows = state.num_trailing_defaults;
if (state.has_value_after_defaults) if (state.has_value_after_defaults)
{ {
offsets.push_back(start + state.num_trailing_defaults); offsets.push_back(start + state.num_trailing_defaults);
first = false;
state.has_value_after_defaults = false; state.has_value_after_defaults = false;
state.num_trailing_defaults = 0; state.num_trailing_defaults = 0;
@ -66,7 +68,6 @@ size_t deserializeOffsets(IColumn::Offsets & offsets,
} }
size_t group_size; size_t group_size;
bool first = true;
while (!istr.eof()) while (!istr.eof())
{ {
readIntBinary(group_size, istr); readIntBinary(group_size, istr);

View File

@ -811,6 +811,14 @@ void IMergeTreeDataPart::loadChecksums(bool require)
void IMergeTreeDataPart::loadRowsCount() void IMergeTreeDataPart::loadRowsCount()
{ {
String path = getFullRelativePath() + "count.txt"; String path = getFullRelativePath() + "count.txt";
auto read_rows_count = [&]()
{
auto buf = openForReading(volume->getDisk(), path);
readIntText(rows_count, *buf);
assertEOF(*buf);
};
if (index_granularity.empty()) if (index_granularity.empty())
{ {
rows_count = 0; rows_count = 0;
@ -820,9 +828,7 @@ void IMergeTreeDataPart::loadRowsCount()
if (!volume->getDisk()->exists(path)) if (!volume->getDisk()->exists(path))
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
auto buf = openForReading(volume->getDisk(), path); read_rows_count();
readIntText(rows_count, *buf);
assertEOF(*buf);
#ifndef NDEBUG #ifndef NDEBUG
/// columns have to be loaded /// columns have to be loaded
@ -875,6 +881,12 @@ void IMergeTreeDataPart::loadRowsCount()
} }
else else
{ {
if (volume->getDisk()->exists(path))
{
read_rows_count();
return;
}
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
{ {
ColumnPtr column_col = column.type->createColumn(*getSerializationForColumn(column)); ColumnPtr column_col = column.type->createColumn(*getSerializationForColumn(column));

View File

@ -115,7 +115,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
out->sync(); out->sync();
} }
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part)) if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{ {
new_part->partition.store(storage, volume->getDisk(), part_path, checksums); new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
if (new_part->minmax_idx.initialized) if (new_part->minmax_idx.initialized)
@ -123,7 +123,9 @@ void MergedBlockOutputStream::finalizePartOnDisk(
else if (rows_count) else if (rows_count)
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR); + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
}
{
auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096); auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(*count_out); HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing); writeIntText(rows_count, count_out_hashing);