From 834f8426e0d6fa7c25e745b6ce963436ddcfce30 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 01:46:58 +0300 Subject: [PATCH 01/33] Fix memory leak --- src/Interpreters/Aggregator.cpp | 74 ++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index e7f6f16b91d..ec4a2f2ba9d 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1016,6 +1016,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( aggregate_functions[i]->insertResultInto( data.getNullKeyData() + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); + + data.getNullKeyData() = nullptr; } } @@ -1023,13 +1025,65 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( { method.insertKeyIntoColumns(key, key_columns, key_sizes); - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->insertResultInto( - mapped + offsets_of_aggregate_states[i], - *final_aggregate_columns[i]); - }); + /** Final values of aggregate functions are inserted to columns. + * Then states of aggregate functions, that are not longer needed, are destroyed. + * + * We mark already destroyed states with "nullptr" in data, + * so they will not be destroyed in destructor of Aggregator + * (other values will be destroyed in destructor in case of exception). + * + * But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data. + * So, if exception is thrown in the middle of moving states for different aggregate functions, + * we have to catch exceptions and destroy all the states that are no longer needed, + * to keep the data in consistent state. + * + * It is also tricky, because there are aggregate functions with "-State" modifier. + * When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction + * and ColumnAggregateFunction will take ownership of this state. + * So, for aggregate functions with "-State" modifier, the state must not be destroyed + * after it has been transferred to ColumnAggregateFunction. + * But we should mark that the data no longer owns these states. + */ - destroyImpl(data); + size_t insert_i = 0; + std::exception_ptr exception; + + try + { + /// Insert final values of aggregate functions into columns. + for (; insert_i < params.aggregates_size; ++insert_i) + aggregate_functions[insert_i]->insertResultInto( + mapped + offsets_of_aggregate_states[insert_i], + *final_aggregate_columns[insert_i]); + } + catch (...) + { + exception = std::current_exception(); + } + + /** Destroy states that are no longer needed. This loop does not throw. + * + * Don't destroy states for "-State" aggregate functions, + * because the ownership of this state is transferred to ColumnAggregateFunction + * and ColumnAggregateFunction will take care. + * + * But it's only for states that has been transferred to ColumnAggregateFunction + * before exception has been thrown; + */ + for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i) + { + /// If ownership was not transferred to ColumnAggregateFunction. + if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState())) + aggregate_functions[destroy_i]->destroy( + mapped + offsets_of_aggregate_states[destroy_i]); + } + + /// Mark the cell as destroyed so it will not be destroyed in destructor. + mapped = nullptr; + + if (exception) + std::rethrow_exception(exception); + }); } template @@ -1047,6 +1101,8 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]); + + data.getNullKeyData() = nullptr; } } @@ -2387,8 +2443,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const return; for (size_t i = 0; i < params.aggregates_size; ++i) - if (!aggregate_functions[i]->isState()) - aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); data = nullptr; }); @@ -2402,8 +2457,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const if (nullptr != res_data) { for (size_t i = 0; i < params.aggregates_size; ++i) - if (!aggregate_functions[i]->isState()) - aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); res_data = nullptr; } From d5443293a3cfa26793429b63ab58ca21e195d4b5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 01:49:26 +0300 Subject: [PATCH 02/33] Added a test --- ...regate_state_exception_memory_leak.reference | 2 ++ ...301_aggregate_state_exception_memory_leak.sh | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.reference create mode 100755 tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.sh diff --git a/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.reference b/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.reference new file mode 100644 index 00000000000..b20e7415f52 --- /dev/null +++ b/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.reference @@ -0,0 +1,2 @@ +Memory limit (for query) exceeded +Ok diff --git a/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.sh b/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.sh new file mode 100755 index 00000000000..633fa5ce315 --- /dev/null +++ b/tests/queries/0_stateless/01301_aggregate_state_exception_memory_leak.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +function test() +{ + for i in {1..1000}; do + $CLICKHOUSE_CLIENT --max_memory_usage 1G <<< "SELECT uniqExactState(number) FROM system.numbers_mt GROUP BY number % 10"; + done +} + +export -f test; + +# If the memory leak exists, it will lead to OOM fairly quickly. +timeout 30 bash -c test 2>&1 | grep -o -F 'Memory limit (for query) exceeded' | uniq +echo 'Ok' From e507f6b8363071f5c13b0febd9322d9063e2d1cc Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 02:01:04 +0300 Subject: [PATCH 03/33] More crystallized test --- ...gregate_state_exception_memory_leak.reference | 1 + ...1302_aggregate_state_exception_memory_leak.sh | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.reference create mode 100755 tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh diff --git a/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.reference b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.reference new file mode 100644 index 00000000000..7326d960397 --- /dev/null +++ b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.reference @@ -0,0 +1 @@ +Ok diff --git a/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh new file mode 100755 index 00000000000..23c1d7c4c57 --- /dev/null +++ b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +function test() +{ + for i in {1..50}; do + $CLICKHOUSE_CLIENT --query "SELECT groupArrayIfState(('Hello, world' AS s) || s || s || s || s || s || s || s || s || s, NOT throwIf(number > 50000000, 'Ok')) FROM system.numbers_mt GROUP BY number % 10"; + done +} + +export -f test; + +# If the memory leak exists, it will lead to OOM fairly quickly. +timeout 10 bash -c test 2>&1 | grep -o -F 'Ok' | uniq From 58e513f59e9c0c5e3649f7f157500128c7085fd4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 07:23:56 +0300 Subject: [PATCH 04/33] Corrected implementation for the case without key --- src/Interpreters/Aggregator.cpp | 151 ++++++++++++++++---------------- src/Interpreters/Aggregator.h | 5 ++ 2 files changed, 82 insertions(+), 74 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index ec4a2f2ba9d..0337016acb9 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -999,6 +999,73 @@ void Aggregator::convertToBlockImpl( data.clearAndShrink(); } + +template +void ALWAYS_INLINE Aggregator::insertAggregatesIntoColumns( + Mapped & mapped, + MutableColumns & final_aggregate_columns) const +{ + /** Final values of aggregate functions are inserted to columns. + * Then states of aggregate functions, that are not longer needed, are destroyed. + * + * We mark already destroyed states with "nullptr" in data, + * so they will not be destroyed in destructor of Aggregator + * (other values will be destroyed in destructor in case of exception). + * + * But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data. + * So, if exception is thrown in the middle of moving states for different aggregate functions, + * we have to catch exceptions and destroy all the states that are no longer needed, + * to keep the data in consistent state. + * + * It is also tricky, because there are aggregate functions with "-State" modifier. + * When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction + * and ColumnAggregateFunction will take ownership of this state. + * So, for aggregate functions with "-State" modifier, the state must not be destroyed + * after it has been transferred to ColumnAggregateFunction. + * But we should mark that the data no longer owns these states. + */ + + size_t insert_i = 0; + std::exception_ptr exception; + + try + { + /// Insert final values of aggregate functions into columns. + for (; insert_i < params.aggregates_size; ++insert_i) + aggregate_functions[insert_i]->insertResultInto( + mapped + offsets_of_aggregate_states[insert_i], + *final_aggregate_columns[insert_i]); + } + catch (...) + { + exception = std::current_exception(); + } + + /** Destroy states that are no longer needed. This loop does not throw. + * + * Don't destroy states for "-State" aggregate functions, + * because the ownership of this state is transferred to ColumnAggregateFunction + * and ColumnAggregateFunction will take care. + * + * But it's only for states that has been transferred to ColumnAggregateFunction + * before exception has been thrown; + */ + for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i) + { + /// If ownership was not transferred to ColumnAggregateFunction. + if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState())) + aggregate_functions[destroy_i]->destroy( + mapped + offsets_of_aggregate_states[destroy_i]); + } + + /// Mark the cell as destroyed so it will not be destroyed in destructor. + mapped = nullptr; + + if (exception) + std::rethrow_exception(exception); +} + + template void NO_INLINE Aggregator::convertToBlockImplFinal( Method & method, @@ -1011,78 +1078,14 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( if (data.hasNullKeyData()) { key_columns[0]->insertDefault(); - - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->insertResultInto( - data.getNullKeyData() + offsets_of_aggregate_states[i], - *final_aggregate_columns[i]); - - data.getNullKeyData() = nullptr; + insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns); } } data.forEachValue([&](const auto & key, auto & mapped) { method.insertKeyIntoColumns(key, key_columns, key_sizes); - - /** Final values of aggregate functions are inserted to columns. - * Then states of aggregate functions, that are not longer needed, are destroyed. - * - * We mark already destroyed states with "nullptr" in data, - * so they will not be destroyed in destructor of Aggregator - * (other values will be destroyed in destructor in case of exception). - * - * But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data. - * So, if exception is thrown in the middle of moving states for different aggregate functions, - * we have to catch exceptions and destroy all the states that are no longer needed, - * to keep the data in consistent state. - * - * It is also tricky, because there are aggregate functions with "-State" modifier. - * When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction - * and ColumnAggregateFunction will take ownership of this state. - * So, for aggregate functions with "-State" modifier, the state must not be destroyed - * after it has been transferred to ColumnAggregateFunction. - * But we should mark that the data no longer owns these states. - */ - - size_t insert_i = 0; - std::exception_ptr exception; - - try - { - /// Insert final values of aggregate functions into columns. - for (; insert_i < params.aggregates_size; ++insert_i) - aggregate_functions[insert_i]->insertResultInto( - mapped + offsets_of_aggregate_states[insert_i], - *final_aggregate_columns[insert_i]); - } - catch (...) - { - exception = std::current_exception(); - } - - /** Destroy states that are no longer needed. This loop does not throw. - * - * Don't destroy states for "-State" aggregate functions, - * because the ownership of this state is transferred to ColumnAggregateFunction - * and ColumnAggregateFunction will take care. - * - * But it's only for states that has been transferred to ColumnAggregateFunction - * before exception has been thrown; - */ - for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i) - { - /// If ownership was not transferred to ColumnAggregateFunction. - if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState())) - aggregate_functions[destroy_i]->destroy( - mapped + offsets_of_aggregate_states[destroy_i]); - } - - /// Mark the cell as destroyed so it will not be destroyed in destructor. - mapped = nullptr; - - if (exception) - std::rethrow_exception(exception); + insertAggregatesIntoColumns(mapped, final_aggregate_columns); }); } @@ -1243,16 +1246,16 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va { AggregatedDataWithoutKey & data = data_variants.without_key; - for (size_t i = 0; i < params.aggregates_size; ++i) - { - if (!final_) - aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]); - else - aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - if (!final_) + { + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]); data = nullptr; + } + else + { + insertAggregatesIntoColumns(data, final_aggregate_columns); + } if (params.overflow_row) for (size_t i = 0; i < params.keys_size; ++i) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index a5d79ce46dc..6d0eeee9014 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1166,6 +1166,11 @@ protected: MutableColumns & final_aggregate_columns, bool final) const; + template + void insertAggregatesIntoColumns( + Mapped & mapped, + MutableColumns & final_aggregate_columns) const; + template void convertToBlockImplFinal( Method & method, From b928ac133b6275ff20b198bff968be1410471592 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 8 Jun 2020 07:46:48 +0000 Subject: [PATCH 05/33] Bump livereload from 2.6.1 to 2.6.2 in /docs/tools Bumps [livereload](https://github.com/lepture/python-livereload) from 2.6.1 to 2.6.2. - [Release notes](https://github.com/lepture/python-livereload/releases) - [Changelog](https://github.com/lepture/python-livereload/blob/master/CHANGES.rst) - [Commits](https://github.com/lepture/python-livereload/compare/v2.6.1...2.6.2) Signed-off-by: dependabot-preview[bot] --- docs/tools/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tools/requirements.txt b/docs/tools/requirements.txt index d8fb7c1a442..37d6161a695 100644 --- a/docs/tools/requirements.txt +++ b/docs/tools/requirements.txt @@ -13,7 +13,7 @@ idna==2.9 Jinja2==2.11.2 jinja2-highlight==0.6.1 jsmin==2.2.2 -livereload==2.6.1 +livereload==2.6.2 Markdown==3.2.1 MarkupSafe==1.1.1 mkdocs==1.1.2 From 9d6fb1dd9cf89eab87d66346e63f0420e2a57493 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 8 Jun 2020 07:47:13 +0000 Subject: [PATCH 06/33] Bump certifi from 2020.4.5.1 to 2020.4.5.2 in /docs/tools Bumps [certifi](https://github.com/certifi/python-certifi) from 2020.4.5.1 to 2020.4.5.2. - [Release notes](https://github.com/certifi/python-certifi/releases) - [Commits](https://github.com/certifi/python-certifi/compare/2020.04.05.1...2020.04.05.2) Signed-off-by: dependabot-preview[bot] --- docs/tools/requirements.txt | 2 +- docs/tools/translate/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/tools/requirements.txt b/docs/tools/requirements.txt index d8fb7c1a442..4ffcd316d6c 100644 --- a/docs/tools/requirements.txt +++ b/docs/tools/requirements.txt @@ -2,7 +2,7 @@ Babel==2.8.0 backports-abc==0.5 backports.functools-lru-cache==1.6.1 beautifulsoup4==4.9.1 -certifi==2020.4.5.1 +certifi==2020.4.5.2 chardet==3.0.4 click==7.1.2 closure==20191111 diff --git a/docs/tools/translate/requirements.txt b/docs/tools/translate/requirements.txt index 3c212ee8bc2..0c9d44a346e 100644 --- a/docs/tools/translate/requirements.txt +++ b/docs/tools/translate/requirements.txt @@ -1,5 +1,5 @@ Babel==2.8.0 -certifi==2020.4.5.1 +certifi==2020.4.5.2 chardet==3.0.4 googletrans==2.4.0 idna==2.9 From 483c60faddd2aa6151b31433dc82175864689a79 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 14:25:30 +0300 Subject: [PATCH 07/33] Merging #8377 --- tests/integration/test_ttl_move/test.py | 95 ++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_ttl_move/test.py b/tests/integration/test_ttl_move/test.py index a4318df1658..26bd36b8cb6 100644 --- a/tests/integration/test_ttl_move/test.py +++ b/tests/integration/test_ttl_move/test.py @@ -61,7 +61,7 @@ def get_used_disks_for_table(node, table_name, partition=None): def check_used_disks_with_retry(node, table_name, expected_disks, retries): for _ in range(retries): used_disks = get_used_disks_for_table(node, table_name) - if set(used_disks) == expected_disks: + if set(used_disks).issubset(expected_disks): return True time.sleep(0.5) return False @@ -830,7 +830,8 @@ def test_concurrent_alter_with_ttl_move(started_cluster, name, engine): def optimize_table(num): for i in range(num): try: # optimize may throw after concurrent alter - node1.query("OPTIMIZE TABLE {} FINAL".format(name)) + node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'}) + break except: pass @@ -903,3 +904,93 @@ def test_double_move_while_select(started_cluster, name, positive): finally: node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) + + +@pytest.mark.parametrize("name,engine,positive", [ + ("mt_test_alter_with_merge_do_not_work","MergeTree()",0), + ("replicated_mt_test_alter_with_merge_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_do_not_work', '1')",0), + ("mt_test_alter_with_merge_work","MergeTree()",1), + ("replicated_mt_test_alter_with_merge_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_work', '1')",1), +]) +def test_alter_with_merge_work(started_cluster, name, engine, positive): + """Copyright 2019, Altinity LTD +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + """Check that TTL expressions are re-evaluated for + existing parts after ALTER command changes TTL expressions + and parts are merged. + """ + try: + node1.query(""" + CREATE TABLE {name} ( + s1 String, + d1 DateTime + ) ENGINE = {engine} + ORDER BY tuple() + TTL d1 + INTERVAL 3000 SECOND TO DISK 'jbod2', + d1 + INTERVAL 6000 SECOND TO VOLUME 'external' + SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0 + """.format(name=name, engine=engine)) + + + def optimize_table(num): + for i in range(num): + try: # optimize may throw after concurrent alter + node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'}) + break + except: + pass + + for p in range(3): + data = [] # 6MB in total + now = time.time() + for i in range(2): + s1 = get_random_string(1024 * 1024) # 1MB + d1 = now - 1 if positive else now + 300 + data.append("('{}', toDateTime({}))".format(s1, d1)) + values = ",".join(data) + node1.query("INSERT INTO {name} (s1, d1) VALUES {values}".format(name=name, values=values)) + + used_disks = get_used_disks_for_table(node1, name) + assert set(used_disks) == {"jbod1", "jbod2"} + + node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["6"] + + node1.query(""" + ALTER TABLE {name} MODIFY + TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2', + d1 + INTERVAL 5 SECOND TO VOLUME 'external', + d1 + INTERVAL 10 SECOND DELETE + """.format(name=name)) + + optimize_table(20) + + assert node1.query("SELECT count() FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)) == "1\n" + + time.sleep(5) + + optimize_table(20) + + if positive: + assert check_used_disks_with_retry(node1, name, set(["external"]), 50) + else: + assert check_used_disks_with_retry(node1, name, set(["jbod1", "jbod2"]), 50) + + time.sleep(5) + + optimize_table(20) + + if positive: + assert node1.query("SELECT count() FROM {name}".format(name=name)) == "0\n" + else: + assert node1.query("SELECT count() FROM {name}".format(name=name)) == "6\n" + + finally: + node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) From 138f3253bab1e9d37975f962c4d78e4c3c0ce84e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 15:33:00 +0300 Subject: [PATCH 08/33] Fix gcc build --- src/Interpreters/Aggregator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 0337016acb9..538a24fa997 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1001,7 +1001,7 @@ void Aggregator::convertToBlockImpl( template -void ALWAYS_INLINE Aggregator::insertAggregatesIntoColumns( +inline void Aggregator::insertAggregatesIntoColumns( Mapped & mapped, MutableColumns & final_aggregate_columns) const { From 7ba2d7e15f64838d092ae50931be6d362e5d6b8b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 8 Jun 2020 15:35:57 +0300 Subject: [PATCH 09/33] Changed timeouts in test --- .../01302_aggregate_state_exception_memory_leak.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh index 23c1d7c4c57..cd2fec408ab 100755 --- a/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh +++ b/tests/queries/0_stateless/01302_aggregate_state_exception_memory_leak.sh @@ -5,12 +5,12 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) function test() { - for i in {1..50}; do - $CLICKHOUSE_CLIENT --query "SELECT groupArrayIfState(('Hello, world' AS s) || s || s || s || s || s || s || s || s || s, NOT throwIf(number > 50000000, 'Ok')) FROM system.numbers_mt GROUP BY number % 10"; + for i in {1..250}; do + $CLICKHOUSE_CLIENT --query "SELECT groupArrayIfState(('Hello, world' AS s) || s || s || s || s || s || s || s || s || s, NOT throwIf(number > 10000000, 'Ok')) FROM system.numbers_mt GROUP BY number % 10"; done } export -f test; # If the memory leak exists, it will lead to OOM fairly quickly. -timeout 10 bash -c test 2>&1 | grep -o -F 'Ok' | uniq +timeout 30 bash -c test 2>&1 | grep -o -F 'Ok' | uniq From 339703d1b89a790a3be7b61aab000964eef9805a Mon Sep 17 00:00:00 2001 From: "Matwey V. Kornilov" Date: Mon, 8 Jun 2020 18:03:54 +0300 Subject: [PATCH 10/33] Fix missed #include is required for std::move --- base/common/strong_typedef.h | 1 + 1 file changed, 1 insertion(+) diff --git a/base/common/strong_typedef.h b/base/common/strong_typedef.h index a46eb415e15..0dc29ad9f1b 100644 --- a/base/common/strong_typedef.h +++ b/base/common/strong_typedef.h @@ -1,6 +1,7 @@ #pragma once #include +#include template struct StrongTypedef From 68bd636c9ccaf50cde51e476a1f85f71b0c072dc Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 18:16:01 +0300 Subject: [PATCH 11/33] Fix modify test --- .../01079_parallel_alter_modify_zookeeper.sh | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index bacc742d16a..9a6e9c3156c 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -100,8 +100,14 @@ wait echo "Finishing alters" -# This alter will finish all previous, but replica 1 maybe still not up-to-date -while [[ $(timeout 30 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do +# This alter will finish all previous, but replica 1 maybe still not up-to-date. +# If query will throw something, than we will sleep 1 and retry. If timeout +# happened we will silentrly go out of loop and probably fail tests in the +# following for loop. +# +# 120 seconds is more than enough, but in rare cases for slow builds (debug, +# thread) it maybe necessary +while [[ $(timeout 120 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do sleep 1 done From 5db83dad07f258c09618800c541d7d938decd1e1 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 18:18:33 +0300 Subject: [PATCH 12/33] Better comment --- .../0_stateless/01079_parallel_alter_modify_zookeeper.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index 9a6e9c3156c..effc9f540a1 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -106,7 +106,7 @@ echo "Finishing alters" # following for loop. # # 120 seconds is more than enough, but in rare cases for slow builds (debug, -# thread) it maybe necessary +# thread) it maybe necessary. while [[ $(timeout 120 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do sleep 1 done From dfa60e01a02ad3082b50075692a0a54cbcec0cd1 Mon Sep 17 00:00:00 2001 From: Eugene Klimov Date: Mon, 8 Jun 2020 20:23:13 +0500 Subject: [PATCH 13/33] add SYSTEM STOP/START FETCHES and SYNC REPLICA description (#11444) * add SYSTEM STOP/START FETCHES and SYNC REPLICA description Signed-off-by: Slach * fix russian verion Signed-off-by: Slach * fix typo Signed-off-by: Slach * add START/STOP REPLICATED SENDS description Signed-off-by: Slach * Update docs/en/sql-reference/statements/system.md Co-authored-by: Ivan Blinkov * sync russian and english version Signed-off-by: Slach * add SYSTEM START/STOP REPLICATION QUEUES Signed-off-by: Slach * add SYSTEM RESTART REPLICA Signed-off-by: Slach * add all missed SYSTEM statement descriptions Signed-off-by: Slach * fix missed link Signed-off-by: Slach Co-authored-by: Ivan Blinkov --- docs/en/sql-reference/statements/system.md | 141 +++++++++++++++++++++ docs/ru/sql-reference/statements/system.md | 141 +++++++++++++++++++++ 2 files changed, 282 insertions(+) diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index 9544998334f..e4823686c68 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -5,10 +5,13 @@ toc_title: SYSTEM # SYSTEM Queries {#query-language-system} +- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries) - [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries) - [RELOAD DICTIONARY](#query_language-system-reload-dictionary) - [DROP DNS CACHE](#query_language-system-drop-dns-cache) - [DROP MARK CACHE](#query_language-system-drop-mark-cache) +- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache) +- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache) - [FLUSH LOGS](#query_language-system-flush_logs) - [RELOAD CONFIG](#query_language-system-reload-config) - [SHUTDOWN](#query_language-system-shutdown) @@ -18,7 +21,25 @@ toc_title: SYSTEM - [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) - [STOP MERGES](#query_language-system-stop-merges) - [START MERGES](#query_language-system-start-merges) +- [STOP TTL MERGES](#query_language-stop-ttl-merges) +- [START TTL MERGES](#query_language-start-ttl-merges) +- [STOP MOVES](#query_language-stop-moves) +- [START MOVES](#query_language-start-moves) +- [STOP FETCHES](#query_language-system-stop-fetches) +- [START FETCHES](#query_language-system-start-fetches) +- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends) +- [START REPLICATED SENDS](#query_language-system-start-replicated-sends) +- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues) +- [START REPLICATION QUEUES](#query_language-system-start-replication-queues) +- [SYNC REPLICA](#query_language-system-sync-replica) +- [RESTART REPLICA](#query_language-system-restart-replica) +- [RESTART REPLICAS](#query_language-system-restart-replicas) +## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries} +Reload all [Internal dictionaries](../dictionaries/internal-dicts.md). +By default, internal dictionaries are disabled. +Always returns `Ok.` regardless of the result of the internal dictionary update. + ## RELOAD DICTIONARIES {#query_language-system-reload-dictionaries} Reloads all dictionaries that have been successfully loaded before. @@ -45,6 +66,16 @@ For more convenient (automatic) cache management, see disable\_internal\_dns\_ca Resets the mark cache. Used in development of ClickHouse and performance tests. +## DROP UNCOMPRESSED CACHE {#query_language-system-drop-uncompressed-cache} + +Reset the uncompressed data cache. Used in development of ClickHouse and performance tests. +For manage uncompressed data cache parameters use following server level settings [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) and query/user/profile level settings [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache) + + +## DROP COMPILED EXPRESSION CACHE {#query_language-system-drop-compiled-expression-cache} +Reset the compiled expression cache. Used in development of ClickHouse and performance tests. +Complied expression cache used when query/user/profile enable option [compile](../../operations/settings/settings.md#compile) + ## FLUSH LOGS {#query_language-system-flush_logs} Flushes buffers of log messages to system tables (e.g. system.query\_log). Allows you to not wait 7.5 seconds when debugging. @@ -89,6 +120,10 @@ Enables background data distribution when inserting data into distributed tables SYSTEM START DISTRIBUTED SENDS [db.] ``` +## Managing MergeTree Tables {#query-language-system-mergetree} + +ClickHouse can manage background processes in [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) tables. + ### STOP MERGES {#query_language-system-stop-merges} Provides possibility to stop background merges for tables in the MergeTree family: @@ -108,4 +143,110 @@ Provides possibility to start background merges for tables in the MergeTree fami SYSTEM START MERGES [[db.]merge_tree_family_table_name] ``` +### STOP TTL MERGES {#query_language-stop-ttl-merges} + +Provides possibility to stop background delete old data according to [TTL expression](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) for tables in the MergeTree family: +Return `Ok.` even table doesn't exists or table have not MergeTree engine. Return error when database doesn't exists: + +``` sql +SYSTEM STOP TTL MERGES [[db.]merge_tree_family_table_name] +``` + +### START TTL MERGES {#query_language-start-ttl-merges} + +Provides possibility to start background delete old data according to [TTL expression](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) for tables in the MergeTree family: +Return `Ok.` even table doesn't exists. Return error when database doesn't exists: + +``` sql +SYSTEM START TTL MERGES [[db.]merge_tree_family_table_name] +``` + +### STOP MOVES {#query_language-stop-moves} + +Provides possibility to stop background move data according to [TTL table expression with TO VOLUME or TO DISK clause](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family: +Return `Ok.` even table doesn't exists. Return error when database doesn't exists: + +``` sql +SYSTEM STOP MOVES [[db.]merge_tree_family_table_name] +``` + +### START MOVES {#query_language-start-moves} + +Provides possibility to start background move data according to [TTL table expression with TO VOLUME and TO DISK clause](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family: +Return `Ok.` even table doesn't exists. Return error when database doesn't exists: + +``` sql +SYSTEM STOP MOVES [[db.]merge_tree_family_table_name] +``` + +## Managing ReplicatedMergeTree Tables {#query-language-system-replicated} + +ClickHouse can manage background replication related processes in [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) tables. + +### STOP FETCHES {#query_language-system-stop-fetches} +Provides possibility to stop background fetches for inserted parts for tables in the `ReplicatedMergeTree` family: +Always returns `Ok.` regardless of the table engine and even table or database doesn't exists. + +``` sql +SYSTEM STOP FETCHES [[db.]replicated_merge_tree_family_table_name] +``` + +### START FETCHES {#query_language-system-start-fetches} +Provides possibility to start background fetches for inserted parts for tables in the `ReplicatedMergeTree` family: +Always returns `Ok.` regardless of the table engine and even table or database doesn't exists. + +``` sql +SYSTEM START FETCHES [[db.]replicated_merge_tree_family_table_name] +``` + +### STOP REPLICATED SENDS {#query_language-system-start-replicated-sends} +Provides possibility to stop background sends to other replicas in cluster for new inserted parts for tables in the `ReplicatedMergeTree` family: + +``` sql +SYSTEM STOP REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name] +``` + +### START REPLICATED SENDS {#query_language-system-start-replicated-sends} +Provides possibility to start background sends to other replicas in cluster for new inserted parts for tables in the `ReplicatedMergeTree` family: + +``` sql +SYSTEM START REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name] +``` + +### STOP REPLICATION QUEUES {#query_language-system-stop-replication-queues} +Provides possibility to stop background fetch tasks from replication queues which stored in Zookeeper for tables in the `ReplicatedMergeTree` family. Possible background tasks types - merges, fetches, mutation, DDL statements with ON CLUSTER clause: + +``` sql +SYSTEM STOP REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name] +``` + +### START REPLICATION QUEUES {#query_language-system-start-replication-queues} +Provides possibility to start background fetch tasks from replication queues which stored in Zookeeper for tables in the `ReplicatedMergeTree` family. Possible background tasks types - merges, fetches, mutation, DDL statements with ON CLUSTER clause: + +``` sql +SYSTEM START REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name] +``` + +### SYNC REPLICA {#query_language-system-sync-replica} +Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster. Will run until `receive_timeout` if fetches currently disabled for the table. + +``` sql +SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name +``` + +### RESTART REPLICA {#query_language-system-restart-replica} +Provides possibility to reinitialize Zookeeper sessions state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed +Initialization replication quene based on ZooKeeper date happens in the same way as `ATTACH TABLE` statement. For a short time the table will be unavailable for any operations. + +``` sql +SYSTEM RESTART REPLICA [db.]replicated_merge_tree_family_table_name +``` + +### RESTART REPLICAS {#query_language-system-restart-replicas} +Provides possibility to reinitialize Zookeeper sessions state for all `ReplicatedMergeTree` tables, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed + +``` sql +SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name +``` + [Original article](https://clickhouse.tech/docs/en/query_language/system/) diff --git a/docs/ru/sql-reference/statements/system.md b/docs/ru/sql-reference/statements/system.md index b058739c894..1b66fa039d9 100644 --- a/docs/ru/sql-reference/statements/system.md +++ b/docs/ru/sql-reference/statements/system.md @@ -1,9 +1,12 @@ # Запросы SYSTEM {#query-language-system} +- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries) - [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries) - [RELOAD DICTIONARY](#query_language-system-reload-dictionary) - [DROP DNS CACHE](#query_language-system-drop-dns-cache) - [DROP MARK CACHE](#query_language-system-drop-mark-cache) +- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache) +- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache) - [FLUSH LOGS](#query_language-system-flush_logs) - [RELOAD CONFIG](#query_language-system-reload-config) - [SHUTDOWN](#query_language-system-shutdown) @@ -13,7 +16,25 @@ - [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) - [STOP MERGES](#query_language-system-stop-merges) - [START MERGES](#query_language-system-start-merges) +- [STOP TTL MERGES](#query_language-stop-ttl-merges) +- [START TTL MERGES](#query_language-start-ttl-merges) +- [STOP MOVES](#query_language-stop-moves) +- [START MOVES](#query_language-start-moves) +- [STOP FETCHES](#query_language-system-stop-fetches) +- [START FETCHES](#query_language-system-start-fetches) +- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends) +- [START REPLICATED SENDS](#query_language-system-start-replicated-sends) +- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues) +- [START REPLICATION QUEUES](#query_language-system-start-replication-queues) +- [SYNC REPLICA](#query_language-system-sync-replica) +- [RESTART REPLICA](#query_language-system-restart-replica) +- [RESTART REPLICAS](#query_language-system-restart-replicas) +## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries} +Перегружет все [Встроенные словари](../dictionaries/internal-dicts.md). +По умолчанию встроенные словари выключены. +Всегда возвращает `Ok.`, вне зависимости от результата обновления встроенных словарей. + ## RELOAD DICTIONARIES {#query_language-system-reload-dictionaries} Перегружает все словари, которые были успешно загружены до этого. @@ -40,6 +61,16 @@ SELECT name, status FROM system.dictionaries; Сбрасывает кеш «засечек» (`mark cache`). Используется при разработке ClickHouse и тестах производительности. +## DROP UNCOMPRESSED CACHE {#query_language-system-drop-uncompressed-cache} + +Сбрасывает кеш не сжатых данных. Используется при разработке ClickHouse и тестах производительности. +Для управления кешем не сжатых данных используйте следующие настройки уровня сервера [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) и настройки уровня запрос/пользователь/профиль [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache) + + +## DROP COMPILED EXPRESSION CACHE {#query_language-system-drop-compiled-expression-cache} +Сбрасывает кеш скомпилированных выражений. Используется при разработке ClickHouse и тестах производительности. +Компилированные выражения используются когда включена настройка уровня запрос/пользователь/профиль [compile](../../operations/settings/settings.md#compile) + ## FLUSH LOGS {#query_language-system-flush_logs} Записывает буферы логов в системные таблицы (например system.query\_log). Позволяет не ждать 7.5 секунд при отладке. @@ -84,6 +115,10 @@ SYSTEM FLUSH DISTRIBUTED [db.] SYSTEM START DISTRIBUTED SENDS [db.] ``` +## Managing MergeTree Tables {#query-language-system-mergetree} + +ClickHouse может управлять фоновыми процессами в [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) таблицах. + ### STOP MERGES {#query_language-system-stop-merges} Позволяет остановить фоновые мержи для таблиц семейства MergeTree: @@ -103,4 +138,110 @@ SYSTEM STOP MERGES [[db.]merge_tree_family_table_name] SYSTEM START MERGES [[db.]merge_tree_family_table_name] ``` +### STOP TTL MERGES {#query_language-stop-ttl-merges} + +Позволяет остановить фоновые процессы удаления старых данных основанные на [выражениях TTL](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) для таблиц семейства MergeTree: +Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных: + +``` sql +SYSTEM STOP TTL MERGES [[db.]merge_tree_family_table_name] +``` + +### START TTL MERGES {#query_language-start-ttl-merges} + +Запускает фоновые процессы удаления старых данных основанные на [выражениях TTL](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) для таблиц семейства MergeTree: +Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных: + +``` sql +SYSTEM START TTL MERGES [[db.]merge_tree_family_table_name] +``` + +### STOP MOVES {#query_language-stop-moves} + +Позволяет остановить фоновые процессы переноса данных основанные [табличных выражениях TTL с использованием TO VOLUME или TO DISK](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family: +Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных: + +``` sql +SYSTEM STOP MOVES [[db.]merge_tree_family_table_name] +``` + +### START MOVES {#query_language-start-moves} + +Запускает фоновые процессы переноса данных основанные [табличных выражениях TTL с использованием TO VOLUME или TO DISK](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family: +Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных: + +``` sql +SYSTEM STOP MOVES [[db.]merge_tree_family_table_name] +``` + +## Managing ReplicatedMergeTree Tables {#query-language-system-replicated} + +ClickHouse может управлять фоновыми процессами связанными c репликацией в таблицах семейства [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md). + +### STOP FETCHES {#query_language-system-stop-fetches} +Позволяет остановить фоновые процессы синхронизации новыми вставленными кусками данных с другими репликами в кластере для таблиц семейства `ReplicatedMergeTree`: +Всегда возвращает `Ok.` вне зависимости от типа таблицы и даже если таблица или база данных не существет. + +``` sql +SYSTEM STOP FETCHES [[db.]replicated_merge_tree_family_table_name] +``` + +### START FETCHES {#query_language-system-start-fetches} +Позволяет запустить фоновые процессы синхронизации новыми вставленными кусками данных с другими репликами в кластере для таблиц семейства `ReplicatedMergeTree`: +Всегда возвращает `Ok.` вне зависимости от типа таблицы и даже если таблица или база данных не существет. + +``` sql +SYSTEM START FETCHES [[db.]replicated_merge_tree_family_table_name] +``` + +### STOP REPLICATED SENDS {#query_language-system-start-replicated-sends} +Позволяет остановить фоновые процессы отсылки новых вставленных кусков данных другим репликам в кластере для таблиц семейства `ReplicatedMergeTree`: + +``` sql +SYSTEM STOP REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name] +``` + +### START REPLICATED SENDS {#query_language-system-start-replicated-sends} +Позволяет запустить фоновые процессы отсылки новых вставленных кусков данных другим репликам в кластере для таблиц семейства `ReplicatedMergeTree`: + +``` sql +SYSTEM START REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name] +``` + +### STOP REPLICATION QUEUES {#query_language-system-stop-replication-queues} +Останавливает фоновые процессы разбора заданий из очереди репликации которая хранится в Zookeeper для таблиц семейства `ReplicatedMergeTree`. Возможные типы заданий - merges, fetches, mutation, DDL запросы с ON CLUSTER: + +``` sql +SYSTEM STOP REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name] +``` + +### START REPLICATION QUEUES {#query_language-system-start-replication-queues} +Запускает фоновые процессы разбора заданий из очереди репликации которая хранится в Zookeeper для таблиц семейства `ReplicatedMergeTree`. Возможные типы заданий - merges, fetches, mutation, DDL запросы с ON CLUSTER: + +``` sql +SYSTEM START REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name] +``` + +### SYNC REPLICA {#query_language-system-sync-replica} +Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, будет работать до достижения `receive_timeout`, если синхронизация для таблицы отключена в настоящий момент времени: + +``` sql +SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name +``` + +### RESTART REPLICA {#query_language-system-restart-replica} +Реинициализация состояния Zookeeper сессий для таблицы семейства `ReplicatedMergeTree`, сравнивает текущее состояние с тем что хранится в Zookeeper как источник правды и добавляет задачи Zookeeper очередь если необходимо +Инициализация очереди репликации на основе данных ZooKeeper, происходит так же как при attach table. На короткое время таблица станет недоступной для любых операций. + +``` sql +SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name +``` + +### RESTART REPLICAS {#query_language-system-restart-replicas} +Реинициализация состояния Zookeeper сессий для всех `ReplicatedMergeTree` таблиц, сравнивает текущее состояние с тем что хранится в Zookeeper как источник правды и добавляет задачи Zookeeper очередь если необходимо + +``` sql +SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name +``` + [Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/system/) From 269bf6f5ef131a6a4c0d837f803ad17bbf675e3f Mon Sep 17 00:00:00 2001 From: "Matwey V. Kornilov" Date: Mon, 8 Jun 2020 19:25:21 +0300 Subject: [PATCH 14/33] Fix missed #include is required for std::hash --- base/common/strong_typedef.h | 1 + 1 file changed, 1 insertion(+) diff --git a/base/common/strong_typedef.h b/base/common/strong_typedef.h index 0dc29ad9f1b..d9850a25c37 100644 --- a/base/common/strong_typedef.h +++ b/base/common/strong_typedef.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include From 42acb627fbd1f5c155d2e6ea589a6471b2a46e94 Mon Sep 17 00:00:00 2001 From: "S.M.A. Djawadi" Date: Mon, 8 Jun 2020 21:04:16 +0430 Subject: [PATCH 15/33] Fix typo (#11521) --- docs/en/engines/table-engines/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/engines/table-engines/index.md b/docs/en/engines/table-engines/index.md index ac23120b9cd..ee28bfda905 100644 --- a/docs/en/engines/table-engines/index.md +++ b/docs/en/engines/table-engines/index.md @@ -60,7 +60,7 @@ Engines in the family: - [Distributed](special/distributed.md#distributed) - [MaterializedView](special/materializedview.md#materializedview) - [Dictionary](special/dictionary.md#dictionary) -- [Merge](special/merge.md#merge +- [Merge](special/merge.md#merge) - [File](special/file.md#file) - [Null](special/null.md#null) - [Set](special/set.md#set) From 9941fbe32d9085737efdd20c0162ea7e9d6f4241 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 19:34:42 +0300 Subject: [PATCH 16/33] Fix benign race condition during shutdown --- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index fea05c00e4f..b399584f4d9 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1865,7 +1865,7 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect part->remove_time.store(remove_time, std::memory_order_relaxed); if (part->state != IMergeTreeDataPart::State::Outdated) - modifyPartState(part,IMergeTreeDataPart::State::Outdated); + modifyPartState(part, IMergeTreeDataPart::State::Outdated); } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index faa44ff7db1..f2ac6678764 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2961,7 +2961,6 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - clearOldPartsFromFilesystem(true); /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); @@ -2997,6 +2996,12 @@ void StorageReplicatedMergeTree::shutdown() std::unique_lock lock(data_parts_exchange_endpoint->rwlock); } data_parts_exchange_endpoint.reset(); + + /// We clear all parts after stopping all background operations. It's + /// important, because background operations can produce temporary parts + /// which will remove themselfs in their descrutors. If so, we may have race + /// condition between our remove call and background process. + clearOldPartsFromFilesystem(true); } From ce73b30505524d7791ef6fea71b9e26addc0df7a Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 21:08:55 +0300 Subject: [PATCH 17/33] Review fixes --- src/Storages/StorageMergeTree.cpp | 27 ++++++++++++------- src/Storages/StorageReplicatedMergeTree.cpp | 6 ++--- .../01079_parallel_alter_modify_zookeeper.sh | 2 +- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4650485847c..15e662b27b5 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -141,16 +141,6 @@ void StorageMergeTree::shutdown() mutation_wait_event.notify_all(); } - try - { - clearOldPartsFromFilesystem(true); - } - catch (...) - { - /// Example: the case of readonly filesystem, we have failure removing old parts. - /// Should not prevent table shutdown. - tryLogCurrentException(log); - } merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); @@ -160,6 +150,23 @@ void StorageMergeTree::shutdown() if (moving_task_handle) global_context.getBackgroundMovePool().removeTask(moving_task_handle); + + + try + { + /// We clear all old parts after stopping all background operations. + /// It's important, because background operations can produce temporary + /// parts which will remove themselves in their descrutors. If so, we + /// may have race condition between our remove call and background + /// process. + clearOldPartsFromFilesystem(true); + } + catch (...) + { + /// Example: the case of readonly filesystem, we have failure removing old parts. + /// Should not prevent table shutdown. + tryLogCurrentException(log); + } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f2ac6678764..d109fa464b0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2997,10 +2997,10 @@ void StorageReplicatedMergeTree::shutdown() } data_parts_exchange_endpoint.reset(); - /// We clear all parts after stopping all background operations. It's + /// We clear all old parts after stopping all background operations. It's /// important, because background operations can produce temporary parts - /// which will remove themselfs in their descrutors. If so, we may have race - /// condition between our remove call and background process. + /// which will remove themselves in their descrutors. If so, we may have + /// race condition between our remove call and background process. clearOldPartsFromFilesystem(true); } diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index effc9f540a1..05ef4a1a675 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -102,7 +102,7 @@ echo "Finishing alters" # This alter will finish all previous, but replica 1 maybe still not up-to-date. # If query will throw something, than we will sleep 1 and retry. If timeout -# happened we will silentrly go out of loop and probably fail tests in the +# happened we will silently go out of loop and probably fail tests in the # following for loop. # # 120 seconds is more than enough, but in rare cases for slow builds (debug, From 00e7eb91b19ef8bacfa12aa4aee418ebb65da337 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Mon, 8 Jun 2020 23:13:03 +0300 Subject: [PATCH 18/33] Revert "Autocompletion support for users in client" --- programs/client/Suggest.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/programs/client/Suggest.cpp b/programs/client/Suggest.cpp index 1229caae245..4ac5e735fd5 100644 --- a/programs/client/Suggest.cpp +++ b/programs/client/Suggest.cpp @@ -116,8 +116,6 @@ void Suggest::loadImpl(Connection & connection, const ConnectionTimeouts & timeo << " UNION ALL " "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str << " UNION ALL " - "SELECT DISTINCT name FROM system.users LIMIT " << limit_str - << " UNION ALL " "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str; } From ef7b054443b5b2b8a43c1ddecbefae0039fb9173 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 00:53:32 +0300 Subject: [PATCH 19/33] Speed up merging in AggregatingMergeTree --- src/Columns/ColumnAggregateFunction.cpp | 59 +++++++++++++++++---- src/Columns/ColumnAggregateFunction.h | 21 +++----- src/DataTypes/DataTypeAggregateFunction.cpp | 10 ++-- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index 1568437618d..2e8d2589b78 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,51 @@ namespace ErrorCodes } +static std::string getTypeString(const AggregateFunctionPtr & func) +{ + WriteBufferFromOwnString stream; + stream << "AggregateFunction(" << func->getName(); + const auto & parameters = func->getParameters(); + const auto & argument_types = func->getArgumentTypes(); + + if (!parameters.empty()) + { + stream << '('; + for (size_t i = 0; i < parameters.size(); ++i) + { + if (i) + stream << ','; + stream << applyVisitor(FieldVisitorToString(), parameters[i]); + } + stream << ')'; + } + + for (const auto & argument_type : argument_types) + stream << ',' << argument_type->getName(); + + stream << ')'; + return stream.str(); +} + + +ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_) + : func(func_), type_string(getTypeString(func)) +{ +} + +ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_, const ConstArenas & arenas_) + : foreign_arenas(arenas_), func(func_), type_string(getTypeString(func)) +{ + +} + +void ColumnAggregateFunction::set(const AggregateFunctionPtr & func_) +{ + func = func_; + type_string = getTypeString(func); +} + + ColumnAggregateFunction::~ColumnAggregateFunction() { if (!func->hasTrivialDestructor() && !src) @@ -336,15 +382,10 @@ MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const return create(func); } -String ColumnAggregateFunction::getTypeString() const -{ - return DataTypeAggregateFunction(func, func->getArgumentTypes(), func->getParameters()).getName(); -} - Field ColumnAggregateFunction::operator[](size_t n) const { Field field = AggregateFunctionStateData(); - field.get().name = getTypeString(); + field.get().name = type_string; { WriteBufferFromString buffer(field.get().data); func->serialize(data[n], buffer); @@ -355,7 +396,7 @@ Field ColumnAggregateFunction::operator[](size_t n) const void ColumnAggregateFunction::get(size_t n, Field & res) const { res = AggregateFunctionStateData(); - res.get().name = getTypeString(); + res.get().name = type_string; { WriteBufferFromString buffer(res.get().data); func->serialize(data[n], buffer); @@ -425,8 +466,6 @@ static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Ar void ColumnAggregateFunction::insert(const Field & x) { - String type_string = getTypeString(); - if (x.getType() != Field::Types::AggregateFunctionState) throw Exception(String("Inserting field of type ") + x.getTypeName() + " into ColumnAggregateFunction. " "Expected " + Field::Types::toString(Field::Types::AggregateFunctionState), ErrorCodes::LOGICAL_ERROR); @@ -564,7 +603,7 @@ void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const AggregateDataPtr place = place_buffer.data(); AggregateFunctionStateData serialized; - serialized.name = getTypeString(); + serialized.name = type_string; func->create(place); try diff --git a/src/Columns/ColumnAggregateFunction.h b/src/Columns/ColumnAggregateFunction.h index 002bc71f561..a9b3c38a2e0 100644 --- a/src/Columns/ColumnAggregateFunction.h +++ b/src/Columns/ColumnAggregateFunction.h @@ -74,6 +74,9 @@ private: /// Array of pointers to aggregation states, that are placed in arenas. Container data; + /// Name of the type to distinguish different aggregation states. + String type_string; + ColumnAggregateFunction() {} /// Create a new column that has another column as a source. @@ -84,29 +87,17 @@ private: /// but ownership of different elements cannot be mixed by different columns. void ensureOwnership(); - ColumnAggregateFunction(const AggregateFunctionPtr & func_) - : func(func_) - { - } + ColumnAggregateFunction(const AggregateFunctionPtr & func_); ColumnAggregateFunction(const AggregateFunctionPtr & func_, - const ConstArenas & arenas_) - : foreign_arenas(arenas_), func(func_) - { - } - + const ConstArenas & arenas_); ColumnAggregateFunction(const ColumnAggregateFunction & src_); - String getTypeString() const; - public: ~ColumnAggregateFunction() override; - void set(const AggregateFunctionPtr & func_) - { - func = func_; - } + void set(const AggregateFunctionPtr & func_); AggregateFunctionPtr getAggregateFunction() { return func; } AggregateFunctionPtr getAggregateFunction() const { return func; } diff --git a/src/DataTypes/DataTypeAggregateFunction.cpp b/src/DataTypes/DataTypeAggregateFunction.cpp index e94d761dc87..59811b1cd55 100644 --- a/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/src/DataTypes/DataTypeAggregateFunction.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include @@ -36,25 +38,25 @@ namespace ErrorCodes std::string DataTypeAggregateFunction::doGetName() const { - std::stringstream stream; + WriteBufferFromOwnString stream; stream << "AggregateFunction(" << function->getName(); if (!parameters.empty()) { - stream << "("; + stream << '('; for (size_t i = 0; i < parameters.size(); ++i) { if (i) stream << ", "; stream << applyVisitor(DB::FieldVisitorToString(), parameters[i]); } - stream << ")"; + stream << ')'; } for (const auto & argument_type : argument_types) stream << ", " << argument_type->getName(); - stream << ")"; + stream << ')'; return stream.str(); } From f9ea964c87bb36bf0b7e26a315d20bdb240f0016 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 00:59:58 +0300 Subject: [PATCH 20/33] Added performance test --- tests/performance/aggregating_merge_tree.xml | 32 ++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 tests/performance/aggregating_merge_tree.xml diff --git a/tests/performance/aggregating_merge_tree.xml b/tests/performance/aggregating_merge_tree.xml new file mode 100644 index 00000000000..2116050f7a5 --- /dev/null +++ b/tests/performance/aggregating_merge_tree.xml @@ -0,0 +1,32 @@ + + DROP TABLE IF EXISTS test + + + CREATE TABLE test( + t UInt64, + q1 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64), + q2 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64), + q3 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64), + q4 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64), + q5 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64) + ) ENGINE=SummingMergeTree() + ORDER BY t + + + + INSERT INTO test + SELECT + number / 10 as t, + quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q1, + quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q2, + quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q3, + quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q4, + quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q5 + FROM numbers(1000 * 1000) + GROUP BY t + + + OPTIMIZE TABLE test FINAL + + DROP TABLE test + From 85363ebe568eb61d78b1f90a32004bbfb9acb085 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Tue, 9 Jun 2020 01:41:57 +0300 Subject: [PATCH 21/33] Update aggregating_merge_tree.xml --- tests/performance/aggregating_merge_tree.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/performance/aggregating_merge_tree.xml b/tests/performance/aggregating_merge_tree.xml index 2116050f7a5..d658fd705bb 100644 --- a/tests/performance/aggregating_merge_tree.xml +++ b/tests/performance/aggregating_merge_tree.xml @@ -1,6 +1,4 @@ - DROP TABLE IF EXISTS test - CREATE TABLE test( t UInt64, From 63f2d92eff60cd139600b92cfd5dd0590ae0db12 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 01:49:19 +0300 Subject: [PATCH 22/33] Better exception message when cannot parse columns declaration list #10403 --- .../parseColumnsListForTableFunction.cpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/TableFunctions/parseColumnsListForTableFunction.cpp b/src/TableFunctions/parseColumnsListForTableFunction.cpp index 8eea3edf9bd..5221d96e086 100644 --- a/src/TableFunctions/parseColumnsListForTableFunction.cpp +++ b/src/TableFunctions/parseColumnsListForTableFunction.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -11,27 +12,20 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; - extern const int SYNTAX_ERROR; } ColumnsDescription parseColumnsListFromString(const std::string & structure, const Context & context) { - Expected expected; - - Tokens tokens(structure.c_str(), structure.c_str() + structure.size()); - IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); - ParserColumnDeclarationList parser; - ASTPtr columns_list_raw; + const Settings & settings = context.getSettingsRef(); - if (!parser.parse(token_iterator, columns_list_raw, expected)) - throw Exception("Cannot parse columns declaration list.", ErrorCodes::SYNTAX_ERROR); + ASTPtr columns_list_raw = parseQuery(parser, structure, "columns declaration list", settings.max_query_size, settings.max_parser_depth); auto * columns_list = dynamic_cast(columns_list_raw.get()); if (!columns_list) throw Exception("Could not cast AST to ASTExpressionList", ErrorCodes::LOGICAL_ERROR); - return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !context.getSettingsRef().allow_suspicious_codecs); + return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !settings.allow_suspicious_codecs); } } From 6936da4e5d2fc113d56c9d9a92835b7e802eff11 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 03:42:56 +0300 Subject: [PATCH 23/33] Fix bad test --- tests/queries/0_stateless/01087_table_function_generate.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01087_table_function_generate.sql b/tests/queries/0_stateless/01087_table_function_generate.sql index 96db6803a47..05f03a5a4e6 100644 --- a/tests/queries/0_stateless/01087_table_function_generate.sql +++ b/tests/queries/0_stateless/01087_table_function_generate.sql @@ -33,11 +33,11 @@ LIMIT 10; SELECT '-'; SELECT toTypeName(i)s -FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))') +FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))') LIMIT 1; SELECT i -FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))', 1, 10, 10) +FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))', 1, 10, 10) LIMIT 10; SELECT '-'; SELECT From 508e0f44d188e883ca46935f3ee6dcfc9ac17853 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 18:16:01 +0300 Subject: [PATCH 24/33] Fix modify test --- .../01079_parallel_alter_modify_zookeeper.sh | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index bacc742d16a..9a6e9c3156c 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -100,8 +100,14 @@ wait echo "Finishing alters" -# This alter will finish all previous, but replica 1 maybe still not up-to-date -while [[ $(timeout 30 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do +# This alter will finish all previous, but replica 1 maybe still not up-to-date. +# If query will throw something, than we will sleep 1 and retry. If timeout +# happened we will silentrly go out of loop and probably fail tests in the +# following for loop. +# +# 120 seconds is more than enough, but in rare cases for slow builds (debug, +# thread) it maybe necessary +while [[ $(timeout 120 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do sleep 1 done From bdeafe830b1b241d39a3ed32f8133ddebd66b5bb Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 18:18:33 +0300 Subject: [PATCH 25/33] Better comment --- .../0_stateless/01079_parallel_alter_modify_zookeeper.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index 9a6e9c3156c..effc9f540a1 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -106,7 +106,7 @@ echo "Finishing alters" # following for loop. # # 120 seconds is more than enough, but in rare cases for slow builds (debug, -# thread) it maybe necessary +# thread) it maybe necessary. while [[ $(timeout 120 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do sleep 1 done From de59629b386782da4e901845552b7e46a67d4dd9 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 19:34:42 +0300 Subject: [PATCH 26/33] Fix benign race condition during shutdown --- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index fea05c00e4f..b399584f4d9 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1865,7 +1865,7 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect part->remove_time.store(remove_time, std::memory_order_relaxed); if (part->state != IMergeTreeDataPart::State::Outdated) - modifyPartState(part,IMergeTreeDataPart::State::Outdated); + modifyPartState(part, IMergeTreeDataPart::State::Outdated); } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index faa44ff7db1..f2ac6678764 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2961,7 +2961,6 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - clearOldPartsFromFilesystem(true); /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); @@ -2997,6 +2996,12 @@ void StorageReplicatedMergeTree::shutdown() std::unique_lock lock(data_parts_exchange_endpoint->rwlock); } data_parts_exchange_endpoint.reset(); + + /// We clear all parts after stopping all background operations. It's + /// important, because background operations can produce temporary parts + /// which will remove themselfs in their descrutors. If so, we may have race + /// condition between our remove call and background process. + clearOldPartsFromFilesystem(true); } From decac918a27389e593c273ec147c3251ea998bd5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 04:48:11 +0300 Subject: [PATCH 27/33] Fix error --- src/Columns/ColumnAggregateFunction.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index 2e8d2589b78..d4021b45f0e 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -41,14 +41,14 @@ static std::string getTypeString(const AggregateFunctionPtr & func) for (size_t i = 0; i < parameters.size(); ++i) { if (i) - stream << ','; + stream << ", "; stream << applyVisitor(FieldVisitorToString(), parameters[i]); } stream << ')'; } for (const auto & argument_type : argument_types) - stream << ',' << argument_type->getName(); + stream << ", " << argument_type->getName(); stream << ')'; return stream.str(); From 8b8beb26d3db3aba31d072e24cfa9607acf04b7d Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Mon, 8 Jun 2020 19:26:56 +0300 Subject: [PATCH 28/33] S3 Poco HTTP Client (do not copy response stream body into memory). --- contrib/aws | 2 +- src/IO/S3/PocoHTTPClient.cpp | 4 ++-- src/IO/S3/PocoHTTPResponseStream.cpp | 12 ++++++++++++ src/IO/S3/PocoHTTPResponseStream.h | 21 +++++++++++++++++++++ src/IO/S3Common.cpp | 2 -- 5 files changed, 36 insertions(+), 5 deletions(-) create mode 100644 src/IO/S3/PocoHTTPResponseStream.cpp create mode 100644 src/IO/S3/PocoHTTPResponseStream.h diff --git a/contrib/aws b/contrib/aws index f7d9ce39f41..17e10c0fc77 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit f7d9ce39f41323300044567be007c233338bb94a +Subproject commit 17e10c0fc77f22afe890fa6d1b283760e5edaa56 diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index b8de483a5a8..f2d44e8d93a 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -149,8 +150,7 @@ void PocoHTTPClient::MakeRequestInternal( response->SetClientErrorMessage(error_message); } else - /// TODO: Do not copy whole stream. - Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody()); + response->GetResponseStream().SetUnderlyingStream(std::make_shared(session, response_body_stream)); break; } diff --git a/src/IO/S3/PocoHTTPResponseStream.cpp b/src/IO/S3/PocoHTTPResponseStream.cpp new file mode 100644 index 00000000000..0a198268f2e --- /dev/null +++ b/src/IO/S3/PocoHTTPResponseStream.cpp @@ -0,0 +1,12 @@ +#include "PocoHTTPResponseStream.h" + +#include + +namespace DB::S3 +{ +PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_) + : Aws::IStream(response_stream_.rdbuf()), session(std::move(session_)) +{ +} + +} diff --git a/src/IO/S3/PocoHTTPResponseStream.h b/src/IO/S3/PocoHTTPResponseStream.h new file mode 100644 index 00000000000..8167ddc4346 --- /dev/null +++ b/src/IO/S3/PocoHTTPResponseStream.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace DB::S3 +{ +/** + * Wrapper of IStream to store response stream and corresponding HTTP session. + */ +class PocoHTTPResponseStream : public Aws::IStream +{ +public: + PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_); + +private: + /// Poco HTTP session is holder of response stream. + std::shared_ptr session; +}; + +} diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index 2c75a137222..2d01416fe57 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -12,9 +12,7 @@ # include # include # include -# include # include -# include # include # include # include From 74ea867b1dee09001975b41f3b0ceead0a3a1b97 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Mon, 8 Jun 2020 23:17:39 +0300 Subject: [PATCH 29/33] Fix includes in S3Common.cpp --- src/IO/S3Common.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index 2d01416fe57..2c75a137222 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -12,7 +12,9 @@ # include # include # include +# include # include +# include # include # include # include From c3d0b351956f9f1623f3e7eff368caf10f6546f4 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Tue, 9 Jun 2020 01:02:05 +0300 Subject: [PATCH 30/33] Fix includes in S3Common.cpp --- src/IO/S3/PocoHTTPClient.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index f2d44e8d93a..0dfa80ca107 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include From 223f45685f93e50f04c5e306d67339d54e4ea497 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 8 Jun 2020 21:08:55 +0300 Subject: [PATCH 31/33] Review fixes --- src/Storages/StorageMergeTree.cpp | 27 ++++++++++++------- src/Storages/StorageReplicatedMergeTree.cpp | 6 ++--- .../01079_parallel_alter_modify_zookeeper.sh | 2 +- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4650485847c..15e662b27b5 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -141,16 +141,6 @@ void StorageMergeTree::shutdown() mutation_wait_event.notify_all(); } - try - { - clearOldPartsFromFilesystem(true); - } - catch (...) - { - /// Example: the case of readonly filesystem, we have failure removing old parts. - /// Should not prevent table shutdown. - tryLogCurrentException(log); - } merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); @@ -160,6 +150,23 @@ void StorageMergeTree::shutdown() if (moving_task_handle) global_context.getBackgroundMovePool().removeTask(moving_task_handle); + + + try + { + /// We clear all old parts after stopping all background operations. + /// It's important, because background operations can produce temporary + /// parts which will remove themselves in their descrutors. If so, we + /// may have race condition between our remove call and background + /// process. + clearOldPartsFromFilesystem(true); + } + catch (...) + { + /// Example: the case of readonly filesystem, we have failure removing old parts. + /// Should not prevent table shutdown. + tryLogCurrentException(log); + } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f2ac6678764..d109fa464b0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2997,10 +2997,10 @@ void StorageReplicatedMergeTree::shutdown() } data_parts_exchange_endpoint.reset(); - /// We clear all parts after stopping all background operations. It's + /// We clear all old parts after stopping all background operations. It's /// important, because background operations can produce temporary parts - /// which will remove themselfs in their descrutors. If so, we may have race - /// condition between our remove call and background process. + /// which will remove themselves in their descrutors. If so, we may have + /// race condition between our remove call and background process. clearOldPartsFromFilesystem(true); } diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index effc9f540a1..05ef4a1a675 100755 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -102,7 +102,7 @@ echo "Finishing alters" # This alter will finish all previous, but replica 1 maybe still not up-to-date. # If query will throw something, than we will sleep 1 and retry. If timeout -# happened we will silentrly go out of loop and probably fail tests in the +# happened we will silently go out of loop and probably fail tests in the # following for loop. # # 120 seconds is more than enough, but in rare cases for slow builds (debug, From 09b9a308cbf5fb20870e94eb72918c4cd873ce8d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 9 Jun 2020 02:55:53 +0300 Subject: [PATCH 32/33] Fix obvious race condition in test --- .../0_stateless/01268_procfs_metrics.sh | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/queries/0_stateless/01268_procfs_metrics.sh b/tests/queries/0_stateless/01268_procfs_metrics.sh index e258f7faafa..1367b68a61c 100755 --- a/tests/queries/0_stateless/01268_procfs_metrics.sh +++ b/tests/queries/0_stateless/01268_procfs_metrics.sh @@ -17,14 +17,16 @@ function read_numbers_func() function show_processes_func() { - sleep 0.1; - - # These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc - $CLICKHOUSE_CLIENT -q " - SELECT count() > 0 FROM system.processes\ - WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\ - SETTINGS max_threads = 1 - "; + while true; do + sleep 0.1; + + # These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc + $CLICKHOUSE_CLIENT -q " + SELECT count() > 0 FROM system.processes\ + WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\ + SETTINGS max_threads = 1 + " | grep '1' && break; + done } From 44b20eee96a9c6adc17baa5894984c2701ec0800 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Tue, 9 Jun 2020 10:23:35 +0300 Subject: [PATCH 33/33] Revert "S3 HTTP client - Avoid copying response stream into memory" --- contrib/aws | 2 +- src/IO/S3/PocoHTTPClient.cpp | 5 ++--- src/IO/S3/PocoHTTPResponseStream.cpp | 12 ------------ src/IO/S3/PocoHTTPResponseStream.h | 21 --------------------- 4 files changed, 3 insertions(+), 37 deletions(-) delete mode 100644 src/IO/S3/PocoHTTPResponseStream.cpp delete mode 100644 src/IO/S3/PocoHTTPResponseStream.h diff --git a/contrib/aws b/contrib/aws index 17e10c0fc77..f7d9ce39f41 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit 17e10c0fc77f22afe890fa6d1b283760e5edaa56 +Subproject commit f7d9ce39f41323300044567be007c233338bb94a diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 0dfa80ca107..b8de483a5a8 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -2,8 +2,6 @@ #include #include -#include -#include #include #include #include @@ -151,7 +149,8 @@ void PocoHTTPClient::MakeRequestInternal( response->SetClientErrorMessage(error_message); } else - response->GetResponseStream().SetUnderlyingStream(std::make_shared(session, response_body_stream)); + /// TODO: Do not copy whole stream. + Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody()); break; } diff --git a/src/IO/S3/PocoHTTPResponseStream.cpp b/src/IO/S3/PocoHTTPResponseStream.cpp deleted file mode 100644 index 0a198268f2e..00000000000 --- a/src/IO/S3/PocoHTTPResponseStream.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include "PocoHTTPResponseStream.h" - -#include - -namespace DB::S3 -{ -PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_) - : Aws::IStream(response_stream_.rdbuf()), session(std::move(session_)) -{ -} - -} diff --git a/src/IO/S3/PocoHTTPResponseStream.h b/src/IO/S3/PocoHTTPResponseStream.h deleted file mode 100644 index 8167ddc4346..00000000000 --- a/src/IO/S3/PocoHTTPResponseStream.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include -#include - -namespace DB::S3 -{ -/** - * Wrapper of IStream to store response stream and corresponding HTTP session. - */ -class PocoHTTPResponseStream : public Aws::IStream -{ -public: - PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_); - -private: - /// Poco HTTP session is holder of response stream. - std::shared_ptr session; -}; - -}