Support reading from DirectoryMonitor.

This commit is contained in:
Nikolai Kochetov 2020-01-04 21:33:16 +03:00
parent cf7b16d333
commit f094e8f565
2 changed files with 14 additions and 5 deletions

View File

@ -269,7 +269,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
Settings insert_settings;
std::string insert_query;
readHeader(in, insert_settings, insert_query);
readHeader(in, insert_settings, insert_query, log);
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
@ -289,7 +289,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}
void StorageDistributedDirectoryMonitor::readHeader(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log)
{
UInt64 query_size;
readVarUInt(query_size, in);
@ -449,7 +449,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
parent.readHeader(in, insert_settings, insert_query);
parent.readHeader(in, insert_settings, insert_query, parent.log);
if (first)
{
@ -527,7 +527,12 @@ public:
: in(file_name)
, decompressing_in(in)
, block_in(decompressing_in, ClickHouseRevision::get())
, log{&Logger::get("DirectoryMonitorBlockInputStream")}
{
Settings insert_settings;
String insert_query;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, log);
block_in.readPrefix();
first_block = block_in.read();
header = first_block.cloneEmpty();
@ -554,6 +559,8 @@ private:
Block first_block;
Block header;
Logger * log;
};
BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(const String & file_name)
@ -597,7 +604,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readHeader(in, insert_settings, insert_query);
readHeader(in, insert_settings, insert_query, log);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());

View File

@ -71,7 +71,9 @@ private:
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.
void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const;
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log);
friend class DirectoryMonitorBlockInputStream;
};
}