refactor some code

This commit is contained in:
feng lv 2021-09-04 17:04:35 +00:00
parent e2cadd2ab1
commit 595005eb21
8 changed files with 366 additions and 212 deletions

View File

@ -0,0 +1,41 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/FileLog/FileLogSettings.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(FileLogSettingsTraits, LIST_OF_FILELOG_SETTINGS)
void FileLogSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
namespace DB
{
class ASTStorage;
#define FILELOG_RELATED_SETTINGS(M) \
/* default is stream_poll_timeout_ms */ \
M(Milliseconds, filelog_poll_timeout_ms, 0, "Timeout for single poll from FileLog.", 0) \
/* default is min(max_block_size, kafka_max_block_size)*/ \
M(UInt64, filelog_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
/* default is = max_insert_block_size / kafka_num_consumers */ \
M(UInt64, filelog_max_block_size, 0, "Number of row collected by poll(s) for flushing data from Kafka.", 0) \
M(UInt64, filelog_max_threads, 8, "Number of max threads to parse files, default is 8", 0)
#define LIST_OF_FILELOG_SETTINGS(M) \
FILELOG_RELATED_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(FileLogSettingsTraits, LIST_OF_FILELOG_SETTINGS)
/** Settings for the Kafka engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct FileLogSettings : public BaseSettings<FileLogSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -1,11 +1,11 @@
#include <Storages/FileLog/FileLogBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h> #include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h> #include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h> #include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Common/Stopwatch.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
namespace DB namespace DB
@ -15,42 +15,34 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
FileLogBlockInputStream::FileLogBlockInputStream( FileLogSource::FileLogSource(
StorageFileLog & storage_, StorageFileLog & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_, const ContextPtr & context_,
const Names & columns, const Names & columns,
size_t max_block_size_) size_t max_block_size_,
size_t poll_time_out_,
size_t stream_number_,
size_t max_streams_number_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, context(context_) , context(context_)
, column_names(columns) , column_names(columns)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, poll_time_out(poll_time_out_)
, stream_number(stream_number_)
, max_streams_number(max_streams_number_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header( , virtual_header(
metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID()))
{ {
createReadBuffer();
} }
Block FileLogBlockInputStream::getHeader() const Chunk FileLogSource::generate()
{
return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
}
void FileLogBlockInputStream::readPrefixImpl()
{
buffer = storage.getBuffer();
if (!buffer)
return;
buffer->open();
}
Block FileLogBlockInputStream::readImpl()
{ {
if (!buffer) if (!buffer)
return Block(); return {};
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
@ -104,8 +96,8 @@ Block FileLogBlockInputStream::readImpl()
while (true) while (true)
{ {
Stopwatch watch;
size_t new_rows = 0; size_t new_rows = 0;
exception_message.reset();
if (buffer->poll()) if (buffer->poll())
{ {
try try
@ -122,28 +114,16 @@ Block FileLogBlockInputStream::readImpl()
total_rows = total_rows + new_rows; total_rows = total_rows + new_rows;
} }
if (!buffer->hasMorePolledRecords() && (total_rows >= max_block_size || !checkTimeLimit())) if ((!buffer->hasMorePolledRecords() && (total_rows >= max_block_size)) || watch.elapsedMilliseconds() > poll_time_out)
{ {
break; break;
} }
} }
if (total_rows == 0) if (total_rows == 0)
return Block(); return {};
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); return Chunk(std::move(result_columns), total_rows);
return ConvertingBlockInputStream(
std::make_shared<OneBlockInputStream>(result_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
}
void FileLogBlockInputStream::readSuffixImpl()
{
if (buffer)
buffer->close();
} }
} }

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h> #include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Storages/FileLog/StorageFileLog.h> #include <Storages/FileLog/StorageFileLog.h>
@ -12,26 +12,24 @@ namespace Poco
} }
namespace DB namespace DB
{ {
class FileLogBlockInputStream : public IBlockInputStream class FileLogSource : public SourceWithProgress
{ {
public: public:
FileLogBlockInputStream( FileLogSource(
StorageFileLog & storage_, StorageFileLog & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_, const ContextPtr & context_,
const Names & columns, const Names & columns,
size_t max_block_size_); size_t max_block_size_,
~FileLogBlockInputStream() override = default; size_t poll_time_out_,
size_t stream_number_,
String getName() const override { return storage.getName(); } size_t max_streams_number_);
Block getHeader() const override;
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
bool isStalled() { return !buffer || buffer->isStalled(); } bool isStalled() { return !buffer || buffer->isStalled(); }
protected:
Chunk generate() override;
private: private:
StorageFileLog & storage; StorageFileLog & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
@ -39,10 +37,19 @@ private:
Names column_names; Names column_names;
UInt64 max_block_size; UInt64 max_block_size;
size_t poll_time_out;
size_t stream_number;
size_t max_streams_number;
ReadBufferFromFileLogPtr buffer; ReadBufferFromFileLogPtr buffer;
bool started = false;
const Block non_virtual_header; const Block non_virtual_header;
const Block virtual_header; const Block virtual_header;
void createReadBuffer();
}; };
} }

View File

@ -1,13 +1,11 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h> #include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/sleep.h>
#include <boost/algorithm/string/join.hpp>
#include <algorithm> #include <algorithm>
#include <filesystem>
#include <boost/algorithm/string/join.hpp>
namespace DB namespace DB
{ {
@ -29,33 +27,9 @@ ReadBufferFromFileLog::ReadBufferFromFileLog(
void ReadBufferFromFileLog::open() void ReadBufferFromFileLog::open()
{ {
Poco::File file(path);
if (file.isFile())
{
file_status[path].reader = std::ifstream(path);
}
else if (file.isDirectory())
{
path_is_directory = true;
Poco::DirectoryIterator dir_iter(file);
Poco::DirectoryIterator end;
while (dir_iter != end)
{
if (dir_iter->isFile())
file_status[dir_iter->path()].reader = std::ifstream(dir_iter->path());
++dir_iter;
}
}
wait_task = context->getMessageBrokerSchedulePool().createTask("waitTask", [this] { waitFunc(); }); wait_task = context->getMessageBrokerSchedulePool().createTask("waitTask", [this] { waitFunc(); });
wait_task->deactivate(); wait_task->deactivate();
if (path_is_directory)
{
select_task = context->getMessageBrokerSchedulePool().createTask("watchTask", [this] { watchFunc(); });
select_task->activateAndSchedule();
}
cleanUnprocessed(); cleanUnprocessed();
allowed = false; allowed = false;
@ -183,43 +157,4 @@ void ReadBufferFromFileLog::waitFunc()
time_out = true; time_out = true;
} }
void ReadBufferFromFileLog::watchFunc()
{
FileLogDirectoryWatcher dw(path);
while (true)
{
sleepForNanoseconds(poll_timeout);
auto error = dw.getError();
if (error)
LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath());
auto events = dw.getEvents();
std::lock_guard<std::mutex> lock(status_mutex);
for (const auto & event : events)
{
switch (event.type)
{
case Poco::DirectoryWatcher::DW_ITEM_ADDED:
LOG_TRACE(log, "New event {} watched.", event.callback);
file_status[event.path].reader = std::ifstream(event.path);
break;
case Poco::DirectoryWatcher::DW_ITEM_MODIFIED:
LOG_TRACE(log, "New event {} watched.", event.callback);
file_status[event.path].status = FileStatus::UPDATED;
break;
case Poco::DirectoryWatcher::DW_ITEM_REMOVED:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_TO:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_FROM:
LOG_TRACE(log, "New event {} watched.", event.callback);
file_status[event.path].status = FileStatus::REMOVED;
break;
}
}
}
}
} }

View File

@ -33,26 +33,13 @@ public:
bool poll(); bool poll();
bool isStalled() { return buffer_status != BufferStatus::NOT_STALLED; } bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }
private: private:
enum class BufferStatus enum class BufferStatus
{ {
NO_RECORD_RETURNED, NO_RECORD_RETURNED,
NOT_STALLED, POLLED_OK,
};
enum class FileStatus
{
BEGIN,
NO_CHANGE,
UPDATED,
REMOVED
};
struct FileContext
{
FileStatus status = FileStatus::BEGIN;
std::ifstream reader;
}; };
BufferStatus buffer_status; BufferStatus buffer_status;
@ -67,10 +54,6 @@ private:
bool time_out = false; bool time_out = false;
using NameToFile = std::unordered_map<String, FileContext>;
NameToFile file_status;
std::mutex status_mutex;
ContextPtr context; ContextPtr context;
@ -85,7 +68,6 @@ private:
using TaskThread = BackgroundSchedulePool::TaskHolder; using TaskThread = BackgroundSchedulePool::TaskHolder;
TaskThread wait_task; TaskThread wait_task;
TaskThread select_task;
Records pollBatch(size_t batch_size_); Records pollBatch(size_t batch_size_);
@ -97,6 +79,5 @@ private:
void waitFunc(); void waitFunc();
[[noreturn]] void watchFunc();
}; };
} }

View File

@ -1,16 +1,3 @@
#include <Storages/FileLog/FileLogBlockInputStream.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
@ -18,20 +5,25 @@
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h> #include <Storages/StorageMaterializedView.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/sleep.h>
#include <Poco/DirectoryIterator.h>
namespace DB namespace DB
{ {
@ -53,19 +45,41 @@ StorageFileLog::StorageFileLog(
ContextPtr context_, ContextPtr context_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const String & path_, const String & path_,
const String & format_name_) const String & format_name_,
std::unique_ptr<FileLogSettings> settings)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, filelog_settings(std::move(settings))
, path(path_) , path(path_)
, format_name(format_name_) , format_name(format_name_)
, log(&Poco::Logger::get("StorageFile (" + table_id_.table_name + ")")) , log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")"))
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
if (std::filesystem::is_regular_file(path))
{
file_status[path].reader = std::ifstream(path);
file_names.push_back(path);
}
else if (std::filesystem::is_directory(path))
{
path_is_directory = true;
/// Just consider file with depth 1
for (const auto & dir_entry : std::filesystem::directory_iterator{path})
{
if (dir_entry.is_regular_file())
{
file_status[dir_entry.path()].reader = std::ifstream(dir_entry.path());
file_names.push_back(dir_entry.path());
}
}
}
watch_task = getContext()->getMessageBrokerSchedulePool().createTask("watchTask", [this] { watchFunc(); });
auto thread = getContext()->getMessageBrokerSchedulePool().createTask(log->name(), [this] { threadFunc(); }); auto thread = getContext()->getMessageBrokerSchedulePool().createTask(log->name(), [this] { threadFunc(); });
thread->deactivate();
task = std::make_shared<TaskContext>(std::move(thread)); task = std::make_shared<TaskContext>(std::move(thread));
} }
@ -78,23 +92,38 @@ Pipe StorageFileLog::read(
size_t /* max_block_size */, size_t /* max_block_size */,
unsigned /* num_streams */) unsigned /* num_streams */)
{ {
std::lock_guard<std::mutex> lock(status_mutex);
auto modified_context = Context::createCopy(local_context); auto modified_context = Context::createCopy(local_context);
return Pipe(std::make_shared<SourceFromInputStream>( clearInvalidFiles();
std::make_shared<FileLogBlockInputStream>(*this, metadata_snapshot, modified_context, column_names, 1)));
auto max_streams_number = std::min<UInt64>(filelog_settings->filelog_max_threads, file_names.size());
/// No files to parse
if (max_streams_number == 0)
{
return Pipe{};
}
Pipes pipes;
pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
*this,
metadata_snapshot,
modified_context,
column_names,
getMaxBlockSize(),
getPollTimeoutMillisecond(),
stream_number,
max_streams_number));
}
return Pipe::unitePipes(std::move(pipes));
} }
void StorageFileLog::startup() void StorageFileLog::startup()
{ {
try
{
createReadBuffer();
}
catch (const Exception &)
{
tryLogCurrentException(log);
}
task->holder->activateAndSchedule(); task->holder->activateAndSchedule();
} }
@ -105,26 +134,32 @@ void StorageFileLog::shutdown()
LOG_TRACE(log, "Waiting for cleanup"); LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate(); task->holder->deactivate();
watch_task->deactivate();
LOG_TRACE(log, "Closing files"); for (auto & file : file_status)
destroyReadBuffer(); {
file.second.reader.close();
}
} }
size_t StorageFileLog::getMaxBlockSize() const size_t StorageFileLog::getMaxBlockSize() const
{ {
return getContext()->getSettingsRef().max_insert_block_size.value; return filelog_settings->filelog_max_block_size.changed ? filelog_settings->filelog_max_block_size.value
: getContext()->getSettingsRef().max_insert_block_size.value;
} }
size_t StorageFileLog::getPollMaxBatchSize() const size_t StorageFileLog::getPollMaxBatchSize() const
{ {
size_t batch_size = getContext()->getSettingsRef().max_block_size.value; size_t batch_size = filelog_settings->filelog_poll_max_batch_size.changed ? filelog_settings->filelog_poll_max_batch_size.value
: getContext()->getSettingsRef().max_block_size.value;
return std::min(batch_size,getMaxBlockSize()); return std::min(batch_size, getMaxBlockSize());
} }
size_t StorageFileLog::getPollTimeoutMillisecond() const size_t StorageFileLog::getPollTimeoutMillisecond() const
{ {
return getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds(); return filelog_settings->filelog_poll_timeout_ms.changed ? filelog_settings->filelog_poll_timeout_ms.totalMilliseconds()
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
} }
@ -135,9 +170,9 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id)
if (dependencies.empty()) if (dependencies.empty())
return true; return true;
for (const auto & db_tab : dependencies) for (const auto & storage : dependencies)
{ {
auto table = DatabaseCatalog::instance().tryGetTable(db_tab, getContext()); auto table = DatabaseCatalog::instance().tryGetTable(storage, getContext());
if (!table) if (!table)
return false; return false;
@ -147,25 +182,13 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id)
return false; return false;
// Check all its dependencies // Check all its dependencies
if (!checkDependencies(db_tab)) if (!checkDependencies(storage))
return false; return false;
} }
return true; return true;
} }
void StorageFileLog::createReadBuffer()
{
auto new_context = Context::createCopy(getContext());
buffer = std::make_shared<ReadBufferFromFileLog>(path, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), new_context);
}
void StorageFileLog::destroyReadBuffer()
{
if (buffer)
buffer->close();
}
void StorageFileLog::threadFunc() void StorageFileLog::threadFunc()
{ {
try try
@ -177,6 +200,8 @@ void StorageFileLog::threadFunc()
{ {
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
watch_task->activateAndSchedule();
// Keep streaming as long as there are attached views and streaming is not cancelled // Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled) while (!task->stream_cancelled)
{ {
@ -207,6 +232,7 @@ void StorageFileLog::threadFunc()
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
watch_task->deactivate();
// Wait for attached views // Wait for attached views
if (!task->stream_cancelled) if (!task->stream_cancelled)
task->holder->scheduleAfter(RESCHEDULE_MS); task->holder->scheduleAfter(RESCHEDULE_MS);
@ -215,6 +241,7 @@ void StorageFileLog::threadFunc()
bool StorageFileLog::streamToViews() bool StorageFileLog::streamToViews()
{ {
std::lock_guard<std::mutex> lock(status_mutex);
Stopwatch watch; Stopwatch watch;
auto table_id = getStorageID(); auto table_id = getStorageID();
@ -223,41 +250,79 @@ bool StorageFileLog::streamToViews()
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
clearInvalidFiles();
auto max_streams_number = std::min<UInt64>(filelog_settings->filelog_max_threads, file_names.size());
/// No files to parse
if (max_streams_number == 0)
{
return false;
}
// Create an INSERT query for streaming data // Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id; insert->table_id = table_id;
size_t block_size = getMaxBlockSize();
auto new_context = Context::createCopy(getContext()); auto new_context = Context::createCopy(getContext());
InterpreterInsertQuery interpreter(insert, new_context, false, true, true); InterpreterInsertQuery interpreter(insert, new_context, false, true, true);
auto block_io = interpreter.execute(); auto block_io = interpreter.execute();
auto stream = std::make_shared<FileLogBlockInputStream>( Pipes pipes;
*this, metadata_snapshot, new_context, block_io.out->getHeader().getNames(), block_size); pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
*this,
metadata_snapshot,
new_context,
block_io.out->getHeader().getNames(),
getPollMaxBatchSize(),
getPollTimeoutMillisecond(),
stream_number,
max_streams_number));
}
StreamLocalLimits limits; QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
limits.speed_limits.max_execution_time = getContext()->getSettingsRef().stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
std::atomic<bool> stub = {false};
size_t rows = 0; size_t rows = 0;
copyData(
*stream, *block_io.out, [&rows](const Block & block) { rows += block.rows(); }, &stub);
bool stream_is_stalled = false; PullingPipelineExecutor executor(pipeline);
Block block;
stream_is_stalled = stream->as<FileLogBlockInputStream>()->isStalled(); block_io.out->writePrefix();
while (executor.pull(block))
{
block_io.out->write(block);
rows += block.rows();
}
block_io.out->writeSuffix();
UInt64 milliseconds = watch.elapsedMilliseconds(); UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.", LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.",
formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds); formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds);
return stream_is_stalled; return true;
}
void StorageFileLog::clearInvalidFiles()
{
/// Do not need to hold file_status lock, since it will be holded
/// by caller when call this function
std::vector<String> valid_files;
for (const auto & it : file_names)
{
if (file_status.at(it).status == FileStatus::REMOVED)
{
file_status.erase(it);
}
else
{
valid_files.push_back(it);
}
}
file_names.swap(valid_files);
} }
void registerStorageFileLog(StorageFactory & factory) void registerStorageFileLog(StorageFactory & factory)
@ -267,6 +332,36 @@ void registerStorageFileLog(StorageFactory & factory)
ASTs & engine_args = args.engine_args; ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size(); size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
auto filelog_settings = std::make_unique<FileLogSettings>();
if (has_settings)
{
filelog_settings->loadFromQuery(*args.storage_def);
}
auto physical_cpu_cores = getNumberOfPhysicalCPUCores();
auto num_threads = filelog_settings->filelog_max_threads.value;
if (num_threads > physical_cpu_cores)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of threads to parse files can not be bigger than {}", physical_cpu_cores);
}
else if (num_threads < 1)
{
throw Exception("Number of threads to parse files can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
if (filelog_settings->filelog_max_block_size.changed && filelog_settings->filelog_max_block_size.value < 1)
{
throw Exception("filelog_max_block_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
if (filelog_settings->filelog_poll_max_batch_size.changed && filelog_settings->filelog_poll_max_batch_size.value < 1)
{
throw Exception("filelog_poll_max_batch_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
if (args_count != 2) if (args_count != 2)
throw Exception( throw Exception(
"Arguments size of StorageFileLog should be 2, path and format name", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); "Arguments size of StorageFileLog should be 2, path and format name", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -277,7 +372,7 @@ void registerStorageFileLog(StorageFactory & factory)
auto path = path_ast->as<ASTLiteral &>().value.safeGet<String>(); auto path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
auto format = format_ast->as<ASTLiteral &>().value.safeGet<String>(); auto format = format_ast->as<ASTLiteral &>().value.safeGet<String>();
return StorageFileLog::create(args.table_id, args.getContext(), args.columns, path, format); return StorageFileLog::create(args.table_id, args.getContext(), args.columns, path, format, std::move(filelog_settings));
}; };
factory.registerStorage( factory.registerStorage(
@ -300,4 +395,54 @@ Names StorageFileLog::getVirtualColumnNames()
return result; return result;
} }
void StorageFileLog::watchFunc()
{
FileLogDirectoryWatcher dw(path);
while (true)
{
sleepForMicroseconds(filelog_settings->filelog_poll_timeout_ms.totalMilliseconds());
auto error = dw.getError();
if (error)
LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath());
auto events = dw.getEvents();
std::lock_guard<std::mutex> lock(status_mutex);
for (const auto & event : events)
{
switch (event.type)
{
case Poco::DirectoryWatcher::DW_ITEM_ADDED:
LOG_TRACE(log, "New event {} watched.", event.callback);
if (std::filesystem::is_regular_file(event.path))
{
file_status[event.path].reader = std::ifstream(event.path);
file_names.push_back(event.path);
}
break;
case Poco::DirectoryWatcher::DW_ITEM_MODIFIED:
LOG_TRACE(log, "New event {} watched.", event.callback);
if (std::filesystem::is_regular_file(event.path) && file_status.contains(event.path))
{
file_status[event.path].status = FileStatus::UPDATED;
}
break;
case Poco::DirectoryWatcher::DW_ITEM_REMOVED:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_TO:
case Poco::DirectoryWatcher::DW_ITEM_MOVED_FROM:
LOG_TRACE(log, "New event {} watched.", event.callback);
if (std::filesystem::is_regular_file(event.path) && file_status.contains(event.path))
{
file_status[event.path].status = FileStatus::REMOVED;
}
break;
}
}
}
}
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Storages/FileLog/Buffer_fwd.h> #include <Storages/FileLog/Buffer_fwd.h>
#include <Storages/FileLog/FileLogSettings.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
@ -11,8 +12,8 @@
#include <common/shared_ptr_helper.h> #include <common/shared_ptr_helper.h>
#include <mutex> #include <mutex>
#include <list>
#include <atomic> #include <atomic>
#include <fstream>
namespace DB namespace DB
{ {
@ -52,16 +53,39 @@ protected:
ContextPtr context_, ContextPtr context_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const String & path_, const String & path_,
const String & format_name_); const String & format_name_,
std::unique_ptr<FileLogSettings> settings);
private: private:
std::unique_ptr<FileLogSettings> filelog_settings;
const String path; const String path;
bool path_is_directory = false;
const String format_name; const String format_name;
Poco::Logger * log; Poco::Logger * log;
ReadBufferFromFileLogPtr buffer; ReadBufferFromFileLogPtr buffer;
std::mutex mutex; enum class FileStatus
{
BEGIN,
NO_CHANGE,
UPDATED,
REMOVED
};
struct FileContext
{
FileStatus status = FileStatus::BEGIN;
std::ifstream reader;
};
using NameToFile = std::unordered_map<String, FileContext>;
NameToFile file_status;
std::vector<String> file_names;
std::mutex status_mutex;
// Stream thread // Stream thread
struct TaskContext struct TaskContext
@ -74,17 +98,22 @@ private:
}; };
std::shared_ptr<TaskContext> task; std::shared_ptr<TaskContext> task;
void createReadBuffer(); using TaskThread = BackgroundSchedulePool::TaskHolder;
void destroyReadBuffer();
TaskThread watch_task;
void threadFunc(); void threadFunc();
void clearInvalidFiles();
size_t getPollMaxBatchSize() const; size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const; size_t getMaxBlockSize() const;
size_t getPollTimeoutMillisecond() const; size_t getPollTimeoutMillisecond() const;
bool streamToViews(); bool streamToViews();
bool checkDependencies(const StorageID & table_id); bool checkDependencies(const StorageID & table_id);
[[noreturn]] void watchFunc();
}; };
} }