minor enhancements in async inserts

This commit is contained in:
Anton Popov 2021-09-16 20:18:34 +03:00
parent 63acdcc85e
commit 99175f7acc
15 changed files with 68 additions and 20 deletions

View File

@ -918,7 +918,7 @@ if (ThreadFuzzer::instance().isEffective())
global_context,
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms}));
/// Size of cache for marks (index of MergeTree family of tables). It is mandatory.
size_t mark_cache_size = config().getUInt64("mark_cache_size");

View File

@ -507,6 +507,14 @@ class IColumn;
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
@ -602,14 +610,6 @@ class IColumn;
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \

View File

@ -439,6 +439,18 @@ bool FormatFactory::checkIfFormatIsColumnOriented(const String & name)
return target.is_column_oriented;
}
bool FormatFactory::isInputFormat(const String & name) const
{
auto it = dict.find(name);
return it != dict.end() && (it->second.input_creator || it->second.input_processor_creator);
}
bool FormatFactory::isOutputFormat(const String & name) const
{
auto it = dict.find(name);
return it != dict.end() && (it->second.output_creator || it->second.output_processor_creator);
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -187,6 +187,9 @@ public:
return dict;
}
bool isInputFormat(const String & name) const;
bool isOutputFormat(const String & name) const;
private:
FormatsDictionary dict;

View File

@ -18,6 +18,7 @@
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Access/AccessFlags.h>
#include <Formats/FormatFactory.h>
namespace DB
@ -27,6 +28,7 @@ namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_FORMAT;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
@ -166,6 +168,9 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
auto table = interpreter.getTable(insert_query);
auto sample_block = interpreter.getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr());
if (!FormatFactory::instance().isInputFormat(insert_query.format))
throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format);
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames());
String bytes;
@ -324,7 +329,7 @@ void AsynchronousInsertQueue::cleanup()
}
if (total_removed)
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size());
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed);
}
{

View File

@ -2744,8 +2744,8 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
{
using namespace std::chrono;
if (std::chrono::milliseconds(settings.async_insert_busy_timeout) == 0ms)
throw Exception("Setting async_insert_busy_timeout can't be zero", ErrorCodes::INVALID_SETTING_VALUE);
if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms)
throw Exception("Setting async_insert_busy_timeout_ms can't be zero", ErrorCodes::INVALID_SETTING_VALUE);
shared->async_insert_queue = ptr;
}

View File

@ -582,7 +582,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert = queue
&& insert_query && !insert_query->select
&& insert_query->hasInlinedData() && settings.async_insert_mode;
&& insert_query->hasInlinedData() && settings.async_insert;
if (async_insert)
{

View File

@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory"

View File

@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id"

View File

@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, v UInt32 DEFAULT id * id) ENGINE = Memory"

View File

@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP USER IF EXISTS u_02015_allowed"
${CLICKHOUSE_CLIENT} -q "DROP USER IF EXISTS u_02015_denied"

View File

@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id SETTINGS parts_to_throw_insert = 1"

View File

@ -0,0 +1,4 @@
Code: 60
Code: 73
Code: 73
Code: 16

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts1 FORMAT JSONEachRow {"id": 1, "s": "a"}' \
| grep -o "Code: 60"
${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts FORMAT BadFormat {"id": 1, "s": "a"}' \
| grep -o "Code: 73"
${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts FORMAT Pretty {"id": 1, "s": "a"}' \
| grep -o "Code: 73"
${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts (id, a) FORMAT JSONEachRow {"id": 1, "s": "a"}' \
| grep -o "Code: 16"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
function insert1()
{
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=0"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0"
while true; do
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV
1,"a"
@ -18,7 +18,7 @@ function insert1()
function insert2()
{
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=0"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0"
while true; do
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done