Remove experimental_use_processors setting, part 1. (#10924)

Remove experimental_use_processors setting, part 1.
This commit is contained in:
Nikolai Kochetov 2020-05-15 22:28:10 +03:00 committed by GitHub
parent 3105be4fea
commit fb38c2a30b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 25 additions and 44 deletions

View File

@ -385,8 +385,6 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.", 0) \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql' and 'odbc' table functions.", 0) \
\
M(SettingBool, experimental_use_processors, true, "Use processors pipeline.", 0) \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.", 0) \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.", 0) \
M(SettingBool, allow_introspection_functions, false, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.", 0) \

View File

@ -77,9 +77,6 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
context = copyContextAndApplySettings(path_to_settings, context, config);
/// Processors are not supported here yet.
context.setSetting("experimental_use_processors", false);
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context.makeQueryContext();
@ -134,7 +131,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
{
BlockIO res = executeQuery(load_all_query, context, true);
BlockIO res = executeQuery(load_all_query, context, true, QueryProcessingStage::Complete, false, false);
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
@ -147,7 +144,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
{
auto res = executeQuery(load_update_query, context, true);
auto res = executeQuery(load_update_query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
}
@ -194,7 +191,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
{
auto res = executeQuery(query, context, true);
auto res = executeQuery(query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(
res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
@ -209,7 +206,8 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
if (is_local)
{
Context query_context = context;
auto input_block = executeQuery(request, query_context, true).in;
auto input_block = executeQuery(request, query_context, true,
QueryProcessingStage::Complete, false, false).in;
return readInvalidateQuery(*input_block);
}
else

View File

@ -317,7 +317,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors();
bool use_processors = allow_processors && interpreter->canExecuteWithProcessors();
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())

View File

@ -29,21 +29,18 @@ def start_cluster():
def test_remote(start_cluster):
for flag in (0, 1):
node1.query("set experimental_use_processors = {}".format(flag))
node1.query("set distributed_aggregation_memory_efficient = 1, group_by_two_level_threshold = 1, group_by_two_level_threshold_bytes=1")
res = node1.query("select sum(a) from (SELECT B, uniqExact(A) a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY B)")
assert res == '200000\n'
node1.query("set distributed_aggregation_memory_efficient = 1, group_by_two_level_threshold = 1, group_by_two_level_threshold_bytes=1")
res = node1.query("select sum(a) from (SELECT B, uniqExact(A) a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY B)")
assert res == '200000\n'
node1.query("set distributed_aggregation_memory_efficient = 0")
res = node1.query("select sum(a) from (SELECT B, uniqExact(A) a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY B)")
assert res == '200000\n'
node1.query("set distributed_aggregation_memory_efficient = 0")
res = node1.query("select sum(a) from (SELECT B, uniqExact(A) a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY B)")
assert res == '200000\n'
node1.query("set distributed_aggregation_memory_efficient = 1, group_by_two_level_threshold = 1, group_by_two_level_threshold_bytes=1")
res = node1.query("SELECT fullHostName() AS h, uniqExact(A) AS a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY h ORDER BY h;")
assert res == 'node1\t100000\nnode2\t100000\n'
node1.query("set distributed_aggregation_memory_efficient = 1, group_by_two_level_threshold = 1, group_by_two_level_threshold_bytes=1")
res = node1.query("SELECT fullHostName() AS h, uniqExact(A) AS a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY h ORDER BY h;")
assert res == 'node1\t100000\nnode2\t100000\n'
node1.query("set distributed_aggregation_memory_efficient = 0")
res = node1.query("SELECT fullHostName() AS h, uniqExact(A) AS a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY h ORDER BY h;")
assert res == 'node1\t100000\nnode2\t100000\n'
node1.query("set distributed_aggregation_memory_efficient = 0")
res = node1.query("SELECT fullHostName() AS h, uniqExact(A) AS a FROM remote('node{1,2}', default.da_memory_efficient_shard) GROUP BY h ORDER BY h;")
assert res == 'node1\t100000\nnode2\t100000\n'

View File

@ -13,7 +13,7 @@ server_logs_file=${CLICKHOUSE_TMP}/$cur_name"_server.logs"
server_logs="--server_logs_file=$server_logs_file"
rm -f "$server_logs_file"
settings="$server_logs --log_queries=1 --log_query_threads=1 --log_profile_events=1 --log_query_settings=1 --experimental_use_processors=0"
settings="$server_logs --log_queries=1 --log_query_threads=1 --log_profile_events=1 --log_query_settings=1"
# Test insert logging on each block and checkPacket() method

View File

@ -1,8 +1,6 @@
DROP TABLE IF EXISTS cube;
CREATE TABLE cube(a String, b Int32, s Int32) ENGINE = Memory;
-- SET experimental_use_processors=1;
INSERT INTO cube VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20);
INSERT INTO cube VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO cube VALUES ('b', 2, 20), ('b', 2, 15);

View File

@ -20,7 +20,7 @@
{
"total": 1,
"arrayElement(k, 1)": null,
"arrayElement(k, 2)": 4
"arrayElement(k, 2)": 1
},
{
"total": 1,
@ -30,12 +30,12 @@
{
"total": 1,
"arrayElement(k, 1)": null,
"arrayElement(k, 2)": 1
"arrayElement(k, 2)": 3
},
{
"total": 1,
"arrayElement(k, 1)": null,
"arrayElement(k, 2)": 3
"arrayElement(k, 2)": 4
},
{
"total": 1,
@ -53,5 +53,5 @@
"rows": 5,
"rows_before_limit_at_least": 5
"rows_before_limit_at_least": 1000000
}

View File

@ -1,5 +1,4 @@
SET output_format_write_statistics = 0;
SET experimental_use_processors = 0;
select
sum(cnt) > 0 as total,
@ -14,6 +13,6 @@ select
limit 1000000
)
group by k with totals
order by total desc
order by k[2]
SETTINGS max_threads = 100, max_execution_time = 120
format JSON;

View File

@ -8,8 +8,4 @@ INSERT INTO TESTTABLE4 VALUES (0,'1','1'), (1,'0','1');
SELECT _id FROM TESTTABLE4 PREWHERE l IN (select '1') ORDER BY _id DESC LIMIT 10;
SET experimental_use_processors=1;
SELECT _id FROM TESTTABLE4 PREWHERE l IN (select '1') ORDER BY _id DESC LIMIT 10;
DROP TABLE TESTTABLE4;

View File

@ -1,8 +1,6 @@
DROP TABLE IF EXISTS d_numbers;
CREATE TABLE d_numbers (number UInt32) ENGINE = Distributed(test_cluster_two_shards, system, numbers, rand());
SET experimental_use_processors = 1;
SELECT '100' AS number FROM d_numbers AS n WHERE n.number = 100 LIMIT 2;
SET distributed_product_mode = 'local';

View File

@ -1,6 +1,5 @@
set send_logs_level = 'error';
set extremes = 1;
-- set experimental_use_processors=0;
select * from remote('127.0.0.1', numbers(2));
select '-';