Merge branch 'master' into session-timezone-caveats

This commit is contained in:
Alexey Milovidov 2023-07-22 17:53:17 +03:00 committed by GitHub
commit 126ee1d886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 472 additions and 202 deletions

1
.gitignore vendored
View File

@ -161,6 +161,7 @@ tests/queries/0_stateless/test_*
tests/queries/0_stateless/*.binary
tests/queries/0_stateless/*.generated-expect
tests/queries/0_stateless/*.expect.history
tests/integration/**/_gen
# rust
/rust/**/target

View File

@ -135,4 +135,5 @@ ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'
EXPOSE 2375
ENTRYPOINT ["dockerd-entrypoint.sh"]
CMD ["sh", "-c", "pytest $PYTEST_OPTS"]
# To pass additional arguments (i.e. list of tests) use PYTEST_ADDOPTS
CMD ["sh", "-c", "pytest"]

View File

@ -1449,7 +1449,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
| %n | new-line character () | |
| %p | AM or PM designation | PM |
| %Q | Quarter (1-4) | 1 |
| %r | 12-hour HH:MM AM/PM time, equivalent to %H:%i %p | 10:30 PM |
| %r | 12-hour HH:MM AM/PM time, equivalent to %h:%i %p | 10:30 PM |
| %R | 24-hour HH:MM time, equivalent to %H:%i | 22:33 |
| %s | second (00-59) | 44 |
| %S | second (00-59) | 44 |

View File

@ -29,6 +29,7 @@ EnvironmentFile=-/etc/default/clickhouse
LimitCORE=infinity
LimitNOFILE=500000
CapabilityBoundingSet=CAP_NET_ADMIN CAP_IPC_LOCK CAP_SYS_NICE CAP_NET_BIND_SERVICE
AmbientCapabilities=CAP_NET_ADMIN CAP_IPC_LOCK CAP_SYS_NICE CAP_NET_BIND_SERVICE
[Install]
# ClickHouse should not start from the rescue shell (rescue.target).

View File

@ -1,6 +1,7 @@
#include <limits>
#include <Common/Exception.h>
#include <Common/PODArray.h>
#include <Common/checkStackSize.h>
#include <Common/OptimizedRegularExpression.h>
#define MIN_LENGTH_FOR_STRSTR 3
@ -50,6 +51,8 @@ const char * analyzeImpl(
bool & is_trivial,
Literals & global_alternatives)
{
checkStackSize();
/** The expression is trivial if all the metacharacters in it are escaped.
* The non-alternative string is
* a string outside parentheses,

View File

@ -49,6 +49,9 @@ struct CountSubstringsImpl
/// FIXME: suboptimal
memset(&res[0], 0, res.size() * sizeof(res[0]));
if (needle.empty())
return; // Return all zeros
/// Current index in the array of strings.
size_t i = 0;
@ -223,16 +226,19 @@ struct CountSubstringsImpl
const char * needle_beg = reinterpret_cast<const char *>(&needle_data[prev_needle_offset]);
size_t needle_size = needle_offsets[i] - prev_needle_offset - 1;
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(needle_beg, needle_size);
const UInt8 * end = reinterpret_cast<const UInt8 *>(haystack.data() + haystack.size());
const UInt8 * beg = reinterpret_cast<const UInt8 *>(Impl::advancePos(haystack.data(), reinterpret_cast<const char *>(end), start - 1));
const UInt8 * pos;
while ((pos = searcher.search(beg, end)) < end)
if (needle_size > 0)
{
++res[i];
beg = pos + needle_size;
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(needle_beg, needle_size);
const UInt8 * end = reinterpret_cast<const UInt8 *>(haystack.data() + haystack.size());
const UInt8 * beg = reinterpret_cast<const UInt8 *>(Impl::advancePos(haystack.data(), reinterpret_cast<const char *>(end), start - 1));
const UInt8 * pos;
while ((pos = searcher.search(beg, end)) < end)
{
++res[i];
beg = pos + needle_size;
}
}
}

View File

@ -73,3 +73,9 @@ target_link_libraries (snappy_read_buffer PRIVATE clickhouse_common_io)
clickhouse_add_executable (hadoop_snappy_read_buffer hadoop_snappy_read_buffer.cpp)
target_link_libraries (hadoop_snappy_read_buffer PRIVATE clickhouse_common_io)
if (TARGET ch_contrib::hdfs)
clickhouse_add_executable (read_buffer_from_hdfs read_buffer_from_hdfs.cpp)
target_link_libraries (read_buffer_from_hdfs PRIVATE dbms ch_contrib::hdfs)
endif ()

View File

@ -0,0 +1,25 @@
#include <iostream>
#include <memory>
#include <string>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <base/types.h>
#include <Common/Config/ConfigProcessor.h>
using namespace DB;
int main()
{
setenv("LIBHDFS3_CONF", "/path/to/hdfs-site.xml", true); /// NOLINT
String hdfs_uri = "hdfs://cluster_name";
String hdfs_file_path = "/path/to/hdfs/file";
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
ReadSettings read_settings;
ReadBufferFromHDFS read_buffer(hdfs_uri, hdfs_file_path, *config, read_settings, 2097152UL, false);
String download_path = "./download";
WriteBufferFromFile write_buffer(download_path);
copyData(read_buffer, write_buffer);
return 0;
}

View File

@ -870,13 +870,12 @@ void FileCache::loadMetadata()
}
size_t total_size = 0;
for (auto key_prefix_it = fs::directory_iterator{metadata.getBaseDirectory()};
key_prefix_it != fs::directory_iterator();)
for (auto key_prefix_it = fs::directory_iterator{metadata.getBaseDirectory()}; key_prefix_it != fs::directory_iterator();
key_prefix_it++)
{
const fs::path key_prefix_directory = key_prefix_it->path();
key_prefix_it++;
if (!fs::is_directory(key_prefix_directory))
if (!key_prefix_it->is_directory())
{
if (key_prefix_directory.filename() != "status")
{
@ -887,19 +886,19 @@ void FileCache::loadMetadata()
continue;
}
if (fs::is_empty(key_prefix_directory))
fs::directory_iterator key_it{key_prefix_directory};
if (key_it == fs::directory_iterator{})
{
LOG_DEBUG(log, "Removing empty key prefix directory: {}", key_prefix_directory.string());
fs::remove(key_prefix_directory);
continue;
}
for (fs::directory_iterator key_it{key_prefix_directory}; key_it != fs::directory_iterator();)
for (/* key_it already initialized to verify emptiness */; key_it != fs::directory_iterator(); key_it++)
{
const fs::path key_directory = key_it->path();
++key_it;
if (!fs::is_directory(key_directory))
if (!key_it->is_directory())
{
LOG_DEBUG(
log,
@ -908,7 +907,7 @@ void FileCache::loadMetadata()
continue;
}
if (fs::is_empty(key_directory))
if (fs::directory_iterator{key_directory} == fs::directory_iterator{})
{
LOG_DEBUG(log, "Removing empty key directory: {}", key_directory.string());
fs::remove(key_directory);

View File

@ -1461,15 +1461,24 @@ void Context::addQueryAccessInfo(
void Context::addQueryAccessInfo(const Names & partition_names)
{
if (isGlobalContext())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
}
std::lock_guard<std::mutex> lock(query_access_info.mutex);
for (const auto & partition_name : partition_names)
{
query_access_info.partitions.emplace(partition_name);
}
}
void Context::addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name)
{
if (!qualified_projection_name)
return;
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
std::lock_guard<std::mutex> lock(query_access_info.mutex);
query_access_info.projections.emplace(fmt::format(
"{}.{}", qualified_projection_name.storage_id.getFullTableName(), backQuoteIfNeed(qualified_projection_name.projection_name)));
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const

View File

@ -658,6 +658,14 @@ public:
const String & view_name = {});
void addQueryAccessInfo(const Names & partition_names);
struct QualifiedProjectionName
{
StorageID storage_id = StorageID::createEmpty();
String projection_name;
explicit operator bool() const { return !projection_name.empty(); }
};
void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name);
/// Supported factories for records in query_log
enum class QueryLogFactories

View File

@ -337,6 +337,11 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
LOG_TRACE(&Poco::Logger::get("JoinedTables"), "Can't use dictionary join: dictionary '{}' was not found", dictionary_name);
return nullptr;
}
if (dictionary->getSpecialKeyType() == DictionarySpecialKeyType::Range)
{
LOG_TRACE(&Poco::Logger::get("JoinedTables"), "Can't use dictionary join: dictionary '{}' is a range dictionary", dictionary_name);
return nullptr;
}
auto dictionary_kv = std::dynamic_pointer_cast<const IKeyValueEntity>(dictionary);
table_join->setStorageJoin(dictionary_kv);

View File

@ -223,10 +223,10 @@ public:
{
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
/// It's behaviour that was initially supported by clickhouse.
bool is_enbaled_by_default = val == JoinAlgorithm::DEFAULT
bool is_enabled_by_default = val == JoinAlgorithm::DEFAULT
|| val == JoinAlgorithm::HASH
|| val == JoinAlgorithm::DIRECT;
if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enbaled_by_default)
if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enabled_by_default)
return true;
return join_algorithm.isSet(val);
}

View File

@ -542,7 +542,8 @@ void trySetStorageInTableJoin(const QueryTreeNodePtr & table_expression, std::sh
if (!table_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
return;
if (auto storage_dictionary = std::dynamic_pointer_cast<StorageDictionary>(storage); storage_dictionary)
if (auto storage_dictionary = std::dynamic_pointer_cast<StorageDictionary>(storage);
storage_dictionary && storage_dictionary->getDictionary()->getSpecialKeyType() != DictionarySpecialKeyType::Range)
table_join->setStorageJoin(std::dynamic_pointer_cast<const IKeyValueEntity>(storage_dictionary->getDictionary()));
else if (auto storage_key_value = std::dynamic_pointer_cast<IKeyValueEntity>(storage); storage_key_value)
table_join->setStorageJoin(storage_key_value);

View File

@ -628,8 +628,16 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
// candidates.minmax_projection->block.dumpStructure());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
});
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts)
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
@ -661,7 +669,16 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
{
auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames());
Pipe pipe(std::make_shared<NullSource>(std::move(header)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
});
}
has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -183,7 +183,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!projection_reading)
{
Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(required_columns)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
});
}
bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -1761,6 +1761,10 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);

View File

@ -4,14 +4,19 @@
namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_)
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, ContextPtr context_, Context::QualifiedProjectionName qualified_projection_name_)
: ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_))
, context(std::move(context_))
, qualified_projection_name(std::move(qualified_projection_name_))
{
}
void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (context && context->hasQueryContext())
context->getQueryContext()->addQueryAccessInfo(qualified_projection_name);
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

View File

@ -1,4 +1,6 @@
#pragma once
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <QueryPipeline/Pipe.h>
@ -9,7 +11,8 @@ namespace DB
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_);
explicit ReadFromPreparedSource(
Pipe pipe_, ContextPtr context_ = nullptr, Context::QualifiedProjectionName qualified_projection_name_ = {});
String getName() const override { return "ReadFromPreparedSource"; }
@ -18,6 +21,7 @@ public:
protected:
Pipe pipe;
ContextPtr context;
Context::QualifiedProjectionName qualified_projection_name;
};
class ReadFromStorageStep : public ReadFromPreparedSource

View File

@ -89,7 +89,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
if (read_until_position < file_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", file_offset, read_until_position - 1);
num_bytes_to_read = read_until_position - file_offset;
num_bytes_to_read = std::min<size_t>(read_until_position - file_offset, internal_buffer.size());
}
else
{

View File

@ -61,7 +61,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
{
auto out = disk->writeFile(std::filesystem::path(path_prefix) / file_name, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings);
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time) << "\n";
<< "create time: " << LocalDateTime(create_time, DateLUT::serverTimezoneInstance()) << "\n";
*out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n";

View File

@ -48,7 +48,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
format_version = std::max<UInt8>(format_version, FORMAT_WITH_LOG_ENTRY_ID);
out << "format version: " << format_version << "\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr), DateLUT::serverTimezoneInstance()) << "\n"
<< "source replica: " << source_replica << '\n'
<< "block_id: " << escape << block_id << '\n';

View File

@ -12,7 +12,7 @@ namespace DB
void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
{
out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "create time: " << LocalDateTime(create_time ? create_time : time(nullptr), DateLUT::serverTimezoneInstance()) << "\n"
<< "source replica: " << source_replica << "\n"
<< "block numbers count: " << block_numbers.size() << "\n";

View File

@ -196,3 +196,8 @@ test_quota/test.py::test_tracking_quota
test_quota/test.py::test_users_xml_is_readonly
test_replicating_constants/test.py::test_different_versions
test_merge_tree_s3/test.py::test_heavy_insert_select_check_memory[node]
test_drop_is_lock_free/test.py::test_query_is_lock_free[detach table]
test_backward_compatibility/test_data_skipping_indices.py::test_index
test_backward_compatibility/test_convert_ordinary.py::test_convert_ordinary_to_atomic
test_backward_compatibility/test_memory_bound_aggregation.py::test_backward_compatability
test_odbc_interaction/test.py::test_postgres_insert

View File

@ -130,3 +130,4 @@
02581_share_big_sets_between_mutation_tasks_long
02581_share_big_sets_between_multiple_mutations_tasks_long
00992_system_parts_race_condition_zookeeper_long
02815_range_dict_no_direct_join

View File

@ -7,9 +7,11 @@ import json
import logging
import os
import random
import re
import shutil
import subprocess
import time
import shlex
import zlib # for crc32
@ -110,16 +112,36 @@ def get_counters(fname):
if not (".py::" in line and " " in line):
continue
line_arr = line.strip().split(" ")
line = line.strip()
# [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client
# ^^^^^^^^^^^^^
if line.strip().startswith("["):
line = re.sub("^\[[^\[\]]*\] \[[^\[\]]*\] ", "", line)
line_arr = line.split(" ")
if len(line_arr) < 2:
logging.debug("Strange line %s", line)
continue
# Lines like:
# [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client
# [gw3] [ 40%] PASSED test_replicated_users/test.py::test_rename_replicated[QUOTA]
state = line_arr[-2]
test_name = line_arr[-1]
#
# ERROR test_mysql_protocol/test.py::test_golang_client
# PASSED test_replicated_users/test.py::test_rename_replicated[QUOTA]
# PASSED test_drop_is_lock_free/test.py::test_query_is_lock_free[detach part]
#
state = line_arr.pop(0)
test_name = " ".join(line_arr)
# Normalize test names for lines like this:
#
# FAILED test_storage_s3/test.py::test_url_reconnect_in_the_middle - Exception
# FAILED test_distributed_ddl/test.py::test_default_database[configs] - AssertionError: assert ...
#
test_name = re.sub(
r"^(?P<test_name>[^\[\] ]+)(?P<test_param>\[[^\[\]]*\]|)(?P<test_error> - .*|)$",
r"\g<test_name>\g<test_param>",
test_name,
)
if state in counters:
counters[state].add(test_name)
@ -411,7 +433,7 @@ class ClickhouseIntegrationTestsRunner:
out_file_full = os.path.join(self.result_path, "runner_get_all_tests.log")
cmd = (
"cd {repo_path}/tests/integration && "
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' "
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} -- --setup-plan "
"| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' "
"| grep -v 'SKIPPED' | sort -u > {out_file}".format(
repo_path=repo_path,
@ -646,7 +668,7 @@ class ClickhouseIntegrationTestsRunner:
info_basename = test_group_str + "_" + str(i) + ".nfo"
info_path = os.path.join(repo_path, "tests/integration", info_basename)
test_cmd = " ".join([test for test in sorted(test_names)])
test_cmd = " ".join([shlex.quote(test) for test in sorted(test_names)])
parallel_cmd = (
" --parallel {} ".format(num_workers) if num_workers > 0 else ""
)
@ -655,7 +677,7 @@ class ClickhouseIntegrationTestsRunner:
# -E -- (E)rror
# -p -- (p)assed
# -s -- (s)kipped
cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} '-rfEps --run-id={} --color=no --durations=0 {}' | tee {}".format(
cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} -- -rfEps --run-id={} --color=no --durations=0 {} | tee {}".format(
repo_path,
self._get_runner_opts(),
image_cmd,
@ -766,6 +788,7 @@ class ClickhouseIntegrationTestsRunner:
and test not in counters["ERROR"]
and test not in counters["SKIPPED"]
and test not in counters["FAILED"]
and test not in counters["BROKEN"]
and "::" in test
):
counters["ERROR"].append(test)
@ -999,16 +1022,6 @@ class ClickhouseIntegrationTestsRunner:
if "(memory)" in self.params["context_name"]:
result_state = "success"
for res in test_result:
# It's not easy to parse output of pytest
# Especially when test names may contain spaces
# Do not allow it to avoid obscure failures
if " " not in res[0]:
continue
logging.warning("Found invalid test name with space: %s", res[0])
status_text = "Found test with invalid name, see main log"
result_state = "failure"
return result_state, status_text, test_result, []

View File

@ -11,6 +11,7 @@ import subprocess
import sys
import string
import random
import shlex
def random_str(length=6):
@ -135,9 +136,7 @@ def check_args_and_update_paths(args):
def docker_kill_handler_handler(signum, frame):
subprocess.check_call(
'docker ps --all --quiet --filter name={name} --format="{{{{.ID}}}}"'.format(
name=CONTAINER_NAME
),
"docker ps --all --quiet --filter name={name}".format(name=CONTAINER_NAME),
shell=True,
)
raise KeyboardInterrupt("Killed by Ctrl+C")
@ -407,8 +406,14 @@ if __name__ == "__main__":
if args.analyzer:
use_analyzer = "-e CLICKHOUSE_USE_NEW_ANALYZER=1"
pytest_opts = " ".join(args.pytest_args).replace("'", "\\'")
tests_list = " ".join(args.tests_list)
# NOTE: since pytest options is in the argument value already we need to additionally escape '"'
pytest_opts = " ".join(
map(lambda x: shlex.quote(x).replace('"', '\\"'), args.pytest_args)
)
tests_list = " ".join(
map(lambda x: shlex.quote(x).replace('"', '\\"'), args.tests_list)
)
cmd_base = (
f"docker run {net} {tty} --rm --name {CONTAINER_NAME} "
"--privileged --dns-search='.' " # since recent dns search leaks from host
@ -420,7 +425,7 @@ if __name__ == "__main__":
f"--volume={args.src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos "
f"--volume=/run:/run/host:ro {dockerd_internal_volume} {env_tags} {env_cleanup} "
f"-e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 {use_analyzer} -e PYTHONUNBUFFERED=1 "
f"-e PYTEST_OPTS='{parallel_args} {pytest_opts} {tests_list} {rand_args} -vvv'"
f'-e PYTEST_ADDOPTS="{parallel_args} {pytest_opts} {tests_list} {rand_args} -vvv"'
f" {DIND_INTEGRATION_TESTS_IMAGE_NAME}:{args.docker_image_version}"
)
@ -431,7 +436,7 @@ if __name__ == "__main__":
)
containers = subprocess.check_output(
f"docker ps --all --quiet --filter name={CONTAINER_NAME} --format={{{{.ID}}}}",
f"docker ps --all --quiet --filter name={CONTAINER_NAME}",
shell=True,
universal_newlines=True,
).splitlines()

View File

@ -992,6 +992,7 @@ def select_without_columns(clickhouse_node, mysql_node, service_name):
)
check_query(clickhouse_node, "SHOW TABLES FROM db FORMAT TSV", "t\n")
clickhouse_node.query("SYSTEM STOP MERGES db.t")
clickhouse_node.query("DROP VIEW IF EXISTS v")
clickhouse_node.query("CREATE VIEW v AS SELECT * FROM db.t")
mysql_node.query("INSERT INTO db.t VALUES (1, 1), (2, 2)")
mysql_node.query("DELETE FROM db.t WHERE a = 2;")

View File

@ -582,75 +582,83 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
def test_postgres_odbc_hashed_dictionary_with_schema(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
node1.exec_in_container(
["ss", "-K", "dport", "postgresql"], privileged=True, user="root"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))",
"world",
)
cursor.execute("truncate table clickhouse.test_table")
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
node1.exec_in_container(
["ss", "-K", "dport", "postgresql"], privileged=True, user="root"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))",
"world",
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_postgres_odbc_hashed_dictionary_no_tty_pipe_overflow(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(3, 3, 'xxx')")
for i in range(100):
try:
node1.query("system reload dictionary postgres_odbc_hashed", timeout=15)
except Exception as ex:
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(3, 3, 'xxx')")
for i in range(100):
try:
node1.query("system reload dictionary postgres_odbc_hashed", timeout=15)
except Exception as ex:
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))",
"xxx",
)
cursor.execute("truncate table clickhouse.test_table")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))",
"xxx",
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_no_connection_pooling(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.exec_in_container(["ss", "-K", "dport", "5432"], privileged=True, user="root")
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_nopool")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(2))",
"world",
)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.exec_in_container(
["ss", "-K", "dport", "5432"], privileged=True, user="root"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_nopool")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(2))",
"world",
)
# No open connections should be left because we don't use connection pooling.
assert "" == node1.exec_in_container(
["ss", "-H", "dport", "5432"], privileged=True, user="root"
)
cursor.execute("truncate table clickhouse.test_table")
# No open connections should be left because we don't use connection pooling.
assert "" == node1.exec_in_container(
["ss", "-H", "dport", "5432"], privileged=True, user="root"
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_postgres_insert(started_cluster):
@ -662,112 +670,119 @@ def test_postgres_insert(started_cluster):
# postgres .yml file). This is needed to check parsing, validation and
# reconstruction of connection string.
node1.query(
"create table pg_insert (id UInt64, column1 UInt8, column2 String) engine=ODBC('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
)
node1.query("insert into pg_insert values (1, 1, 'hello'), (2, 2, 'world')")
assert node1.query("select * from pg_insert") == "1\t1\thello\n2\t2\tworld\n"
node1.query(
"insert into table function odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table') format CSV 3,3,test"
)
node1.query(
"insert into table function odbc('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
" select number, number, 's' || toString(number) from numbers (4, 7)"
)
assert (
node1.query("select sum(column1), count(column1) from pg_insert") == "55\t10\n"
)
assert (
try:
node1.query(
"select sum(n), count(n) from (select (*,).1 as n from (select * from odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table')))"
"create table pg_insert (id UInt64, column1 UInt8, column2 String) engine=ODBC('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
)
== "55\t10\n"
)
node1.query("DROP TABLE pg_insert")
conn.cursor().execute("truncate table clickhouse.test_table")
node1.query("insert into pg_insert values (1, 1, 'hello'), (2, 2, 'world')")
assert node1.query("select * from pg_insert") == "1\t1\thello\n2\t2\tworld\n"
node1.query(
"insert into table function odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table') format CSV 3,3,test"
)
node1.query(
"insert into table function odbc('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
" select number, number, 's' || toString(number) from numbers (4, 7)"
)
assert (
node1.query("select sum(column1), count(column1) from pg_insert")
== "55\t10\n"
)
assert (
node1.query(
"select sum(n), count(n) from (select (*,).1 as n from (select * from odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table')))"
)
== "55\t10\n"
)
finally:
node1.query("DROP TABLE IF EXISTS pg_insert")
conn.cursor().execute("truncate table clickhouse.test_table")
def test_odbc_postgres_date_data_type(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE clickhouse.test_date (id integer, column1 integer, column2 date)"
)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE clickhouse.test_date (id integer, column1 integer, column2 date)"
)
cursor.execute("INSERT INTO clickhouse.test_date VALUES (1, 1, '2020-12-01')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (2, 2, '2020-12-02')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (3, 3, '2020-12-03')")
conn.commit()
cursor.execute("INSERT INTO clickhouse.test_date VALUES (1, 1, '2020-12-01')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (2, 2, '2020-12-02')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (3, 3, '2020-12-03')")
conn.commit()
node1.query(
"""
CREATE TABLE test_date (id UInt64, column1 UInt64, column2 Date)
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_date')"""
)
node1.query(
"""
CREATE TABLE test_date (id UInt64, column1 UInt64, column2 Date)
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_date')"""
)
expected = "1\t1\t2020-12-01\n2\t2\t2020-12-02\n3\t3\t2020-12-03\n"
result = node1.query("SELECT * FROM test_date")
assert result == expected
cursor.execute("DROP TABLE clickhouse.test_date")
node1.query("DROP TABLE test_date")
expected = "1\t1\t2020-12-01\n2\t2\t2020-12-02\n3\t3\t2020-12-03\n"
result = node1.query("SELECT * FROM test_date")
assert result == expected
finally:
cursor.execute("DROP TABLE clickhouse.test_date")
node1.query("DROP TABLE IF EXISTS test_date")
def test_odbc_postgres_conversions(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE clickhouse.test_types (
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
h timestamp)"""
)
cursor.execute(
"""CREATE TABLE clickhouse.test_types (
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
h timestamp)"""
)
node1.query(
"""
INSERT INTO TABLE FUNCTION
odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
VALUES (-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12')"""
)
node1.query(
"""
INSERT INTO TABLE FUNCTION
odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
VALUES (-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12')"""
)
result = node1.query(
"""
SELECT a, b, c, d, e, f, g, h
FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
"""
)
result = node1.query(
"""
SELECT a, b, c, d, e, f, g, h
FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
"""
)
assert (
result
== "-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\n"
)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
assert (
result
== "-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\n"
)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
cursor.execute(
"""CREATE TABLE clickhouse.test_types (column1 Timestamp, column2 Numeric)"""
)
cursor.execute(
"""CREATE TABLE clickhouse.test_types (column1 Timestamp, column2 Numeric)"""
)
node1.query(
"""
CREATE TABLE test_types (column1 DateTime64, column2 Decimal(5, 1))
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')"""
)
node1.query(
"""
CREATE TABLE test_types (column1 DateTime64, column2 Decimal(5, 1))
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')"""
)
node1.query(
"""INSERT INTO test_types
SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"""
)
node1.query(
"""INSERT INTO test_types
SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"""
)
expected = node1.query(
"SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"
)
result = node1.query("SELECT * FROM test_types")
cursor.execute("DROP TABLE clickhouse.test_types")
node1.query("DROP TABLE test_types")
assert result == expected
expected = node1.query(
"SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"
)
result = node1.query("SELECT * FROM test_types")
assert result == expected
finally:
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
node1.query("DROP TABLE IF EXISTS test_types")
def test_odbc_cyrillic_with_varchar(started_cluster):

View File

@ -7,6 +7,11 @@ empty
0
0
0
0
0
0
0
0
char
1
2

View File

@ -12,6 +12,11 @@ select 'empty';
select countSubstrings('', '.');
select countSubstrings('', '');
select countSubstrings('.', '');
select countSubstrings(toString(number), '') from numbers(1);
select countSubstrings('', toString(number)) from numbers(1);
select countSubstrings('aaa', materialize(''));
select countSubstrings(materialize('aaa'), '');
select countSubstrings(materialize('aaa'), materialize(''));
select 'char';
select countSubstrings('foobar.com', '.');

View File

@ -0,0 +1,3 @@
t.t_normal
t.t_agg
t._minmax_count_projection

View File

@ -0,0 +1,66 @@
set log_queries=1;
set log_queries_min_type='QUERY_FINISH';
set optimize_use_implicit_projections=1;
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`id` UInt64,
`id2` UInt64,
`id3` UInt64,
PROJECTION t_normal
(
SELECT
id,
id2,
id3
ORDER BY
id2,
id,
id3
),
PROJECTION t_agg
(
SELECT
sum(id3)
GROUP BY id2
)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8;
insert into t SELECT number, -number, number FROM numbers(10000);
SELECT * FROM t WHERE id2 = 3 FORMAT Null;
SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;
SELECT min(id) FROM t FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT * FROM t WHERE id2 = 3 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT min(id) FROM t FORMAT Null;';
DROP TABLE t;

View File

@ -0,0 +1,12 @@
1 0.1
1 0.2
2 0.3
2 0.4
3 0.5
3 0.6
1 0.1
1 0.2
2 0.3
2 0.4
3 0.5
3 0.6

View File

@ -0,0 +1,34 @@
CREATE TABLE discounts
(
advertiser_id UInt64,
discount_start_date Date,
discount_end_date Nullable(Date),
amount Float64
)
ENGINE = Memory;
INSERT INTO discounts VALUES (1, '2015-01-01', Null, 0.1);
INSERT INTO discounts VALUES (1, '2015-01-15', Null, 0.2);
INSERT INTO discounts VALUES (2, '2015-01-01', '2015-01-15', 0.3);
INSERT INTO discounts VALUES (2, '2015-01-04', '2015-01-10', 0.4);
INSERT INTO discounts VALUES (3, '1970-01-01', '2015-01-15', 0.5);
INSERT INTO discounts VALUES (3, '1970-01-01', '2015-01-10', 0.6);
CREATE DICTIONARY discounts_dict
(
advertiser_id UInt64,
discount_start_date Date,
discount_end_date Nullable(Date),
amount Float64
)
PRIMARY KEY advertiser_id
SOURCE(CLICKHOUSE(TABLE discounts))
LIFETIME(MIN 600 MAX 900)
LAYOUT(RANGE_HASHED(RANGE_LOOKUP_STRATEGY 'max'))
RANGE(MIN discount_start_date MAX discount_end_date);
CREATE TABLE ids (id UInt64) ENGINE = Memory;
INSERT INTO ids SELECT * FROM numbers(10);
SELECT id, amount FROM ids INNER JOIN discounts_dict ON id = advertiser_id ORDER BY id, amount SETTINGS join_algorithm = 'direct';
SELECT id, amount FROM ids INNER JOIN discounts_dict ON id = advertiser_id ORDER BY id, amount SETTINGS allow_experimental_analyzer = 1;

View File

@ -0,0 +1 @@
SELECT match('', repeat('(', 100000)); -- { serverError 306 }