add some comments

This commit is contained in:
Anton Popov 2021-09-09 19:10:53 +03:00
parent 106566e701
commit 8cb02a4b27
6 changed files with 28 additions and 6 deletions

View File

@ -165,7 +165,6 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
auto read_buf = getReadBufferFromASTInsertQuery(query); auto read_buf = getReadBufferFromASTInsertQuery(query);
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
auto entry = std::make_shared<InsertData::Entry>(); auto entry = std::make_shared<InsertData::Entry>();
entry->query_id = query_context->getCurrentQueryId(); entry->query_id = query_context->getCurrentQueryId();
@ -177,6 +176,7 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
bool found = false; bool found = false;
{ {
/// Firstly try to get entry from queue without exclusive lock.
std::shared_lock read_lock(rwlock); std::shared_lock read_lock(rwlock);
it = queue.find(key); it = queue.find(key);
if (it != queue.end()) if (it != queue.end())

View File

@ -15,6 +15,8 @@ namespace DB
class ASTInsertQuery; class ASTInsertQuery;
struct BlockIO; struct BlockIO;
/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
class AsynchronousInsertQueue : public WithContext class AsynchronousInsertQueue : public WithContext
{ {
public: public:
@ -85,6 +87,9 @@ private:
using InsertDataPtr = std::unique_ptr<InsertData>; using InsertDataPtr = std::unique_ptr<InsertData>;
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
struct Container struct Container
{ {
std::mutex mutex; std::mutex mutex;
@ -119,7 +124,7 @@ private:
ThreadPool pool; /// dump the data only inside this pool. ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck() ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
ThreadFromGlobalPool cleanup_thread; ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");

View File

@ -29,7 +29,10 @@ public:
* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result). * Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
*/ */
BlockIO execute() override; BlockIO execute() override;
/// Returns only sinks, without input sources.
Processors getSinks(); Processors getSinks();
StorageID getDatabaseTable() const; StorageID getDatabaseTable() const;
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override; void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;

View File

@ -8,9 +8,16 @@ namespace DB
using SimpleTransformPtr = std::shared_ptr<ISimpleTransform>; using SimpleTransformPtr = std::shared_ptr<ISimpleTransform>;
/// Recieves format and allows to execute
/// it multiple times for streaming processing of data.
class StreamingFormatExecutor class StreamingFormatExecutor
{ {
public: public:
/// Callback is called, when got exception, while executing format.
/// It provides currently accumulated columns to make a roolback, for example,
/// and exception to rethrow or add context to it.
/// Should return number of rows, which are added to callback
/// to result columns in comparison to previous call of `execute`.
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>; using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
StreamingFormatExecutor( StreamingFormatExecutor(
@ -19,7 +26,10 @@ public:
ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; }, ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; },
SimpleTransformPtr adding_defaults_transform_ = nullptr); SimpleTransformPtr adding_defaults_transform_ = nullptr);
/// Returns numbers of newly read rows.
size_t execute(); size_t execute();
/// Releases currently accumulated columns.
MutableColumns getResultColumns(); MutableColumns getResultColumns();
private: private:

View File

@ -6,6 +6,8 @@
namespace DB namespace DB
{ {
/// Source, that allow to wait until processing of
/// asynchronous insert for specified query_id will be finished.
class WaitForAsyncInsertSource : public ISource, WithContext class WaitForAsyncInsertSource : public ISource, WithContext
{ {
public: public:

View File

@ -10,13 +10,11 @@
namespace DB namespace DB
{ {
/** Prepares a pipe which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly
* Remaining (tail) data could be stored in input_buffer_tail_part
*/
class Pipe; class Pipe;
/// Prepares a input format, which produce data containing in INSERT query.
InputFormatPtr getInputFormatFromASTInsertQuery( InputFormatPtr getInputFormatFromASTInsertQuery(
const ASTPtr & ast, const ASTPtr & ast,
bool with_buffers, bool with_buffers,
@ -24,6 +22,7 @@ InputFormatPtr getInputFormatFromASTInsertQuery(
ContextPtr context, ContextPtr context,
const ASTPtr & input_function); const ASTPtr & input_function);
/// Prepares a pipe which produce data containing in INSERT query.
Pipe getSourceFromASTInsertQuery( Pipe getSourceFromASTInsertQuery(
const ASTPtr & ast, const ASTPtr & ast,
bool with_buffers, bool with_buffers,
@ -32,6 +31,9 @@ Pipe getSourceFromASTInsertQuery(
const ASTPtr & input_function); const ASTPtr & input_function);
class ReadBuffer; class ReadBuffer;
/// Prepares a read buffer, that allows to read inlined data
/// from ASTInsertQuert directly, and from tail buffer, if it exists.
std::unique_ptr<ReadBuffer> getReadBufferFromASTInsertQuery(const ASTPtr & ast); std::unique_ptr<ReadBuffer> getReadBufferFromASTInsertQuery(const ASTPtr & ast);
} }