From 347d2a328f480850076475eb1a1cab8aa956e933 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 17 Mar 2020 18:10:56 +0300 Subject: [PATCH] Remove columns lock!!! --- dbms/src/Storages/MergeTree/DataPartsExchange.cpp | 2 -- dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp | 2 -- dbms/src/Storages/MergeTree/IMergeTreeDataPart.h | 7 +------ dbms/src/Storages/MergeTree/MergeList.cpp | 2 -- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 4 ---- dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 4 ++-- dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp | 2 -- dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp | 2 -- dbms/src/Storages/MergeTree/MergeTreeReadPool.h | 1 - .../Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp | 2 -- .../Storages/MergeTree/MergeTreeReverseSelectProcessor.h | 2 -- dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp | 2 -- dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.h | 2 -- .../MergeTree/MergeTreeSequentialBlockInputStream.cpp | 2 -- .../MergeTree/MergeTreeSequentialBlockInputStream.h | 3 --- dbms/src/Storages/StorageMergeTree.cpp | 2 -- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 7 +------ dbms/src/Storages/System/StorageSystemParts.cpp | 6 +----- 18 files changed, 5 insertions(+), 49 deletions(-) diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index b2d0b4fe8f2..6373c85a15d 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -89,8 +89,6 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo MergeTreeData::DataPartPtr part = findPart(part_name); - std::shared_lock part_lock(part->columns_lock); - CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend}; /// We'll take a list of files from the list of checksums. diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp index b5c8f16b7e5..8bb92c00a75 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -739,8 +739,6 @@ void IMergeTreeDataPart::remove() const # pragma GCC diagnostic push # pragma GCC diagnostic ignored "-Wunused-variable" #endif - std::shared_lock lock(columns_lock); - /// TODO: IDisk doesn't support `unlink()` and `rmdir()` functionality. auto to = fullPath(disk, to_); diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h index 225a862d6da..0a77e32a535 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -93,7 +93,7 @@ public: /// NOTE: Returns zeros if column files are not found in checksums. /// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes. - /// (either by locking columns_lock, or by locking table structure). + /// (by locking table structure). virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; } virtual ColumnSize getTotalColumnsSize() const { return {}; } @@ -276,11 +276,6 @@ public: /// Columns with values, that all have been zeroed by expired ttl NameSet expired_columns; - /** It is blocked for writing when changing columns, checksums or any part files. - * Locked to read when reading columns, checksums or any part files. - */ - mutable std::shared_mutex columns_lock; - /// For data in RAM ('index') UInt64 getIndexSizeInBytes() const; UInt64 getIndexSizeInAllocatedBytes() const; diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index 5db4377c2ab..2100a7d4e57 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -27,8 +27,6 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str source_part_names.emplace_back(source_part->name); source_part_paths.emplace_back(source_part->getFullPath()); - std::shared_lock part_lock(source_part->columns_lock); - total_size_bytes_compressed += source_part->bytes_on_disk; total_size_marks += source_part->getMarksCount(); total_rows_count += source_part->index_granularity.getTotalRows(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 99ac64194b7..32392d91a92 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2421,8 +2421,6 @@ void MergeTreeData::calculateColumnSizesImpl() void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part) { - std::shared_lock lock(part->columns_lock); - for (const auto & column : part->getColumns()) { ColumnSize & total_column_size = column_sizes[column.name]; @@ -2433,8 +2431,6 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part) void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part) { - std::shared_lock lock(part->columns_lock); - for (const auto & column : part->getColumns()) { ColumnSize & total_column_size = column_sizes[column.name]; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index b0c1ec4cb19..da1182d8d16 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -625,7 +625,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex - /// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock + /// (which is locked in data.getTotalActiveSizeInBytes()) /// (which is locked in shared mode when input streams are created) and when inserting new data /// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus /// deadlock is impossible. @@ -1028,7 +1028,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor String new_part_tmp_path = new_data_part->getFullPath(); /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex - /// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock + /// (which is locked in data.getTotalActiveSizeInBytes()) /// (which is locked in shared mode when input streams are created) and when inserting new data /// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus /// deadlock is impossible. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index 769f15c743b..6ee51991fad 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -159,8 +159,6 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide() void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size) const { - std::shared_lock part_lock(columns_lock); - for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical()) { IDataType::SubstreamPath path; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp index 1f273ecf71c..8efa7bd31fe 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -199,8 +199,6 @@ std::vector MergeTreeReadPool::fillPerPartInfo( per_part_sum_marks.push_back(sum_marks); - per_part_columns_lock.emplace_back(part.data_part, part.data_part->columns_lock); - auto [required_columns, required_pre_columns, should_reorder] = getReadTaskColumns(data, part.data_part, column_names, prewhere_info, check_columns); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.h b/dbms/src/Storages/MergeTree/MergeTreeReadPool.h index 68b99a8fcc5..f639a6a4905 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.h @@ -94,7 +94,6 @@ private: const size_t threads, const size_t sum_marks, std::vector per_part_sum_marks, RangesInDataParts & parts, const size_t min_marks_for_concurrent_read); - std::vector>> per_part_columns_lock; const MergeTreeData & data; Names column_names; bool do_not_steal_tasks; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index 47b68aa1b7f..265dba0e6fe 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -54,7 +54,6 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, - part_columns_lock(data_part->columns_lock), all_mark_ranges(std::move(mark_ranges_)), part_index_in_query(part_index_in_query_), path(data_part->getFullRelativePath()) @@ -170,7 +169,6 @@ void MergeTreeReverseSelectProcessor::finish() */ reader.reset(); pre_reader.reset(); - part_columns_lock.unlock(); data_part.reset(); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index a2ea4bb8960..211bf9701cf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -57,8 +57,6 @@ private: /// Data part will not be removed if the pointer owns it MergeTreeData::DataPartPtr data_part; - /// Forbids to change columns list of the part during reading - std::shared_lock part_columns_lock; /// Mark ranges we should read (in ascending order) MarkRanges all_mark_ranges; diff --git a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 98d1bb1eaa2..def01b192d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -34,7 +34,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, - part_columns_lock(data_part->columns_lock), all_mark_ranges(std::move(mark_ranges_)), part_index_in_query(part_index_in_query_), check_columns(check_columns_), @@ -119,7 +118,6 @@ void MergeTreeSelectProcessor::finish() */ reader.reset(); pre_reader.reset(); - part_columns_lock.unlock(); data_part.reset(); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.h index a4e9dfcab1c..4c64bfb6a18 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -55,8 +55,6 @@ private: /// Data part will not be removed if the pointer owns it MergeTreeData::DataPartPtr data_part; - /// Forbids to change columns list of the part during reading - std::shared_lock part_columns_lock; /// Mark ranges we should read (in ascending order) MarkRanges all_mark_ranges; diff --git a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp index 79432cfbed7..9e0737810df 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp @@ -17,7 +17,6 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream( bool quiet) : storage(storage_) , data_part(data_part_) - , part_columns_lock(data_part->columns_lock) , columns_to_read(columns_to_read_) , read_with_direct_io(read_with_direct_io_) , mark_cache(storage.global_context.getMarkCache()) @@ -153,7 +152,6 @@ void MergeTreeSequentialBlockInputStream::finish() * buffers don't waste memory. */ reader.reset(); - part_columns_lock.unlock(); data_part.reset(); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.h b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.h index 35d6b4dfd27..fd57a39fd7a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.h @@ -46,9 +46,6 @@ private: /// Data part will not be removed if the pointer owns it MergeTreeData::DataPartPtr data_part; - /// Forbids to change columns list of the part during reading - std::shared_lock part_columns_lock; - /// Columns we have to read (each Block from read will contain them) Names columns_to_read; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index cf2bcc9edfa..15f0845e343 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1093,7 +1093,6 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); - std::shared_lock part_lock(src_part->columns_lock); dst_parts.emplace_back(cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info)); } @@ -1175,7 +1174,6 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); - std::shared_lock part_lock(src_part->columns_lock); dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info)); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index b9754ec1a2d..7bea30c1819 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1593,11 +1593,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) continue; } - String checksum_hex; - { - std::shared_lock part_lock(src_part->columns_lock); - checksum_hex = src_part->checksums.getTotalChecksumHex(); - } + String checksum_hex = src_part->checksums.getTotalChecksumHex(); if (checksum_hex != part_desc->checksum_hex) { @@ -1707,7 +1703,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) { if (part_desc->src_table_part) { - std::shared_lock part_lock(part_desc->src_table_part->columns_lock); if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex()) throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED); diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index 4c2e0477281..17c07ee88f2 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -118,11 +118,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto columns_[i++]->insert(part->stateString()); MinimalisticDataPartChecksums helper; - { - /// TODO:IMergeTreeDataPart structure is too error-prone. - std::shared_lock lock(part->columns_lock); - helper.computeTotalChecksums(part->checksums); - } + helper.computeTotalChecksums(part->checksums); auto checksum = helper.hash_of_all_files; columns_[i++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));