add directory watcher

This commit is contained in:
feng lv 2021-07-03 17:14:56 +00:00
parent 94011d91ba
commit b15099fe6f
11 changed files with 243 additions and 88 deletions

View File

@ -104,7 +104,11 @@ if (USE_HDFS)
add_headers_and_sources(dbms Disks/HDFS)
endif()
add_headers_and_sources(dbms Storages/FileLog)
set (USE_FILELOG 1)
if(USE_FILELOG)
add_headers_and_sources(dbms Storages/FileLog)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})

View File

@ -15,3 +15,4 @@
#cmakedefine01 USE_LIBPQXX
#cmakedefine01 USE_NURAFT
#cmakedefine01 USE_KRB5
#cmakedefine01 USE_FILELOG

View File

@ -15,10 +15,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
// with default poll timeout (500ms) it will give about 5 sec delay for doing 10 retries
// when selecting from empty topic
const auto MAX_FAILED_POLL_ATTEMPTS = 10;
FileLogBlockInputStream::FileLogBlockInputStream(
StorageFileLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -55,15 +51,10 @@ void FileLogBlockInputStream::readPrefixImpl()
Block FileLogBlockInputStream::readImpl()
{
if (!buffer || finished)
if (!buffer)
return Block();
finished = true;
// now it's one-time usage InputStream
// one block of the needed size (or with desired flush timeout) is formed in one internal iteration
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
// MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
@ -93,8 +84,6 @@ Block FileLogBlockInputStream::readImpl()
{
auto chunk = port.pull();
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
// if will be backported should go together with #8005
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
@ -125,7 +114,7 @@ Block FileLogBlockInputStream::readImpl()
{
new_rows = read_file_log();
}
catch (Exception & e)
catch (Exception &)
{
throw;
}
@ -133,6 +122,7 @@ Block FileLogBlockInputStream::readImpl()
if (new_rows)
{
total_rows = total_rows + new_rows;
LOG_INFO(log, "FileLogBlockInputStream, {} rows data polled from buffer.", new_rows);
}
if (!buffer->hasMorePolledRecords() && (total_rows >= max_block_size || !checkTimeLimit()))
@ -144,18 +134,7 @@ Block FileLogBlockInputStream::readImpl()
if (total_rows == 0)
return Block();
/// MATERIALIZED columns can be added here, but I think
// they are not needed here:
// and it's misleading to use them here,
// as columns 'materialized' that way stays 'ephemeral'
// i.e. will not be stored anythere
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
// auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
// for (const auto & column : virtual_block.getColumnsWithTypeAndName())
// result_block.insert(column);
return ConvertingBlockInputStream(
std::make_shared<OneBlockInputStream>(result_block),

View File

@ -0,0 +1,89 @@
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Poco/Delegate.h>
#include <Poco/DirectoryWatcher.h>
FileLogDirectoryWatcher::FileLogDirectoryWatcher(const std::string & path_)
: path(path_), dw(std::make_shared<Poco::DirectoryWatcher>(path))
{
dw->itemAdded += Poco::delegate(this, &FileLogDirectoryWatcher::onItemAdded);
dw->itemRemoved += Poco::delegate(this, &FileLogDirectoryWatcher::onItemRemoved);
dw->itemModified += Poco::delegate(this, &FileLogDirectoryWatcher::onItemModified);
dw->itemMovedFrom += Poco::delegate(this, &FileLogDirectoryWatcher::onItemMovedFrom);
dw->itemMovedTo += Poco::delegate(this, &FileLogDirectoryWatcher::onItemMovedTo);
}
FileLogDirectoryWatcher::Events FileLogDirectoryWatcher::getEvents()
{
std::lock_guard<std::mutex> lock(mutex);
Events res;
res.swap(events);
return res;
}
bool FileLogDirectoryWatcher::getError() const
{
return error;
}
const std::string & FileLogDirectoryWatcher::getPath() const
{
return path;
}
void FileLogDirectoryWatcher::onItemAdded(const Poco::DirectoryWatcher::DirectoryEvent& ev)
{
std::lock_guard<std::mutex> lock(mutex);
DirEvent de;
de.callback = "onItemAdded";
de.path = ev.item.path();
de.type = ev.event;
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemRemoved(const Poco::DirectoryWatcher::DirectoryEvent& ev)
{
std::lock_guard<std::mutex> lock(mutex);
DirEvent de;
de.callback = "onItemRemoved";
de.path = ev.item.path();
de.type = ev.event;
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemModified(const Poco::DirectoryWatcher::DirectoryEvent& ev)
{
std::lock_guard<std::mutex> lock(mutex);
DirEvent de;
de.callback = "onItemModified";
de.path = ev.item.path();
de.type = ev.event;
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemMovedFrom(const Poco::DirectoryWatcher::DirectoryEvent& ev)
{
std::lock_guard<std::mutex> lock(mutex);
DirEvent de;
de.callback = "onItemMovedFrom";
de.path = ev.item.path();
de.type = ev.event;
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemMovedTo(const Poco::DirectoryWatcher::DirectoryEvent& ev)
{
std::lock_guard<std::mutex> lock(mutex);
DirEvent de;
de.callback = "onItemMovedTo";
de.path = ev.item.path();
de.type = ev.event;
events.push_back(de);
}
void FileLogDirectoryWatcher::onError(const Poco::Exception &)
{
error = true;
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Poco/DirectoryWatcher.h>
#include <Poco/Foundation.h>
#include <Poco/Path.h>
#include <deque>
#include <mutex>
class FileLogDirectoryWatcher
{
public:
struct DirEvent
{
Poco::DirectoryWatcher::DirectoryEventType type;
std::string callback;
std::string path;
};
using Events = std::deque<DirEvent>;
explicit FileLogDirectoryWatcher(const std::string & path_);
~FileLogDirectoryWatcher() = default;
Events getEvents();
bool getError() const;
const std::string & getPath() const;
protected:
void onItemAdded(const Poco::DirectoryWatcher::DirectoryEvent& ev);
void onItemRemoved(const Poco::DirectoryWatcher::DirectoryEvent& ev);
void onItemModified(const Poco::DirectoryWatcher::DirectoryEvent& ev);
void onItemMovedFrom(const Poco::DirectoryWatcher::DirectoryEvent& ev);
void onItemMovedTo(const Poco::DirectoryWatcher::DirectoryEvent& ev);
void onError(const Poco::Exception &);
private:
const std::string path;
std::shared_ptr<Poco::DirectoryWatcher> dw;
std::mutex mutex;
Events events;
bool error = false;
};

View File

@ -1,5 +1,7 @@
#include <Interpreters/Context.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <common/logger_useful.h>
#include <common/sleep.h>
@ -15,33 +17,49 @@ namespace ErrorCodes
extern const int CANNOT_COMMIT_OFFSET;
}
using namespace std::chrono_literals;
ReadBufferFromFileLog::ReadBufferFromFileLog(
const std::vector<String> & log_files_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_)
const String & path_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, ContextPtr context_)
: ReadBuffer(nullptr, 0)
, path(path_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, context(context_)
, log_files(log_files_.begin(), log_files_.end())
{
}
void ReadBufferFromFileLog::open()
{
for (const auto & file : log_files)
file_status[file].reader = std::ifstream(file);
Poco::File file(path);
bool path_is_directory = false;
if (file.isFile())
{
file_status[path].reader = std::ifstream(path);
}
else if (file.isDirectory())
{
path_is_directory = true;
Poco::DirectoryIterator dir_iter(file);
Poco::DirectoryIterator end;
while (dir_iter != end)
{
if (dir_iter->isFile())
file_status[dir_iter->path()].reader = std::ifstream(dir_iter->path());
++dir_iter;
}
}
wait_task = context->getMessageBrokerSchedulePool().createTask("waitTask", [this] { waitFunc(); });
wait_task->deactivate();
select_task = context->getMessageBrokerSchedulePool().createTask("selectTask", [this] { selectFunc(); });
if (path_is_directory)
{
FileLogDirectoryWatcher dw(path);
select_task = context->getMessageBrokerSchedulePool().createTask("watchTask", [this, &dw] { watchFunc(dw); });
select_task->activateAndSchedule();
}
cleanUnprocessed();
allowed = false;
@ -63,7 +81,6 @@ void ReadBufferFromFileLog::close()
status.second.reader.close();
}
// it do the poll when needed
bool ReadBufferFromFileLog::poll()
{
@ -113,10 +130,19 @@ ReadBufferFromFileLog::Records ReadBufferFromFileLog::pollBatch(size_t batch_siz
void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_records, size_t batch_size_)
{
std::lock_guard<std::mutex> lock(status_mutex);
size_t need_records_size = batch_size_ - new_records.size();
size_t read_records_size = 0;
for (auto & status : file_status)
{
if (status.second.status == FileStatus::NO_CHANGE)
continue;
if (status.second.status == FileStatus::REMOVED)
file_status.erase(status.first);
while (read_records_size < need_records_size && status.second.reader.good() && !status.second.reader.eof())
{
Record record;
@ -124,6 +150,11 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_
new_records.emplace_back(record);
++read_records_size;
}
// Read to the end of the file
if (status.second.reader.eof())
status.second.status = FileStatus::NO_CHANGE;
if (read_records_size == need_records_size)
break;
}
@ -149,8 +180,41 @@ void ReadBufferFromFileLog::waitFunc()
time_out = true;
}
void ReadBufferFromFileLog::selectFunc()
void ReadBufferFromFileLog::watchFunc(FileLogDirectoryWatcher & dw)
{
}
}
while (true)
{
sleepForNanoseconds(poll_timeout);
auto error = dw.getError();
if (error)
LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath());
auto events = dw.getEvents();
std::lock_guard<std::mutex> lock(status_mutex);
for (const auto & event : events)
{
switch (event.type)
{
case Poco::DirectoryWatcher::DW_ITEM_ADDED:
file_status[event.path].reader = std::ifstream(event.path);
break;
case Poco::DirectoryWatcher::DW_ITEM_MODIFIED:
file_status[event.path].status = FileStatus::UPDATED;
break;
case Poco::DirectoryWatcher::DW_ITEM_REMOVED:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_TO:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_FROM:
file_status[event.path].status = FileStatus::REMOVED;
break;
default:
LOG_INFO(log, "Undefine event type");
}
}
}
}
}

View File

@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
#include <IO/ReadBuffer.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <common/types.h>
#include <fstream>
@ -19,12 +20,7 @@ namespace DB
class ReadBufferFromFileLog : public ReadBuffer
{
public:
ReadBufferFromFileLog(
const std::vector<String> & log_files_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_);
ReadBufferFromFileLog(const String & path_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, ContextPtr context_);
~ReadBufferFromFileLog() override = default;
@ -48,11 +44,12 @@ private:
struct FileContext
{
std::mutex status_mutex;
FileStatus status = FileStatus::BEGIN;
std::ifstream reader;
};
const String path;
Poco::Logger * log;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
@ -62,12 +59,12 @@ private:
using NameToFile = std::unordered_map<String, FileContext>;
NameToFile file_status;
std::mutex status_mutex;
ContextPtr context;
bool allowed = true;
const std::vector<String> log_files;
using Record = std::string;
using Records = std::vector<Record>;
@ -83,13 +80,12 @@ private:
void readNewRecords(Records & new_records, size_t batch_size_);
// void drain();
void cleanUnprocessed();
bool nextImpl() override;
void waitFunc();
void selectFunc();
void watchFunc(FileLogDirectoryWatcher & dw);
};
}

View File

@ -23,7 +23,6 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/DirectoryWatcher.h>
#include <Poco/File.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
@ -67,24 +66,6 @@ StorageFileLog::StorageFileLog(
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
Poco::File file(path);
if (file.isFile())
{
log_files.emplace_back(path);
}
else if (file.isDirectory())
{
Poco::DirectoryIterator dir_iter(file);
Poco::DirectoryIterator end;
while (dir_iter != end)
{
if (dir_iter->isFile())
log_files.emplace_back(dir_iter->path());
++dir_iter;
}
}
auto thread = getContext()->getMessageBrokerSchedulePool().createTask(log->name(), [this] { threadFunc(); });
thread->deactivate();
task = std::make_shared<TaskContext>(std::move(thread));
@ -157,7 +138,6 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id)
if (dependencies.empty())
return true;
// Check the dependencies are ready?
for (const auto & db_tab : dependencies)
{
auto table = DatabaseCatalog::instance().tryGetTable(db_tab, getContext());
@ -180,7 +160,7 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id)
void StorageFileLog::createReadBuffer()
{
auto new_context = Context::createCopy(getContext());
buffer = std::make_shared<ReadBufferFromFileLog>(log_files, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), new_context);
buffer = std::make_shared<ReadBufferFromFileLog>(path, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), new_context);
}
void StorageFileLog::destroyReadBuffer()
@ -208,13 +188,7 @@ void StorageFileLog::threadFunc()
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
// Exit the loop & reschedule if some stream stalled
auto some_stream_is_stalled = streamToViews();
if (some_stream_is_stalled)
{
LOG_TRACE(log, "Stream(s) stalled. Reschedule.");
break;
}
streamToViews();
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
@ -255,14 +229,12 @@ bool StorageFileLog::streamToViews()
auto new_context = Context::createCopy(getContext());
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, new_context, false, true, true);
auto block_io = interpreter.execute();
auto stream = std::make_shared<FileLogBlockInputStream>(
*this, metadata_snapshot, new_context, block_io.out->getHeader().getNames(), log, block_size);
// Limit read batch to maximum block size to allow DDL
StreamLocalLimits limits;
limits.speed_limits.max_execution_time = getContext()->getSettingsRef().stream_flush_interval_ms;
@ -270,8 +242,6 @@ bool StorageFileLog::streamToViews()
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
size_t rows = 0;
copyData(

View File

@ -59,8 +59,6 @@ private:
const String format_name;
Poco::Logger * log;
Files log_files;
ReadBufferFromFileLogPtr buffer;
std::mutex mutex;

View File

@ -50,6 +50,8 @@ const char * auto_config_build[]
"USE_LDAP", "@USE_LDAP@",
"TZDATA_VERSION", "@TZDATA_VERSION@",
"USE_KRB5", "@USE_KRB5@",
"USE_KRB5", "@USE_KRB5@",
"USE_FILELOG", "@USE_FILELOG@",
nullptr, nullptr
};

View File

@ -50,8 +50,6 @@ void registerStorageMongoDB(StorageFactory & factory);
void registerStorageKafka(StorageFactory & factory);
#endif
void registerStorageFileLog(StorageFactory & factory);
#if USE_AMQPCPP
void registerStorageRabbitMQ(StorageFactory & factory);
#endif
@ -69,6 +67,10 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory);
void registerStorageExternalDistributed(StorageFactory & factory);
#endif
#if USE_FILELOG
void registerStorageFileLog(StorageFactory & factory);
#endif
void registerStorages()
{
auto & factory = StorageFactory::instance();
@ -114,7 +116,9 @@ void registerStorages()
registerStorageKafka(factory);
#endif
#if USE_FILELOG
registerStorageFileLog(factory);
#endif
#if USE_AMQPCPP
registerStorageRabbitMQ(factory);