check in-memory parts, comments and style-fixes

This commit is contained in:
Anton Popov 2020-06-22 21:56:53 +03:00
parent b19d48a11c
commit 78d28be8cf
8 changed files with 44 additions and 25 deletions

View File

@ -149,8 +149,8 @@ void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuf
/// We'll take a list of files from the list of checksums. /// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums; MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list. /// Add files that are not in the checksum list.
checksums.files["checksums.txt"]; checksums.files["checksums.txt"] = {};
checksums.files["columns.txt"]; checksums.files["columns.txt"] = {};
MergeTreeData::DataPart::Checksums data_checksums; MergeTreeData::DataPart::Checksums data_checksums;

View File

@ -859,7 +859,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
part_names_with_disks.emplace_back(it->name(), disk_ptr); part_names_with_disks.emplace_back(it->name(), disk_ptr);
/// Create and correctly initialize global WAL object, if it's needed /// Create and correctly initialize global WAL object, if it's needed
if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE && settings->in_memory_parts_enable_wal) if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{ {
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name()); write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore()) for (auto && part : write_ahead_log->restore())

View File

@ -126,10 +126,21 @@ void MergeTreeDataPartInMemory::calculateEachColumnSizesOnDisk(ColumnSizeByName
each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize(); each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize();
} }
IMergeTreeDataPart::Checksum MergeTreeDataPartInMemory::calculateBlockChecksum() const
{
SipHash hash;
IMergeTreeDataPart::Checksum checksum;
for (const auto & column : block)
column.column->updateHashFast(hash);
checksum.uncompressed_size = block.bytes();
hash.get128(checksum.uncompressed_hash.first, checksum.uncompressed_hash.second);
return checksum;
}
DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part) DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part)
{ {
return std::dynamic_pointer_cast<const MergeTreeDataPartInMemory>(part); return std::dynamic_pointer_cast<const MergeTreeDataPartInMemory>(part);
} }
} }

View File

@ -48,6 +48,9 @@ public:
bool waitUntilMerged(size_t timeout_ms) const; bool waitUntilMerged(size_t timeout_ms) const;
void notifyMerged() const; void notifyMerged() const;
/// Returns hash of parts's block
Checksum calculateBlockChecksum() const;
mutable Block block; mutable Block block;
private: private:

View File

@ -69,14 +69,6 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl
} }
} }
static MergeTreeDataPartChecksum createUncompressedChecksum(size_t size, SipHash & hash)
{
MergeTreeDataPartChecksum checksum;
checksum.uncompressed_size = size;
hash.get128(checksum.uncompressed_hash.first, checksum.uncompressed_hash.second);
return checksum;
}
void MergeTreeDataPartWriterInMemory::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) void MergeTreeDataPartWriterInMemory::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{ {
/// If part is empty we still need to initialize block by empty columns. /// If part is empty we still need to initialize block by empty columns.
@ -84,10 +76,7 @@ void MergeTreeDataPartWriterInMemory::finishDataSerialization(IMergeTreeDataPart
for (const auto & column : columns_list) for (const auto & column : columns_list)
part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name}); part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name});
SipHash hash; checksums.files["data.bin"] = part_in_memory->calculateBlockChecksum();
for (const auto & column : part_in_memory->block)
column.column->updateHashFast(hash);
checksums.files["data.bin"] = createUncompressedChecksum(part_in_memory->block.bytes(), hash);
} }
} }

View File

@ -53,7 +53,7 @@ void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_na
auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes; auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes) if (out->count() > max_wal_bytes)
rotate(); rotate(lock);
} }
void MergeTreeWriteAheadLog::dropPart(const String & part_name) void MergeTreeWriteAheadLog::dropPart(const String & part_name)
@ -65,7 +65,7 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name)
writeStringBinary(part_name, *out); writeStringBinary(part_name, *out);
} }
void MergeTreeWriteAheadLog::rotate() void MergeTreeWriteAheadLog::rotate(const std::lock_guard<std::mutex> &)
{ {
String new_name = String(WAL_FILE_NAME) + "_" String new_name = String(WAL_FILE_NAME) + "_"
+ toString(min_block_number) + "_" + toString(min_block_number) + "_"
@ -138,8 +138,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
/// But if it contains any part rotate and save them. /// But if it contains any part rotate and save them.
if (max_block_number == -1) if (max_block_number == -1)
disk->remove(path); disk->remove(path);
else if (name == DEFAULT_WAL_FILE) else if (name == DEFAULT_WAL_FILE_NAME)
rotate(); rotate(lock);
break; break;
} }

View File

@ -10,6 +10,13 @@ namespace DB
class MergeTreeData; class MergeTreeData;
/** WAL stores addditions and removals of data parts in in-memory format.
* Format of data in WAL:
* - version
* - type of action (ADD or DROP)
* - part name
* - part's block in Native format. (for ADD action)
*/
class MergeTreeWriteAheadLog class MergeTreeWriteAheadLog
{ {
public: public:
@ -22,10 +29,10 @@ public:
constexpr static auto WAL_FILE_NAME = "wal"; constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin"; constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE = "wal.bin"; constexpr static auto DEFAULT_WAL_FILE_NAME = "wal.bin";
MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_, MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_,
const String & name = DEFAULT_WAL_FILE); const String & name = DEFAULT_WAL_FILE_NAME);
void addPart(const Block & block, const String & part_name); void addPart(const Block & block, const String & part_name);
void dropPart(const String & part_name); void dropPart(const String & part_name);
@ -36,7 +43,7 @@ public:
private: private:
void init(); void init();
void rotate(); void rotate(const std::lock_guard<std::mutex> & lock);
const MergeTreeData & storage; const MergeTreeData & storage;
DiskPtr disk; DiskPtr disk;

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h> #include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/checkDataPart.h> #include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h> #include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <IO/HashingReadBuffer.h> #include <IO/HashingReadBuffer.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -161,13 +162,21 @@ IMergeTreeDataPart::Checksums checkDataPart(
return checksums_data; return checksums_data;
} }
IMergeTreeDataPart::Checksums checkDataPartInMemory(const DataPartInMemoryPtr & data_part)
{
IMergeTreeDataPart::Checksums data_checksums;
data_checksums.files["data.bin"] = data_part->calculateBlockChecksum();
data_part->checksums.checkEqual(data_checksums, true);
return data_checksums;
}
IMergeTreeDataPart::Checksums checkDataPart( IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part, MergeTreeData::DataPartPtr data_part,
bool require_checksums, bool require_checksums,
std::function<bool()> is_cancelled) std::function<bool()> is_cancelled)
{ {
if (!data_part->isStoredOnDisk()) if (auto part_in_memory = asInMemoryPart(data_part))
return data_part->checksums; return checkDataPartInMemory(part_in_memory);
return checkDataPart( return checkDataPart(
data_part->volume->getDisk(), data_part->volume->getDisk(),