diff --git a/src/Common/FileChecker.cpp b/src/Common/FileChecker.cpp index 6cbec3bda77..b306c3af990 100644 --- a/src/Common/FileChecker.cpp +++ b/src/Common/FileChecker.cpp @@ -41,6 +41,11 @@ void FileChecker::setEmpty(const String & full_file_path) map[fileName(full_file_path)] = 0; } +FileChecker::Map FileChecker::getFileSizes() const +{ + return map; +} + CheckResults FileChecker::check() const { // Read the files again every time you call `check` - so as not to violate the constancy. diff --git a/src/Common/FileChecker.h b/src/Common/FileChecker.h index 015d4cadb07..59e7331952e 100644 --- a/src/Common/FileChecker.h +++ b/src/Common/FileChecker.h @@ -27,10 +27,12 @@ public: /// The purpose of this function is to rollback a group of unfinished writes. void repair(); -private: /// File name -> size. using Map = std::map; + Map getFileSizes() const; + +private: void initialize(); void updateImpl(const String & file_path); void load(Map & local_map, const String & path) const; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index cc47047dc78..7b094f9bc06 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -52,6 +52,7 @@ namespace ErrorCodes extern const int UNKNOWN_IDENTIFIER; extern const int INCORRECT_FILE_NAME; extern const int FILE_DOESNT_EXIST; + extern const int TIMEOUT_EXCEEDED; } namespace @@ -199,6 +200,17 @@ StorageFile::StorageFile(CommonArguments args) setInMemoryMetadata(storage_metadata); } + +static std::chrono::seconds getLockTimeout(const Context & context) +{ + const Settings & settings = context.getSettingsRef(); + Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); + if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) + lock_timeout = settings.max_execution_time.totalSeconds(); + return std::chrono::seconds{lock_timeout}; +} + + class StorageFileSource : public SourceWithProgress { public: @@ -245,7 +257,9 @@ public: { if (storage->use_table_fd) { - unique_lock = std::unique_lock(storage->rwlock); + unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context)); + if (!unique_lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); /// We could use common ReadBuffer and WriteBuffer in storage to leverage cache /// and add ability to seek unseekable files, but cache sync isn't supported. @@ -264,7 +278,9 @@ public: } else { - shared_lock = std::shared_lock(storage->rwlock); + shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context)); + if (!shared_lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } } @@ -373,8 +389,8 @@ private: bool finished_generate = false; - std::shared_lock shared_lock; - std::unique_lock unique_lock; + std::shared_lock shared_lock; + std::unique_lock unique_lock; }; @@ -417,7 +433,7 @@ Pipe StorageFile::read( for (size_t i = 0; i < num_streams; ++i) pipes.emplace_back(std::make_shared( - this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults())); + this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults())); return Pipe::unitePipes(std::move(pipes)); } @@ -429,12 +445,16 @@ public: explicit StorageFileBlockOutputStream( StorageFile & storage_, const StorageMetadataPtr & metadata_snapshot_, + std::unique_lock && lock_, const CompressionMethod compression_method, const Context & context) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(storage.rwlock) + , lock(std::move(lock_)) { + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + std::unique_ptr naked_buffer = nullptr; if (storage.use_table_fd) { @@ -488,7 +508,7 @@ public: private: StorageFile & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; std::unique_ptr write_buf; BlockOutputStreamPtr writer; bool prefix_written{false}; @@ -506,7 +526,7 @@ BlockOutputStreamPtr StorageFile::write( if (!paths.empty()) path = paths[0]; - return std::make_shared(*this, metadata_snapshot, + return std::make_shared(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}, chooseCompressionMethod(path, compression_method), context); } @@ -529,8 +549,6 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID if (path_new == paths[0]) return; - std::unique_lock lock(rwlock); - Poco::File(Poco::Path(path_new).parent()).createDirectories(); Poco::File(paths[0]).renameTo(path_new); @@ -547,8 +565,6 @@ void StorageFile::truncate( if (paths.size() != 1) throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED); - std::unique_lock lock(rwlock); - if (use_table_fd) { if (0 != ::ftruncate(table_fd, 0)) diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index ea70dcd5311..babc56e3a11 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -89,7 +89,7 @@ private: std::atomic table_fd_was_used{false}; /// To detect repeating reads from stdin off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads - mutable std::shared_mutex rwlock; + mutable std::shared_timed_mutex rwlock; Poco::Logger * log = &Poco::Logger::get("StorageFile"); }; diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index e437bfb05f1..2fbce21655c 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -39,6 +39,7 @@ namespace DB namespace ErrorCodes { + extern const int TIMEOUT_EXCEEDED; extern const int LOGICAL_ERROR; extern const int DUPLICATE_COLUMN; extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT; @@ -50,7 +51,6 @@ namespace ErrorCodes class LogSource final : public SourceWithProgress { public: - static Block getHeader(const NamesAndTypesList & columns) { Block res; @@ -116,13 +116,16 @@ private: class LogBlockOutputStream final : public IBlockOutputStream { public: - explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_) + explicit LogBlockOutputStream( + StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock && lock_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(storage.rwlock) + , lock(std::move(lock_)) , marks_stream( storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite)) { + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~LogBlockOutputStream() override @@ -149,7 +152,7 @@ public: private: StorageLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; bool done = false; struct Stream @@ -507,9 +510,11 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type) } -void StorageLog::loadMarks() +void StorageLog::loadMarks(std::chrono::seconds lock_timeout) { - std::unique_lock lock(rwlock); + std::unique_lock lock(rwlock, lock_timeout); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); if (loaded_marks) return; @@ -552,8 +557,6 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID & { assert(table_path != new_path_to_table_data); { - std::unique_lock lock(rwlock); - disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -569,8 +572,6 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID & void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) { - std::shared_lock lock(rwlock); - files.clear(); file_count = 0; loaded_marks = false; @@ -610,6 +611,17 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta return it->second.marks; } + +static std::chrono::seconds getLockTimeout(const Context & context) +{ + const Settings & settings = context.getSettingsRef(); + Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); + if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) + lock_timeout = settings.max_execution_time.totalSeconds(); + return std::chrono::seconds{lock_timeout}; +} + + Pipe StorageLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -620,11 +632,15 @@ Pipe StorageLog::read( unsigned num_streams) { metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); - loadMarks(); + + auto lock_timeout = getLockTimeout(context); + loadMarks(lock_timeout); NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)); - std::shared_lock lock(rwlock); + std::shared_lock lock(rwlock, lock_timeout); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); Pipes pipes; @@ -653,18 +669,28 @@ Pipe StorageLog::read( max_read_buffer_size)); } + /// No need to hold lock while reading because we read fixed range of data that does not change while appending more data. return Pipe::unitePipes(std::move(pipes)); } -BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) +BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) { - loadMarks(); - return std::make_shared(*this, metadata_snapshot); + auto lock_timeout = getLockTimeout(context); + loadMarks(lock_timeout); + + std::unique_lock lock(rwlock, lock_timeout); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + + return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */) +CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & context) { - std::shared_lock lock(rwlock); + std::shared_lock lock(rwlock, getLockTimeout(context)); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + return file_checker.check(); } diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index 49fc9a576c5..3553426b9e6 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -83,7 +83,7 @@ private: DiskPtr disk; String table_path; - mutable std::shared_mutex rwlock; + mutable std::shared_timed_mutex rwlock; Files files; @@ -104,7 +104,7 @@ private: /// Read marks files if they are not already read. /// It is done lazily, so that with a large number of tables, the server starts quickly. /// You can not call with a write locked `rwlock`. - void loadMarks(); + void loadMarks(std::chrono::seconds lock_timeout); /** For normal columns, the number of rows in the block is specified in the marks. * For array columns and nested structures, there are more than one group of marks that correspond to different files diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index c4344cf6f1f..8ff8035c128 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -47,13 +47,13 @@ namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int INCORRECT_FILE_NAME; + extern const int TIMEOUT_EXCEEDED; } class StripeLogSource final : public SourceWithProgress { public: - static Block getHeader( StorageStripeLog & storage, const StorageMetadataPtr & metadata_snapshot, @@ -157,10 +157,11 @@ private: class StripeLogBlockOutputStream final : public IBlockOutputStream { public: - explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_) + explicit StripeLogBlockOutputStream( + StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock && lock_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - , lock(storage.rwlock) + , lock(std::move(lock_)) , data_out_file(storage.table_path + "data.bin") , data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)) , data_out(std::make_unique( @@ -170,6 +171,8 @@ public: , index_out(std::make_unique(*index_out_compressed)) , block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file)) { + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~StripeLogBlockOutputStream() override @@ -223,7 +226,7 @@ public: private: StorageStripeLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; String data_out_file; std::unique_ptr data_out_compressed; @@ -286,8 +289,6 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora { assert(table_path != new_path_to_table_data); { - std::unique_lock lock(rwlock); - disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -297,6 +298,16 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora } +static std::chrono::seconds getLockTimeout(const Context & context) +{ + const Settings & settings = context.getSettingsRef(); + Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); + if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) + lock_timeout = settings.max_execution_time.totalSeconds(); + return std::chrono::seconds{lock_timeout}; +} + + Pipe StorageStripeLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -306,7 +317,9 @@ Pipe StorageStripeLog::read( const size_t /*max_block_size*/, unsigned num_streams) { - std::shared_lock lock(rwlock); + std::shared_lock lock(rwlock, getLockTimeout(context)); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); @@ -345,24 +358,28 @@ Pipe StorageStripeLog::read( } -BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) +BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) { - return std::make_shared(*this, metadata_snapshot); + std::unique_lock lock(rwlock, getLockTimeout(context)); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + + return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */) +CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & context) { - std::shared_lock lock(rwlock); + std::shared_lock lock(rwlock, getLockTimeout(context)); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + return file_checker.check(); } void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) { - std::shared_lock lock(rwlock); - disk->clearDirectory(table_path); - file_checker = FileChecker{disk, table_path + "sizes.json"}; } diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index f88120a932e..ca3bfe4ff75 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -67,7 +67,7 @@ private: size_t max_compress_block_size; FileChecker file_checker; - mutable std::shared_mutex rwlock; + mutable std::shared_timed_mutex rwlock; Poco::Logger * log; }; diff --git a/src/Storages/StorageTinyLog.cpp b/src/Storages/StorageTinyLog.cpp index 0bdcab8abf4..4d646c7451e 100644 --- a/src/Storages/StorageTinyLog.cpp +++ b/src/Storages/StorageTinyLog.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -46,6 +47,7 @@ namespace DB namespace ErrorCodes { + extern const int TIMEOUT_EXCEEDED; extern const int DUPLICATE_COLUMN; extern const int INCORRECT_FILE_NAME; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; @@ -55,7 +57,6 @@ namespace ErrorCodes class TinyLogSource final : public SourceWithProgress { public: - static Block getHeader(const NamesAndTypesList & columns) { Block res; @@ -66,10 +67,17 @@ public: return Nested::flatten(res); } - TinyLogSource(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_) + TinyLogSource( + size_t block_size_, + const NamesAndTypesList & columns_, + StorageTinyLog & storage_, + size_t max_read_buffer_size_, + FileChecker::Map file_sizes_) : SourceWithProgress(getHeader(columns_)) - , block_size(block_size_), columns(columns_), storage(storage_), lock(storage_.rwlock) - , max_read_buffer_size(max_read_buffer_size_) {} + , block_size(block_size_), columns(columns_), storage(storage_) + , max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_)) + { + } String getName() const override { return "TinyLog"; } @@ -80,19 +88,21 @@ private: size_t block_size; NamesAndTypesList columns; StorageTinyLog & storage; - std::shared_lock lock; bool is_finished = false; size_t max_read_buffer_size; + FileChecker::Map file_sizes; struct Stream { - Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_) + Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size) : plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))), + limited(std::make_unique(*plain, file_size, false)), compressed(*plain) { } std::unique_ptr plain; + std::unique_ptr limited; CompressedReadBuffer compressed; }; @@ -110,9 +120,14 @@ private: class TinyLogBlockOutputStream final : public IBlockOutputStream { public: - explicit TinyLogBlockOutputStream(StorageTinyLog & storage_, const StorageMetadataPtr & metadata_snapshot_) - : storage(storage_), metadata_snapshot(metadata_snapshot_), lock(storage_.rwlock) + explicit TinyLogBlockOutputStream( + StorageTinyLog & storage_, + const StorageMetadataPtr & metadata_snapshot_, + std::unique_lock && lock_) + : storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_)) { + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); } ~TinyLogBlockOutputStream() override @@ -140,7 +155,7 @@ public: private: StorageTinyLog & storage; StorageMetadataPtr metadata_snapshot; - std::unique_lock lock; + std::unique_lock lock; bool done = false; struct Stream @@ -231,13 +246,17 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum String stream_name = IDataType::getFileNameForStream(name, path); if (!streams.count(stream_name)) - streams[stream_name] = std::make_unique(storage.disk, storage.files[stream_name].data_file_path, max_read_buffer_size); + { + String file_path = storage.files[stream_name].data_file_path; + streams[stream_name] = std::make_unique( + storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]); + } return &streams[stream_name]->compressed; }; if (deserialize_states.count(name) == 0) - type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); + type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]); } @@ -410,8 +429,6 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage { assert(table_path != new_path_to_table_data); { - std::unique_lock lock(rwlock); - disk->moveDirectory(table_path, new_path_to_table_data); table_path = new_path_to_table_data; @@ -424,6 +441,16 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage } +static std::chrono::seconds getLockTimeout(const Context & context) +{ + const Settings & settings = context.getSettingsRef(); + Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); + if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) + lock_timeout = settings.max_execution_time.totalSeconds(); + return std::chrono::seconds{lock_timeout}; +} + + Pipe StorageTinyLog::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -437,28 +464,40 @@ Pipe StorageTinyLog::read( // When reading, we lock the entire storage, because we only have one file // per column and can't modify it concurrently. + const Settings & settings = context.getSettingsRef(); + + std::shared_lock lock{rwlock, getLockTimeout(context)}; + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + + /// No need to hold lock while reading because we read fixed range of data that does not change while appending more data. return Pipe(std::make_shared( - max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); + max_block_size, + Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), + *this, + settings.max_read_buffer_size, + file_checker.getFileSizes())); } -BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) +BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) { - return std::make_shared(*this, metadata_snapshot); + return std::make_shared(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}); } -CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */) +CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & context) { - std::shared_lock lock(rwlock); + std::shared_lock lock(rwlock, getLockTimeout(context)); + if (!lock) + throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); + return file_checker.check(); } void StorageTinyLog::truncate( const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) { - std::unique_lock lock(rwlock); - disk->clearDirectory(table_path); files.clear(); @@ -468,14 +507,6 @@ void StorageTinyLog::truncate( addFiles(column.name, *column.type); } -void StorageTinyLog::drop() -{ - std::unique_lock lock(rwlock); - if (disk->exists(table_path)) - disk->removeRecursive(table_path); - files.clear(); -} - void registerStorageTinyLog(StorageFactory & factory) { diff --git a/src/Storages/StorageTinyLog.h b/src/Storages/StorageTinyLog.h index dc6ff101503..95b7d9f2941 100644 --- a/src/Storages/StorageTinyLog.h +++ b/src/Storages/StorageTinyLog.h @@ -43,8 +43,6 @@ public: void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override; - void drop() override; - protected: StorageTinyLog( DiskPtr disk_, @@ -70,7 +68,7 @@ private: Files files; FileChecker file_checker; - mutable std::shared_mutex rwlock; + mutable std::shared_timed_mutex rwlock; Poco::Logger * log; diff --git a/tests/queries/0_stateless/01499_log_deadlock.reference b/tests/queries/0_stateless/01499_log_deadlock.reference new file mode 100644 index 00000000000..166be640db5 --- /dev/null +++ b/tests/queries/0_stateless/01499_log_deadlock.reference @@ -0,0 +1,3 @@ +6 +6 +6 diff --git a/tests/queries/0_stateless/01499_log_deadlock.sql b/tests/queries/0_stateless/01499_log_deadlock.sql new file mode 100644 index 00000000000..e98b37f2455 --- /dev/null +++ b/tests/queries/0_stateless/01499_log_deadlock.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS t; +CREATE TABLE t (x UInt8) ENGINE = TinyLog; + +INSERT INTO t VALUES (1), (2), (3); +INSERT INTO t SELECT * FROM t; +SELECT count() FROM t; + +DROP TABLE t; + + +CREATE TABLE t (x UInt8) ENGINE = Log; + +INSERT INTO t VALUES (1), (2), (3); +INSERT INTO t SELECT * FROM t; +SELECT count() FROM t; + +DROP TABLE t; + + +CREATE TABLE t (x UInt8) ENGINE = StripeLog; + +INSERT INTO t VALUES (1), (2), (3); +INSERT INTO t SELECT * FROM t; +SELECT count() FROM t; + +DROP TABLE t; diff --git a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference new file mode 100644 index 00000000000..4bf85ae79f3 --- /dev/null +++ b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.reference @@ -0,0 +1,6 @@ +Testing TinyLog +Done TinyLog +Testing StripeLog +Done StripeLog +Testing Log +Done Log diff --git a/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh new file mode 100755 index 00000000000..29c5f868617 --- /dev/null +++ b/tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash + +set -e + +CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + + +function thread_create { + while true; do + $CLICKHOUSE_CLIENT --query "CREATE TABLE IF NOT EXISTS $1 (x UInt64, s Array(Nullable(String))) ENGINE = $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' + sleep 0.0$RANDOM + done +} + +function thread_drop { + while true; do + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS $1" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' + sleep 0.0$RANDOM + done +} + +function thread_rename { + while true; do + $CLICKHOUSE_CLIENT --query "RENAME TABLE $1 TO $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)' + sleep 0.0$RANDOM + done +} + +function thread_select { + while true; do + $CLICKHOUSE_CLIENT --query "SELECT * FROM $1 FORMAT Null" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' + sleep 0.0$RANDOM + done +} + +function thread_insert { + while true; do + $CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT rand64(1), [toString(rand64(2))] FROM numbers($2)" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' + sleep 0.0$RANDOM + done +} + +function thread_insert_select { + while true; do + $CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT * FROM $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)' + sleep 0.0$RANDOM + done +} + +export -f thread_create +export -f thread_drop +export -f thread_rename +export -f thread_select +export -f thread_insert +export -f thread_insert_select + + +# Do randomized queries and expect nothing extraordinary happens. + +function test_with_engine { + echo "Testing $1" + + timeout 10 bash -c "thread_create t1 $1" & + timeout 10 bash -c "thread_create t2 $1" & + timeout 10 bash -c 'thread_drop t1' & + timeout 10 bash -c 'thread_drop t2' & + timeout 10 bash -c 'thread_rename t1 t2' & + timeout 10 bash -c 'thread_rename t2 t1' & + timeout 10 bash -c 'thread_select t1' & + timeout 10 bash -c 'thread_select t2' & + timeout 10 bash -c 'thread_insert t1 5' & + timeout 10 bash -c 'thread_insert t2 10' & + timeout 10 bash -c 'thread_insert_select t1 t2' & + timeout 10 bash -c 'thread_insert_select t2 t1' & + + wait + echo "Done $1" +} + +test_with_engine TinyLog +test_with_engine StripeLog +test_with_engine Log diff --git a/tests/queries/0_stateless/01505_log_distributed_deadlock.reference b/tests/queries/0_stateless/01505_log_distributed_deadlock.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01505_log_distributed_deadlock.sql b/tests/queries/0_stateless/01505_log_distributed_deadlock.sql new file mode 100644 index 00000000000..2b0b2b97188 --- /dev/null +++ b/tests/queries/0_stateless/01505_log_distributed_deadlock.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS t_local; +DROP TABLE IF EXISTS t_dist; + +create table t_local(a int) engine Log; +create table t_dist (a int) engine Distributed(test_shard_localhost, currentDatabase(), 't_local', cityHash64(a)); + +set insert_distributed_sync = 1; + +insert into t_dist values (1); + +DROP TABLE t_local; +DROP TABLE t_dist; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 69391ca9fd4..6d1c6444d1b 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -145,3 +145,4 @@ 01461_query_start_time_microseconds 01455_shard_leaf_max_rows_bytes_to_read 01505_distributed_local_type_conversion_enum +01505_log_distributed_deadlock