Merge branch 'master' into mutation-stuck

This commit is contained in:
mergify[bot] 2021-07-30 08:42:45 +00:00 committed by GitHub
commit 41273ef5f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 152 additions and 65 deletions

View File

@ -19,9 +19,9 @@ Detailed description / Documentation draft:
...
By adding documentation, you'll allow users to try your new feature immediately, not when someone else will have time to document it later. Documentation is necessary for all features that affect user experience in any way. You can add brief documentation draft above, or add documentation right into your patch as Markdown files in [docs](https://github.com/ClickHouse/ClickHouse/tree/master/docs) folder.
> By adding documentation, you'll allow users to try your new feature immediately, not when someone else will have time to document it later. Documentation is necessary for all features that affect user experience in any way. You can add brief documentation draft above, or add documentation right into your patch as Markdown files in [docs](https://github.com/ClickHouse/ClickHouse/tree/master/docs) folder.
If you are doing this for the first time, it's recommended to read the lightweight [Contributing to ClickHouse Documentation](https://github.com/ClickHouse/ClickHouse/tree/master/docs/README.md) guide first.
> If you are doing this for the first time, it's recommended to read the lightweight [Contributing to ClickHouse Documentation](https://github.com/ClickHouse/ClickHouse/tree/master/docs/README.md) guide first.
Information about CI checks: https://clickhouse.tech/docs/en/development/continuous-integration/
> Information about CI checks: https://clickhouse.tech/docs/en/development/continuous-integration/

View File

@ -259,10 +259,25 @@ private:
Poco::Logger * log;
BaseDaemon & daemon;
void onTerminate(const std::string & message, UInt32 thread_num) const
void onTerminate(std::string_view message, UInt32 thread_num) const
{
size_t pos = message.find('\n');
LOG_FATAL(log, "(version {}{}, {}) (from thread {}) {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, message);
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, message.substr(0, pos));
/// Print trace from std::terminate exception line-by-line to make it easy for grep.
while (pos != std::string_view::npos)
{
++pos;
size_t next_pos = message.find('\n', pos);
size_t size = next_pos;
if (next_pos != std::string_view::npos)
size = next_pos - pos;
LOG_FATAL(log, "{}", message.substr(pos, size));
pos = next_pos;
}
}
void onFault(

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 976874b7aa7f422bf4ea595bb7d1166c617b1c26
Subproject commit 0ce9490093021c63564cca159571a8b27772ad48

View File

@ -22,6 +22,7 @@ set(SRCS
"${LIBRARY_DIR}/src/launcher.cxx"
"${LIBRARY_DIR}/src/srv_config.cxx"
"${LIBRARY_DIR}/src/snapshot_sync_req.cxx"
"${LIBRARY_DIR}/src/snapshot_sync_ctx.cxx"
"${LIBRARY_DIR}/src/handle_timeout.cxx"
"${LIBRARY_DIR}/src/handle_append_entries.cxx"
"${LIBRARY_DIR}/src/cluster_config.cxx"

View File

@ -20,6 +20,7 @@ def get_skip_list_cmd(path):
def get_options(i):
options = []
client_options = []
if 0 < i:
options.append("--order=random")
@ -27,25 +28,29 @@ def get_options(i):
options.append("--db-engine=Ordinary")
if i % 3 == 2:
options.append('''--client-option='allow_experimental_database_replicated=1' --db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i))
options.append('''--db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i))
client_options.append('allow_experimental_database_replicated=1')
# If database name is not specified, new database is created for each functional test.
# Run some threads with one database for all tests.
if i % 2 == 1:
options.append(" --database=test_{}".format(i))
if i % 7 == 0:
options.append(" --client-option='join_use_nulls=1'")
if i % 5 == 1:
client_options.append("join_use_nulls=1")
if i % 14 == 0:
options.append(' --client-option="join_algorithm=\'partial_merge\'"')
if i % 15 == 6:
client_options.append("join_algorithm='partial_merge'")
if i % 21 == 0:
options.append(' --client-option="join_algorithm=\'auto\'"')
options.append(' --client-option="max_rows_in_join=1000"')
if i % 15 == 11:
client_options.append("join_algorithm='auto'")
client_options.append('max_rows_in_join=1000')
if i == 13:
options.append(" --client-option='memory_tracker_fault_probability=0.00001'")
client_options.append('memory_tracker_fault_probability=0.001')
if client_options:
options.append(" --client-option " + ' '.join(client_options))
return ' '.join(options)

View File

@ -11,7 +11,6 @@ set (CLICKHOUSE_COPIER_LINK
clickhouse_functions
clickhouse_table_functions
clickhouse_aggregate_functions
clickhouse_dictionaries
string_utils
PUBLIC

View File

@ -6,7 +6,6 @@ set (CLICKHOUSE_LOCAL_LINK
clickhouse_aggregate_functions
clickhouse_common_config
clickhouse_common_io
clickhouse_dictionaries
clickhouse_functions
clickhouse_parsers
clickhouse_storages_system

View File

@ -13,7 +13,6 @@ set (CLICKHOUSE_SERVER_LINK
clickhouse_common_config
clickhouse_common_io
clickhouse_common_zookeeper
clickhouse_dictionaries
clickhouse_functions
clickhouse_parsers
clickhouse_storages_system

View File

@ -3,6 +3,7 @@
#include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>

View File

@ -3,6 +3,7 @@
#include <Client/HedgedConnections.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context.h>
namespace ProfileEvents
{
@ -21,13 +22,14 @@ namespace ErrorCodes
HedgedConnections::HedgedConnections(
const ConnectionPoolWithFailoverPtr & pool_,
const Settings & settings_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler_,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_)
: hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_)
, settings(settings_)
: hedged_connections_factory(pool_, &context_->getSettingsRef(), timeouts_, table_to_check_)
, context(std::move(context_))
, settings(context->getSettingsRef())
, drain_timeout(settings.drain_timeout)
, allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet)
, throttler(throttler_)

View File

@ -72,7 +72,7 @@ public:
};
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
const Settings & settings_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler,
PoolMode pool_mode,
@ -188,6 +188,7 @@ private:
Packet last_received_packet;
Epoll epoll;
ContextPtr context;
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of

View File

@ -102,7 +102,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
return std::make_shared<HedgedConnections>(pool, current_settings, timeouts, throttler, pool_mode, table_to_check);
return std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
}
#endif

View File

@ -0,0 +1,15 @@
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h>
namespace DB
{
void formatBlock(BlockOutputStreamPtr & out, const Block & block)
{
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
}
}

View File

@ -0,0 +1,9 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
void formatBlock(BlockOutputStreamPtr & out, const Block & block);
}

View File

@ -1,7 +1,7 @@
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include "DictionaryStructure.h"
@ -18,14 +18,6 @@ namespace ErrorCodes
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
void formatBlock(BlockOutputStreamPtr & out, const Block & block)
{
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
}
/// For simple key
Block blockForIds(

View File

@ -13,15 +13,8 @@
namespace DB
{
class IBlockOutputStream;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
struct DictionaryStructure;
/// Write keys to block output stream.
void formatBlock(BlockOutputStreamPtr & out, const Block & block);
/// For simple key
Block blockForIds(

View File

@ -4,6 +4,7 @@
#include <common/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

View File

@ -3,6 +3,7 @@
#include <functional>
#include <common/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

View File

@ -1,6 +1,7 @@
#include "HTTPDictionarySource.h"
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>

View File

@ -3214,7 +3214,11 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
const auto & partition_ast = ast->as<ASTPartition &>();
if (!partition_ast.value)
{
if (!MergeTreePartInfo::validatePartitionID(partition_ast.id, format_version))
throw Exception("Invalid partition format: " + partition_ast.id, ErrorCodes::INVALID_PARTITION_VALUE);
return partition_ast.id;
}
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{

View File

@ -21,6 +21,40 @@ MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & part_name, Merg
}
bool MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTreeDataFormatVersion format_version)
{
if (partition_id.empty())
return false;
ReadBufferFromString in(partition_id);
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
UInt32 min_yyyymmdd = 0;
UInt32 max_yyyymmdd = 0;
if (!tryReadIntText(min_yyyymmdd, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_yyyymmdd, in)
|| !checkChar('_', in))
{
return false;
}
}
else
{
while (!in.eof())
{
char c;
readChar(c, in);
if (c == '_')
break;
}
}
return in.eof();
}
bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version)
{
ReadBufferFromString in(part_name);

View File

@ -86,6 +86,9 @@ struct MergeTreePartInfo
return static_cast<UInt64>(max_block - min_block + 1);
}
/// Simple sanity check for partition ID. Checking that it's not too long or too short, doesn't contain a lot of '_'.
static bool validatePartitionID(const String & partition_id, MergeTreeDataFormatVersion format_version);
static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version); // -V1071
static bool tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version);

View File

@ -175,7 +175,8 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
database = 'test_{suffix}'.format(suffix=random_str())
with open(stderr_file, 'w') as stderr:
clickhouse_proc_create = Popen(shlex.split(testcase_args.testcase_client), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True)
client_cmd = testcase_args.testcase_client + " " + get_additional_client_options(args)
clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True)
try:
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout)
except TimeoutExpired:
@ -937,7 +938,8 @@ def main(args):
def create_common_database(args, db_name):
create_database_retries = 0
while create_database_retries < MAX_RETRIES:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
client_cmd = args.client + " " + get_additional_client_options(args)
clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
(stdout, stderr) = clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name)))
if not need_retry(stdout, stderr):
break

View File

@ -6,20 +6,14 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT -n --query "
DROP USER IF EXISTS quoted_by_ip;
DROP USER IF EXISTS quoted_by_forwarded_ip;
CREATE USER quoted_by_ip_${CLICKHOUSE_DATABASE};
CREATE USER quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE};
DROP QUOTA IF EXISTS quota_by_ip;
DROP QUOTA IF EXISTS quota_by_forwarded_ip;
GRANT SELECT, CREATE ON *.* TO quoted_by_ip_${CLICKHOUSE_DATABASE};
GRANT SELECT, CREATE ON *.* TO quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE};
CREATE USER quoted_by_ip;
CREATE USER quoted_by_forwarded_ip;
GRANT SELECT, CREATE ON *.* TO quoted_by_ip;
GRANT SELECT, CREATE ON *.* TO quoted_by_forwarded_ip;
CREATE QUOTA quota_by_ip KEYED BY ip_address FOR RANDOMIZED INTERVAL 1 YEAR MAX QUERIES = 1 TO quoted_by_ip;
CREATE QUOTA quota_by_forwarded_ip KEYED BY forwarded_ip_address FOR RANDOMIZED INTERVAL 1 YEAR MAX QUERIES = 1 TO quoted_by_forwarded_ip;
CREATE QUOTA quota_by_ip_${CLICKHOUSE_DATABASE} KEYED BY ip_address FOR RANDOMIZED INTERVAL 1 YEAR MAX QUERIES = 1 TO quoted_by_ip_${CLICKHOUSE_DATABASE};
CREATE QUOTA quota_by_forwarded_ip_${CLICKHOUSE_DATABASE} KEYED BY forwarded_ip_address FOR RANDOMIZED INTERVAL 1 YEAR MAX QUERIES = 1 TO quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE};
"
# Note: the test can be flaky if the randomized interval will end while the loop is run. But with year long interval it's unlikely.
@ -28,39 +22,39 @@ CREATE QUOTA quota_by_forwarded_ip KEYED BY forwarded_ip_address FOR RANDOMIZED
echo '--- Test with quota by immediate IP ---'
while true; do
$CLICKHOUSE_CLIENT --user quoted_by_ip --query "SELECT count() FROM numbers(10)" 2>/dev/null || break
${CLICKHOUSE_CURL} --fail -sS "${CLICKHOUSE_URL}&user=quoted_by_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" 2>/dev/null || break
done | uniq
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=quoted_by_ip" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=quoted_by_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
# X-Forwarded-For is ignored for quota by immediate IP address
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_ip" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
echo '--- Test with quota by forwarded IP ---'
while true; do
$CLICKHOUSE_CLIENT --user quoted_by_forwarded_ip --query "SELECT count() FROM numbers(10)" 2>/dev/null || break
${CLICKHOUSE_CURL} --fail -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" 2>/dev/null || break
done | uniq
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
# X-Forwarded-For is respected for quota by forwarded IP address
while true; do
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip" -d "SELECT count() FROM numbers(10)" | grep -oP '^10$' || break
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oP '^10$' || break
done | uniq
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
# Only the last IP address is trusted
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 5.6.7.8, 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 5.6.7.8, 1.2.3.4' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)" | grep -oF 'exceeded'
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4, 5.6.7.8' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip" -d "SELECT count() FROM numbers(10)"
${CLICKHOUSE_CURL} -H 'X-Forwarded-For: 1.2.3.4, 5.6.7.8' -sS "${CLICKHOUSE_URL}&user=quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE}" -d "SELECT count() FROM numbers(10)"
$CLICKHOUSE_CLIENT -n --query "
DROP QUOTA IF EXISTS quota_by_ip;
DROP QUOTA IF EXISTS quota_by_ip_${CLICKHOUSE_DATABASE};
DROP QUOTA IF EXISTS quota_by_forwarded_ip;
DROP USER IF EXISTS quoted_by_ip;
DROP USER IF EXISTS quoted_by_forwarded_ip;
DROP USER IF EXISTS quoted_by_ip_${CLICKHOUSE_DATABASE};
DROP USER IF EXISTS quoted_by_forwarded_ip_${CLICKHOUSE_DATABASE};
"

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS broken_partition;
CREATE TABLE broken_partition
(
date Date,
key UInt64
)
ENGINE = ReplicatedMergeTree('/clickhouse/test_01925_{database}/rmt', 'r1')
ORDER BY tuple()
PARTITION BY date;
ALTER TABLE broken_partition DROP PARTITION ID '20210325_0_13241_6_12747'; --{serverError 248}
ALTER TABLE broken_partition DROP PARTITION ID '20210325_0_13241_6_12747'; --{serverError 248}
DROP TABLE IF EXISTS broken_partition;