From 13529265c45ee0b2e526ecd4d6f0e0fc797a2fb5 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Fri, 25 Sep 2020 02:28:57 +0300 Subject: [PATCH] Revert "Avoid deadlocks in Log/TinyLog" --- src/Common/FileChecker.cpp | 5 -- src/Common/FileChecker.h | 4 +- src/Storages/StorageFile.cpp | 40 +++------ src/Storages/StorageFile.h | 2 +- src/Storages/StorageLog.cpp | 60 ++++--------- src/Storages/StorageLog.h | 4 +- src/Storages/StorageStripeLog.cpp | 45 +++------- src/Storages/StorageStripeLog.h | 2 +- src/Storages/StorageTinyLog.cpp | 87 ++++++------------- src/Storages/StorageTinyLog.h | 4 +- .../0_stateless/01499_log_deadlock.reference | 3 - .../0_stateless/01499_log_deadlock.sql | 26 ------ ...2_long_log_tinylog_deadlock_race.reference | 6 -- .../01502_long_log_tinylog_deadlock_race.sh | 85 ------------------ .../01505_log_distributed_deadlock.reference | 0 .../01505_log_distributed_deadlock.sql | 12 --- .../queries/0_stateless/arcadia_skip_list.txt | 1 - 17 files changed, 79 insertions(+), 307 deletions(-) delete mode 100644 tests/queries/0_stateless/01499_log_deadlock.reference delete mode 100644 tests/queries/0_stateless/01499_log_deadlock.sql delete mode 100644 tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference delete mode 100755 tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh delete mode 100644 tests/queries/0_stateless/01505_log_distributed_deadlock.reference delete mode 100644 tests/queries/0_stateless/01505_log_distributed_deadlock.sql diff --git a/src/Common/FileChecker.cpp b/src/Common/FileChecker.cpp index b306c3af990..6cbec3bda77 100644 --- a/src/Common/FileChecker.cpp +++ b/src/Common/FileChecker.cpp @@ -41,11 +41,6 @@ void FileChecker::setEmpty(const String & full_file_path) map[fileName(full_file_path)] = 0; } -FileChecker::Map FileChecker::getFileSizes() const -{ - return map; -} - CheckResults FileChecker::check() const { // Read the files again every time you call `check` - so as not to violate the constancy. diff --git a/src/Common/FileChecker.h b/src/Common/FileChecker.h index 59e7331952e..015d4cadb07 100644 --- a/src/Common/FileChecker.h +++ b/src/Common/FileChecker.h @@ -27,12 +27,10 @@ public: /// The purpose of this function is to rollback a group of unfinished writes. void repair(); +private: /// File name -> size. using Map = std::map; - Map getFileSizes() const; - -private: void initialize(); void updateImpl(const String & file_path); void load(Map & local_map, const String & path) const; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 7b094f9bc06..cc47047dc78 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -52,7 +52,6 @@ namespace ErrorCodes extern const int UNKNOWN_IDENTIFIER; extern const int INCORRECT_FILE_NAME; extern const int FILE_DOESNT_EXIST; - extern const int TIMEOUT_EXCEEDED; } namespace @@ -200,17 +199,6 @@ StorageFile::StorageFile(CommonArguments args) setInMemoryMetadata(storage_metadata); } - -static std::chrono::seconds getLockTimeout(const Context & context) -{ - const Settings & settings = context.getSettingsRef(); - Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); - if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) - lock_timeout = settings.max_execution_time.totalSeconds(); - return std::chrono::seconds{lock_timeout}; -} - - class StorageFileSource : public SourceWithProgress { public: @@ -257,9 +245,7 @@ public: { if (storage->use_table_fd) { - unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context)); - if (!unique_lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + unique_lock = std::unique_lock(storage->rwlock); /// We could use common ReadBuffer and WriteBuffer in storage to leverage cache /// and add ability to seek unseekable files, but cache sync isn't supported. @@ -278,9 +264,7 @@ public: } else { - shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context)); - if (!shared_lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + shared_lock = std::shared_lock(storage->rwlock); } } @@ -389,8 +373,8 @@ private: bool finished_generate = false; - std::shared_lock shared_lock; - std::unique_lock unique_lock; + std::shared_lock shared_lock; + std::unique_lock unique_lock; }; @@ -433,7 +417,7 @@ Pipe StorageFile::read( for (size_t i = 0; i < num_streams; ++i) pipes.emplace_back(std::make_shared( - this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults())); + this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults())); return Pipe::unitePipes(std::move(pipes)); } @@ -445,16 +429,12 @@ public: explicit StorageFileBlockOutputStream( StorageFile & storage_, const StorageMetadataPtr & metadata_snapshot_, - std::unique_lock && lock_, const CompressionMethod compression_method, const Context & context) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(std::move(lock_)) + , lock(storage.rwlock) { - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - std::unique_ptr naked_buffer = nullptr; if (storage.use_table_fd) { @@ -508,7 +488,7 @@ public: private: StorageFile & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; std::unique_ptr write_buf; BlockOutputStreamPtr writer; bool prefix_written{false}; @@ -526,7 +506,7 @@ BlockOutputStreamPtr StorageFile::write( if (!paths.empty()) path = paths[0]; - return std::make_shared(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}, + return std::make_shared(*this, metadata_snapshot, chooseCompressionMethod(path, compression_method), context); } @@ -549,6 +529,8 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID if (path_new == paths[0]) return; + std::unique_lock lock(rwlock); + Poco::File(Poco::Path(path_new).parent()).createDirectories(); Poco::File(paths[0]).renameTo(path_new); @@ -565,6 +547,8 @@ void StorageFile::truncate( if (paths.size() != 1) throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED); + std::unique_lock lock(rwlock); + if (use_table_fd) { if (0 != ::ftruncate(table_fd, 0)) diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index babc56e3a11..ea70dcd5311 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -89,7 +89,7 @@ private: std::atomic table_fd_was_used{false}; /// To detect repeating reads from stdin off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads - mutable std::shared_timed_mutex rwlock; + mutable std::shared_mutex rwlock; Poco::Logger * log = &Poco::Logger::get("StorageFile"); }; diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 2fbce21655c..e437bfb05f1 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -39,7 +39,6 @@ namespace DB namespace ErrorCodes { - extern const int TIMEOUT_EXCEEDED; extern const int LOGICAL_ERROR; extern const int DUPLICATE_COLUMN; extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT; @@ -51,6 +50,7 @@ namespace ErrorCodes class LogSource final : public SourceWithProgress { public: + static Block getHeader(const NamesAndTypesList & columns) { Block res; @@ -116,16 +116,13 @@ private: class LogBlockOutputStream final : public IBlockOutputStream { public: - explicit LogBlockOutputStream( - StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock && lock_) + explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(std::move(lock_)) + , lock(storage.rwlock) , marks_stream( storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite)) { - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~LogBlockOutputStream() override @@ -152,7 +149,7 @@ public: private: StorageLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; bool done = false; struct Stream @@ -510,11 +507,9 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type) } -void StorageLog::loadMarks(std::chrono::seconds lock_timeout) +void StorageLog::loadMarks() { - std::unique_lock lock(rwlock, lock_timeout); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + std::unique_lock lock(rwlock); if (loaded_marks) return; @@ -557,6 +552,8 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID & { assert(table_path != new_path_to_table_data); { + std::unique_lock lock(rwlock); + disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -572,6 +569,8 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID & void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) { + std::shared_lock lock(rwlock); + files.clear(); file_count = 0; loaded_marks = false; @@ -611,17 +610,6 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta return it->second.marks; } - -static std::chrono::seconds getLockTimeout(const Context & context) -{ - const Settings & settings = context.getSettingsRef(); - Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); - if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) - lock_timeout = settings.max_execution_time.totalSeconds(); - return std::chrono::seconds{lock_timeout}; -} - - Pipe StorageLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -632,15 +620,11 @@ Pipe StorageLog::read( unsigned num_streams) { metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); - - auto lock_timeout = getLockTimeout(context); - loadMarks(lock_timeout); + loadMarks(); NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)); - std::shared_lock lock(rwlock, lock_timeout); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + std::shared_lock lock(rwlock); Pipes pipes; @@ -669,28 +653,18 @@ Pipe StorageLog::read( max_read_buffer_size)); } - /// No need to hold lock while reading because we read fixed range of data that does not change while appending more data. return Pipe::unitePipes(std::move(pipes)); } -BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) +BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) { - auto lock_timeout = getLockTimeout(context); - loadMarks(lock_timeout); - - std::unique_lock lock(rwlock, lock_timeout); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - - return std::make_shared(*this, metadata_snapshot, std::move(lock)); + loadMarks(); + return std::make_shared(*this, metadata_snapshot); } -CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & context) +CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */) { - std::shared_lock lock(rwlock, getLockTimeout(context)); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - + std::shared_lock lock(rwlock); return file_checker.check(); } diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index 3553426b9e6..49fc9a576c5 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -83,7 +83,7 @@ private: DiskPtr disk; String table_path; - mutable std::shared_timed_mutex rwlock; + mutable std::shared_mutex rwlock; Files files; @@ -104,7 +104,7 @@ private: /// Read marks files if they are not already read. /// It is done lazily, so that with a large number of tables, the server starts quickly. /// You can not call with a write locked `rwlock`. - void loadMarks(std::chrono::seconds lock_timeout); + void loadMarks(); /** For normal columns, the number of rows in the block is specified in the marks. * For array columns and nested structures, there are more than one group of marks that correspond to different files diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 8ff8035c128..c4344cf6f1f 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -47,13 +47,13 @@ namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int INCORRECT_FILE_NAME; - extern const int TIMEOUT_EXCEEDED; } class StripeLogSource final : public SourceWithProgress { public: + static Block getHeader( StorageStripeLog & storage, const StorageMetadataPtr & metadata_snapshot, @@ -157,11 +157,10 @@ private: class StripeLogBlockOutputStream final : public IBlockOutputStream { public: - explicit StripeLogBlockOutputStream( - StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock && lock_) + explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(std::move(lock_)) + , lock(storage.rwlock) , data_out_file(storage.table_path + "data.bin") , data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)) , data_out(std::make_unique( @@ -171,8 +170,6 @@ public: , index_out(std::make_unique(*index_out_compressed)) , block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file)) { - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~StripeLogBlockOutputStream() override @@ -226,7 +223,7 @@ public: private: StorageStripeLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; String data_out_file; std::unique_ptr data_out_compressed; @@ -289,6 +286,8 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora { assert(table_path != new_path_to_table_data); { + std::unique_lock lock(rwlock); + disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -298,16 +297,6 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora } -static std::chrono::seconds getLockTimeout(const Context & context) -{ - const Settings & settings = context.getSettingsRef(); - Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); - if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) - lock_timeout = settings.max_execution_time.totalSeconds(); - return std::chrono::seconds{lock_timeout}; -} - - Pipe StorageStripeLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -317,9 +306,7 @@ Pipe StorageStripeLog::read( const size_t /*max_block_size*/, unsigned num_streams) { - std::shared_lock lock(rwlock, getLockTimeout(context)); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + std::shared_lock lock(rwlock); metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); @@ -358,28 +345,24 @@ Pipe StorageStripeLog::read( } -BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) +BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) { - std::unique_lock lock(rwlock, getLockTimeout(context)); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - - return std::make_shared(*this, metadata_snapshot, std::move(lock)); + return std::make_shared(*this, metadata_snapshot); } -CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & context) +CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */) { - std::shared_lock lock(rwlock, getLockTimeout(context)); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - + std::shared_lock lock(rwlock); return file_checker.check(); } void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) { + std::shared_lock lock(rwlock); + disk->clearDirectory(table_path); + file_checker = FileChecker{disk, table_path + "sizes.json"}; } diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index ca3bfe4ff75..f88120a932e 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -67,7 +67,7 @@ private: size_t max_compress_block_size; FileChecker file_checker; - mutable std::shared_timed_mutex rwlock; + mutable std::shared_mutex rwlock; Poco::Logger * log; }; diff --git a/src/Storages/StorageTinyLog.cpp b/src/Storages/StorageTinyLog.cpp index 4d646c7451e..0bdcab8abf4 100644 --- a/src/Storages/StorageTinyLog.cpp +++ b/src/Storages/StorageTinyLog.cpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -47,7 +46,6 @@ namespace DB namespace ErrorCodes { - extern const int TIMEOUT_EXCEEDED; extern const int DUPLICATE_COLUMN; extern const int INCORRECT_FILE_NAME; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; @@ -57,6 +55,7 @@ namespace ErrorCodes class TinyLogSource final : public SourceWithProgress { public: + static Block getHeader(const NamesAndTypesList & columns) { Block res; @@ -67,17 +66,10 @@ public: return Nested::flatten(res); } - TinyLogSource( - size_t block_size_, - const NamesAndTypesList & columns_, - StorageTinyLog & storage_, - size_t max_read_buffer_size_, - FileChecker::Map file_sizes_) + TinyLogSource(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_) : SourceWithProgress(getHeader(columns_)) - , block_size(block_size_), columns(columns_), storage(storage_) - , max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_)) - { - } + , block_size(block_size_), columns(columns_), storage(storage_), lock(storage_.rwlock) + , max_read_buffer_size(max_read_buffer_size_) {} String getName() const override { return "TinyLog"; } @@ -88,21 +80,19 @@ private: size_t block_size; NamesAndTypesList columns; StorageTinyLog & storage; + std::shared_lock lock; bool is_finished = false; size_t max_read_buffer_size; - FileChecker::Map file_sizes; struct Stream { - Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size) + Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_) : plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))), - limited(std::make_unique(*plain, file_size, false)), compressed(*plain) { } std::unique_ptr plain; - std::unique_ptr limited; CompressedReadBuffer compressed; }; @@ -120,14 +110,9 @@ private: class TinyLogBlockOutputStream final : public IBlockOutputStream { public: - explicit TinyLogBlockOutputStream( - StorageTinyLog & storage_, - const StorageMetadataPtr & metadata_snapshot_, - std::unique_lock && lock_) - : storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_)) + explicit TinyLogBlockOutputStream(StorageTinyLog & storage_, const StorageMetadataPtr & metadata_snapshot_) + : storage(storage_), metadata_snapshot(metadata_snapshot_), lock(storage_.rwlock) { - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~TinyLogBlockOutputStream() override @@ -155,7 +140,7 @@ public: private: StorageTinyLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; bool done = false; struct Stream @@ -246,17 +231,13 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum String stream_name = IDataType::getFileNameForStream(name, path); if (!streams.count(stream_name)) - { - String file_path = storage.files[stream_name].data_file_path; - streams[stream_name] = std::make_unique( - storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]); - } + streams[stream_name] = std::make_unique(storage.disk, storage.files[stream_name].data_file_path, max_read_buffer_size); return &streams[stream_name]->compressed; }; if (deserialize_states.count(name) == 0) - type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); + type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]); } @@ -429,6 +410,8 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage { assert(table_path != new_path_to_table_data); { + std::unique_lock lock(rwlock); + disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -441,16 +424,6 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage } -static std::chrono::seconds getLockTimeout(const Context & context) -{ - const Settings & settings = context.getSettingsRef(); - Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); - if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) - lock_timeout = settings.max_execution_time.totalSeconds(); - return std::chrono::seconds{lock_timeout}; -} - - Pipe StorageTinyLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -464,40 +437,28 @@ Pipe StorageTinyLog::read( // When reading, we lock the entire storage, because we only have one file // per column and can't modify it concurrently. - const Settings & settings = context.getSettingsRef(); - - std::shared_lock lock{rwlock, getLockTimeout(context)}; - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - - /// No need to hold lock while reading because we read fixed range of data that does not change while appending more data. return Pipe(std::make_shared( - max_block_size, - Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), - *this, - settings.max_read_buffer_size, - file_checker.getFileSizes())); + max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); } -BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) +BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) { - return std::make_shared(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}); + return std::make_shared(*this, metadata_snapshot); } -CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & context) +CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */) { - std::shared_lock lock(rwlock, getLockTimeout(context)); - if (!lock) - throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); - + std::shared_lock lock(rwlock); return file_checker.check(); } void StorageTinyLog::truncate( const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) { + std::unique_lock lock(rwlock); + disk->clearDirectory(table_path); files.clear(); @@ -507,6 +468,14 @@ void StorageTinyLog::truncate( addFiles(column.name, *column.type); } +void StorageTinyLog::drop() +{ + std::unique_lock lock(rwlock); + if (disk->exists(table_path)) + disk->removeRecursive(table_path); + files.clear(); +} + void registerStorageTinyLog(StorageFactory & factory) { diff --git a/src/Storages/StorageTinyLog.h b/src/Storages/StorageTinyLog.h index 95b7d9f2941..dc6ff101503 100644 --- a/src/Storages/StorageTinyLog.h +++ b/src/Storages/StorageTinyLog.h @@ -43,6 +43,8 @@ public: void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override; + void drop() override; + protected: StorageTinyLog( DiskPtr disk_, @@ -68,7 +70,7 @@ private: Files files; FileChecker file_checker; - mutable std::shared_timed_mutex rwlock; + mutable std::shared_mutex rwlock; Poco::Logger * log; diff --git a/tests/queries/0_stateless/01499_log_deadlock.reference b/tests/queries/0_stateless/01499_log_deadlock.reference deleted file mode 100644 index 166be640db5..00000000000 --- a/tests/queries/0_stateless/01499_log_deadlock.reference +++ /dev/null @@ -1,3 +0,0 @@ -6 -6 -6 diff --git a/tests/queries/0_stateless/01499_log_deadlock.sql b/tests/queries/0_stateless/01499_log_deadlock.sql deleted file mode 100644 index e98b37f2455..00000000000 --- a/tests/queries/0_stateless/01499_log_deadlock.sql +++ /dev/null @@ -1,26 +0,0 @@ -DROP TABLE IF EXISTS t; -CREATE TABLE t (x UInt8) ENGINE = TinyLog; - -INSERT INTO t VALUES (1), (2), (3); -INSERT INTO t SELECT * FROM t; -SELECT count() FROM t; - -DROP TABLE t; - - -CREATE TABLE t (x UInt8) ENGINE = Log; - -INSERT INTO t VALUES (1), (2), (3); -INSERT INTO t SELECT * FROM t; -SELECT count() FROM t; - -DROP TABLE t; - - -CREATE TABLE t (x UInt8) ENGINE = StripeLog; - -INSERT INTO t VALUES (1), (2), (3); -INSERT INTO t SELECT * FROM t; -SELECT count() FROM t; - -DROP TABLE t; diff --git a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference deleted file mode 100644 index 4bf85ae79f3..00000000000 --- a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference +++ /dev/null @@ -1,6 +0,0 @@ -Testing TinyLog -Done TinyLog -Testing StripeLog -Done StripeLog -Testing Log -Done Log diff --git a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh deleted file mode 100755 index 29c5f868617..00000000000 --- a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env bash - -set -e - -CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal - -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -. "$CURDIR"/../shell_config.sh - - -function thread_create { - while true; do - $CLICKHOUSE_CLIENT --query "CREATE TABLE IF NOT EXISTS $1 (x UInt64, s Array(Nullable(String))) ENGINE = $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' - sleep 0.0$RANDOM - done -} - -function thread_drop { - while true; do - $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS $1" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' - sleep 0.0$RANDOM - done -} - -function thread_rename { - while true; do - $CLICKHOUSE_CLIENT --query "RENAME TABLE $1 TO $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' - sleep 0.0$RANDOM - done -} - -function thread_select { - while true; do - $CLICKHOUSE_CLIENT --query "SELECT * FROM $1 FORMAT Null" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' - sleep 0.0$RANDOM - done -} - -function thread_insert { - while true; do - $CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT rand64(1), [toString(rand64(2))] FROM numbers($2)" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' - sleep 0.0$RANDOM - done -} - -function thread_insert_select { - while true; do - $CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT * FROM $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' - sleep 0.0$RANDOM - done -} - -export -f thread_create -export -f thread_drop -export -f thread_rename -export -f thread_select -export -f thread_insert -export -f thread_insert_select - - -# Do randomized queries and expect nothing extraordinary happens. - -function test_with_engine { - echo "Testing $1" - - timeout 10 bash -c "thread_create t1 $1" & - timeout 10 bash -c "thread_create t2 $1" & - timeout 10 bash -c 'thread_drop t1' & - timeout 10 bash -c 'thread_drop t2' & - timeout 10 bash -c 'thread_rename t1 t2' & - timeout 10 bash -c 'thread_rename t2 t1' & - timeout 10 bash -c 'thread_select t1' & - timeout 10 bash -c 'thread_select t2' & - timeout 10 bash -c 'thread_insert t1 5' & - timeout 10 bash -c 'thread_insert t2 10' & - timeout 10 bash -c 'thread_insert_select t1 t2' & - timeout 10 bash -c 'thread_insert_select t2 t1' & - - wait - echo "Done $1" -} - -test_with_engine TinyLog -test_with_engine StripeLog -test_with_engine Log diff --git a/tests/queries/0_stateless/01505_log_distributed_deadlock.reference b/tests/queries/0_stateless/01505_log_distributed_deadlock.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/queries/0_stateless/01505_log_distributed_deadlock.sql b/tests/queries/0_stateless/01505_log_distributed_deadlock.sql deleted file mode 100644 index 2b0b2b97188..00000000000 --- a/tests/queries/0_stateless/01505_log_distributed_deadlock.sql +++ /dev/null @@ -1,12 +0,0 @@ -DROP TABLE IF EXISTS t_local; -DROP TABLE IF EXISTS t_dist; - -create table t_local(a int) engine Log; -create table t_dist (a int) engine Distributed(test_shard_localhost, currentDatabase(), 't_local', cityHash64(a)); - -set insert_distributed_sync = 1; - -insert into t_dist values (1); - -DROP TABLE t_local; -DROP TABLE t_dist; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 6d1c6444d1b..69391ca9fd4 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -145,4 +145,3 @@ 01461_query_start_time_microseconds 01455_shard_leaf_max_rows_bytes_to_read 01505_distributed_local_type_conversion_enum -01505_log_distributed_deadlock