load partition and minmax index from disk [#CLICKHOUSE-3000]

This commit is contained in:
Alexey Zatelepin 2017-08-30 23:23:29 +03:00 committed by alexey-milovidov
parent e9f93028b8
commit 6906921932
5 changed files with 70 additions and 16 deletions

View File

@ -44,12 +44,12 @@ struct Memory : boost::noncopyable, Allocator<false>
dealloc(); dealloc();
} }
Memory(Memory && rhs) Memory(Memory && rhs) noexcept
{ {
*this = std::move(rhs); *this = std::move(rhs);
} }
Memory & operator=(Memory && rhs) Memory & operator=(Memory && rhs) noexcept
{ {
std::swap(m_capacity, rhs.m_capacity); std::swap(m_capacity, rhs.m_capacity);
std::swap(m_size, rhs.m_size); std::swap(m_size, rhs.m_size);

View File

@ -32,6 +32,8 @@ public:
ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0); char * existing_memory = nullptr, size_t alignment = 0);
ReadBufferFromFile(ReadBufferFromFile &&) = default;
~ReadBufferFromFile() override; ~ReadBufferFromFile() override;
/// Close file before destruction of object. /// Close file before destruction of object.

View File

@ -18,6 +18,7 @@ class ReadBufferFromFileBase : public BufferWithOwnMemory<ReadBuffer>
{ {
public: public:
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment); ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default;
virtual ~ReadBufferFromFileBase(); virtual ~ReadBufferFromFileBase();
off_t seek(off_t off, int whence = SEEK_SET); off_t seek(off_t off, int whence = SEEK_SET);
virtual off_t getPositionInFile() = 0; virtual off_t getPositionInFile() = 0;

View File

@ -26,6 +26,8 @@ public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {} : ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {}
ReadBufferFromFileDescriptor(ReadBufferFromFileDescriptor &&) = default;
int getFD() const override int getFD() const override
{ {
return fd; return fd;

View File

@ -614,6 +614,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
} }
static ReadBufferFromFile openForReading(const String & path)
{
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
}
void MergeTreeDataPart::loadIndex() void MergeTreeDataPart::loadIndex()
{ {
/// Size - in number of marks. /// Size - in number of marks.
@ -640,8 +646,7 @@ void MergeTreeDataPart::loadIndex()
} }
String index_path = getFullPath() + "primary.idx"; String index_path = getFullPath() + "primary.idx";
ReadBufferFromFile index_file(index_path, ReadBufferFromFile index_file = openForReading(index_path);
std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
for (size_t j = 0; j < key_size; ++j) for (size_t j = 0; j < key_size; ++j)
@ -676,7 +681,28 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
minmax_idx.initialized = true; minmax_idx.initialized = true;
} }
else else
throw Exception("TODO", ErrorCodes::LOGICAL_ERROR); {
if (!storage.partition_expr_column_types.empty())
{
ReadBufferFromFile file = openForReading(getFullPath() + "partition.dat");
partition.resize(storage.partition_expr_column_types.size());
for (size_t i = 0; i < storage.partition_expr_column_types.size(); ++i)
storage.partition_expr_column_types[i]->deserializeBinary(partition[i], file);
}
size_t minmax_idx_size = storage.minmax_idx_column_types.size();
minmax_idx.min_column_values.resize(minmax_idx_size);
minmax_idx.max_column_values.resize(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = getFullPath() + "minmax_" + escapeForFileName(storage.minmax_idx_columns[i]) + ".idx";
ReadBufferFromFile file = openForReading(file_name);
const DataTypePtr & type = storage.minmax_idx_column_types[i];
type->deserializeBinary(minmax_idx.min_column_values[i], file);
type->deserializeBinary(minmax_idx.max_column_values[i], file);
}
minmax_idx.initialized = true;
}
} }
void MergeTreeDataPart::loadChecksums(bool require) void MergeTreeDataPart::loadChecksums(bool require)
@ -689,7 +715,7 @@ void MergeTreeDataPart::loadChecksums(bool require)
return; return;
} }
ReadBufferFromFile file(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize())); ReadBufferFromFile file = openForReading(path);
if (checksums.read(file)) if (checksums.read(file))
assertEOF(file); assertEOF(file);
} }
@ -727,7 +753,7 @@ void MergeTreeDataPart::loadColumns(bool require)
return; return;
} }
ReadBufferFromFile file(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize())); ReadBufferFromFile file = openForReading(path);
columns.readText(file); columns.readText(file);
} }
@ -751,22 +777,45 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
} }
} }
if (storage.format_version > 0)
{
if (!storage.partition_expr_columns.empty() && !checksums.files.count("partition.dat"))
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
for (const String & col_name : storage.minmax_idx_columns)
{
if (!checksums.files.count("minmax_" + escapeForFileName(col_name) + ".idx"))
throw Exception("No minmax idx file checksum for column " + col_name, ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
checksums.checkSizes(path); checksums.checkSizes(path);
} }
else else
{ {
if (!storage.sort_descr.empty()) auto check_file_not_empty = [&path](const String & file_path)
{ {
/// Check that the primary key is not empty. Poco::File file(file_path);
Poco::File index_file(path + "primary.idx"); if (!file.exists() || file.getSize() == 0)
throw Exception("Part " + path + " is broken: " + file_path + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
return file.getSize();
};
if (!index_file.exists() || index_file.getSize() == 0) /// Check that the primary key index is not empty.
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); if (!storage.sort_descr.empty())
check_file_not_empty(path + "primary.idx");
if (storage.format_version > 0)
{
if (!storage.partition_expr_columns.empty())
check_file_not_empty(path + "partition.dat");
for (const String & col_name : storage.minmax_idx_columns)
check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx");
} }
/// Check that all marks are nonempty and have the same size. /// Check that all marks are nonempty and have the same size.
auto check_marks = [&path](const NamesAndTypesList & columns, const std::string & extension)
auto check_marks = [](const std::string & path, const NamesAndTypesList & columns, const std::string & extension)
{ {
ssize_t marks_size = -1; ssize_t marks_size = -1;
for (const NameAndTypePair & it : columns) for (const NameAndTypePair & it : columns)
@ -794,8 +843,8 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
} }
}; };
check_marks(path, columns, ".mrk"); check_marks(columns, ".mrk");
check_marks(path, columns, ".null.mrk"); check_marks(columns, ".null.mrk");
} }
} }