diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index a9dce5b5ebe..3eb638d15c0 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -126,7 +126,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor std::unique_lock lock(write_mutex); MergeTreeData::MutableDataPartsVector parts; - auto in = disk->readFile(path, {}); + auto in = disk->readFile(path); NativeReader block_in(*in, 0); NameSet dropped_parts; diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index a03acd3731d..ccb88992732 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -535,15 +535,16 @@ StorageLog::StorageLog( const ConstraintsDescription & constraints_, const String & comment, bool attach, - size_t max_compress_block_size_) + ContextMutablePtr context_) : IStorage(table_id_) + , WithMutableContext(context_) , engine_name(engine_name_) , disk(std::move(disk_)) , table_path(relative_path_) , use_marks_file(engine_name == "Log") , marks_file_path(table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME) , file_checker(disk, table_path + "sizes.json") - , max_compress_block_size(max_compress_block_size_) + , max_compress_block_size(context_->getSettingsRef().max_compress_block_size) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -750,9 +751,9 @@ static std::chrono::seconds getLockTimeout(ContextPtr context) return std::chrono::seconds{lock_timeout}; } -void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &) +void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { - WriteLock lock{rwlock, getLockTimeout(context)}; + WriteLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); @@ -769,6 +770,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr marks_loaded = true; num_marks_saved = 0; + getContext()->dropMMappedFileCache(); } @@ -776,14 +778,14 @@ Pipe StorageLog::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & /*query_info*/, - ContextPtr context, + ContextPtr local_context, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, unsigned num_streams) { storage_snapshot->check(column_names); - auto lock_timeout = getLockTimeout(context); + auto lock_timeout = getLockTimeout(local_context); loadMarks(lock_timeout); ReadLock lock{rwlock, lock_timeout}; @@ -817,7 +819,7 @@ Pipe StorageLog::read( bool limited_by_file_sizes = !use_marks_file; size_t row_limit = std::numeric_limits::max(); - ReadSettings read_settings = context->getReadSettings(); + ReadSettings read_settings = local_context->getReadSettings(); Pipes pipes; for (size_t stream = 0; stream < num_streams; ++stream) @@ -848,18 +850,18 @@ Pipe StorageLog::read( return Pipe::unitePipes(std::move(pipes)); } -SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { - WriteLock lock{rwlock, getLockTimeout(context)}; + WriteLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr context) +CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context) { - ReadLock lock{rwlock, getLockTimeout(context)}; + ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); @@ -1114,7 +1116,7 @@ void registerStorageLog(StorageFactory & factory) args.constraints, args.comment, args.attach, - args.getContext()->getSettings().max_compress_block_size); + args.getContext()); }; factory.registerStorage("Log", create_fn, features); diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index 778633440a4..2e677dd3161 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -12,6 +12,7 @@ namespace DB { + class IBackup; using BackupPtr = std::shared_ptr; @@ -21,7 +22,7 @@ using BackupPtr = std::shared_ptr; * Also implements TinyLog - a table engine that is suitable for small chunks of the log. * It differs from Log in the absence of mark files. */ -class StorageLog final : public IStorage +class StorageLog final : public IStorage, public WithMutableContext { friend class LogSource; friend class LogSink; @@ -40,7 +41,7 @@ public: const ConstraintsDescription & constraints_, const String & comment, bool attach, - size_t max_compress_block_size_); + ContextMutablePtr context_); ~StorageLog() override; String getName() const override { return engine_name; } @@ -49,16 +50,16 @@ public: const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, - ContextPtr context, + ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; - SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override; void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) override; + CheckResults checkData(const ASTPtr & query, ContextPtr local_context) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override; diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index eb8bc9b1d51..e3f477936db 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -265,14 +265,15 @@ StorageStripeLog::StorageStripeLog( const ConstraintsDescription & constraints_, const String & comment, bool attach, - size_t max_compress_block_size_) + ContextMutablePtr context_) : IStorage(table_id_) + , WithMutableContext(context_) , disk(std::move(disk_)) , table_path(relative_path_) , data_file_path(table_path + "data.bin") , index_file_path(table_path + "index.mrk") , file_checker(disk, table_path + "sizes.json") - , max_compress_block_size(max_compress_block_size_) + , max_compress_block_size(context_->getSettings().max_compress_block_size) , log(&Poco::Logger::get("StorageStripeLog")) { StorageInMemoryMetadata storage_metadata; @@ -330,9 +331,9 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora } -static std::chrono::seconds getLockTimeout(ContextPtr context) +static std::chrono::seconds getLockTimeout(ContextPtr local_context) { - const Settings & settings = context->getSettingsRef(); + const Settings & settings = local_context->getSettingsRef(); Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds(); if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout) lock_timeout = settings.max_execution_time.totalSeconds(); @@ -344,14 +345,14 @@ Pipe StorageStripeLog::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & /*query_info*/, - ContextPtr context, + ContextPtr local_context, QueryProcessingStage::Enum /*processed_stage*/, const size_t /*max_block_size*/, unsigned num_streams) { storage_snapshot->check(column_names); - auto lock_timeout = getLockTimeout(context); + auto lock_timeout = getLockTimeout(local_context); loadIndices(lock_timeout); ReadLock lock{rwlock, lock_timeout}; @@ -369,7 +370,7 @@ Pipe StorageStripeLog::read( if (num_streams > size) num_streams = size; - ReadSettings read_settings = context->getReadSettings(); + ReadSettings read_settings = local_context->getReadSettings(); Pipes pipes; for (size_t stream = 0; stream < num_streams; ++stream) @@ -390,9 +391,9 @@ Pipe StorageStripeLog::read( } -SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { - WriteLock lock{rwlock, getLockTimeout(context)}; + WriteLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); @@ -400,9 +401,9 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage } -CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr context) +CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context) { - ReadLock lock{rwlock, getLockTimeout(context)}; + ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); @@ -420,6 +421,7 @@ void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont indices_loaded = true; num_indices_saved = 0; + getContext()->dropMMappedFileCache(); } @@ -686,7 +688,7 @@ void registerStorageStripeLog(StorageFactory & factory) args.constraints, args.comment, args.attach, - args.getContext()->getSettings().max_compress_block_size); + args.getContext()); }, features); } diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index 3faffff381d..efdf18c0f7b 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -20,7 +20,7 @@ using BackupPtr = std::shared_ptr; /** Implements a table engine that is suitable for small chunks of the log. * In doing so, stores all the columns in a single Native file, with a nearby index. */ -class StorageStripeLog final : public IStorage +class StorageStripeLog final : public IStorage, public WithMutableContext { friend class StripeLogSource; friend class StripeLogSink; @@ -34,7 +34,7 @@ public: const ConstraintsDescription & constraints_, const String & comment, bool attach, - size_t max_compress_block_size_); + ContextMutablePtr context_); ~StorageStripeLog() override; @@ -44,16 +44,16 @@ public: const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, - ContextPtr context, + ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; - SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override; void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) override; + CheckResults checkData(const ASTPtr & query, ContextPtr ocal_context) override; bool storesDataOnDisk() const override { return true; } Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; } diff --git a/tests/queries/0_stateless/01533_multiple_nested.sql b/tests/queries/0_stateless/01533_multiple_nested.sql index 03724ce0b46..a61f13fc807 100644 --- a/tests/queries/0_stateless/01533_multiple_nested.sql +++ b/tests/queries/0_stateless/01533_multiple_nested.sql @@ -4,6 +4,7 @@ DROP TABLE IF EXISTS nested; SET flatten_nested = 0; SET use_uncompressed_cache = 0; +SET local_filesystem_read_method='pread'; CREATE TABLE nested (