Remove columns lock!!!

This commit is contained in:
alesapin 2020-03-17 18:10:56 +03:00
parent d5636fb76b
commit 347d2a328f
18 changed files with 5 additions and 49 deletions

View File

@ -89,8 +89,6 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPartPtr part = findPart(part_name);
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
/// We'll take a list of files from the list of checksums.

View File

@ -739,8 +739,6 @@ void IMergeTreeDataPart::remove() const
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
std::shared_lock<std::shared_mutex> lock(columns_lock);
/// TODO: IDisk doesn't support `unlink()` and `rmdir()` functionality.
auto to = fullPath(disk, to_);

View File

@ -93,7 +93,7 @@ public:
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
/// (by locking table structure).
virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; }
virtual ColumnSize getTotalColumnsSize() const { return {}; }
@ -276,11 +276,6 @@ public:
/// Columns with values, that all have been zeroed by expired ttl
NameSet expired_columns;
/** It is blocked for writing when changing columns, checksums or any part files.
* Locked to read when reading columns, checksums or any part files.
*/
mutable std::shared_mutex columns_lock;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;

View File

@ -27,8 +27,6 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());
std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
total_size_bytes_compressed += source_part->bytes_on_disk;
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();

View File

@ -2421,8 +2421,6 @@ void MergeTreeData::calculateColumnSizesImpl()
void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
for (const auto & column : part->getColumns())
{
ColumnSize & total_column_size = column_sizes[column.name];
@ -2433,8 +2431,6 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
{
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
for (const auto & column : part->getColumns())
{
ColumnSize & total_column_size = column_sizes[column.name];

View File

@ -625,7 +625,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
@ -1028,7 +1028,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
String new_part_tmp_path = new_data_part->getFullPath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.

View File

@ -159,8 +159,6 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
{
IDataType::SubstreamPath path;

View File

@ -199,8 +199,6 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
per_part_columns_lock.emplace_back(part.data_part, part.data_part->columns_lock);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, part.data_part, column_names, prewhere_info, check_columns);

View File

@ -94,7 +94,6 @@ private:
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
std::vector<std::pair<MergeTreeData::DataPartPtr, std::shared_lock<std::shared_mutex>>> per_part_columns_lock;
const MergeTreeData & data;
Names column_names;
bool do_not_steal_tasks;

View File

@ -54,7 +54,6 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
path(data_part->getFullRelativePath())
@ -170,7 +169,6 @@ void MergeTreeReverseSelectProcessor::finish()
*/
reader.reset();
pre_reader.reset();
part_columns_lock.unlock();
data_part.reset();
}

View File

@ -57,8 +57,6 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Forbids to change columns list of the part during reading
std::shared_lock<std::shared_mutex> part_columns_lock;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;

View File

@ -34,7 +34,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
check_columns(check_columns_),
@ -119,7 +118,6 @@ void MergeTreeSelectProcessor::finish()
*/
reader.reset();
pre_reader.reset();
part_columns_lock.unlock();
data_part.reset();
}

View File

@ -55,8 +55,6 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Forbids to change columns list of the part during reading
std::shared_lock<std::shared_mutex> part_columns_lock;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;

View File

@ -17,7 +17,6 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
bool quiet)
: storage(storage_)
, data_part(data_part_)
, part_columns_lock(data_part->columns_lock)
, columns_to_read(columns_to_read_)
, read_with_direct_io(read_with_direct_io_)
, mark_cache(storage.global_context.getMarkCache())
@ -153,7 +152,6 @@ void MergeTreeSequentialBlockInputStream::finish()
* buffers don't waste memory.
*/
reader.reset();
part_columns_lock.unlock();
data_part.reset();
}

View File

@ -46,9 +46,6 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Forbids to change columns list of the part during reading
std::shared_lock<std::shared_mutex> part_columns_lock;
/// Columns we have to read (each Block from read will contain them)
Names columns_to_read;

View File

@ -1093,7 +1093,6 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
dst_parts.emplace_back(cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info));
}
@ -1175,7 +1174,6 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info));
}

View File

@ -1593,11 +1593,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
continue;
}
String checksum_hex;
{
std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
checksum_hex = src_part->checksums.getTotalChecksumHex();
}
String checksum_hex = src_part->checksums.getTotalChecksumHex();
if (checksum_hex != part_desc->checksum_hex)
{
@ -1707,7 +1703,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
if (part_desc->src_table_part)
{
std::shared_lock<std::shared_mutex> part_lock(part_desc->src_table_part->columns_lock);
if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);

View File

@ -118,11 +118,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(part->stateString());
MinimalisticDataPartChecksums helper;
{
/// TODO:IMergeTreeDataPart structure is too error-prone.
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
helper.computeTotalChecksums(part->checksums);
}
auto checksum = helper.hash_of_all_files;
columns_[i++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));