From be55296e06d5d557ab9fae136437f8acfa229f1f Mon Sep 17 00:00:00 2001 From: cnmade Date: Thu, 27 Jan 2022 14:54:12 +0800 Subject: [PATCH 01/11] Translate zh/sql-reference/statements/use: remove old file --- docs/zh/sql-reference/statements/{use.md => use.md.bak} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/zh/sql-reference/statements/{use.md => use.md.bak} (100%) diff --git a/docs/zh/sql-reference/statements/use.md b/docs/zh/sql-reference/statements/use.md.bak similarity index 100% rename from docs/zh/sql-reference/statements/use.md rename to docs/zh/sql-reference/statements/use.md.bak From 6621618d16eedc9d9ddb96096fe01a9db4b0ef9e Mon Sep 17 00:00:00 2001 From: cnmade Date: Thu, 27 Jan 2022 14:55:28 +0800 Subject: [PATCH 02/11] Translate zh/sql-reference/statements/use: add translate --- docs/zh/sql-reference/statements/use.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 docs/zh/sql-reference/statements/use.md diff --git a/docs/zh/sql-reference/statements/use.md b/docs/zh/sql-reference/statements/use.md new file mode 100644 index 00000000000..41cba58bb9d --- /dev/null +++ b/docs/zh/sql-reference/statements/use.md @@ -0,0 +1,16 @@ +--- +toc_priority: 53 +toc_title: USE +--- + +# USE 语句 {#use} + +``` sql +USE db +``` + +用于设置会话的当前数据库。 + +如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。 + +使用 HTTP 协议时无法进行此查询,因为没有会话的概念。 From 9f7799637c5daa686f2591908f8c478c84e89958 Mon Sep 17 00:00:00 2001 From: cnmade Date: Thu, 27 Jan 2022 14:57:59 +0800 Subject: [PATCH 03/11] Translate zh/sql-reference/statements/use: fix en translate , remove bakup file --- docs/en/sql-reference/statements/use.md | 8 ++++---- docs/zh/sql-reference/statements/use.md.bak | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) delete mode 120000 docs/zh/sql-reference/statements/use.md.bak diff --git a/docs/en/sql-reference/statements/use.md b/docs/en/sql-reference/statements/use.md index 41cba58bb9d..841c23d333d 100644 --- a/docs/en/sql-reference/statements/use.md +++ b/docs/en/sql-reference/statements/use.md @@ -3,14 +3,14 @@ toc_priority: 53 toc_title: USE --- -# USE 语句 {#use} +# USE Statement {#use} ``` sql USE db ``` -用于设置会话的当前数据库。 +Lets you set the current database for the session. -如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。 +The current database is used for searching for tables if the database is not explicitly defined in the query with a dot before the table name. -使用 HTTP 协议时无法进行此查询,因为没有会话的概念。 +This query can’t be made when using the HTTP protocol, since there is no concept of a session. diff --git a/docs/zh/sql-reference/statements/use.md.bak b/docs/zh/sql-reference/statements/use.md.bak deleted file mode 120000 index 7bdbf049326..00000000000 --- a/docs/zh/sql-reference/statements/use.md.bak +++ /dev/null @@ -1 +0,0 @@ -../../../en/sql-reference/statements/use.md \ No newline at end of file From 6c0959b907a4e47008b34babe2f7c8e51f7b63c0 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 28 Jan 2022 03:25:15 +0300 Subject: [PATCH 04/11] fix asynchronous inserts with Native format --- src/Processors/Formats/Impl/NativeFormat.cpp | 20 +++++--- .../02187_async_inserts_all_formats.python | 50 +++++++++++++++++++ .../02187_async_inserts_all_formats.reference | 40 +++++++++++++++ .../02187_async_inserts_all_formats.sh | 9 ++++ .../0_stateless/helpers/pure_http_client.py | 42 ++++++++++------ 5 files changed, 141 insertions(+), 20 deletions(-) create mode 100644 tests/queries/0_stateless/02187_async_inserts_all_formats.python create mode 100644 tests/queries/0_stateless/02187_async_inserts_all_formats.reference create mode 100755 tests/queries/0_stateless/02187_async_inserts_all_formats.sh diff --git a/src/Processors/Formats/Impl/NativeFormat.cpp b/src/Processors/Formats/Impl/NativeFormat.cpp index 19e2ede6b65..bd95cfd6376 100644 --- a/src/Processors/Formats/Impl/NativeFormat.cpp +++ b/src/Processors/Formats/Impl/NativeFormat.cpp @@ -15,21 +15,22 @@ namespace DB class NativeInputFormat final : public IInputFormat { public: - NativeInputFormat(ReadBuffer & buf, const Block & header) - : IInputFormat(header, buf) - , reader(buf, header, 0) {} + NativeInputFormat(ReadBuffer & buf, const Block & header_) + : IInputFormat(header_, buf) + , reader(std::make_unique(buf, header_, 0)) + , header(header_) {} String getName() const override { return "Native"; } void resetParser() override { IInputFormat::resetParser(); - reader.resetParser(); + reader->resetParser(); } Chunk generate() override { - auto block = reader.read(); + auto block = reader->read(); if (!block) return {}; @@ -40,8 +41,15 @@ public: return Chunk(block.getColumns(), num_rows); } + void setReadBuffer(ReadBuffer & in_) override + { + reader = std::make_unique(in_, header, 0); + IInputFormat::setReadBuffer(in_); + } + private: - NativeReader reader; + std::unique_ptr reader; + Block header; }; class NativeOutputFormat final : public IOutputFormat diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.python b/tests/queries/0_stateless/02187_async_inserts_all_formats.python new file mode 100644 index 00000000000..0a909451259 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.python @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') +CLICKHOUSE_TMP = os.environ.get('CLICKHOUSE_TMP') + +from pure_http_client import ClickHouseClient + +client = ClickHouseClient() + +def run_test(data_format, gen_data_template, settings): + print(data_format) + client.query("TRUNCATE TABLE t_async_insert") + + expected = client.query(gen_data_template.format("TSV")).strip() + data = client.query(gen_data_template.format(data_format), settings=settings,binary_result=True) + + insert_query = "INSERT INTO t_async_insert FORMAT {}".format(data_format) + client.query_with_data(insert_query, data, settings=settings) + + result = client.query("SELECT * FROM t_async_insert FORMAT TSV").strip() + if result != expected: + print("Failed for format {}.\nExpected:\n{}\nGot:\n{}\n".format(data_format, expected, result)) + exit(1) + +formats = client.query("SELECT name FROM system.formats WHERE is_input AND is_output \ + AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf') ORDER BY name").strip().split('\n') + +# Generic formats +client.query("DROP TABLE IF EXISTS t_async_insert") +client.query("CREATE TABLE t_async_insert (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory") +gen_data_query = "SELECT number AS id, toString(number) AS s, range(number) AS arr FROM numbers(10) FORMAT {}" + +for data_format in formats: + run_test(data_format, gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1}) + +# LineAsString +client.query("DROP TABLE IF EXISTS t_async_insert") +client.query("CREATE TABLE t_async_insert (s String) ENGINE = Memory") +gen_data_query = "SELECT toString(number) AS s FROM numbers(10) FORMAT {}" + +run_test('LineAsString', gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1}) + +# TODO: add CapnProto and Protobuf + +print("OK") diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.reference b/tests/queries/0_stateless/02187_async_inserts_all_formats.reference new file mode 100644 index 00000000000..b4a5b6c3a42 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.reference @@ -0,0 +1,40 @@ +Arrow +ArrowStream +Avro +CSV +CSVWithNames +CSVWithNamesAndTypes +CustomSeparated +CustomSeparatedWithNames +CustomSeparatedWithNamesAndTypes +JSONCompactEachRow +JSONCompactEachRowWithNames +JSONCompactEachRowWithNamesAndTypes +JSONCompactStringsEachRow +JSONCompactStringsEachRowWithNames +JSONCompactStringsEachRowWithNamesAndTypes +JSONEachRow +JSONStringsEachRow +MsgPack +Native +ORC +Parquet +RowBinary +RowBinaryWithNames +RowBinaryWithNamesAndTypes +TSKV +TSV +TSVRaw +TSVRawWithNames +TSVRawWithNamesAndTypes +TSVWithNames +TSVWithNamesAndTypes +TabSeparated +TabSeparatedRaw +TabSeparatedRawWithNames +TabSeparatedRawWithNamesAndTypes +TabSeparatedWithNames +TabSeparatedWithNamesAndTypes +Values +LineAsString +OK diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.sh b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh new file mode 100755 index 00000000000..0031f72fbe5 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02187_async_inserts_all_formats.python diff --git a/tests/queries/0_stateless/helpers/pure_http_client.py b/tests/queries/0_stateless/helpers/pure_http_client.py index 9f79c4ac529..3335f141bb5 100644 --- a/tests/queries/0_stateless/helpers/pure_http_client.py +++ b/tests/queries/0_stateless/helpers/pure_http_client.py @@ -14,22 +14,23 @@ class ClickHouseClient: def __init__(self, host = CLICKHOUSE_SERVER_URL_STR): self.host = host - def query(self, query, connection_timeout = 1500): + def query(self, query, connection_timeout=1500, settings=dict(), binary_result=False): NUMBER_OF_TRIES = 30 DELAY = 10 + params = { + 'timeout_before_checking_execution_speed': 120, + 'max_execution_time': 6000, + 'database': CLICKHOUSE_DATABASE, + } + + # Add extra settings to params + params = {**params, **settings} + for i in range(NUMBER_OF_TRIES): - r = requests.post( - self.host, - params = { - 'timeout_before_checking_execution_speed': 120, - 'max_execution_time': 6000, - 'database': CLICKHOUSE_DATABASE - }, - timeout = connection_timeout, - data = query) + r = requests.post(self.host, params=params, timeout=connection_timeout, data=query) if r.status_code == 200: - return r.text + return r.content if binary_result else r.text else: print('ATTENTION: try #%d failed' % i) if i != (NUMBER_OF_TRIES-1): @@ -44,9 +45,22 @@ class ClickHouseClient: df = pd.read_csv(io.StringIO(data), sep = '\t') return df - def query_with_data(self, query, content): - content = content.encode('utf-8') - r = requests.post(self.host, data=content) + def query_with_data(self, query, data, connection_timeout=1500, settings=dict()): + params = { + 'query': query, + 'timeout_before_checking_execution_speed': 120, + 'max_execution_time': 6000, + 'database': CLICKHOUSE_DATABASE, + } + + headers = { + "Content-Type": "application/binary" + } + + # Add extra settings to params + params = {**params, **settings} + + r = requests.post(self.host, params=params, timeout=connection_timeout, data=data, headers=headers) result = r.text if r.status_code == 200: return result From b0c862c2974a1fb3a8ce6dc0a3c3f79c7432ce51 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 26 Jan 2022 11:33:52 +0300 Subject: [PATCH 05/11] Fix memory accounting for queries that uses < max_untracker_memory MemoryTracker starts accounting memory directly only after per-thread allocation exceeded max_untracker_memory (or memory_profiler_step). But even memory under this limit should be accounted too, and there is code to do this in ThreadStatus dtor, however due to PullingAsyncPipelineExecutor detached the query from thread group that memory was not accounted. So remove CurrentThread::detachQueryIfNotDetached() from threads that uses ThreadFromGlobalPool since it has ThreadStatus, and the query will be detached using CurrentThread::defaultThreadDeleter. Note, that before this patch memory accounting works for HTTP queries due to it had been accounted from ParallelFormattingOutputFormat, but not for TCP. Signed-off-by: Azat Khuzhin --- src/Core/BackgroundSchedulePool.cpp | 3 --- src/Interpreters/ExternalLoader.cpp | 6 ------ src/Processors/Executors/CompletedPipelineExecutor.cpp | 6 ------ src/Processors/Executors/PipelineExecutor.cpp | 5 ----- .../Executors/PullingAsyncPipelineExecutor.cpp | 7 ------- .../Executors/PushingAsyncPipelineExecutor.cpp | 8 -------- .../Formats/Impl/ParallelParsingInputFormat.cpp | 9 --------- src/Storages/MergeTree/MergeTreeData.cpp | 5 ----- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 2 -- 9 files changed, 51 deletions(-) diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index 9a42f752db2..18c43d8c45f 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB @@ -246,7 +245,6 @@ void BackgroundSchedulePool::threadFunction() setThreadName(thread_name.c_str()); attachToThreadGroup(); - SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); while (!shutdown) { @@ -273,7 +271,6 @@ void BackgroundSchedulePool::delayExecutionThreadFunction() setThreadName((thread_name + "/D").c_str()); attachToThreadGroup(); - SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); while (!shutdown) { diff --git a/src/Interpreters/ExternalLoader.cpp b/src/Interpreters/ExternalLoader.cpp index b2cd9495feb..ae4eb748cc3 100644 --- a/src/Interpreters/ExternalLoader.cpp +++ b/src/Interpreters/ExternalLoader.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -969,11 +968,6 @@ private: if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - LOG_TRACE(log, "Start loading object '{}'", name); try { diff --git a/src/Processors/Executors/CompletedPipelineExecutor.cpp b/src/Processors/Executors/CompletedPipelineExecutor.cpp index 45b02cba298..8ec1916f4ce 100644 --- a/src/Processors/Executors/CompletedPipelineExecutor.cpp +++ b/src/Processors/Executors/CompletedPipelineExecutor.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include namespace DB @@ -40,11 +39,6 @@ static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupSt if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - data.executor->execute(num_threads); } catch (...) diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index e722f8718f7..80aacf14fe6 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -301,11 +301,6 @@ void PipelineExecutor::executeImpl(size_t num_threads) if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - try { executeSingleThread(thread_num); diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 0ba07df95a6..198d5ce5d8d 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -4,9 +4,7 @@ #include #include #include - #include -#include namespace DB { @@ -77,11 +75,6 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - data.executor->execute(num_threads); } catch (...) diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index 68898bdc2c2..6c2e62b77dc 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -2,11 +2,8 @@ #include #include #include -#include - #include #include -#include #include namespace DB @@ -107,11 +104,6 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - data.executor->execute(num_threads); } catch (...) diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index 213226c9d68..360f1ec9bf0 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -2,17 +2,12 @@ #include #include #include -#include namespace DB { void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group) { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); if (thread_group) CurrentThread::attachTo(thread_group); @@ -59,10 +54,6 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); if (thread_group) CurrentThread::attachTo(thread_group); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9a455c2d93c..eb84782abc5 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -67,7 +67,6 @@ #include #include -#include #include #include @@ -1590,10 +1589,6 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re { pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()] { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); if (thread_group) CurrentThread::attachTo(thread_group); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index cdedd37e14a..0f8bc02475a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1,5 +1,4 @@ #include /// For calculations related to sampling coefficients. -#include #include #include @@ -988,7 +987,6 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd for (size_t part_index = 0; part_index < parts.size(); ++part_index) pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] { - SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachQueryIfNotDetached();); if (thread_group) CurrentThread::attachTo(thread_group); From 1519985c9866c3ff9dbc18930a8b54106dcf72c3 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 27 Jan 2022 11:55:38 +0300 Subject: [PATCH 06/11] Fix possible "Can't attach query to the thread, it is already attached" After detachQueryIfNotDetached() had been removed it is not enough to use attachTo() for ThreadPool (scheduleOrThrowOnError()) since the query may be already attached, if the thread doing multiple jobs, so CurrentThread::attachToIfDetached() should be used instead. This should fix all the places from the failures on CI [1]: $ fgrep DB::CurrentThread::attachTo -A1 ~/Downloads/47.txt | fgrep -v attachTo | cut -d' ' -f5,6 | sort | uniq -c 92 -- 2 /fasttest-workspace/build/../../ClickHouse/contrib/libcxx/include/deque:1393: DB::ParallelParsingInputFormat::parserThreadFunction(std::__1::shared_ptr, 4 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeData.cpp:1595: void 87 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp:993: void [1]: https://github.com/ClickHouse/ClickHouse/runs/4954466034?check_suite_focus=true Signed-off-by: Azat Khuzhin --- src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index 360f1ec9bf0..bfdb9de7d26 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -55,7 +55,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) { if (thread_group) - CurrentThread::attachTo(thread_group); + CurrentThread::attachToIfDetached(thread_group); const auto parser_unit_number = current_ticket_number % processing_units.size(); auto & unit = processing_units[parser_unit_number]; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index eb84782abc5..67c304b4511 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1590,7 +1590,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()] { if (thread_group) - CurrentThread::attachTo(thread_group); + CurrentThread::attachToIfDetached(thread_group); LOG_DEBUG(log, "Removing part from filesystem {}", part->name); part->remove(); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 0f8bc02475a..a277cda9e50 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -988,7 +988,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] { if (thread_group) - CurrentThread::attachTo(thread_group); + CurrentThread::attachToIfDetached(thread_group); process_part(part_index); }); From 162f96f8e181795225bf3c9edbb22d8e37360a8b Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 27 Jan 2022 18:51:36 +0300 Subject: [PATCH 07/11] Get back detachQueryIfNotDetached() into ExternalLoader ExternalLoader from ThreadPool (async loading) is done from the server context, not from the query context, and the context will already go away, so we should detachQueryIfNotDetached() to avoid trigger assertion in ThreadStatus. CI: https://s3.amazonaws.com/clickhouse-test-reports/34001/8cace291d17fa9553a98b2a1e8bf15b30fe5a1bd/stateless_tests__debug__actions__[3/3].html Signed-off-by: Azat Khuzhin --- src/Interpreters/ExternalLoader.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Interpreters/ExternalLoader.cpp b/src/Interpreters/ExternalLoader.cpp index ae4eb748cc3..aab3a9e7437 100644 --- a/src/Interpreters/ExternalLoader.cpp +++ b/src/Interpreters/ExternalLoader.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -965,6 +966,11 @@ private: /// Does the loading, possibly in the separate thread. void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {}) { + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); + ); + if (thread_group) CurrentThread::attachTo(thread_group); From 0e1c6b6b9c2124a5dab43cba87c9246e9d888545 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 27 Jan 2022 18:12:26 +0300 Subject: [PATCH 08/11] Make 01810_max_part_removal_threads_long more accurate v0: After parts removing stop attach/detach for each remove, there are only 11 threads, primary query and 10 for removal. CI: https://s3.amazonaws.com/clickhouse-test-reports/34001/8cace291d17fa9553a98b2a1e8bf15b30fe5a1bd/stateless_tests__debug__actions__[3/3].html v2: Make 01810_max_part_removal_threads_long more deterministic Sometimes, if removing parts will be too fast (like in [1]), then it may use less threads, since thread pool will reuse existing, so increase number of rows per part and compare threads directly w/o throwIf(). [1]: https://s3.amazonaws.com/clickhouse-test-reports/34001/e74b6379a58c4607bdb54cd4c457410166e869ca/stateless_tests_flaky_check__address__actions_.html v3: Less flaky 01810_max_part_removal_threads_long v4: Make check not strict in 01810_max_part_removal_threads_long Signed-off-by: Azat Khuzhin --- .../01810_max_part_removal_threads_long.sh | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/tests/queries/0_stateless/01810_max_part_removal_threads_long.sh b/tests/queries/0_stateless/01810_max_part_removal_threads_long.sh index c5aaa794ac9..f5ab71d8d34 100755 --- a/tests/queries/0_stateless/01810_max_part_removal_threads_long.sh +++ b/tests/queries/0_stateless/01810_max_part_removal_threads_long.sh @@ -16,22 +16,54 @@ $CLICKHOUSE_CLIENT -nm -q "create database ordinary_$CLICKHOUSE_DATABASE engine= $CLICKHOUSE_CLIENT -nm -q """ use ordinary_$CLICKHOUSE_DATABASE; drop table if exists data_01810; - create table data_01810 (key Int) Engine=MergeTree() order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49; - insert into data_01810 select * from numbers(50); + + create table data_01810 (key Int) + Engine=MergeTree() + order by key + partition by key%100 + settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0; + + insert into data_01810 select * from numbers(100); drop table data_01810 settings log_queries=1; system flush logs; - select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null; + + -- sometimes the same thread can be used to remove part, due to ThreadPool, + -- hence we cannot compare strictly. + select throwIf(not(length(thread_ids) between 6 and 11)) + from system.query_log + where + event_date >= yesterday() and + current_database = currentDatabase() and + query = 'drop table data_01810 settings log_queries=1;' and + type = 'QueryFinish' + format Null; """ # ReplicatedMergeTree $CLICKHOUSE_CLIENT -nm -q """ use ordinary_$CLICKHOUSE_DATABASE; drop table if exists rep_data_01810; - create table rep_data_01810 (key Int) Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1') order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49; - insert into rep_data_01810 select * from numbers(50); + + create table rep_data_01810 (key Int) + Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1') + order by key + partition by key%100 + settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0; + + insert into rep_data_01810 select * from numbers(100); drop table rep_data_01810 settings log_queries=1; system flush logs; - select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table rep_data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null; + + -- sometimes the same thread can be used to remove part, due to ThreadPool, + -- hence we cannot compare strictly. + select throwIf(not(length(thread_ids) between 6 and 11)) + from system.query_log + where + event_date >= yesterday() and + current_database = currentDatabase() and + query = 'drop table rep_data_01810 settings log_queries=1;' and + type = 'QueryFinish' + format Null; """ $CLICKHOUSE_CLIENT -nm -q "drop database ordinary_$CLICKHOUSE_DATABASE" From c0a5bd187a4e6e66a947be47cc5392bc738b3d81 Mon Sep 17 00:00:00 2001 From: Federico Rodriguez Date: Fri, 28 Jan 2022 14:26:18 -0500 Subject: [PATCH 09/11] ExecuteScalarSubqueriesVisitor missing static const Added string_view --- src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp index 2117eec0063..a81d4204565 100644 --- a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp +++ b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp @@ -68,7 +68,7 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data) static bool worthConvertingToLiteral(const Block & scalar) { const auto * scalar_type_name = scalar.safeGetByPosition(0).type->getFamilyName(); - std::set useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"}; + static const std::set useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"}; return !useless_literal_types.count(scalar_type_name); } From 379f8d3d7e531840503456c84cf000c14df578b0 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sat, 29 Jan 2022 01:09:46 +0300 Subject: [PATCH 10/11] Update use.md --- docs/en/sql-reference/statements/use.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/sql-reference/statements/use.md b/docs/en/sql-reference/statements/use.md index 41cba58bb9d..841c23d333d 100644 --- a/docs/en/sql-reference/statements/use.md +++ b/docs/en/sql-reference/statements/use.md @@ -3,14 +3,14 @@ toc_priority: 53 toc_title: USE --- -# USE 语句 {#use} +# USE Statement {#use} ``` sql USE db ``` -用于设置会话的当前数据库。 +Lets you set the current database for the session. -如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。 +The current database is used for searching for tables if the database is not explicitly defined in the query with a dot before the table name. -使用 HTTP 协议时无法进行此查询,因为没有会话的概念。 +This query can’t be made when using the HTTP protocol, since there is no concept of a session. From 7c3d6f5fda4302cebdb980b08ae7f2fe4dcc9a95 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 29 Jan 2022 01:24:30 +0300 Subject: [PATCH 11/11] Update 02187_async_inserts_all_formats.sh --- tests/queries/0_stateless/02187_async_inserts_all_formats.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.sh b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh index 0031f72fbe5..4b0b8d84c58 100755 --- a/tests/queries/0_stateless/02187_async_inserts_all_formats.sh +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Tags: no-fasttest +# Tags: no-fasttest, long CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh