From f094e8f565966ab843cf2e0b89c4a05335ee7028 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Sat, 4 Jan 2020 21:33:16 +0300 Subject: [PATCH] Support reading from DirectoryMonitor. --- .../src/Storages/Distributed/DirectoryMonitor.cpp | 15 +++++++++++---- dbms/src/Storages/Distributed/DirectoryMonitor.h | 4 +++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 7c22d589daf..e33e7c2b001 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -269,7 +269,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa Settings insert_settings; std::string insert_query; - readHeader(in, insert_settings, insert_query); + readHeader(in, insert_settings, insert_query, log); RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings}; @@ -289,7 +289,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa } void StorageDistributedDirectoryMonitor::readHeader( - ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const + ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log) { UInt64 query_size; readVarUInt(query_size, in); @@ -449,7 +449,7 @@ struct StorageDistributedDirectoryMonitor::Batch } ReadBufferFromFile in(file_path->second); - parent.readHeader(in, insert_settings, insert_query); + parent.readHeader(in, insert_settings, insert_query, parent.log); if (first) { @@ -527,7 +527,12 @@ public: : in(file_name) , decompressing_in(in) , block_in(decompressing_in, ClickHouseRevision::get()) + , log{&Logger::get("DirectoryMonitorBlockInputStream")} { + Settings insert_settings; + String insert_query; + StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, log); + block_in.readPrefix(); first_block = block_in.read(); header = first_block.cloneEmpty(); @@ -554,6 +559,8 @@ private: Block first_block; Block header; + + Logger * log; }; BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(const String & file_name) @@ -597,7 +604,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - readHeader(in, insert_settings, insert_query); + readHeader(in, insert_settings, insert_query, log); CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get()); diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index c9d23f67dd0..4ee77072ee3 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -71,7 +71,9 @@ private: ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; /// Read insert query and insert settings for backward compatible. - void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const; + static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log); + + friend class DirectoryMonitorBlockInputStream; }; }