From 7415b65a55d61aa982a3a512eeaa6ad82cbc5e23 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 14 Apr 2015 18:00:57 +0300 Subject: [PATCH] Merge --- .../DB/IO/CachedCompressedReadBuffer.h | 10 ++++--- .../DB/IO/CompressedReadBufferFromFile.h | 4 +-- dbms/include/DB/IO/ReadBufferAIO.h | 5 ++-- dbms/include/DB/IO/ReadBufferFromFileBase.h | 4 +-- .../DB/IO/ReadBufferFromFileDescriptor.h | 4 +-- dbms/include/DB/IO/WriteBufferAIO.h | 4 +-- dbms/include/DB/IO/WriteBufferFromFileBase.h | 4 +-- .../DB/IO/WriteBufferFromFileDescriptor.h | 4 +-- .../DB/IO/createReadBufferFromFileBase.h | 9 ++++++- .../DB/IO/createWriteBufferFromFileBase.h | 8 +++++- .../Interpreters/InterpreterOptimizeQuery.h | 8 +++--- dbms/include/DB/Storages/IStorage.h | 10 ++----- dbms/include/DB/Storages/MarkCache.h | 2 +- .../DB/Storages/MergeTree/MergeTreeReader.h | 26 ++++++++++++++++--- dbms/include/DB/Storages/StorageBuffer.h | 3 +-- .../DB/Storages/StorageMaterializedView.h | 3 +-- dbms/include/DB/Storages/StorageMergeTree.h | 15 +++++------ .../DB/Storages/StorageReplicatedMergeTree.h | 4 +-- dbms/src/IO/WriteBufferFromFileBase.cpp | 2 +- dbms/src/IO/createReadBufferFromFileBase.cpp | 9 +++---- dbms/src/IO/createWriteBufferFromFileBase.cpp | 2 +- .../tests/cached_compressed_read_buffer.cpp | 4 +-- dbms/src/Storages/StorageBuffer.cpp | 6 ++--- dbms/src/Storages/StorageMaterializedView.cpp | 4 +-- .../Storages/StorageReplicatedMergeTree.cpp | 7 ++--- 25 files changed, 92 insertions(+), 69 deletions(-) diff --git a/dbms/include/DB/IO/CachedCompressedReadBuffer.h b/dbms/include/DB/IO/CachedCompressedReadBuffer.h index 85e6317b46b..7d71a8ef531 100644 --- a/dbms/include/DB/IO/CachedCompressedReadBuffer.h +++ b/dbms/include/DB/IO/CachedCompressedReadBuffer.h @@ -20,6 +20,7 @@ private: const std::string path; UncompressedCache * cache; size_t buf_size; + size_t estimated_size; size_t aio_threshold; /// SharedPtr - для ленивой инициализации (только в случае кэш-промаха). @@ -33,7 +34,7 @@ private: { if (!file_in) { - file_in = createReadBufferFromFileBase(path, aio_threshold, buf_size); + file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size); compressed_in = &*file_in; } } @@ -81,9 +82,10 @@ private: } public: - CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t aio_threshold_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE) - : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), aio_threshold(aio_threshold_), file_pos(0) + CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, + size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE) + : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), + estimated_size(estimated_size_), aio_threshold(aio_threshold_), file_pos(0) { } diff --git a/dbms/include/DB/IO/CompressedReadBufferFromFile.h b/dbms/include/DB/IO/CompressedReadBufferFromFile.h index 3fb4e72c992..74af29961e8 100644 --- a/dbms/include/DB/IO/CompressedReadBufferFromFile.h +++ b/dbms/include/DB/IO/CompressedReadBufferFromFile.h @@ -42,9 +42,9 @@ private: } public: - CompressedReadBufferFromFile(const std::string & path, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) + CompressedReadBufferFromFile(const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) : BufferWithOwnMemory(0), - p_file_in(createReadBufferFromFileBase(path, aio_threshold, buf_size)), + p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)), file_in(*p_file_in) { compressed_in = &file_in; diff --git a/dbms/include/DB/IO/ReadBufferAIO.h b/dbms/include/DB/IO/ReadBufferAIO.h index c8f0cc6f205..80ce9f0696b 100644 --- a/dbms/include/DB/IO/ReadBufferAIO.h +++ b/dbms/include/DB/IO/ReadBufferAIO.h @@ -10,7 +10,6 @@ #include #include #include -#include namespace DB { @@ -29,8 +28,8 @@ public: void setMaxBytes(size_t max_bytes_read_); off_t getPositionInFile() override { return first_unread_pos_in_file - (working_buffer.end() - pos); } - std::string getFileName() const noexcept override { return filename; } - int getFD() const noexcept override { return fd; } + std::string getFileName() const override { return filename; } + int getFD() const override { return fd; } private: /// diff --git a/dbms/include/DB/IO/ReadBufferFromFileBase.h b/dbms/include/DB/IO/ReadBufferFromFileBase.h index a49b8a255a3..9681977d08a 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileBase.h +++ b/dbms/include/DB/IO/ReadBufferFromFileBase.h @@ -16,8 +16,8 @@ public: virtual ~ReadBufferFromFileBase(); off_t seek(off_t off, int whence = SEEK_SET); virtual off_t getPositionInFile() = 0; - virtual std::string getFileName() const noexcept = 0; - virtual int getFD() const noexcept = 0; + virtual std::string getFileName() const = 0; + virtual int getFD() const = 0; protected: virtual off_t doSeek(off_t off, int whence) = 0; diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index ec907371861..8aa0de5dc12 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -54,7 +54,7 @@ protected: } /// Имя или описание файла - virtual std::string getFileName() const noexcept override + virtual std::string getFileName() const override { return "(fd = " + toString(fd) + ")"; } @@ -63,7 +63,7 @@ public: ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) : ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {} - int getFD() const noexcept override + int getFD() const override { return fd; } diff --git a/dbms/include/DB/IO/WriteBufferAIO.h b/dbms/include/DB/IO/WriteBufferAIO.h index 1a6349d05f5..cd2bf5f669b 100644 --- a/dbms/include/DB/IO/WriteBufferAIO.h +++ b/dbms/include/DB/IO/WriteBufferAIO.h @@ -28,8 +28,8 @@ public: off_t getPositionInFile() override; void truncate(off_t length = 0) override; void sync() override; - std::string getFileName() const noexcept override { return filename; } - int getFD() const noexcept override { return fd; } + std::string getFileName() const override { return filename; } + int getFD() const override { return fd; } private: /// diff --git a/dbms/include/DB/IO/WriteBufferFromFileBase.h b/dbms/include/DB/IO/WriteBufferFromFileBase.h index 3b051bd5e27..badfef29739 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileBase.h +++ b/dbms/include/DB/IO/WriteBufferFromFileBase.h @@ -18,8 +18,8 @@ public: virtual off_t getPositionInFile() = 0; virtual void truncate(off_t length) = 0; virtual void sync() = 0; - virtual std::string getFileName() const noexcept = 0; - virtual int getFD() const noexcept = 0; + virtual std::string getFileName() const = 0; + virtual int getFD() const = 0; protected: virtual off_t doSeek(off_t off, int whence) = 0; diff --git a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h index cdb72ad6a6f..14020ea5e51 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h @@ -41,7 +41,7 @@ protected: } /// Имя или описание файла - virtual std::string getFileName() const noexcept override + virtual std::string getFileName() const override { return "(fd = " + toString(fd) + ")"; } @@ -70,7 +70,7 @@ public: } } - int getFD() const noexcept override + int getFD() const override { return fd; } diff --git a/dbms/include/DB/IO/createReadBufferFromFileBase.h b/dbms/include/DB/IO/createReadBufferFromFileBase.h index 73aed806851..127d0900fab 100644 --- a/dbms/include/DB/IO/createReadBufferFromFileBase.h +++ b/dbms/include/DB/IO/createReadBufferFromFileBase.h @@ -2,12 +2,19 @@ #include #include -#include namespace DB { +/** Создать объект для чтения данных из файла. + * estimated_size - количество байтов, которые надо читать + * aio_threshold - минимальное количество байтов для асинхронных операций чтения + * + * Если aio_threshold = 0 или estimated_size < aio_threshold, операции чтения выполняются синхронно. + * В противном случае операции чтения выполняются асинхронно. + */ ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_, + size_t estimated_size, size_t aio_threshold, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, diff --git a/dbms/include/DB/IO/createWriteBufferFromFileBase.h b/dbms/include/DB/IO/createWriteBufferFromFileBase.h index 3526297f41e..d4b80cca0dd 100644 --- a/dbms/include/DB/IO/createWriteBufferFromFileBase.h +++ b/dbms/include/DB/IO/createWriteBufferFromFileBase.h @@ -2,11 +2,17 @@ #include #include -#include namespace DB { +/** Создать объект для записи данных в файл. + * estimated_size - количество байтов, которые надо записать + * aio_threshold - минимальное количество байтов для асинхронных операций записи + * + * Если aio_threshold = 0 или estimated_size < aio_threshold, операции записи выполняются синхронно. + * В противном случае операции записи выполняются асинхронно. + */ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size, size_t aio_threshold, diff --git a/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h b/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h index 230098cfd41..03110a3efca 100644 --- a/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterOptimizeQuery.h @@ -15,21 +15,21 @@ class InterpreterOptimizeQuery { public: InterpreterOptimizeQuery(ASTPtr query_ptr_, Context & context_) - : query_ptr(query_ptr_), context(context_), - aio_threshold(context_.getSettings().min_bytes_to_use_direct_io) {} + : query_ptr(query_ptr_), context(context_) + { + } void execute() { const ASTOptimizeQuery & ast = typeid_cast(*query_ptr); StoragePtr table = context.getTable(ast.database, ast.table); auto table_lock = table->lockStructure(true); - table->optimize(aio_threshold); + table->optimize(context.getSettings()); } private: ASTPtr query_ptr; Context context; - size_t aio_threshold; }; diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index f573702f287..0a1f656dcdc 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -236,9 +236,9 @@ public: /** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree. * Возвращает - была ли выполнена какая-либо работа. */ - bool optimize(size_t aio_threshold = 0) + virtual bool optimize(const Settings & settings) { - return performOptimize(aio_threshold); + throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } /** Получить запрос CREATE TABLE, который описывает данную таблицу. @@ -279,12 +279,6 @@ public: /// проверяет валидность данных virtual bool checkData() const { throw DB::Exception("Check query is not supported for " + getName() + " storage"); } -protected: - virtual bool performOptimize(size_t aio_threshold) - { - throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); - } - protected: using ITableDeclaration::ITableDeclaration; diff --git a/dbms/include/DB/Storages/MarkCache.h b/dbms/include/DB/Storages/MarkCache.h index b71e14a674a..08b912de9d5 100644 --- a/dbms/include/DB/Storages/MarkCache.h +++ b/dbms/include/DB/Storages/MarkCache.h @@ -32,7 +32,7 @@ struct MarkInCompressedFile typedef std::vector MarksInCompressedFile; -/// Оценка количества байт, занимаемых засечками в кеше. +/// Оценка количества байтов, занимаемых засечками в кеше. struct MarksWeightFunction { size_t operator()(const MarksInCompressedFile & marks) const diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index a4587a9af95..a49327cb5cd 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -256,16 +256,36 @@ private: (*marks)[right].offset_in_compressed_file - (*marks)[all_mark_ranges[i].begin].offset_in_compressed_file); } - size_t buffer_size = max_read_buffer_size < max_mark_range ? max_read_buffer_size : max_mark_range; + size_t buffer_size = std::min(max_read_buffer_size, max_mark_range); + + size_t estimated_size = 0; + if (aio_threshold > 0) + { + for (const auto & mark_range : all_mark_ranges) + { + size_t offset_begin = (*marks)[mark_range.begin].offset_in_compressed_file; + + size_t offset_end; + if (mark_range.end < (*marks).size()) + offset_end = (*marks)[mark_range.end].offset_in_compressed_file; + else + offset_end = Poco::File(path_prefix + ".bin").getSize(); + + if (offset_end > 0) + estimated_size += offset_end - offset_begin; + } + } if (uncompressed_cache) { - cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache, aio_threshold, buffer_size); + cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache, + estimated_size, aio_threshold, buffer_size); data_buffer = &*cached_buffer; } else { - non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", aio_threshold, buffer_size); + non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size, + aio_threshold, buffer_size); data_buffer = &*non_cached_buffer; } } diff --git a/dbms/include/DB/Storages/StorageBuffer.h b/dbms/include/DB/Storages/StorageBuffer.h index e6b70cb5ef1..ffbc3e019b4 100644 --- a/dbms/include/DB/Storages/StorageBuffer.h +++ b/dbms/include/DB/Storages/StorageBuffer.h @@ -70,6 +70,7 @@ public: /// Сбрасывает все буферы в подчинённую таблицу. void shutdown() override; + bool optimize(const Settings & settings) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; } @@ -123,8 +124,6 @@ private: void writeBlockToDestination(const Block & block, StoragePtr table); void flushThread(); - - bool performOptimize(size_t aio_threshold) override; }; } diff --git a/dbms/include/DB/Storages/StorageMaterializedView.h b/dbms/include/DB/Storages/StorageMaterializedView.h index 84a968efcc2..66eb2b0dfc4 100644 --- a/dbms/include/DB/Storages/StorageMaterializedView.h +++ b/dbms/include/DB/Storages/StorageMaterializedView.h @@ -32,6 +32,7 @@ public: BlockOutputStreamPtr write(ASTPtr query) override; void drop() override; + bool optimize(const Settings & settings) override; BlockInputStreams read( const Names & column_names, @@ -55,8 +56,6 @@ private: const NamesAndTypesList & alias_columns_, const ColumnDefaults & column_defaults_, bool attach_); - - bool performOptimize(size_t aio_threshold) override; }; } diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index ffde219f78b..57e6420ef3d 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -79,6 +79,13 @@ public: BlockOutputStreamPtr write(ASTPtr query) override; + /** Выполнить очередной шаг объединения кусков. + */ + bool optimize(const Settings & settings) override + { + return merge(settings.min_bytes_to_use_direct_io, true); + } + void dropPartition(const Field & partition, bool detach, const Settings & settings) override; void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override; void freezePartition(const Field & partition, const Settings & settings) override; @@ -93,14 +100,6 @@ public: MergeTreeData & getData() { return data; } -private: - /** Выполнить очередной шаг объединения кусков. - */ - bool performOptimize(size_t aio_threshold) override - { - return merge(aio_threshold, true); - } - private: String path; String database_name; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index f0178aae1be..f09b60da11b 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -83,6 +83,8 @@ public: BlockOutputStreamPtr write(ASTPtr query) override; + bool optimize(const Settings & settings) override; + void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override; void dropPartition(const Field & partition, bool detach, const Settings & settings) override; @@ -386,8 +388,6 @@ private: */ void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry); - bool performOptimize(size_t aio_threshold) override; - /// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper. static String padIndex(UInt64 index) { diff --git a/dbms/src/IO/WriteBufferFromFileBase.cpp b/dbms/src/IO/WriteBufferFromFileBase.cpp index 9f48cead9f6..fec26327c32 100644 --- a/dbms/src/IO/WriteBufferFromFileBase.cpp +++ b/dbms/src/IO/WriteBufferFromFileBase.cpp @@ -17,4 +17,4 @@ off_t WriteBufferFromFileBase::seek(off_t off, int whence) return doSeek(off, whence); } -} \ No newline at end of file +} diff --git a/dbms/src/IO/createReadBufferFromFileBase.cpp b/dbms/src/IO/createReadBufferFromFileBase.cpp index 3b32ffe5778..46f82413785 100644 --- a/dbms/src/IO/createReadBufferFromFileBase.cpp +++ b/dbms/src/IO/createReadBufferFromFileBase.cpp @@ -1,17 +1,14 @@ #include #include #include -#include namespace DB { -ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_, size_t aio_threshold, - size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment) +ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_, size_t estimated_size, + size_t aio_threshold, size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment) { - size_t file_size = (aio_threshold > 0) ? Poco::File(filename_).getSize() : 0; - - if ((aio_threshold == 0) || (file_size < aio_threshold)) + if ((aio_threshold == 0) || (estimated_size < aio_threshold)) return new ReadBufferFromFile(filename_, buffer_size_, flags_, existing_memory_, alignment); else return new ReadBufferAIO(filename_, buffer_size_, flags_, existing_memory_); diff --git a/dbms/src/IO/createWriteBufferFromFileBase.cpp b/dbms/src/IO/createWriteBufferFromFileBase.cpp index b4d0515b2fd..e354cd1d274 100644 --- a/dbms/src/IO/createWriteBufferFromFileBase.cpp +++ b/dbms/src/IO/createWriteBufferFromFileBase.cpp @@ -6,7 +6,7 @@ namespace DB { WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size, - size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_, + size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_, size_t alignment) { if ((aio_threshold == 0) || (estimated_size < aio_threshold)) diff --git a/dbms/src/IO/tests/cached_compressed_read_buffer.cpp b/dbms/src/IO/tests/cached_compressed_read_buffer.cpp index 8c5725f30f7..144dd8fde97 100644 --- a/dbms/src/IO/tests/cached_compressed_read_buffer.cpp +++ b/dbms/src/IO/tests/cached_compressed_read_buffer.cpp @@ -25,7 +25,7 @@ int main(int argc, char ** argv) { Stopwatch watch; - CachedCompressedReadBuffer in(path, &cache, std::numeric_limits::max()); + CachedCompressedReadBuffer in(path, &cache, 0, 0); WriteBufferFromFile out("/dev/null"); copyData(in, out); @@ -37,7 +37,7 @@ int main(int argc, char ** argv) { Stopwatch watch; - CachedCompressedReadBuffer in(path, &cache, std::numeric_limits::max()); + CachedCompressedReadBuffer in(path, &cache, 0, 0); WriteBufferFromFile out("/dev/null"); copyData(in, out); diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index b5e65f89d75..9e20fddb72a 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -265,11 +265,11 @@ void StorageBuffer::shutdown() if (flush_thread.joinable()) flush_thread.join(); - optimize(); + optimize(context.getSettings()); } -bool StorageBuffer::performOptimize(size_t aio_threshold) +bool StorageBuffer::optimize(const Settings & settings) { flushAllBuffers(false); @@ -442,7 +442,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_ auto lock = lockStructureForAlter(); /// Чтобы не осталось блоков старой структуры. - optimize(); + optimize(context.getSettings()); params.apply(*columns, materialized_columns, alias_columns, column_defaults); InterpreterAlterQuery::updateMetadata(database_name, table_name, diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index e8e4f397dd5..910bedfc1e1 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -129,9 +129,9 @@ void StorageMaterializedView::drop() } } -bool StorageMaterializedView::performOptimize(size_t aio_threshold) +bool StorageMaterializedView::optimize(const Settings & settings) { - return data->optimize(aio_threshold); + return data->optimize(settings); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 0c28e3f2a62..1e3dc307443 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2060,7 +2060,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query) } -bool StorageReplicatedMergeTree::performOptimize(size_t aio_threshold) +bool StorageReplicatedMergeTree::optimize(const Settings & settings) { /// Померджим какие-нибудь куски из директории unreplicated. /// TODO: Мерджить реплицируемые куски тоже. @@ -2074,12 +2074,13 @@ bool StorageReplicatedMergeTree::performOptimize(size_t aio_threshold) MergeTreeData::DataPartsVector parts; String merged_name; - auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; }; + auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; }; if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge)) return false; const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); - unreplicated_merger->mergeParts(parts, merged_name, *merge_entry, aio_threshold); + unreplicated_merger->mergeParts(parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io); + return true; }