From 1bf6e034e8157966ac039c7932f8d677f2f3779f Mon Sep 17 00:00:00 2001 From: Ivan <5627721+abyss7@users.noreply.github.com> Date: Wed, 7 Aug 2019 19:10:14 +0300 Subject: [PATCH 1/3] Fix infinite loop when reading Kafka messages (#6354) * Do not pause/resume consumer at all * Fix kafka tests * Try to ensure the subscription * Set timeout for kafka tests and return 'while True' * Update cluster.py * When doing a raw select from kafka, ignore client errors. They may rise due to 'Local: Timed out' while subscribing. --- dbms/src/Storages/IStorage.cpp | 7 +- dbms/src/Storages/IStorage.h | 2 +- .../Kafka/ReadBufferFromKafkaConsumer.cpp | 32 +++++--- dbms/src/Storages/StorageValues.cpp | 2 +- .../integration/test_storage_kafka/test.py | 76 +++++++++++-------- 5 files changed, 73 insertions(+), 46 deletions(-) diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index 687ca970311..1504df4f68d 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -145,9 +145,12 @@ namespace } } -void IStorage::check(const Names & column_names) const +void IStorage::check(const Names & column_names, bool include_virtuals) const { - const NamesAndTypesList & available_columns = getColumns().getAllPhysical(); + NamesAndTypesList available_columns = getColumns().getAllPhysical(); + if (include_virtuals) + available_columns.splice(available_columns.end(), getColumns().getVirtuals()); + const String list_of_columns = listOfColumns(available_columns); if (column_names.empty()) diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 7d259f289ee..3f38dc08b83 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -116,7 +116,7 @@ public: /// thread-unsafe part. lockStructure must be acquired /// Verify that all the requested names are in the table and are set correctly: /// list of names is not empty and the names do not repeat. - void check(const Names & column_names) const; + void check(const Names & column_names, bool include_virtuals = false) const; /// Check that all the requested names are in the table and have the correct types. void check(const NamesAndTypesList & columns) const; diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index db3de302dd8..01fd09db7e3 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -56,32 +56,42 @@ void ReadBufferFromKafkaConsumer::commit() void ReadBufferFromKafkaConsumer::subscribe(const Names & topics) { { - String message = "Subscribed to topics:"; + String message = "Already subscribed to topics:"; for (const auto & topic : consumer->get_subscription()) message += " " + topic; LOG_TRACE(log, message); } { - String message = "Assigned to topics:"; + String message = "Already assigned to topics:"; for (const auto & toppar : consumer->get_assignment()) message += " " + toppar.get_topic(); LOG_TRACE(log, message); } - consumer->resume(); - // While we wait for an assignment after subscribtion, we'll poll zero messages anyway. // If we're doing a manual select then it's better to get something after a wait, then immediate nothing. - if (consumer->get_subscription().empty()) + // But due to the nature of async pause/resume/subscribe we can't guarantee any persistent state: + // see https://github.com/edenhill/librdkafka/issues/2455 + while (consumer->get_subscription().empty()) { - consumer->pause(); // don't accidentally read any messages - consumer->subscribe(topics); - consumer->poll(5s); - consumer->resume(); + stalled = false; - // FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up. - // see https://github.com/edenhill/librdkafka/issues/2077 + try + { + consumer->subscribe(topics); + if (nextImpl()) + break; + + // FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up. + // see https://github.com/edenhill/librdkafka/issues/2077 + } + catch (cppkafka::HandleException & e) + { + if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) + continue; + throw; + } } stalled = false; diff --git a/dbms/src/Storages/StorageValues.cpp b/dbms/src/Storages/StorageValues.cpp index d289a4d6579..79d1641f6c2 100644 --- a/dbms/src/Storages/StorageValues.cpp +++ b/dbms/src/Storages/StorageValues.cpp @@ -21,7 +21,7 @@ BlockInputStreams StorageValues::read( size_t /*max_block_size*/, unsigned /*num_streams*/) { - check(column_names); + check(column_names, true); return BlockInputStreams(1, std::make_shared(res_block)); } diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 9be725d33b7..f066dc34a7f 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -122,6 +122,7 @@ def kafka_setup_teardown(): # Tests +@pytest.mark.timeout(60) def test_kafka_settings_old_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -136,14 +137,15 @@ def test_kafka_settings_old_syntax(kafka_cluster): kafka_produce('old', messages) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) -@pytest.mark.skip(reason="fails for some reason") + +@pytest.mark.timeout(60) def test_kafka_settings_new_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -171,14 +173,15 @@ def test_kafka_settings_new_syntax(kafka_cluster): kafka_produce('new', messages) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) +@pytest.mark.timeout(60) def test_kafka_csv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -196,14 +199,15 @@ def test_kafka_csv_with_delimiter(kafka_cluster): kafka_produce('csv', messages) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) +@pytest.mark.timeout(60) def test_kafka_tsv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -221,14 +225,15 @@ def test_kafka_tsv_with_delimiter(kafka_cluster): kafka_produce('tsv', messages) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) +@pytest.mark.timeout(60) def test_kafka_json_without_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -250,14 +255,15 @@ def test_kafka_json_without_delimiter(kafka_cluster): kafka_produce('json', [messages]) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) +@pytest.mark.timeout(60) def test_kafka_protobuf(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) @@ -274,14 +280,15 @@ def test_kafka_protobuf(kafka_cluster): kafka_produce_protobuf_messages('pb', 21, 29) result = '' - for i in range(50): - result += instance.query('SELECT * FROM test.kafka') + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break - time.sleep(0.5) + kafka_check_result(result, True) +@pytest.mark.timeout(60) def test_kafka_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; @@ -305,19 +312,20 @@ def test_kafka_materialized_view(kafka_cluster): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mv', messages) - for i in range(50): + while True: result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): break - time.sleep(0.5) - kafka_check_result(result, True) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') -@pytest.mark.skip(reason="Hungs") + kafka_check_result(result, True) + + +@pytest.mark.timeout(300) def test_kafka_flush_on_big_message(kafka_cluster): # Create batchs of messages of size ~100Kb kafka_messages = 1000 @@ -354,15 +362,20 @@ def test_kafka_flush_on_big_message(kafka_cluster): except kafka.errors.GroupCoordinatorNotAvailableError: continue - for i in range(50): + while True: result = instance.query('SELECT count() FROM test.view') if int(result) == kafka_messages*batch_messages: break - time.sleep(0.5) + + instance.query(''' + DROP TABLE test.consumer; + DROP TABLE test.view; + ''') assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) +@pytest.mark.timeout(60) def test_kafka_virtual_columns(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) @@ -384,14 +397,15 @@ def test_kafka_virtual_columns(kafka_cluster): kafka_produce('virt1', [messages]) result = '' - for i in range(50): - result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka') + while True: + result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka', ignore_error=True) if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break - time.sleep(0.5) + kafka_check_result(result, True, 'test_kafka_virtual1.reference') +@pytest.mark.timeout(60) def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; @@ -415,18 +429,18 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('virt2', messages) - for i in range(50): + while True: result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view') if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break - time.sleep(0.5) - kafka_check_result(result, True, 'test_kafka_virtual2.reference') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') + kafka_check_result(result, True, 'test_kafka_virtual2.reference') + if __name__ == '__main__': cluster.start() From c5b25b23e3092b815c8eaa58cb832bfc1c7423db Mon Sep 17 00:00:00 2001 From: BayoNet Date: Wed, 7 Aug 2019 20:06:56 +0300 Subject: [PATCH 2/3] DOCAPI-8016: Adam weights mention in docs * DOCAPI-8016: Added mention about Adam method of updating weights in regressions. * DOCAPI-8016: RU translation. --- docs/en/query_language/agg_functions/reference.md | 4 ++-- docs/ru/query_language/agg_functions/reference.md | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/en/query_language/agg_functions/reference.md b/docs/en/query_language/agg_functions/reference.md index f9cb88c0113..350803f5aef 100644 --- a/docs/en/query_language/agg_functions/reference.md +++ b/docs/en/query_language/agg_functions/reference.md @@ -1009,7 +1009,7 @@ SELECT arrayReduce('simpleLinearRegression', [0, 1, 2, 3], [3, 4, 5, 6]) ## stochasticLinearRegression {#agg_functions-stochasticlinearregression} -This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size and has few methods for updating weights ([simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)). +This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size and has few methods for updating weights ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (used by default), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)). ### Parameters {#agg_functions-stochasticlinearregression-parameters} @@ -1022,7 +1022,7 @@ stochasticLinearRegression(1.0, 1.0, 10, 'SGD') 1. `learning rate` is the coefficient on step length, when gradient descent step is performed. Too big learning rate may cause infinite weights of the model. Default is `0.00001`. 2. `l2 regularization coefficient` which may help to prevent overfitting. Default is `0.1`. 3. `mini-batch size` sets the number of elements, which gradients will be computed and summed to perform one step of gradient descent. Pure stochastic descent uses one element, however having small batches(about 10 elements) make gradient steps more stable. Default is `15`. -4. `method for updating weights`, there are 3 of them: `SGD`, `Momentum`, `Nesterov`. `Momentum` and `Nesterov` require little bit more computations and memory, however they happen to be useful in terms of speed of convergance and stability of stochastic gradient methods. Default is `'SGD'`. +4. `method for updating weights`, they are: `Adam` (by default), `SGD`, `Momentum`, `Nesterov`. `Momentum` and `Nesterov` require little bit more computations and memory, however they happen to be useful in terms of speed of convergance and stability of stochastic gradient methods. ### Usage {#agg_functions-stochasticlinearregression-usage} diff --git a/docs/ru/query_language/agg_functions/reference.md b/docs/ru/query_language/agg_functions/reference.md index fca564b7a14..12308169f9a 100644 --- a/docs/ru/query_language/agg_functions/reference.md +++ b/docs/ru/query_language/agg_functions/reference.md @@ -878,7 +878,7 @@ SELECT arrayReduce('simpleLinearRegression', [0, 1, 2, 3], [3, 4, 5, 6]) ## stochasticLinearRegression {#agg_functions-stochasticlinearregression} -Функция реализует стохастическую линейную регрессию. Поддерживает пользовательские параметры для скорости обучения, коэффициента регуляризации L2, размера mini-batch и имеет несколько методов обновления весов ([simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)). +Функция реализует стохастическую линейную регрессию. Поддерживает пользовательские параметры для скорости обучения, коэффициента регуляризации L2, размера mini-batch и имеет несколько методов обновления весов ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (по умолчанию), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)). ### Параметры {#agg_functions-stochasticlinearregression-parameters} @@ -891,7 +891,8 @@ stochasticLinearRegression(1.0, 1.0, 10, 'SGD') 1. Скорость обучения — коэффициент длины шага, при выполнении градиентного спуска. Слишком большая скорость обучения может привести к бесконечным весам модели. По умолчанию `0.00001`. 2. Коэффициент регуляризации l2. Помогает предотвратить подгонку. По умолчанию `0.1`. 3. Размер mini-batch задаёт количество элементов, чьи градиенты будут вычислены и просуммированы при выполнении одного шага градиентного спуска. Чистый стохастический спуск использует один элемент, однако использование mini-batch (около 10 элементов) делает градиентные шаги более стабильными. По умолчанию `15`. -4. Метод обновления весов, можно выбрать один из следующих: `SGD`, `Momentum`, `Nesterov`. `Momentum` и `Nesterov` более требовательные к вычислительным ресурсам и памяти, однако они имеют высокую скорость схождения и остальные методы стохастического градиента. По умолчанию `SGD`. +4. Метод обновления весов, можно выбрать один из следующих: `Adam` (по умолчанию), `SGD`, `Momentum`, `Nesterov`. `Momentum` и `Nesterov` более требовательные к вычислительным ресурсам и памяти, однако они имеют высокую скорость схождения и устойчивости методов стохастического градиента. + ### Использование {#agg_functions-stochasticlinearregression-usage} @@ -1005,4 +1006,3 @@ stochasticLogisticRegression(1.0, 1.0, 10, 'SGD') - [Отличие линейной от логистической регрессии](https://moredez.ru/q/51225972/) [Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/agg_functions/reference/) - From 7a0baefac0f63d987c08fa31223e770759af4e78 Mon Sep 17 00:00:00 2001 From: BayoNet Date: Wed, 7 Aug 2019 21:12:36 +0300 Subject: [PATCH 3/3] DOCAPI-7783: Update of the SET query documentation (#6165) * DOCAPI-7783: Update of the SET query documentation. --- docs/en/query_language/misc.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/en/query_language/misc.md b/docs/en/query_language/misc.md index 31bfea5dc4d..514f5d9f823 100644 --- a/docs/en/query_language/misc.md +++ b/docs/en/query_language/misc.md @@ -195,18 +195,21 @@ RENAME TABLE [db11.]name11 TO [db12.]name12, [db21.]name21 TO [db22.]name22, ... All tables are renamed under global locking. Renaming tables is a light operation. If you indicated another database after TO, the table will be moved to this database. However, the directories with databases must reside in the same file system (otherwise, an error is returned). -## SET +## SET {#query-set} -``` sql +```sql SET param = value ``` -Allows you to set `param` to `value`. You can also make all the settings from the specified settings profile in a single query. To do this, specify 'profile' as the setting name. For more information, see the section "Settings". -The setting is made for the session, or for the server (globally) if `GLOBAL` is specified. -When making a global setting, the setting is not applied to sessions already running, including the current session. It will only be used for new sessions. +Assigns `value` to the `param` configurations settings for the current session. You cannot change [server settings](../operations/server_settings/index.md) this way. -When the server is restarted, global settings made using `SET` are lost. -To make settings that persist after a server restart, you can only use the server's config file. +You can also set all the values from the specified settings profile in a single query. + +```sql +SET profile = 'profile-name-from-the-settings-file' +``` + +For more information, see [Settings](../operations/settings/settings.md). ## SHOW CREATE TABLE