Merge branch 'master' into compression-codecs-refactoring

This commit is contained in:
mergify[bot] 2021-06-06 18:02:39 +00:00 committed by GitHub
commit f84ac23951
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 107 additions and 39 deletions

View File

@ -83,6 +83,17 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
}
}
if (settings.offset)
{
new_settings.offset = 0;
new_settings.offset.changed = false;
}
if (settings.limit)
{
new_settings.limit = 0;
new_settings.limit.changed = false;
}
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;

View File

@ -66,8 +66,15 @@ DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std
std::string resolved_name = resolveDictionaryName(dictionary_name, query_context->getCurrentDatabase());
auto load_result = getLoadResult(resolved_name);
if (load_result.object)
{
const auto dictionary = std::static_pointer_cast<const IDictionary>(load_result.object);
return dictionary->getStructure();
}
if (!load_result.config)
throw Exception("Dictionary " + backQuote(dictionary_name) + " config not found", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary {} config not found", backQuote(dictionary_name));
return ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
}
@ -128,7 +135,7 @@ std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog
DictionaryStructure
ExternalDictionariesLoader::getDictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config)
{
return {config, key_in_config};
return DictionaryStructure(config, key_in_config);
}
DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const ObjectConfig & config)

View File

@ -224,8 +224,9 @@ namespace
}
}
}
else if (const auto * tuple_literal = right->as<ASTLiteral>();
tuple_literal && tuple_literal->value.getType() == Field::Types::Tuple)
else if (const auto * tuple_literal = right->as<ASTLiteral>(); tuple_literal)
{
if (tuple_literal->value.getType() == Field::Types::Tuple)
{
const auto & tuple = tuple_literal->value.get<const Tuple &>();
for (const auto & child : tuple)
@ -243,6 +244,9 @@ namespace
}
}
}
else
return analyzeEquals(identifier, tuple_literal, expr);
}
else
{
return {};

View File

@ -609,7 +609,7 @@ void StorageDistributed::read(
ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
modified_query_ast, local_context, query_info,
sharding_key_expr, sharding_key_column_name,
getCluster());
query_info.cluster);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())

View File

@ -2647,24 +2647,6 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
{
String source_path = fs::path(zookeeper_path) / "replicas" / source_replica;
/** TODO: it will be deleted! (It is only to support old version of CH server).
* In current code, the replica is created in single transaction.
* If the reference/master replica is not yet fully created, let's wait.
*/
while (!zookeeper->exists(fs::path(source_path) / "columns"))
{
LOG_INFO(log, "Waiting for replica {} to be fully created", source_path);
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(fs::path(source_path) / "columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
break;
}
event->wait();
}
/// The order of the following three actions is important.
Strings source_queue_names;

View File

@ -170,7 +170,7 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
return testcase_args
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
client = args.client
client = args.testcase_client
start_time = args.testcase_start_time
database = args.testcase_database

View File

@ -1320,7 +1320,8 @@ def test_librdkafka_compression(kafka_cluster):
logging.debug(('Check compression {}'.format(compression_type)))
topic_name = 'test_librdkafka_compression_{}'.format(compression_type)
admin_client = admin.AdminClient({'bootstrap.servers': 'localhost:9092'})
admin_client = admin.AdminClient({'bootstrap.servers': "localhost:{}".format(kafka_cluster.kafka_port)})
topic = admin.NewTopic(topic=topic_name, num_partitions=1, replication_factor=1, config={
'compression.type': compression_type,
})

View File

@ -12,6 +12,8 @@ WITH CAST(\'default\', \'String\') AS id_2 SELECT one.dummy, ignore(id_2) FROM s
optimize_skip_unused_shards_rewrite_in(0,)
0 0
WITH CAST(\'default\', \'String\') AS id_0 SELECT one.dummy, ignore(id_0) FROM system.one WHERE dummy IN tuple(0)
0
0
errors
others
0

View File

@ -81,18 +81,18 @@ select query from system.query_log where
type = 'QueryFinish'
order by query;
-- not tuple
select * from dist_01756 where dummy in (0);
select * from dist_01756 where dummy in ('0');
--
-- errors
--
select 'errors';
-- not tuple
select * from dist_01756 where dummy in (0); -- { serverError 507 }
-- optimize_skip_unused_shards does not support non-constants
select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 }
select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 }
-- wrong type (tuple)
select * from dist_01756 where dummy in ('0'); -- { serverError 507 }
-- intHash64 does not accept string
select * from dist_01756 where dummy in ('0', '2'); -- { serverError 43 }
-- NOT IN does not supported

View File

@ -0,0 +1,14 @@
limit 0
limit 1
limit 2
limit 3
limit 4
offset 5
offset 6
offset 7
offset 8
offset 9
limit w/ GROUP BY 4 4
limit w/ GROUP BY 4 3
limit/offset w/ GROUP BY 4 2
limit/offset w/ GROUP BY 4 1

View File

@ -0,0 +1,30 @@
SELECT 'limit', * FROM remote('127.1', view(SELECT * FROM numbers(10))) SETTINGS limit=5;
SELECT 'offset', * FROM remote('127.1', view(SELECT * FROM numbers(10))) SETTINGS offset=5;
SELECT
'limit w/ GROUP BY',
count(),
number
FROM remote('127.{1,2}', view(
SELECT intDiv(number, 2) AS number
FROM numbers(10)
))
GROUP BY number
ORDER BY
count() ASC,
number DESC
SETTINGS limit=2;
SELECT
'limit/offset w/ GROUP BY',
count(),
number
FROM remote('127.{1,2}', view(
SELECT intDiv(number, 2) AS number
FROM numbers(10)
))
GROUP BY number
ORDER BY
count() ASC,
number DESC
SETTINGS limit=2, offset=2;

View File

@ -0,0 +1,15 @@
set optimize_skip_unused_shards=1;
set force_optimize_skip_unused_shards=1;
drop table if exists d;
drop table if exists dp;
create table d (i UInt8) Engine=Memory;
create table dp as d Engine=Distributed(test_cluster_two_shards, currentDatabase(), d, i);
insert into d values (1), (2);
select * from dp where i in (1);
drop table if exists d;
drop table if exists dp;

View File

@ -240,3 +240,4 @@
01880_remote_ipv6
01882_scalar_subquery_exception
01882_check_max_parts_to_merge_at_once
01892_setting_limit_offset_distributed