remove questionable functionality

This commit is contained in:
Anton Popov 2020-06-29 18:46:50 +03:00
parent 9710a67704
commit a43cb93be5
9 changed files with 3 additions and 82 deletions

View File

@ -23,14 +23,12 @@ void MergeTreeBlockOutputStream::write(const Block & block)
storage.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
MergeTreeData::DataPartsVector inserted_parts;
for (auto & current_block : part_blocks)
{
Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
storage.renameTempPartAndAdd(part, &storage.increment);
inserted_parts.push_back(part);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
@ -50,17 +48,6 @@ void MergeTreeBlockOutputStream::write(const Block & block)
storage.merging_mutating_task_handle->signalReadyToRun();
}
}
if (storage.getSettings()->in_memory_parts_insert_sync)
{
for (const auto & part : inserted_parts)
{
auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
}
}

View File

@ -14,11 +14,10 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, size_t in_memory_parts_timeout_)
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, in_memory_parts_timeout(in_memory_parts_timeout_)
{
}
@ -29,7 +28,6 @@ private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
size_t in_memory_parts_timeout;
};
}

View File

@ -98,21 +98,6 @@ void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const
flushToDisk(storage.getRelativeDataPath(), detached_path, metadata_snapshot);
}
bool MergeTreeDataPartInMemory::waitUntilMerged(size_t timeout_ms) const
{
auto lock = storage.lockParts();
return is_merged.wait_for(lock, std::chrono::milliseconds(timeout_ms),
[this]() { return state != State::Committed; });
}
void MergeTreeDataPartInMemory::notifyMerged() const
{
LOG_DEBUG(&Poco::Logger::get("InMemPart"), "notifiedMerged");
LOG_DEBUG(&Poco::Logger::get("InMemPart"), "state {}", stateString());
is_merged.notify_all();
}
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) const
{
relative_path = new_relative_path;

View File

@ -46,9 +46,6 @@ public:
void flushToDisk(const String & base_path, const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;
bool waitUntilMerged(size_t timeout_ms) const;
void notifyMerged() const;
/// Returns hash of parts's block
Checksum calculateBlockChecksum() const;

View File

@ -34,7 +34,6 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_bytes_for_compact_part, 0, "Minimal uncompressed size in bytes to create part in compact format instead of saving it in RAM", 0) \
M(SettingUInt64, min_rows_for_compact_part, 0, "Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \
M(SettingBool, in_memory_parts_enable_wal, true, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \
M(SettingBool, in_memory_parts_insert_sync, false, "If true and in-memory parts are enabled, insert will wait while part will persist on disk in result of merge", 0) \
M(SettingUInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Rotate WAL, if it exceeds that amount of bytes", 0) \
\
/** Merge settings. */ \

View File

@ -36,14 +36,12 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
size_t quorum_,
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
size_t insert_in_memory_parts_timeout_ms_,
bool deduplicate_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
, quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_)
, insert_in_memory_parts_timeout_ms(insert_in_memory_parts_timeout_ms_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
{
@ -378,14 +376,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
auto part_in_memory = asInMemoryPart(part);
if (part_in_memory && storage.getSettings()->in_memory_parts_insert_sync)
{
if (!part_in_memory->waitUntilMerged(insert_in_memory_parts_timeout_ms))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (quorum)
{
/// We are waiting for quorum to be satisfied.

View File

@ -28,7 +28,6 @@ public:
size_t quorum_,
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
size_t insert_in_memory_parts_timeout_ms_,
bool deduplicate_);
Block getHeader() const override;
@ -64,7 +63,6 @@ private:
size_t quorum;
size_t quorum_timeout_ms;
size_t max_parts_per_block;
size_t insert_in_memory_parts_timeout_ms;
bool deduplicate = true;
bool last_block_is_duplicate = false;

View File

@ -205,8 +205,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto
const auto & settings = context.getSettingsRef();
return std::make_shared<MergeTreeBlockOutputStream>(
*this, metadata_snapshot, settings.max_partitions_per_insert_block,
settings.insert_in_memory_parts_timeout.totalMilliseconds());
*this, metadata_snapshot, settings.max_partitions_per_insert_block);
}
void StorageMergeTree::checkTableCanBeDropped() const
@ -696,23 +695,6 @@ bool StorageMergeTree::merge(
merging_tagger->reserved_space, deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
DataPartsVector parts_to_remove_immediately;
{
auto lock = lockParts();
for (const auto & part : future_part.parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
part_in_memory->notifyMerged();
modifyPartState(part_in_memory, DataPartState::Deleting);
parts_to_remove_immediately.push_back(part_in_memory);
}
}
}
removePartsFinally(parts_to_remove_immediately);
merging_tagger->is_successful = true;
write_part_log({});
}

View File

@ -1371,20 +1371,6 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
throw;
}
DataPartsVector parts_to_remove_immediatly;
for (const auto & part_ptr : parts)
{
if (auto part_in_memory = asInMemoryPart(part_ptr))
{
modifyPartState(part_in_memory, DataPartState::Deleting);
part_in_memory->notifyMerged();
parts_to_remove_immediatly.push_back(part_in_memory);
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_immediatly);
removePartsFinally(parts_to_remove_immediatly);
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
@ -3490,7 +3476,6 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
*this, metadata_snapshot, query_settings.insert_quorum,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_in_memory_parts_timeout.totalMilliseconds(),
deduplicate);
}
@ -4045,7 +4030,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, const
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, 0, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
String old_name = loaded_parts[i]->name;