diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 5ca9007bdcc..f243ecda108 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -918,7 +918,7 @@ if (ThreadFuzzer::instance().isEffective()) global_context, settings.async_insert_threads, settings.async_insert_max_data_size, - AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout})); + AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms})); /// Size of cache for marks (index of MergeTree family of tables). It is mandatory. size_t mark_cache_size = config().getUInt64("mark_cache_size"); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0e29168f906..a883f3915e4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -507,6 +507,14 @@ class IColumn; M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \ M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \ \ + M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ + M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \ + M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \ + M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ + M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ + M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ + M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ + \ /** Experimental functions */ \ M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \ M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \ @@ -602,14 +610,6 @@ class IColumn; M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \ M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \ \ - M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ - M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \ - M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \ - M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ - M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ - M(Milliseconds, async_insert_busy_timeout, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ - M(Milliseconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ - \ M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \ \ M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index a014f85d45f..c8c2b380850 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -439,6 +439,18 @@ bool FormatFactory::checkIfFormatIsColumnOriented(const String & name) return target.is_column_oriented; } +bool FormatFactory::isInputFormat(const String & name) const +{ + auto it = dict.find(name); + return it != dict.end() && (it->second.input_creator || it->second.input_processor_creator); +} + +bool FormatFactory::isOutputFormat(const String & name) const +{ + auto it = dict.find(name); + return it != dict.end() && (it->second.output_creator || it->second.output_processor_creator); +} + FormatFactory & FormatFactory::instance() { static FormatFactory ret; diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index e935eb4d761..7ff72387509 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -187,6 +187,9 @@ public: return dict; } + bool isInputFormat(const String & name) const; + bool isOutputFormat(const String & name) const; + private: FormatsDictionary dict; diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 5b9521f334e..da41eb82d5e 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB @@ -27,6 +28,7 @@ namespace ErrorCodes { extern const int TIMEOUT_EXCEEDED; extern const int UNKNOWN_EXCEPTION; + extern const int UNKNOWN_FORMAT; } AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_) @@ -166,6 +168,9 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) auto table = interpreter.getTable(insert_query); auto sample_block = interpreter.getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr()); + if (!FormatFactory::instance().isInputFormat(insert_query.format)) + throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format); + query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames()); String bytes; @@ -324,7 +329,7 @@ void AsynchronousInsertQueue::cleanup() } if (total_removed) - LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size()); + LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed); } { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index be1cb21bbc3..d0e941b0aff 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2744,8 +2744,8 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptrasync_insert_queue = ptr; } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index ecf2d87dd5c..0b1746feebc 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -582,7 +582,7 @@ static std::tuple executeQueryImpl( auto * queue = context->getAsynchronousInsertQueue(); const bool async_insert = queue && insert_query && !insert_query->select - && insert_query->hasInlinedData() && settings.async_insert_mode; + && insert_query->hasInlinedData() && settings.async_insert; if (async_insert) { diff --git a/tests/queries/0_stateless/02015_async_inserts_1.sh b/tests/queries/0_stateless/02015_async_inserts_1.sh index 365d2e99b31..b4310f5101c 100755 --- a/tests/queries/0_stateless/02015_async_inserts_1.sh +++ b/tests/queries/0_stateless/02015_async_inserts_1.sh @@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" ${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory" diff --git a/tests/queries/0_stateless/02015_async_inserts_2.sh b/tests/queries/0_stateless/02015_async_inserts_2.sh index 0eb11bb5219..90f5584d84e 100755 --- a/tests/queries/0_stateless/02015_async_inserts_2.sh +++ b/tests/queries/0_stateless/02015_async_inserts_2.sh @@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" ${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id" diff --git a/tests/queries/0_stateless/02015_async_inserts_3.sh b/tests/queries/0_stateless/02015_async_inserts_3.sh index fe97354d3ac..9d85d81caac 100755 --- a/tests/queries/0_stateless/02015_async_inserts_3.sh +++ b/tests/queries/0_stateless/02015_async_inserts_3.sh @@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" ${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, v UInt32 DEFAULT id * id) ENGINE = Memory" diff --git a/tests/queries/0_stateless/02015_async_inserts_4.sh b/tests/queries/0_stateless/02015_async_inserts_4.sh index f8cc0aa0a48..65598923b96 100755 --- a/tests/queries/0_stateless/02015_async_inserts_4.sh +++ b/tests/queries/0_stateless/02015_async_inserts_4.sh @@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" ${CLICKHOUSE_CLIENT} -q "DROP USER IF EXISTS u_02015_allowed" ${CLICKHOUSE_CLIENT} -q "DROP USER IF EXISTS u_02015_denied" diff --git a/tests/queries/0_stateless/02015_async_inserts_5.sh b/tests/queries/0_stateless/02015_async_inserts_5.sh index e07e274d1d7..05ea876b101 100755 --- a/tests/queries/0_stateless/02015_async_inserts_5.sh +++ b/tests/queries/0_stateless/02015_async_inserts_5.sh @@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1" +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" ${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id SETTINGS parts_to_throw_insert = 1" diff --git a/tests/queries/0_stateless/02015_async_inserts_6.reference b/tests/queries/0_stateless/02015_async_inserts_6.reference new file mode 100644 index 00000000000..f3a80cd0cdf --- /dev/null +++ b/tests/queries/0_stateless/02015_async_inserts_6.reference @@ -0,0 +1,4 @@ +Code: 60 +Code: 73 +Code: 73 +Code: 16 diff --git a/tests/queries/0_stateless/02015_async_inserts_6.sh b/tests/queries/0_stateless/02015_async_inserts_6.sh new file mode 100755 index 00000000000..94091783081 --- /dev/null +++ b/tests/queries/0_stateless/02015_async_inserts_6.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts" +${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory" + +${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts1 FORMAT JSONEachRow {"id": 1, "s": "a"}' \ + | grep -o "Code: 60" + +${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts FORMAT BadFormat {"id": 1, "s": "a"}' \ + | grep -o "Code: 73" + +${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts FORMAT Pretty {"id": 1, "s": "a"}' \ + | grep -o "Code: 73" + +${CLICKHOUSE_CURL} -sS $url -d 'INSERT INTO async_inserts (id, a) FORMAT JSONEachRow {"id": 1, "s": "a"}' \ + | grep -o "Code: 16" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts" diff --git a/tests/queries/0_stateless/02015_async_inserts_stress_long.sh b/tests/queries/0_stateless/02015_async_inserts_stress_long.sh index c11a1be8cef..f9a58818404 100755 --- a/tests/queries/0_stateless/02015_async_inserts_stress_long.sh +++ b/tests/queries/0_stateless/02015_async_inserts_stress_long.sh @@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) function insert1() { - url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=0" + url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" while true; do ${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV 1,"a" @@ -18,7 +18,7 @@ function insert1() function insert2() { - url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=0" + url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" while true; do ${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}' done