mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 04:22:03 +00:00
Merge branch 'master' of github.com:yandex/ClickHouse
This commit is contained in:
commit
200765acd4
@ -145,9 +145,12 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
void IStorage::check(const Names & column_names) const
|
||||
void IStorage::check(const Names & column_names, bool include_virtuals) const
|
||||
{
|
||||
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
||||
NamesAndTypesList available_columns = getColumns().getAllPhysical();
|
||||
if (include_virtuals)
|
||||
available_columns.splice(available_columns.end(), getColumns().getVirtuals());
|
||||
|
||||
const String list_of_columns = listOfColumns(available_columns);
|
||||
|
||||
if (column_names.empty())
|
||||
|
@ -116,7 +116,7 @@ public: /// thread-unsafe part. lockStructure must be acquired
|
||||
|
||||
/// Verify that all the requested names are in the table and are set correctly:
|
||||
/// list of names is not empty and the names do not repeat.
|
||||
void check(const Names & column_names) const;
|
||||
void check(const Names & column_names, bool include_virtuals = false) const;
|
||||
|
||||
/// Check that all the requested names are in the table and have the correct types.
|
||||
void check(const NamesAndTypesList & columns) const;
|
||||
|
@ -56,32 +56,42 @@ void ReadBufferFromKafkaConsumer::commit()
|
||||
void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
|
||||
{
|
||||
{
|
||||
String message = "Subscribed to topics:";
|
||||
String message = "Already subscribed to topics:";
|
||||
for (const auto & topic : consumer->get_subscription())
|
||||
message += " " + topic;
|
||||
LOG_TRACE(log, message);
|
||||
}
|
||||
|
||||
{
|
||||
String message = "Assigned to topics:";
|
||||
String message = "Already assigned to topics:";
|
||||
for (const auto & toppar : consumer->get_assignment())
|
||||
message += " " + toppar.get_topic();
|
||||
LOG_TRACE(log, message);
|
||||
}
|
||||
|
||||
consumer->resume();
|
||||
|
||||
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
|
||||
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
|
||||
if (consumer->get_subscription().empty())
|
||||
// But due to the nature of async pause/resume/subscribe we can't guarantee any persistent state:
|
||||
// see https://github.com/edenhill/librdkafka/issues/2455
|
||||
while (consumer->get_subscription().empty())
|
||||
{
|
||||
consumer->pause(); // don't accidentally read any messages
|
||||
consumer->subscribe(topics);
|
||||
consumer->poll(5s);
|
||||
consumer->resume();
|
||||
stalled = false;
|
||||
|
||||
// FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up.
|
||||
// see https://github.com/edenhill/librdkafka/issues/2077
|
||||
try
|
||||
{
|
||||
consumer->subscribe(topics);
|
||||
if (nextImpl())
|
||||
break;
|
||||
|
||||
// FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up.
|
||||
// see https://github.com/edenhill/librdkafka/issues/2077
|
||||
}
|
||||
catch (cppkafka::HandleException & e)
|
||||
{
|
||||
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
|
||||
continue;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
stalled = false;
|
||||
|
@ -21,7 +21,7 @@ BlockInputStreams StorageValues::read(
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
check(column_names);
|
||||
check(column_names, true);
|
||||
|
||||
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res_block));
|
||||
}
|
||||
|
@ -122,6 +122,7 @@ def kafka_setup_teardown():
|
||||
|
||||
# Tests
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_settings_old_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -136,14 +137,15 @@ def test_kafka_settings_old_syntax(kafka_cluster):
|
||||
kafka_produce('old', messages)
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
@pytest.mark.skip(reason="fails for some reason")
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_settings_new_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -171,14 +173,15 @@ def test_kafka_settings_new_syntax(kafka_cluster):
|
||||
kafka_produce('new', messages)
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_csv_with_delimiter(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -196,14 +199,15 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
|
||||
kafka_produce('csv', messages)
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_tsv_with_delimiter(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -221,14 +225,15 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
|
||||
kafka_produce('tsv', messages)
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_json_without_delimiter(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -250,14 +255,15 @@ def test_kafka_json_without_delimiter(kafka_cluster):
|
||||
kafka_produce('json', [messages])
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_protobuf(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value String)
|
||||
@ -274,14 +280,15 @@ def test_kafka_protobuf(kafka_cluster):
|
||||
kafka_produce_protobuf_messages('pb', 21, 29)
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_materialized_view(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -305,19 +312,20 @@ def test_kafka_materialized_view(kafka_cluster):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('mv', messages)
|
||||
|
||||
for i in range(50):
|
||||
while True:
|
||||
result = instance.query('SELECT * FROM test.view')
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
kafka_check_result(result, True)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
|
||||
@pytest.mark.skip(reason="Hungs")
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(300)
|
||||
def test_kafka_flush_on_big_message(kafka_cluster):
|
||||
# Create batchs of messages of size ~100Kb
|
||||
kafka_messages = 1000
|
||||
@ -354,15 +362,20 @@ def test_kafka_flush_on_big_message(kafka_cluster):
|
||||
except kafka.errors.GroupCoordinatorNotAvailableError:
|
||||
continue
|
||||
|
||||
for i in range(50):
|
||||
while True:
|
||||
result = instance.query('SELECT count() FROM test.view')
|
||||
if int(result) == kafka_messages*batch_messages:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
|
||||
assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_virtual_columns(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -384,14 +397,15 @@ def test_kafka_virtual_columns(kafka_cluster):
|
||||
kafka_produce('virt1', [messages])
|
||||
|
||||
result = ''
|
||||
for i in range(50):
|
||||
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka')
|
||||
while True:
|
||||
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
kafka_check_result(result, True, 'test_kafka_virtual1.reference')
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -415,18 +429,18 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('virt2', messages)
|
||||
|
||||
for i in range(50):
|
||||
while True:
|
||||
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view')
|
||||
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
|
||||
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
|
@ -1009,7 +1009,7 @@ SELECT arrayReduce('simpleLinearRegression', [0, 1, 2, 3], [3, 4, 5, 6])
|
||||
## stochasticLinearRegression {#agg_functions-stochasticlinearregression}
|
||||
|
||||
|
||||
This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size and has few methods for updating weights ([simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
|
||||
This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size and has few methods for updating weights ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (used by default), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
|
||||
|
||||
### Parameters {#agg_functions-stochasticlinearregression-parameters}
|
||||
|
||||
@ -1022,7 +1022,7 @@ stochasticLinearRegression(1.0, 1.0, 10, 'SGD')
|
||||
1. `learning rate` is the coefficient on step length, when gradient descent step is performed. Too big learning rate may cause infinite weights of the model. Default is `0.00001`.
|
||||
2. `l2 regularization coefficient` which may help to prevent overfitting. Default is `0.1`.
|
||||
3. `mini-batch size` sets the number of elements, which gradients will be computed and summed to perform one step of gradient descent. Pure stochastic descent uses one element, however having small batches(about 10 elements) make gradient steps more stable. Default is `15`.
|
||||
4. `method for updating weights`, there are 3 of them: `SGD`, `Momentum`, `Nesterov`. `Momentum` and `Nesterov` require little bit more computations and memory, however they happen to be useful in terms of speed of convergance and stability of stochastic gradient methods. Default is `'SGD'`.
|
||||
4. `method for updating weights`, they are: `Adam` (by default), `SGD`, `Momentum`, `Nesterov`. `Momentum` and `Nesterov` require little bit more computations and memory, however they happen to be useful in terms of speed of convergance and stability of stochastic gradient methods.
|
||||
|
||||
|
||||
### Usage {#agg_functions-stochasticlinearregression-usage}
|
||||
|
@ -195,18 +195,21 @@ RENAME TABLE [db11.]name11 TO [db12.]name12, [db21.]name21 TO [db22.]name22, ...
|
||||
|
||||
All tables are renamed under global locking. Renaming tables is a light operation. If you indicated another database after TO, the table will be moved to this database. However, the directories with databases must reside in the same file system (otherwise, an error is returned).
|
||||
|
||||
## SET
|
||||
## SET {#query-set}
|
||||
|
||||
``` sql
|
||||
```sql
|
||||
SET param = value
|
||||
```
|
||||
|
||||
Allows you to set `param` to `value`. You can also make all the settings from the specified settings profile in a single query. To do this, specify 'profile' as the setting name. For more information, see the section "Settings".
|
||||
The setting is made for the session, or for the server (globally) if `GLOBAL` is specified.
|
||||
When making a global setting, the setting is not applied to sessions already running, including the current session. It will only be used for new sessions.
|
||||
Assigns `value` to the `param` configurations settings for the current session. You cannot change [server settings](../operations/server_settings/index.md) this way.
|
||||
|
||||
When the server is restarted, global settings made using `SET` are lost.
|
||||
To make settings that persist after a server restart, you can only use the server's config file.
|
||||
You can also set all the values from the specified settings profile in a single query.
|
||||
|
||||
```sql
|
||||
SET profile = 'profile-name-from-the-settings-file'
|
||||
```
|
||||
|
||||
For more information, see [Settings](../operations/settings/settings.md).
|
||||
|
||||
## SHOW CREATE TABLE
|
||||
|
||||
|
@ -878,7 +878,7 @@ SELECT arrayReduce('simpleLinearRegression', [0, 1, 2, 3], [3, 4, 5, 6])
|
||||
|
||||
## stochasticLinearRegression {#agg_functions-stochasticlinearregression}
|
||||
|
||||
Функция реализует стохастическую линейную регрессию. Поддерживает пользовательские параметры для скорости обучения, коэффициента регуляризации L2, размера mini-batch и имеет несколько методов обновления весов ([simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
|
||||
Функция реализует стохастическую линейную регрессию. Поддерживает пользовательские параметры для скорости обучения, коэффициента регуляризации L2, размера mini-batch и имеет несколько методов обновления весов ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (по умолчанию), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
|
||||
|
||||
### Параметры {#agg_functions-stochasticlinearregression-parameters}
|
||||
|
||||
@ -891,7 +891,8 @@ stochasticLinearRegression(1.0, 1.0, 10, 'SGD')
|
||||
1. Скорость обучения — коэффициент длины шага, при выполнении градиентного спуска. Слишком большая скорость обучения может привести к бесконечным весам модели. По умолчанию `0.00001`.
|
||||
2. Коэффициент регуляризации l2. Помогает предотвратить подгонку. По умолчанию `0.1`.
|
||||
3. Размер mini-batch задаёт количество элементов, чьи градиенты будут вычислены и просуммированы при выполнении одного шага градиентного спуска. Чистый стохастический спуск использует один элемент, однако использование mini-batch (около 10 элементов) делает градиентные шаги более стабильными. По умолчанию `15`.
|
||||
4. Метод обновления весов, можно выбрать один из следующих: `SGD`, `Momentum`, `Nesterov`. `Momentum` и `Nesterov` более требовательные к вычислительным ресурсам и памяти, однако они имеют высокую скорость схождения и остальные методы стохастического градиента. По умолчанию `SGD`.
|
||||
4. Метод обновления весов, можно выбрать один из следующих: `Adam` (по умолчанию), `SGD`, `Momentum`, `Nesterov`. `Momentum` и `Nesterov` более требовательные к вычислительным ресурсам и памяти, однако они имеют высокую скорость схождения и устойчивости методов стохастического градиента.
|
||||
|
||||
|
||||
### Использование {#agg_functions-stochasticlinearregression-usage}
|
||||
|
||||
@ -1005,4 +1006,3 @@ stochasticLogisticRegression(1.0, 1.0, 10, 'SGD')
|
||||
- [Отличие линейной от логистической регрессии](https://moredez.ru/q/51225972/)
|
||||
|
||||
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/agg_functions/reference/) <!--hide-->
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user