in-memory parts: partition commands

This commit is contained in:
Anton Popov 2020-05-29 18:02:12 +03:00
parent 4769ce7271
commit c919840722
10 changed files with 162 additions and 41 deletions

View File

@ -794,15 +794,11 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
void IMergeTreeDataPart::renameToDetached(const String & prefix) const
{
assertOnDisk();
renameTo(getRelativePathForDetachedPart(prefix));
}
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
assertOnDisk();
LOG_INFO(storage.log, "Detaching " << relative_path);
String destination_path = storage.relative_data_path + getRelativePathForDetachedPart(prefix);
/// Backup is not recursive (max_level is 0), so do not copy inner directories

View File

@ -287,9 +287,10 @@ public:
size_t getFileSizeOrZero(const String & file_name) const;
String getFullRelativePath() const;
String getFullPath() const;
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists = false) const;
void renameToDetached(const String & prefix) const;
void makeCloneInDetached(const String & prefix) const;
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists = false) const;
virtual void makeCloneInDetached(const String & prefix) const;
/// Makes full clone of part in detached/ on another disk
void makeCloneOnDiskDetached(const ReservationPtr & reservation) const;
@ -324,6 +325,8 @@ protected:
/// disk using columns and checksums.
virtual void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0;
String getRelativePathForDetachedPart(const String & prefix) const;
private:
/// In compact parts order of columns is necessary
NameToPosition column_name_to_position;
@ -348,8 +351,6 @@ private:
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();
String getRelativePathForDetachedPart(const String & prefix) const;
};
using MergeTreeDataPartState = IMergeTreeDataPart::State;

View File

@ -896,11 +896,17 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
part_names_with_disks.emplace_back(it->name(), disk_ptr);
if (startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
/// Create and correctly initialize global WAL object, if it's needed
if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE && settings->in_memory_parts_enable_wal)
{
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore())
parts_from_wal.push_back(std::move(part));
}
else if (startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
auto current_parts = wal.restore();
for (auto & part : current_parts)
for (auto && part : wal.restore())
parts_from_wal.push_back(std::move(part));
}
}
@ -1120,7 +1126,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
if (settings->in_memory_parts_enable_wal)
if (settings->in_memory_parts_enable_wal && !write_ahead_log)
{
auto disk = makeEmptyReservationOnLargestDisk()->getDisk();
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, std::move(disk));
@ -1976,7 +1982,7 @@ void MergeTreeData::renameTempPartAndReplace(
if (part_in_memory && getSettings()->in_memory_parts_enable_wal)
{
auto wal = getWriteAheadLog();
wal->write(part_in_memory->block, part_in_memory->name);
wal->addPart(part_in_memory->block, part_in_memory->name);
}
if (out_covered_parts)

View File

@ -1,9 +1,11 @@
#include "MergeTreeDataPartInMemory.h"
#include <Storages/MergeTree/MergeTreeReaderInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
namespace DB
{
@ -58,6 +60,36 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
return std::make_unique<MergeTreeDataPartWriterInMemory>(ptr, columns_list, writer_settings);
}
void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix) const
{
String detached_path = getRelativePathForDetachedPart(prefix);
String destination_path = storage.getRelativeDataPath() + getRelativePathForDetachedPart(prefix);
auto new_type = storage.choosePartTypeOnDisk(block.bytes(), rows_count);
auto new_data_part = storage.createPart(name, new_type, info, disk, detached_path);
new_data_part->setColumns(columns);
new_data_part->partition.value.assign(partition.value);
new_data_part->minmax_idx = minmax_idx;
if (disk->exists(destination_path))
{
LOG_WARNING(&Logger::get(storage.getLogName()), "Removing old temporary directory " + disk->getPath() + destination_path);
disk->removeRecursive(destination_path);
}
disk->createDirectories(destination_path);
auto compression_codec = storage.global_context.chooseCompressionCodec(0, 0);
MergedBlockOutputStream out(new_data_part, columns, storage.skip_indices, compression_codec);
out.writePrefix();
out.write(block);
out.writeSuffixAndFinalizePart(new_data_part);
if (storage.getSettings()->in_memory_parts_enable_wal)
storage.getWriteAheadLog()->dropPart(name);
}
bool MergeTreeDataPartInMemory::waitUntilMerged(size_t timeout) const
{
auto lock = storage.lockParts();

View File

@ -38,12 +38,10 @@ public:
const MergeTreeIndexGranularity & computed_index_granularity) const override;
bool isStoredOnDisk() const override { return false; }
bool hasColumnFiles(const String & column_name, const IDataType & /* type */) const override { return !!getColumnPosition(column_name); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & /*new_relative_path*/, bool /*remove_new_dir_if_exists*/) const override {}
void makeCloneInDetached(const String & prefix) const override;
bool waitUntilMerged(size_t timeout) const override;
void notifyMerged() const override;

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT_VERSION;
extern const int CANNOT_READ_ALL_DATA;
extern const int BAD_DATA_PART_NAME;
extern const int CORRUPTED_DATA;
}
@ -36,7 +37,7 @@ void MergeTreeWriteAheadLog::init()
max_block_number = -1;
}
void MergeTreeWriteAheadLog::write(const Block & block, const String & part_name)
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
{
std::lock_guard lock(write_mutex);
@ -45,6 +46,7 @@ void MergeTreeWriteAheadLog::write(const Block & block, const String & part_name
max_block_number = std::max(max_block_number, part_info.max_block);
writeIntBinary(static_cast<UInt8>(0), *out); /// version
writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
writeStringBinary(part_name, *out);
block_out->write(block);
block_out->flush();
@ -54,6 +56,15 @@ void MergeTreeWriteAheadLog::write(const Block & block, const String & part_name
rotate();
}
void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::lock_guard lock(write_mutex);
writeIntBinary(static_cast<UInt8>(0), *out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
}
void MergeTreeWriteAheadLog::rotate()
{
String new_name = String(WAL_FILE_NAME) + "_"
@ -68,9 +79,10 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
{
std::lock_guard lock(write_mutex);
MergeTreeData::MutableDataPartsVector result;
MergeTreeData::MutableDataPartsVector parts;
auto in = disk->readFile(path, DBMS_DEFAULT_BUFFER_SIZE);
NativeBlockInputStream block_in(*in, 0);
NameSet dropped_parts;
while (!in->eof())
{
@ -78,6 +90,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
UInt8 version;
String part_name;
Block block;
ActionType action_type;
try
{
@ -85,22 +98,35 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
if (version != 0)
throw Exception("Unknown WAL format version: " + toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
readIntBinary(action_type, *in);
readStringBinary(part_name, *in);
part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
MergeTreePartInfo::fromPartName(part_name, storage.format_version),
storage.reserveSpace(0)->getDisk(),
part_name);
if (action_type == ActionType::DROP_PART)
{
dropped_parts.insert(part_name);
}
else if (action_type == ActionType::ADD_PART)
{
part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
MergeTreePartInfo::fromPartName(part_name, storage.format_version),
storage.reserveSpace(0)->getDisk(),
part_name);
block = block_in.read();
block = block_in.read();
}
else
{
throw Exception("Unknown action type: " + toString(static_cast<UInt8>(action_type)), ErrorCodes::CORRUPTED_DATA);
}
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME)
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME
|| e.code() == ErrorCodes::CORRUPTED_DATA)
{
LOG_WARNING(&Logger::get(storage.getLogName() + " (WriteAheadLog)"),
"WAL file '" << path << "' is broken. " << e.displayText());
@ -117,23 +143,30 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
throw;
}
MergedBlockOutputStream part_out(part, block.getNamesAndTypesList(), {}, nullptr);
if (action_type == ActionType::ADD_PART)
{
MergedBlockOutputStream part_out(part, block.getNamesAndTypesList(), {}, nullptr);
part->minmax_idx.update(block, storage.minmax_idx_columns);
if (storage.partition_key_expr)
part->partition.create(storage, block, 0);
if (storage.hasSortingKey())
storage.sorting_key_expr->execute(block);
part->minmax_idx.update(block, storage.minmax_idx_columns);
if (storage.partition_key_expr)
part->partition.create(storage, block, 0);
if (storage.hasSortingKey())
storage.sorting_key_expr->execute(block);
part_out.writePrefix();
part_out.write(block);
part_out.writeSuffixAndFinalizePart(part);
part_out.writePrefix();
part_out.write(block);
part_out.writeSuffixAndFinalizePart(part);
min_block_number = std::min(min_block_number, part->info.min_block);
max_block_number = std::max(max_block_number, part->info.max_block);
result.push_back(std::move(part));
min_block_number = std::min(min_block_number, part->info.min_block);
max_block_number = std::max(max_block_number, part->info.max_block);
parts.push_back(std::move(part));
}
}
MergeTreeData::MutableDataPartsVector result;
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
return result;
}

View File

@ -13,6 +13,13 @@ class MergeTreeData;
class MergeTreeWriteAheadLog
{
public:
/// Append-only enum. It is serialized to WAL
enum class ActionType : UInt8
{
ADD_PART = 0,
DROP_PART = 1,
};
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE = "wal.bin";
@ -20,7 +27,8 @@ public:
MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_,
const String & name = DEFAULT_WAL_FILE);
void write(const Block & block, const String & part_name);
void addPart(const Block & block, const String & part_name);
void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore();
using MinMaxBlockNumber = std::pair<Int64, Int64>;

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS in_memory;
CREATE TABLE in_memory (a UInt32, b UInt32)
ENGINE = MergeTree ORDER BY a
SETTINGS min_rows_for_compact_part = 1000;
SETTINGS min_rows_for_compact_part = 1000, min_rows_for_compact_part = 1000;
INSERT INTO in_memory SELECT number, number % 3 FROM numbers(100);
SELECT DISTINCT part_type, marks FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory' AND active;

View File

@ -0,0 +1,22 @@
1 2 foo
1 3 bar
2 4 aa
2 5 bb
3 6 qq
3 7 ww
==================
2 4 aa
2 5 bb
3 6 qq
3 7 ww
==================
3 6 qq
3 7 ww
==================
2 4 aa
2 5 bb
3 6 qq
3 7 ww
2_4_4_0 Compact
3_3_3_0 InMemory
==================

View File

@ -0,0 +1,25 @@
DROP TABLE IF EXISTS t2;
CREATE TABLE t2(id UInt32, a UInt64, s String)
ENGINE = MergeTree ORDER BY a PARTITION BY id
SETTINGS min_rows_for_compact_part = 1000, min_rows_for_wide_part = 2000;
INSERT INTO t2 VALUES (1, 2, 'foo'), (1, 3, 'bar');
INSERT INTO t2 VALUES (2, 4, 'aa'), (2, 5, 'bb');
INSERT INTO t2 VALUES (3, 6, 'qq'), (3, 7, 'ww');
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
ALTER TABLE t2 DROP PARTITION 1;
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
ALTER TABLE t2 DETACH PARTITION 2;
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
ALTER TABLE t2 ATTACH PARTITION 2;
SELECT * FROM t2 ORDER BY a;
SELECT name, part_type FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '==================';