in-memory parts: sync insert to replicated

This commit is contained in:
Anton Popov 2020-06-15 20:41:44 +03:00
parent 66e31d4311
commit a3ac224ae4
10 changed files with 118 additions and 45 deletions

View File

@ -23,12 +23,14 @@ void MergeTreeBlockOutputStream::write(const Block & block)
storage.delayInsertOrThrowIfNeeded(); storage.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
MergeTreeData::DataPartsVector inserted_parts;
for (auto & current_block : part_blocks) for (auto & current_block : part_blocks)
{ {
Stopwatch watch; Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
storage.renameTempPartAndAdd(part, &storage.increment); storage.renameTempPartAndAdd(part, &storage.increment);
inserted_parts.push_back(part);
PartLog::addNewPart(storage.global_context, part, watch.elapsed()); PartLog::addNewPart(storage.global_context, part, watch.elapsed());
@ -36,20 +38,11 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{ {
storage.in_memory_merges_throttler.add(part_in_memory->block.bytes(), part_in_memory->rows_count); storage.in_memory_merges_throttler.add(part_in_memory->block.bytes(), part_in_memory->rows_count);
auto settings = storage.getSettings(); if (storage.merging_mutating_task_handle && !storage.in_memory_merges_throttler.needDelayMerge())
if (settings->in_memory_parts_insert_sync)
{
if (!part_in_memory->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
else if (storage.merging_mutating_task_handle && !storage.in_memory_merges_throttler.needDelayMerge())
{ {
storage.in_memory_merges_throttler.reset(); storage.in_memory_merges_throttler.reset();
storage.merging_mutating_task_handle->wake(); storage.merging_mutating_task_handle->wake();
} }
continue;
} }
else if (storage.merging_mutating_task_handle) else if (storage.merging_mutating_task_handle)
{ {
@ -57,6 +50,17 @@ void MergeTreeBlockOutputStream::write(const Block & block)
storage.merging_mutating_task_handle->wake(); storage.merging_mutating_task_handle->wake();
} }
} }
if (storage.getSettings()->in_memory_parts_insert_sync)
{
for (const auto & part : inserted_parts)
{
auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
} }
} }

View File

@ -245,7 +245,8 @@ MergeTreeData::MergeTreeData(
String reason; String reason;
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty()) if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_bytes_for_wide_part' and 'min_bytes_for_wide_part' will be ignored.", reason); LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
} }
@ -1592,10 +1593,10 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const
{ {
if (!canUseAdaptiveGranularity()) const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE; return MergeTreeDataPartType::WIDE;
const auto settings = getSettings();
if (bytes_uncompressed < settings->min_bytes_for_compact_part || rows_count < settings->min_rows_for_compact_part) if (bytes_uncompressed < settings->min_bytes_for_compact_part || rows_count < settings->min_rows_for_compact_part)
return MergeTreeDataPartType::IN_MEMORY; return MergeTreeDataPartType::IN_MEMORY;
@ -1607,10 +1608,10 @@ MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, s
MergeTreeDataPartType MergeTreeData::choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const MergeTreeDataPartType MergeTreeData::choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const
{ {
if (!canUseAdaptiveGranularity()) const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE; return MergeTreeDataPartType::WIDE;
const auto settings = getSettings();
if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part) if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT; return MergeTreeDataPartType::COMPACT;
@ -3605,11 +3606,15 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
{ {
if (!canUseAdaptiveGranularity()) if (!canUseAdaptiveGranularity())
{ {
if ((settings.min_rows_for_wide_part != 0 || settings.min_bytes_for_wide_part != 0) && out_reason) if (out_reason && (settings.min_rows_for_wide_part != 0 || settings.min_bytes_for_wide_part != 0
|| settings.min_rows_for_compact_part != 0 || settings.min_bytes_for_compact_part != 0))
{ {
std::ostringstream message; std::ostringstream message;
message << "Table can't create parts with adaptive granularity, but settings min_rows_for_wide_part = " message << "Table can't create parts with adaptive granularity, but settings"
<< settings.min_rows_for_wide_part << ", min_bytes_for_wide_part = " << settings.min_bytes_for_wide_part << "min_rows_for_wide_part = " << settings.min_rows_for_wide_part
<< ", min_bytes_for_wide_part = " << settings.min_bytes_for_wide_part
<< ", min_rows_for_compact_part = " << settings.min_rows_for_compact_part
<< ", min_bytes_for_compact_part = " << settings.min_bytes_for_compact_part
<< ". Parts with non-adaptive granularity can be stored only in Wide (default) format."; << ". Parts with non-adaptive granularity can be stored only in Wide (default) format.";
*out_reason = message.str(); *out_reason = message.str();
} }

View File

@ -901,7 +901,7 @@ private:
/// Check selected parts for movements. Used by ALTER ... MOVE queries. /// Check selected parts for movements. Used by ALTER ... MOVE queries.
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space); CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason) const; bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason = nullptr) const;
WriteAheadLogPtr write_ahead_log; WriteAheadLogPtr write_ahead_log;
}; };

View File

@ -95,16 +95,19 @@ void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix) const
flushToDisk(storage.getRelativeDataPath(), detached_path); flushToDisk(storage.getRelativeDataPath(), detached_path);
} }
bool MergeTreeDataPartInMemory::waitUntilMerged(size_t timeout) const bool MergeTreeDataPartInMemory::waitUntilMerged(size_t timeout_ms) const
{ {
auto lock = storage.lockParts(); auto lock = storage.lockParts();
return is_merged.wait_for(lock, std::chrono::milliseconds(timeout), return is_merged.wait_for(lock, std::chrono::milliseconds(timeout_ms),
[this]() { return state == State::Outdated; }); [this]() { return state != State::Committed; });
} }
void MergeTreeDataPartInMemory::notifyMerged() const void MergeTreeDataPartInMemory::notifyMerged() const
{ {
is_merged.notify_one(); LOG_DEBUG(&Poco::Logger::get("InMemPart"), "notifiedMerged");
LOG_DEBUG(&Poco::Logger::get("InMemPart"), "state {}", stateString());
is_merged.notify_all();
} }
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) const void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) const

View File

@ -45,7 +45,7 @@ public:
void flushToDisk(const String & base_path, const String & new_relative_path) const; void flushToDisk(const String & base_path, const String & new_relative_path) const;
bool waitUntilMerged(size_t timeout) const; bool waitUntilMerged(size_t timeout_ms) const;
void notifyMerged() const; void notifyMerged() const;
mutable Block block; mutable Block block;

View File

@ -121,7 +121,8 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
static bool isPartFormatSetting(const String & name) static bool isPartFormatSetting(const String & name)
{ {
return name == "min_bytes_for_wide_part" || name == "min_rows_for_wide_part"; return name == "min_bytes_for_wide_part" || name == "min_rows_for_wide_part"
|| name == "min_bytes_for_compact_part" || name == "min_rows_for_compact_part";
} }
}; };

View File

@ -31,9 +31,14 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_) StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_,
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_), size_t max_parts_per_block_, size_t insert_in_memory_parts_timeout_ms_, bool deduplicate_)
log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) : storage(storage_), quorum(quorum_)
, quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_)
, insert_in_memory_parts_timeout_ms(insert_in_memory_parts_timeout_ms_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
{ {
/// The quorum value `1` has the same meaning as if it is disabled. /// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1) if (quorum == 1)
@ -365,6 +370,14 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
+ zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
} }
auto part_in_memory = asInMemoryPart(part);
if (part_in_memory && storage.getSettings()->in_memory_parts_insert_sync)
{
if (!part_in_memory->waitUntilMerged(insert_in_memory_parts_timeout_ms))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (quorum) if (quorum)
{ {
/// We are waiting for quorum to be satisfied. /// We are waiting for quorum to be satisfied.

View File

@ -24,7 +24,7 @@ class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
public: public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_,
bool deduplicate_); size_t insert_in_memory_parts_timeout_ms_, bool deduplicate_);
Block getHeader() const override; Block getHeader() const override;
void writePrefix() override; void writePrefix() override;
@ -58,6 +58,7 @@ private:
size_t quorum; size_t quorum;
size_t quorum_timeout_ms; size_t quorum_timeout_ms;
size_t max_parts_per_block; size_t max_parts_per_block;
size_t insert_in_memory_parts_timeout_ms;
bool deduplicate = true; bool deduplicate = true;
bool last_block_is_duplicate = false; bool last_block_is_duplicate = false;

View File

@ -1085,20 +1085,6 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
try try
{ {
checkPartChecksumsAndCommit(transaction, part); checkPartChecksumsAndCommit(transaction, part);
DataPartsVector parts_to_remove_immediatly;
for (const auto & part_ptr : parts)
{
if (auto part_in_memory = asInMemoryPart(part_ptr))
{
part_in_memory->notifyMerged();
modifyPartState(part_in_memory, DataPartState::Deleting);
parts_to_remove_immediatly.push_back(part_in_memory);
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_immediatly);
removePartsFinally(parts_to_remove_immediatly);
} }
catch (const Exception & e) catch (const Exception & e)
{ {
@ -1122,6 +1108,20 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
throw; throw;
} }
DataPartsVector parts_to_remove_immediatly;
for (const auto & part_ptr : parts)
{
if (auto part_in_memory = asInMemoryPart(part_ptr))
{
modifyPartState(part_in_memory, DataPartState::Deleting);
part_in_memory->notifyMerged();
parts_to_remove_immediatly.push_back(part_in_memory);
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_immediatly);
removePartsFinally(parts_to_remove_immediatly);
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. /** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/ */
@ -3138,7 +3138,11 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate; bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this, return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate); query_settings.insert_quorum,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_in_memory_parts_timeout.totalMilliseconds(),
deduplicate);
} }
@ -3662,7 +3666,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
PartsTemporaryRename renamed_parts(*this, "detached/"); PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here. ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, 0, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i) for (size_t i = 0; i < loaded_parts.size(); ++i)
{ {
String old_name = loaded_parts[i]->name; String old_name = loaded_parts[i]->name;

View File

@ -9,6 +9,7 @@ from helpers.test_tools import TSV
from helpers.test_tools import assert_eq_with_retry from helpers.test_tools import assert_eq_with_retry
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager from helpers.network import PartitionManager
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -90,6 +91,8 @@ def start_cluster():
create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4") create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
create_tables('wal_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard4") create_tables('wal_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard4")
create_tables('restore_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard5") create_tables('restore_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard5")
create_tables('deduplication_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard5")
create_tables('sync_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard5")
yield cluster yield cluster
@ -422,6 +425,45 @@ def test_in_memory_wal_rotate(start_cluster):
assert os.path.exists(wal_file) assert os.path.exists(wal_file)
assert os.path.getsize(wal_file) == 0 assert os.path.getsize(wal_file) == 0
def test_in_memory_deduplication(start_cluster):
for i in range(3):
node9.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
node10.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
node9.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
node10.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
assert node9.query("SELECT date, id, s FROM deduplication_table") == "2020-03-03\t1\tfoo\n"
assert node10.query("SELECT date, id, s FROM deduplication_table") == "2020-03-03\t1\tfoo\n"
def test_in_memory_sync_insert(start_cluster):
node9.query("ALTER TABLE sync_table MODIFY SETTING in_memory_parts_insert_sync = 1")
node10.query("ALTER TABLE sync_table MODIFY SETTING in_memory_parts_insert_sync = 1")
node9.query("SYSTEM STOP MERGES sync_table")
node10.query("SYSTEM STOP MERGES sync_table")
pool = Pool(5)
tasks = []
for i in range(5):
tasks.append(pool.apply_async(insert_random_data, ('sync_table', node9, 50)))
time.sleep(5)
assert node9.query("SELECT count() FROM sync_table") == "250\n"
assert node9.query("SELECT part_type, count() FROM system.parts WHERE table = 'sync_table' AND active GROUP BY part_type") == "InMemory\t5\n"
for task in tasks:
assert not task.ready()
node9.query("SYSTEM START MERGES sync_table")
node10.query("SYSTEM START MERGES sync_table")
assert_eq_with_retry(node9, "OPTIMIZE TABLE sync_table FINAL SETTINGS optimize_throw_if_noop = 1", "")
for task in tasks:
task.get()
assert node9.query("SELECT count() FROM sync_table") == "250\n"
assert node9.query("SELECT part_type, count() FROM system.parts WHERE table = 'sync_table' AND active GROUP BY part_type") == "Compact\t1\n"
def test_polymorphic_parts_index(start_cluster): def test_polymorphic_parts_index(start_cluster):
node1.query(''' node1.query('''
CREATE TABLE index_compact(a UInt32, s String) CREATE TABLE index_compact(a UInt32, s String)