Merge branch 'master' into more-batching-keeper

This commit is contained in:
Antonio Andelic 2023-03-24 19:44:29 +00:00
commit e38aa298f4
167 changed files with 3569 additions and 1711 deletions

View File

@ -10,31 +10,38 @@ import requests
import tempfile
DEFAULT_URL = 'https://clickhouse-datasets.s3.amazonaws.com'
DEFAULT_URL = "https://clickhouse-datasets.s3.amazonaws.com"
AVAILABLE_DATASETS = {
'hits': 'hits_v1.tar',
'visits': 'visits_v1.tar',
"hits": "hits_v1.tar",
"visits": "visits_v1.tar",
}
RETRIES_COUNT = 5
def _get_temp_file_name():
return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
def build_url(base_url, dataset):
return os.path.join(base_url, dataset, 'partitions', AVAILABLE_DATASETS[dataset])
return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset])
def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
for i in range(RETRIES_COUNT):
try:
with open(path, 'wb') as f:
with open(path, "wb") as f:
response = requests.get(url, stream=True)
response.raise_for_status()
total_length = response.headers.get('content-length')
total_length = response.headers.get("content-length")
if total_length is None or int(total_length) == 0:
logging.info("No content-length, will download file without progress")
logging.info(
"No content-length, will download file without progress"
)
f.write(response.content)
else:
dl = 0
@ -46,7 +53,11 @@ def dowload_with_progress(url, path):
if sys.stdout.isatty():
done = int(50 * dl / total_length)
percent = int(100 * float(dl) / total_length)
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent))
sys.stdout.write(
"\r[{}{}] {}%".format(
"=" * done, " " * (50 - done), percent
)
)
sys.stdout.flush()
break
except Exception as ex:
@ -56,14 +67,21 @@ def dowload_with_progress(url, path):
if os.path.exists(path):
os.remove(path)
else:
raise Exception("Cannot download dataset from {}, all retries exceeded".format(url))
raise Exception(
"Cannot download dataset from {}, all retries exceeded".format(url)
)
sys.stdout.write("\n")
logging.info("Downloading finished")
def unpack_to_clickhouse_directory(tar_path, clickhouse_path):
logging.info("Will unpack data from temp path %s to clickhouse db %s", tar_path, clickhouse_path)
with tarfile.open(tar_path, 'r') as comp_file:
logging.info(
"Will unpack data from temp path %s to clickhouse db %s",
tar_path,
clickhouse_path,
)
with tarfile.open(tar_path, "r") as comp_file:
comp_file.extractall(path=clickhouse_path)
logging.info("Unpack finished")
@ -72,15 +90,21 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description="Simple tool for dowloading datasets for clickhouse from S3")
description="Simple tool for dowloading datasets for clickhouse from S3"
)
parser.add_argument('--dataset-names', required=True, nargs='+', choices=list(AVAILABLE_DATASETS.keys()))
parser.add_argument('--url-prefix', default=DEFAULT_URL)
parser.add_argument('--clickhouse-data-path', default='/var/lib/clickhouse/')
parser.add_argument(
"--dataset-names",
required=True,
nargs="+",
choices=list(AVAILABLE_DATASETS.keys()),
)
parser.add_argument("--url-prefix", default=DEFAULT_URL)
parser.add_argument("--clickhouse-data-path", default="/var/lib/clickhouse/")
args = parser.parse_args()
datasets = args.dataset_names
logging.info("Will fetch following datasets: %s", ', '.join(datasets))
logging.info("Will fetch following datasets: %s", ", ".join(datasets))
for dataset in datasets:
logging.info("Processing %s", dataset)
temp_archive_path = _get_temp_file_name()
@ -92,10 +116,11 @@ if __name__ == "__main__":
logging.info("Some exception occured %s", str(ex))
raise
finally:
logging.info("Will remove downloaded file %s from filesystem if it exists", temp_archive_path)
logging.info(
"Will remove downloaded file %s from filesystem if it exists",
temp_archive_path,
)
if os.path.exists(temp_archive_path):
os.remove(temp_archive_path)
logging.info("Processing of %s finished", dataset)
logging.info("Fetch finished, enjoy your tables!")

View File

@ -11,13 +11,14 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
aspell \
curl \
git \
file \
libxml2-utils \
moreutils \
python3-fuzzywuzzy \
python3-pip \
shellcheck \
yamllint \
&& pip3 install black==22.8.0 boto3 codespell==2.2.1 dohq-artifactory mypy PyGithub unidiff pylint==2.6.2 \
&& pip3 install black==23.1.0 boto3 codespell==2.2.1 dohq-artifactory mypy PyGithub unidiff pylint==2.6.2 \
&& apt-get clean \
&& rm -rf /root/.cache/pip

View File

@ -1238,7 +1238,7 @@ Formats a Time according to the given Format string. Format is a constant expres
formatDateTime uses MySQL datetime format style, refer to https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format.
The opposite operation of this function is [formatDateTime](/docs/en/sql-reference/functions/type-conversion-functions.md#formatdatetime).
The opposite operation of this function is [parseDateTime](/docs/en/sql-reference/functions/type-conversion-functions.md#type_conversion_functions-parseDateTime).
Alias: `DATE_FORMAT`.
@ -1334,7 +1334,7 @@ Result:
Similar to formatDateTime, except that it formats datetime in Joda style instead of MySQL style. Refer to https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html.
The opposite operation of this function is [formatDateTimeInJodaSyntax](/docs/en/sql-reference/functions/type-conversion-functions.md#formatdatetimeinjodasyntax).
The opposite operation of this function is [parseDateTimeInJodaSyntax](/docs/en/sql-reference/functions/type-conversion-functions.md#type_conversion_functions-parseDateTimeInJodaSyntax).
**Replacement fields**

View File

@ -1148,9 +1148,10 @@ Result:
└───────────────────────────┴──────────────────────────────┘
```
## parseDateTime
## parseDateTime {#type_conversion_functions-parseDateTime}
Converts a [String](/docs/en/sql-reference/data-types/string.md) to [DateTime](/docs/en/sql-reference/data-types/datetime.md) according to a [MySQL format string](https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format).
This function is the opposite operation of function [formatDateTime](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTime).
**Syntax**
@ -1163,6 +1164,7 @@ parseDateTime(str, format[, timezone])
- `str` — the String to be parsed
- `format` — the format string
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**
@ -1186,9 +1188,10 @@ SELECT parseDateTime('2021-01-04+23:00:00', '%Y-%m-%d+%H:%i:%s')
Alias: `TO_TIMESTAMP`.
## parseDateTimeInJodaSyntax
## parseDateTimeInJodaSyntax {#type_conversion_functions-parseDateTimeInJodaSyntax}
Similar to [parseDateTime](#parsedatetime), except that the format string is in [Joda](https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) instead of MySQL syntax.
This function is the opposite operation of function [formatDateTimeInJodaSyntax](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTimeInJodaSyntax).
**Syntax**
@ -1201,6 +1204,7 @@ parseDateTimeInJodaSyntax(str, format[, timezone])
- `str` — the String to be parsed
- `format` — the format string
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**

View File

@ -674,18 +674,16 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
backup_entries_collector.getContext()->getAccessControl());
auto backup_coordination = backup_entries_collector.getBackupCoordination();
String current_host_id = backup_entries_collector.getBackupSettings().host_id;
backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, current_host_id, backup_entry_with_path.first);
backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, backup_entry_with_path.first);
backup_entries_collector.addPostTask(
[backup_entry = backup_entry_with_path.second,
zookeeper_path = zookeeper_path,
type,
current_host_id,
&backup_entries_collector,
backup_coordination]
{
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type, current_host_id))
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type))
backup_entries_collector.addBackupEntry(path, backup_entry);
});
}

View File

@ -49,7 +49,7 @@ QueryTreeNodePtr ArrayJoinNode::cloneImpl() const
return std::make_shared<ArrayJoinNode>(getTableExpression(), getJoinExpressionsNode(), is_left);
}
ASTPtr ArrayJoinNode::toASTImpl() const
ASTPtr ArrayJoinNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto array_join_ast = std::make_shared<ASTArrayJoin>();
array_join_ast->kind = is_left ? ASTArrayJoin::Kind::Left : ASTArrayJoin::Kind::Inner;
@ -63,9 +63,9 @@ ASTPtr ArrayJoinNode::toASTImpl() const
auto * column_node = array_join_expression->as<ColumnNode>();
if (column_node && column_node->getExpression())
array_join_expression_ast = column_node->getExpression()->toAST();
array_join_expression_ast = column_node->getExpression()->toAST(options);
else
array_join_expression_ast = array_join_expression->toAST();
array_join_expression_ast = array_join_expression->toAST(options);
array_join_expression_ast->setAlias(array_join_expression->getAlias());
array_join_expressions_ast->children.push_back(std::move(array_join_expression_ast));
@ -75,7 +75,7 @@ ASTPtr ArrayJoinNode::toASTImpl() const
array_join_ast->expression_list = array_join_ast->children.back();
ASTPtr tables_in_select_query_ast = std::make_shared<ASTTablesInSelectQuery>();
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[table_expression_child_index]);
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[table_expression_child_index], options);
auto array_join_query_element_ast = std::make_shared<ASTTablesInSelectQueryElement>();
array_join_query_element_ast->children.push_back(std::move(array_join_ast));

View File

@ -99,7 +99,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
bool is_left = false;

View File

@ -91,12 +91,12 @@ QueryTreeNodePtr ColumnNode::cloneImpl() const
return std::make_shared<ColumnNode>(column, getSourceWeakPointer());
}
ASTPtr ColumnNode::toASTImpl() const
ASTPtr ColumnNode::toASTImpl(const ConvertToASTOptions & options) const
{
std::vector<std::string> column_identifier_parts;
auto column_source = getColumnSourceOrNull();
if (column_source)
if (column_source && options.fully_qualified_identifiers)
{
auto node_type = column_source->getNodeType();
if (node_type == QueryTreeNodeType::TABLE ||

View File

@ -132,7 +132,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
const QueryTreeNodeWeakPtr & getSourceWeakPointer() const

View File

@ -91,7 +91,7 @@ QueryTreeNodePtr ApplyColumnTransformerNode::cloneImpl() const
return std::make_shared<ApplyColumnTransformerNode>(getExpressionNode());
}
ASTPtr ApplyColumnTransformerNode::toASTImpl() const
ASTPtr ApplyColumnTransformerNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto ast_apply_transformer = std::make_shared<ASTColumnsApplyTransformer>();
const auto & expression_node = getExpressionNode();
@ -100,14 +100,14 @@ ASTPtr ApplyColumnTransformerNode::toASTImpl() const
{
auto & function_expression = expression_node->as<FunctionNode &>();
ast_apply_transformer->func_name = function_expression.getFunctionName();
ast_apply_transformer->parameters = function_expression.getParametersNode()->toAST();
ast_apply_transformer->parameters = function_expression.getParametersNode()->toAST(options);
}
else
{
auto & lambda_expression = expression_node->as<LambdaNode &>();
if (!lambda_expression.getArgumentNames().empty())
ast_apply_transformer->lambda_arg = lambda_expression.getArgumentNames()[0];
ast_apply_transformer->lambda = lambda_expression.toAST();
ast_apply_transformer->lambda = lambda_expression.toAST(options);
}
return ast_apply_transformer;
@ -227,7 +227,7 @@ QueryTreeNodePtr ExceptColumnTransformerNode::cloneImpl() const
return std::make_shared<ExceptColumnTransformerNode>(except_column_names, is_strict);
}
ASTPtr ExceptColumnTransformerNode::toASTImpl() const
ASTPtr ExceptColumnTransformerNode::toASTImpl(const ConvertToASTOptions & /* options */) const
{
auto ast_except_transformer = std::make_shared<ASTColumnsExceptTransformer>();
@ -334,7 +334,7 @@ QueryTreeNodePtr ReplaceColumnTransformerNode::cloneImpl() const
return result_replace_transformer;
}
ASTPtr ReplaceColumnTransformerNode::toASTImpl() const
ASTPtr ReplaceColumnTransformerNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto ast_replace_transformer = std::make_shared<ASTColumnsReplaceTransformer>();
@ -347,7 +347,7 @@ ASTPtr ReplaceColumnTransformerNode::toASTImpl() const
{
auto replacement_ast = std::make_shared<ASTColumnsReplaceTransformer::Replacement>();
replacement_ast->name = replacements_names[i];
replacement_ast->children.push_back(replacement_expressions_nodes[i]->toAST());
replacement_ast->children.push_back(replacement_expressions_nodes[i]->toAST(options));
ast_replace_transformer->children.push_back(std::move(replacement_ast));
}

View File

@ -141,7 +141,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
ApplyColumnTransformerType apply_transformer_type = ApplyColumnTransformerType::LAMBDA;
@ -220,7 +220,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
ExceptColumnTransformerType except_transformer_type;
@ -298,7 +298,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
ListNode & getReplacements()

View File

@ -75,11 +75,14 @@ QueryTreeNodePtr ConstantNode::cloneImpl() const
return std::make_shared<ConstantNode>(constant_value, source_expression);
}
ASTPtr ConstantNode::toASTImpl() const
ASTPtr ConstantNode::toASTImpl(const ConvertToASTOptions & options) const
{
const auto & constant_value_literal = constant_value->getValue();
auto constant_value_ast = std::make_shared<ASTLiteral>(constant_value_literal);
if (!options.add_cast_for_constants)
return constant_value_ast;
bool need_to_add_cast_function = false;
auto constant_value_literal_type = constant_value_literal.getType();
WhichDataType constant_value_type(constant_value->getType());

View File

@ -83,7 +83,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
ConstantValuePtr constant_value;

View File

@ -197,7 +197,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const
return result_function;
}
ASTPtr FunctionNode::toASTImpl() const
ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto function_ast = std::make_shared<ASTFunction>();
@ -212,12 +212,12 @@ ASTPtr FunctionNode::toASTImpl() const
const auto & parameters = getParameters();
if (!parameters.getNodes().empty())
{
function_ast->children.push_back(parameters.toAST());
function_ast->children.push_back(parameters.toAST(options));
function_ast->parameters = function_ast->children.back();
}
const auto & arguments = getArguments();
function_ast->children.push_back(arguments.toAST());
function_ast->children.push_back(arguments.toAST(options));
function_ast->arguments = function_ast->children.back();
auto window_node = getWindowNode();
@ -226,7 +226,7 @@ ASTPtr FunctionNode::toASTImpl() const
if (auto * identifier_node = window_node->as<IdentifierNode>())
function_ast->window_name = identifier_node->getIdentifier().getFullName();
else
function_ast->window_definition = window_node->toAST();
function_ast->window_definition = window_node->toAST(options);
}
return function_ast;

View File

@ -209,7 +209,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
String function_name;

View File

@ -331,9 +331,9 @@ QueryTreeNodePtr IQueryTreeNode::cloneAndReplace(const QueryTreeNodePtr & node_t
return cloneAndReplace(replacement_map);
}
ASTPtr IQueryTreeNode::toAST() const
ASTPtr IQueryTreeNode::toAST(const ConvertToASTOptions & options) const
{
auto converted_node = toASTImpl();
auto converted_node = toASTImpl(options);
if (auto * ast_with_alias = dynamic_cast<ASTWithAlias *>(converted_node.get()))
converted_node->setAlias(alias);

View File

@ -181,8 +181,17 @@ public:
*/
String formatOriginalASTForErrorMessage() const;
struct ConvertToASTOptions
{
/// Add _CAST if constant litral type is different from column type
bool add_cast_for_constants = true;
/// Identifiers are fully qualified (`database.table.column`), otherwise names are just column names (`column`)
bool fully_qualified_identifiers = true;
};
/// Convert query tree to AST
ASTPtr toAST() const;
ASTPtr toAST(const ConvertToASTOptions & options = { .add_cast_for_constants = true, .fully_qualified_identifiers = true }) const;
/// Convert query tree to AST and then format it for error message.
String formatConvertedASTForErrorMessage() const;
@ -258,7 +267,7 @@ protected:
virtual QueryTreeNodePtr cloneImpl() const = 0;
/// Subclass must convert its internal state and its children to AST
virtual ASTPtr toASTImpl() const = 0;
virtual ASTPtr toASTImpl(const ConvertToASTOptions & options) const = 0;
QueryTreeNodes children;
QueryTreeWeakNodes weak_pointers;

View File

@ -58,7 +58,7 @@ QueryTreeNodePtr IdentifierNode::cloneImpl() const
return std::make_shared<IdentifierNode>(identifier);
}
ASTPtr IdentifierNode::toASTImpl() const
ASTPtr IdentifierNode::toASTImpl(const ConvertToASTOptions & /* options */) const
{
auto identifier_parts = identifier.getParts();
return std::make_shared<ASTIdentifier>(std::move(identifier_parts));

View File

@ -59,7 +59,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
Identifier identifier;

View File

@ -44,11 +44,11 @@ QueryTreeNodePtr InterpolateNode::cloneImpl() const
return std::make_shared<InterpolateNode>(nullptr /*expression*/, nullptr /*interpolate_expression*/);
}
ASTPtr InterpolateNode::toASTImpl() const
ASTPtr InterpolateNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto result = std::make_shared<ASTInterpolateElement>();
result->column = getExpression()->toAST()->getColumnName();
result->children.push_back(getInterpolateExpression()->toAST());
result->column = getExpression()->toAST(options)->getColumnName();
result->children.push_back(getInterpolateExpression()->toAST(options));
result->expr = result->children.back();
return result;

View File

@ -59,7 +59,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
static constexpr size_t expression_child_index = 0;

View File

@ -99,17 +99,17 @@ QueryTreeNodePtr JoinNode::cloneImpl() const
return std::make_shared<JoinNode>(getLeftTableExpression(), getRightTableExpression(), getJoinExpression(), locality, strictness, kind);
}
ASTPtr JoinNode::toASTImpl() const
ASTPtr JoinNode::toASTImpl(const ConvertToASTOptions & options) const
{
ASTPtr tables_in_select_query_ast = std::make_shared<ASTTablesInSelectQuery>();
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[left_table_expression_child_index]);
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[left_table_expression_child_index], options);
size_t join_table_index = tables_in_select_query_ast->children.size();
auto join_ast = toASTTableJoin();
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[right_table_expression_child_index]);
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, children[right_table_expression_child_index], options);
auto & table_element = tables_in_select_query_ast->children.at(join_table_index)->as<ASTTablesInSelectQueryElement &>();
table_element.children.push_back(std::move(join_ast));

View File

@ -148,7 +148,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
JoinLocality locality = JoinLocality::Unspecified;

View File

@ -65,17 +65,17 @@ QueryTreeNodePtr LambdaNode::cloneImpl() const
return std::make_shared<LambdaNode>(argument_names, getExpression());
}
ASTPtr LambdaNode::toASTImpl() const
ASTPtr LambdaNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto lambda_function_arguments_ast = std::make_shared<ASTExpressionList>();
auto tuple_function = std::make_shared<ASTFunction>();
tuple_function->name = "tuple";
tuple_function->children.push_back(children[arguments_child_index]->toAST());
tuple_function->children.push_back(children[arguments_child_index]->toAST(options));
tuple_function->arguments = tuple_function->children.back();
lambda_function_arguments_ast->children.push_back(std::move(tuple_function));
lambda_function_arguments_ast->children.push_back(children[expression_child_index]->toAST());
lambda_function_arguments_ast->children.push_back(children[expression_child_index]->toAST(options));
auto lambda_function_ast = std::make_shared<ASTFunction>();
lambda_function_ast->name = "lambda";

View File

@ -98,7 +98,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
Names argument_names;

View File

@ -54,7 +54,7 @@ QueryTreeNodePtr ListNode::cloneImpl() const
return std::make_shared<ListNode>();
}
ASTPtr ListNode::toASTImpl() const
ASTPtr ListNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto expression_list_ast = std::make_shared<ASTExpressionList>();
@ -62,7 +62,7 @@ ASTPtr ListNode::toASTImpl() const
expression_list_ast->children.resize(children_size);
for (size_t i = 0; i < children_size; ++i)
expression_list_ast->children[i] = children[i]->toAST();
expression_list_ast->children[i] = children[i]->toAST(options);
return expression_list_ast;
}

View File

@ -57,7 +57,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
};
}

View File

@ -204,7 +204,7 @@ QueryTreeNodePtr MatcherNode::cloneImpl() const
return matcher_node;
}
ASTPtr MatcherNode::toASTImpl() const
ASTPtr MatcherNode::toASTImpl(const ConvertToASTOptions & options) const
{
ASTPtr result;
ASTPtr transformers;
@ -216,7 +216,7 @@ ASTPtr MatcherNode::toASTImpl() const
transformers = std::make_shared<ASTColumnsTransformerList>();
for (const auto & column_transformer : column_transformers)
transformers->children.push_back(column_transformer->toAST());
transformers->children.push_back(column_transformer->toAST(options));
}
if (matcher_type == MatcherNodeType::ASTERISK)

View File

@ -148,7 +148,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
explicit MatcherNode(MatcherNodeType matcher_type_,

View File

@ -259,7 +259,7 @@ QueryTreeNodePtr QueryNode::cloneImpl() const
return result_query_node;
}
ASTPtr QueryNode::toASTImpl() const
ASTPtr QueryNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto select_query = std::make_shared<ASTSelectQuery>();
select_query->distinct = is_distinct;
@ -271,9 +271,9 @@ ASTPtr QueryNode::toASTImpl() const
select_query->group_by_all = is_group_by_all;
if (hasWith())
select_query->setExpression(ASTSelectQuery::Expression::WITH, getWith().toAST());
select_query->setExpression(ASTSelectQuery::Expression::WITH, getWith().toAST(options));
auto projection_ast = getProjection().toAST();
auto projection_ast = getProjection().toAST(options);
auto & projection_expression_list_ast = projection_ast->as<ASTExpressionList &>();
size_t projection_expression_list_ast_children_size = projection_expression_list_ast.children.size();
if (projection_expression_list_ast_children_size != getProjection().getNodes().size())
@ -293,44 +293,44 @@ ASTPtr QueryNode::toASTImpl() const
select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(projection_ast));
ASTPtr tables_in_select_query_ast = std::make_shared<ASTTablesInSelectQuery>();
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, getJoinTree());
addTableExpressionOrJoinIntoTablesInSelectQuery(tables_in_select_query_ast, getJoinTree(), options);
select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables_in_select_query_ast));
if (getPrewhere())
select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, getPrewhere()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, getPrewhere()->toAST(options));
if (getWhere())
select_query->setExpression(ASTSelectQuery::Expression::WHERE, getWhere()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::WHERE, getWhere()->toAST(options));
if (!is_group_by_all && hasGroupBy())
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, getGroupBy().toAST());
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, getGroupBy().toAST(options));
if (hasHaving())
select_query->setExpression(ASTSelectQuery::Expression::HAVING, getHaving()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::HAVING, getHaving()->toAST(options));
if (hasWindow())
select_query->setExpression(ASTSelectQuery::Expression::WINDOW, getWindow().toAST());
select_query->setExpression(ASTSelectQuery::Expression::WINDOW, getWindow().toAST(options));
if (hasOrderBy())
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, getOrderBy().toAST());
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, getOrderBy().toAST(options));
if (hasInterpolate())
select_query->setExpression(ASTSelectQuery::Expression::INTERPOLATE, getInterpolate()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::INTERPOLATE, getInterpolate()->toAST(options));
if (hasLimitByLimit())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, getLimitByLimit()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, getLimitByLimit()->toAST(options));
if (hasLimitByOffset())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, getLimitByOffset()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, getLimitByOffset()->toAST(options));
if (hasLimitBy())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, getLimitBy().toAST());
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, getLimitBy().toAST(options));
if (hasLimit())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, getLimit()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, getLimit()->toAST(options));
if (hasOffset())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, getOffset()->toAST());
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, getOffset()->toAST(options));
if (hasSettingsChanges())
{

View File

@ -575,7 +575,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
bool is_subquery = false;

View File

@ -838,8 +838,14 @@ QueryTreeNodePtr QueryTreeBuilder::buildJoinTree(const ASTPtr & tables_in_select
const auto & function_arguments_list = table_function_expression.arguments->as<ASTExpressionList &>().children;
for (const auto & argument : function_arguments_list)
{
if (!node->getSettingsChanges().empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' has arguments after SETTINGS",
table_function_expression.formatForErrorMessage());
if (argument->as<ASTSelectQuery>() || argument->as<ASTSelectWithUnionQuery>() || argument->as<ASTSelectIntersectExceptQuery>())
node->getArguments().getNodes().push_back(buildSelectOrUnionExpression(argument, false /*is_subquery*/, {} /*cte_name*/, context));
else if (const auto * ast_set = argument->as<ASTSetQuery>())
node->setSettingsChanges(ast_set->changes);
else
node->getArguments().getNodes().push_back(buildExpression(argument, context));
}

View File

@ -109,7 +109,7 @@ QueryTreeNodePtr SortNode::cloneImpl() const
return std::make_shared<SortNode>(nullptr /*expression*/, sort_direction, nulls_sort_direction, collator, with_fill);
}
ASTPtr SortNode::toASTImpl() const
ASTPtr SortNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto result = std::make_shared<ASTOrderByElement>();
result->direction = sort_direction == SortDirection::ASCENDING ? 1 : -1;
@ -120,10 +120,10 @@ ASTPtr SortNode::toASTImpl() const
result->nulls_direction_was_explicitly_specified = nulls_sort_direction.has_value();
result->with_fill = with_fill;
result->fill_from = hasFillFrom() ? getFillFrom()->toAST() : nullptr;
result->fill_to = hasFillTo() ? getFillTo()->toAST() : nullptr;
result->fill_step = hasFillStep() ? getFillStep()->toAST() : nullptr;
result->children.push_back(getExpression()->toAST());
result->fill_from = hasFillFrom() ? getFillFrom()->toAST(options) : nullptr;
result->fill_to = hasFillTo() ? getFillTo()->toAST(options) : nullptr;
result->fill_step = hasFillStep() ? getFillStep()->toAST(options) : nullptr;
result->children.push_back(getExpression()->toAST(options));
if (collator)
{

View File

@ -137,7 +137,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
static constexpr size_t sort_expression_child_index = 0;

View File

@ -7,6 +7,7 @@
#include <Storages/IStorage.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Interpreters/Context.h>
@ -71,6 +72,13 @@ void TableFunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_
buffer << '\n' << std::string(indent + 2, ' ') << "ARGUMENTS\n";
arguments.dumpTreeImpl(buffer, format_state, indent + 4);
}
if (!settings_changes.empty())
{
buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS";
for (const auto & change : settings_changes)
buffer << fmt::format(" {}={}", change.name, toString(change.value));
}
}
bool TableFunctionNode::isEqualImpl(const IQueryTreeNode & rhs) const
@ -82,6 +90,9 @@ bool TableFunctionNode::isEqualImpl(const IQueryTreeNode & rhs) const
if (storage && rhs_typed.storage)
return storage_id == rhs_typed.storage_id;
if (settings_changes != rhs_typed.settings_changes)
return false;
return table_expression_modifiers == rhs_typed.table_expression_modifiers;
}
@ -99,6 +110,17 @@ void TableFunctionNode::updateTreeHashImpl(HashState & state) const
if (table_expression_modifiers)
table_expression_modifiers->updateTreeHash(state);
state.update(settings_changes.size());
for (const auto & change : settings_changes)
{
state.update(change.name.size());
state.update(change.name);
const auto & value_dump = change.value.dump();
state.update(value_dump.size());
state.update(value_dump);
}
}
QueryTreeNodePtr TableFunctionNode::cloneImpl() const
@ -109,20 +131,29 @@ QueryTreeNodePtr TableFunctionNode::cloneImpl() const
result->storage_id = storage_id;
result->storage_snapshot = storage_snapshot;
result->table_expression_modifiers = table_expression_modifiers;
result->settings_changes = settings_changes;
return result;
}
ASTPtr TableFunctionNode::toASTImpl() const
ASTPtr TableFunctionNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto table_function_ast = std::make_shared<ASTFunction>();
table_function_ast->name = table_function_name;
const auto & arguments = getArguments();
table_function_ast->children.push_back(arguments.toAST());
table_function_ast->children.push_back(arguments.toAST(options));
table_function_ast->arguments = table_function_ast->children.back();
if (!settings_changes.empty())
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->changes = settings_changes;
settings_ast->is_standalone = false;
table_function_ast->arguments->children.push_back(std::move(settings_ast));
}
return table_function_ast;
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <Common/SettingsChanges.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Storages/StorageSnapshot.h>
@ -122,6 +124,18 @@ public:
return table_expression_modifiers;
}
/// Get settings changes passed to table function
const SettingsChanges & getSettingsChanges() const
{
return settings_changes;
}
/// Set settings changes passed as last argument to table function
void setSettingsChanges(SettingsChanges settings_changes_)
{
settings_changes = std::move(settings_changes_);
}
/// Set table expression modifiers
void setTableExpressionModifiers(TableExpressionModifiers table_expression_modifiers_value)
{
@ -142,7 +156,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
String table_function_name;
@ -151,6 +165,7 @@ private:
StorageID storage_id;
StorageSnapshotPtr storage_snapshot;
std::optional<TableExpressionModifiers> table_expression_modifiers;
SettingsChanges settings_changes;
static constexpr size_t arguments_child_index = 0;
static constexpr size_t children_size = arguments_child_index + 1;

View File

@ -86,7 +86,7 @@ QueryTreeNodePtr TableNode::cloneImpl() const
return result_table_node;
}
ASTPtr TableNode::toASTImpl() const
ASTPtr TableNode::toASTImpl(const ConvertToASTOptions & /* options */) const
{
if (!temporary_table_name.empty())
return std::make_shared<ASTTableIdentifier>(temporary_table_name);

View File

@ -106,7 +106,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
StoragePtr storage;

View File

@ -140,12 +140,12 @@ QueryTreeNodePtr UnionNode::cloneImpl() const
return result_union_node;
}
ASTPtr UnionNode::toASTImpl() const
ASTPtr UnionNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->union_mode = union_mode;
select_with_union_query->is_normalized = true;
select_with_union_query->children.push_back(getQueriesNode()->toAST());
select_with_union_query->children.push_back(getQueriesNode()->toAST(options));
select_with_union_query->list_of_selects = select_with_union_query->children.back();
if (is_subquery)

View File

@ -143,7 +143,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
bool is_subquery = false;

View File

@ -268,7 +268,7 @@ static ASTPtr convertIntoTableExpressionAST(const QueryTreeNodePtr & table_expre
return result_table_expression;
}
void addTableExpressionOrJoinIntoTablesInSelectQuery(ASTPtr & tables_in_select_query_ast, const QueryTreeNodePtr & table_expression)
void addTableExpressionOrJoinIntoTablesInSelectQuery(ASTPtr & tables_in_select_query_ast, const QueryTreeNodePtr & table_expression, const IQueryTreeNode::ConvertToASTOptions & convert_to_ast_options)
{
auto table_expression_node_type = table_expression->getNodeType();
@ -297,7 +297,7 @@ void addTableExpressionOrJoinIntoTablesInSelectQuery(ASTPtr & tables_in_select_q
[[fallthrough]];
case QueryTreeNodeType::JOIN:
{
auto table_expression_tables_in_select_query_ast = table_expression->toAST();
auto table_expression_tables_in_select_query_ast = table_expression->toAST(convert_to_ast_options);
tables_in_select_query_ast->children.reserve(table_expression_tables_in_select_query_ast->children.size());
for (auto && table_element_ast : table_expression_tables_in_select_query_ast->children)
tables_in_select_query_ast->children.push_back(std::move(table_element_ast));

View File

@ -40,7 +40,7 @@ std::optional<bool> tryExtractConstantFromConditionNode(const QueryTreeNodePtr &
/** Add table expression in tables in select query children.
* If table expression node is not of identifier node, table node, query node, table function node, join node or array join node type throws logical error exception.
*/
void addTableExpressionOrJoinIntoTablesInSelectQuery(ASTPtr & tables_in_select_query_ast, const QueryTreeNodePtr & table_expression);
void addTableExpressionOrJoinIntoTablesInSelectQuery(ASTPtr & tables_in_select_query_ast, const QueryTreeNodePtr & table_expression, const IQueryTreeNode::ConvertToASTOptions & convert_to_ast_options);
/// Extract table, table function, query, union from join tree
QueryTreeNodes extractTableExpressions(const QueryTreeNodePtr & join_tree_node);

View File

@ -107,7 +107,7 @@ QueryTreeNodePtr WindowNode::cloneImpl() const
return window_node;
}
ASTPtr WindowNode::toASTImpl() const
ASTPtr WindowNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto window_definition = std::make_shared<ASTWindowDefinition>();
@ -115,13 +115,13 @@ ASTPtr WindowNode::toASTImpl() const
if (hasPartitionBy())
{
window_definition->children.push_back(getPartitionByNode()->toAST());
window_definition->children.push_back(getPartitionByNode()->toAST(options));
window_definition->partition_by = window_definition->children.back();
}
if (hasOrderBy())
{
window_definition->children.push_back(getOrderByNode()->toAST());
window_definition->children.push_back(getOrderByNode()->toAST(options));
window_definition->order_by = window_definition->children.back();
}
@ -132,7 +132,7 @@ ASTPtr WindowNode::toASTImpl() const
if (hasFrameBeginOffset())
{
window_definition->children.push_back(getFrameBeginOffsetNode()->toAST());
window_definition->children.push_back(getFrameBeginOffsetNode()->toAST(options));
window_definition->frame_begin_offset = window_definition->children.back();
}
@ -140,7 +140,7 @@ ASTPtr WindowNode::toASTImpl() const
window_definition->frame_end_preceding = window_frame.end_preceding;
if (hasFrameEndOffset())
{
window_definition->children.push_back(getFrameEndOffsetNode()->toAST());
window_definition->children.push_back(getFrameEndOffsetNode()->toAST(options));
window_definition->frame_end_offset = window_definition->children.back();
}

View File

@ -175,7 +175,7 @@ protected:
QueryTreeNodePtr cloneImpl() const override;
ASTPtr toASTImpl() const override;
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
private:
static constexpr size_t order_by_child_index = 0;

View File

@ -36,7 +36,7 @@ public:
return std::make_shared<SourceNode>();
}
ASTPtr toASTImpl() const override
ASTPtr toASTImpl(const ConvertToASTOptions & /* options */) const override
{
return nullptr;
}

View File

@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo;
BackupCoordinationLocal::BackupCoordinationLocal() = default;
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void BackupCoordinationLocal::setStage(const String &, const String &, const String &)
void BackupCoordinationLocal::setStage(const String &, const String &)
{
}
void BackupCoordinationLocal::setError(const String &, const Exception &)
void BackupCoordinationLocal::setError(const Exception &)
{
}
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &)
Strings BackupCoordinationLocal::waitForStage(const String &)
{
return {};
}
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
Strings BackupCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds)
{
return {};
}
@ -70,29 +70,29 @@ Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_sha
}
void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path)
{
std::lock_guard lock{mutex};
replicated_access.addFilePath(access_zk_path, access_entity_type, host_id, file_path);
replicated_access.addFilePath(access_zk_path, access_entity_type, "", file_path);
}
Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const
{
std::lock_guard lock{mutex};
return replicated_access.getFilePaths(access_zk_path, access_entity_type, host_id);
return replicated_access.getFilePaths(access_zk_path, access_entity_type, "");
}
void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path)
void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path)
{
std::lock_guard lock{mutex};
replicated_sql_objects.addDirectory(loader_zk_path, object_type, host_id, dir_path);
replicated_sql_objects.addDirectory(loader_zk_path, object_type, "", dir_path);
}
Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const
Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const
{
std::lock_guard lock{mutex};
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, host_id);
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, "");
}

View File

@ -21,10 +21,10 @@ public:
BackupCoordinationLocal();
~BackupCoordinationLocal() override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
@ -37,11 +37,11 @@ public:
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;

View File

@ -166,17 +166,30 @@ namespace
}
}
size_t BackupCoordinationRemote::findCurrentHostIndex(const Strings & all_hosts, const String & current_host)
{
auto it = std::find(all_hosts.begin(), all_hosts.end(), current_host);
if (it == all_hosts.end())
return 0;
return it - all_hosts.begin();
}
BackupCoordinationRemote::BackupCoordinationRemote(
const BackupKeeperSettings & keeper_settings_,
const String & root_zookeeper_path_,
const String & backup_uuid_,
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const BackupKeeperSettings & keeper_settings_,
const String & backup_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_)
: keeper_settings(keeper_settings_)
: get_zookeeper(get_zookeeper_)
, root_zookeeper_path(root_zookeeper_path_)
, zookeeper_path(root_zookeeper_path_ + "/backup-" + backup_uuid_)
, keeper_settings(keeper_settings_)
, backup_uuid(backup_uuid_)
, get_zookeeper(get_zookeeper_)
, all_hosts(all_hosts_)
, current_host(current_host_)
, current_host_index(findCurrentHostIndex(all_hosts, current_host))
, is_internal(is_internal_)
{
zookeeper_retries_info = ZooKeeperRetriesInfo(
@ -251,22 +264,22 @@ void BackupCoordinationRemote::removeAllNodes()
}
void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
void BackupCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception)
void BackupCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setError(current_host, exception);
}
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait)
{
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}
@ -403,7 +416,7 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
}
void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path)
{
{
std::lock_guard lock{mutex};
@ -416,15 +429,15 @@ void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access
zk->createIfNotExists(path, "");
path += "/" + AccessEntityTypeInfo::get(access_entity_type).name;
zk->createIfNotExists(path, "");
path += "/" + host_id;
path += "/" + current_host;
zk->createIfNotExists(path, file_path);
}
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const
{
std::lock_guard lock{mutex};
prepareReplicatedAccess();
return replicated_access->getFilePaths(access_zk_path, access_entity_type, host_id);
return replicated_access->getFilePaths(access_zk_path, access_entity_type, current_host);
}
void BackupCoordinationRemote::prepareReplicatedAccess() const
@ -453,7 +466,7 @@ void BackupCoordinationRemote::prepareReplicatedAccess() const
}
}
void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path)
void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path)
{
{
std::lock_guard lock{mutex};
@ -474,15 +487,15 @@ void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_
}
zk->createIfNotExists(path, "");
path += "/" + host_id;
path += "/" + current_host;
zk->createIfNotExists(path, dir_path);
}
Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const
Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const
{
std::lock_guard lock{mutex};
prepareReplicatedSQLObjects();
return replicated_sql_objects->getDirectories(loader_zk_path, object_type, host_id);
return replicated_sql_objects->getDirectories(loader_zk_path, object_type, current_host);
}
void BackupCoordinationRemote::prepareReplicatedSQLObjects() const
@ -827,5 +840,4 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
return false;
}
}

View File

@ -27,17 +27,20 @@ public:
};
BackupCoordinationRemote(
const BackupKeeperSettings & keeper_settings_,
const String & root_zookeeper_path_,
const String & backup_uuid_,
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const BackupKeeperSettings & keeper_settings_,
const String & backup_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_);
~BackupCoordinationRemote() override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(
const String & table_shared_id,
@ -58,11 +61,11 @@ public:
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
@ -78,6 +81,8 @@ public:
bool hasConcurrentBackups(const std::atomic<size_t> & num_active_backups) const override;
static size_t findCurrentHostIndex(const Strings & all_hosts, const String & current_host);
private:
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperNoLock() const;
@ -91,11 +96,14 @@ private:
void prepareReplicatedAccess() const;
void prepareReplicatedSQLObjects() const;
const BackupKeeperSettings keeper_settings;
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;
const String zookeeper_path;
const BackupKeeperSettings keeper_settings;
const String backup_uuid;
const zkutil::GetZooKeeper get_zookeeper;
const Strings all_hosts;
const String current_host;
const size_t current_host_index;
const bool is_internal;
mutable ZooKeeperRetriesInfo zookeeper_retries_info;

View File

@ -133,22 +133,22 @@ Strings BackupEntriesCollector::setStage(const String & new_stage, const String
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
current_stage = new_stage;
backup_coordination->setStage(backup_settings.host_id, new_stage, message);
backup_coordination->setStage(new_stage, message);
if (new_stage == Stage::formatGatheringMetadata(1))
{
return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
return backup_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
}
else if (new_stage.starts_with(Stage::GATHERING_METADATA))
{
auto current_time = std::chrono::steady_clock::now();
auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time);
return backup_coordination->waitForStage(
all_hosts, new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
}
else
{
return backup_coordination->waitForStage(all_hosts, new_stage);
return backup_coordination->waitForStage(new_stage);
}
}

View File

@ -38,14 +38,33 @@ namespace Stage = BackupCoordinationStage;
namespace
{
std::shared_ptr<IBackupCoordination> makeBackupCoordination(std::optional<BackupCoordinationRemote::BackupKeeperSettings> keeper_settings, String & root_zk_path, const String & backup_uuid, const ContextPtr & context, bool is_internal_backup)
std::shared_ptr<IBackupCoordination> makeBackupCoordination(const ContextPtr & context, const BackupSettings & backup_settings, bool remote)
{
if (!root_zk_path.empty())
if (remote)
{
if (!keeper_settings.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parameter keeper_settings is empty while root_zk_path is not. This is bug");
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<BackupCoordinationRemote>(*keeper_settings, root_zk_path, backup_uuid, get_zookeeper, is_internal_backup);
BackupCoordinationRemote::BackupKeeperSettings keeper_settings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
return std::make_shared<BackupCoordinationRemote>(
get_zookeeper,
root_zk_path,
keeper_settings,
toString(*backup_settings.backup_uuid),
all_hosts,
backup_settings.host_id,
backup_settings.internal);
}
else
{
@ -53,12 +72,19 @@ namespace
}
}
std::shared_ptr<IRestoreCoordination> makeRestoreCoordination(const String & root_zk_path, const String & restore_uuid, const ContextPtr & context, bool is_internal_backup)
std::shared_ptr<IRestoreCoordination>
makeRestoreCoordination(const ContextPtr & context, const RestoreSettings & restore_settings, bool remote)
{
if (!root_zk_path.empty())
if (remote)
{
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<RestoreCoordinationRemote>(root_zk_path, restore_uuid, get_zookeeper, is_internal_backup);
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
return std::make_shared<RestoreCoordinationRemote>(get_zookeeper, root_zk_path, toString(*restore_settings.restore_uuid), all_hosts, restore_settings.host_id, restore_settings.internal);
}
else
{
@ -68,12 +94,12 @@ namespace
/// Sends information about an exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host, const Exception & exception)
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const Exception & exception)
{
try
{
if (coordination)
coordination->setError(current_host, exception);
coordination->setError(exception);
}
catch (...)
{
@ -82,7 +108,7 @@ namespace
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination)
{
try
{
@ -90,12 +116,12 @@ namespace
}
catch (const Exception & e)
{
sendExceptionToCoordination(coordination, current_host, e);
sendExceptionToCoordination(coordination, e);
}
catch (...)
{
if (coordination)
coordination->setError(current_host, Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()));
coordination->setError(Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()));
}
}
@ -162,24 +188,13 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
else
backup_id = toString(*backup_settings.backup_uuid);
String root_zk_path;
std::shared_ptr<IBackupCoordination> backup_coordination;
if (backup_settings.internal)
{
/// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startMakingBackup() other hosts will know about that.
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
BackupCoordinationRemote::BackupKeeperSettings keeper_settings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal);
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ true);
}
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
@ -238,7 +253,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
/// Something bad happened, the backup has not built.
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
sendCurrentExceptionToCoordination(backup_coordination);
throw;
}
}
@ -274,19 +289,9 @@ void BackupsWorker::doBackup(
if (!on_cluster)
context->checkAccess(required_access);
String root_zk_path;
std::optional<BackupCoordinationRemote::BackupKeeperSettings> keeper_settings;
ClusterPtr cluster;
if (on_cluster)
{
keeper_settings = BackupCoordinationRemote::BackupKeeperSettings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
cluster = context->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
@ -294,7 +299,7 @@ void BackupsWorker::doBackup(
/// Make a backup coordination.
if (!backup_coordination)
backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal);
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ on_cluster);
if (!allow_concurrent_backups && backup_coordination->hasConcurrentBackups(std::ref(num_active_backups)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
@ -330,9 +335,7 @@ void BackupsWorker::doBackup(
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
backup_coordination->waitForStage(all_hosts, Stage::COMPLETED);
backup_coordination->waitForStage(Stage::COMPLETED);
}
else
{
@ -349,7 +352,7 @@ void BackupsWorker::doBackup(
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, "");
backup_coordination->setStage(Stage::COMPLETED, "");
}
size_t num_files = 0;
@ -383,7 +386,7 @@ void BackupsWorker::doBackup(
{
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
sendCurrentExceptionToCoordination(backup_coordination);
}
else
{
@ -417,8 +420,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
/// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startRestoring() other hosts will know about that.
auto root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal);
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ true);
}
try
@ -474,7 +476,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
{
/// Something bad happened, the backup has not built.
setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
sendCurrentExceptionToCoordination(restore_coordination);
throw;
}
}
@ -509,14 +511,12 @@ void BackupsWorker::doRestore(
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();
String root_zk_path;
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
@ -539,7 +539,7 @@ void BackupsWorker::doRestore(
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal);
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
@ -561,9 +561,7 @@ void BackupsWorker::doRestore(
executeDDLQueryOnCluster(restore_query, context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
restore_coordination->waitForStage(all_hosts, Stage::COMPLETED);
restore_coordination->waitForStage(Stage::COMPLETED);
}
else
{
@ -581,7 +579,7 @@ void BackupsWorker::doRestore(
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, "");
restore_coordination->setStage(Stage::COMPLETED, "");
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
@ -603,7 +601,7 @@ void BackupsWorker::doRestore(
{
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
sendCurrentExceptionToCoordination(restore_coordination);
}
else
{

View File

@ -22,10 +22,10 @@ public:
virtual ~IBackupCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
struct PartNameAndChecksum
{
@ -66,12 +66,12 @@ public:
virtual Strings getReplicatedDataPaths(const String & table_shared_id) const = 0;
/// Adds a path to access.txt file keeping access entities of a ReplicatedAccessStorage.
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) = 0;
virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const = 0;
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) = 0;
virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const = 0;
/// Adds a path to a directory with user-defined SQL objects inside the backup.
virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) = 0;
virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const = 0;
virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) = 0;
virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const = 0;
struct FileInfo
{

View File

@ -18,10 +18,10 @@ public:
virtual ~IRestoreCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
static constexpr const char * kErrorStatus = "error";

View File

@ -7,20 +7,20 @@ namespace DB
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStage(const String &, const String &, const String &)
void RestoreCoordinationLocal::setStage(const String &, const String &)
{
}
void RestoreCoordinationLocal::setError(const String &, const Exception &)
void RestoreCoordinationLocal::setError(const Exception &)
{
}
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &)
Strings RestoreCoordinationLocal::waitForStage(const String &)
{
return {};
}
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
Strings RestoreCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds)
{
return {};
}

View File

@ -19,10 +19,10 @@ public:
~RestoreCoordinationLocal() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -11,11 +11,19 @@ namespace DB
namespace Stage = BackupCoordinationStage;
RestoreCoordinationRemote::RestoreCoordinationRemote(
const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_)
: root_zookeeper_path(root_zookeeper_path_)
, zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_)
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const String & restore_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_)
: get_zookeeper(get_zookeeper_)
, root_zookeeper_path(root_zookeeper_path_)
, restore_uuid(restore_uuid_)
, get_zookeeper(get_zookeeper_)
, zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_)
, all_hosts(all_hosts_)
, current_host(current_host_)
, current_host_index(BackupCoordinationRemote::findCurrentHostIndex(all_hosts, current_host))
, is_internal(is_internal_)
{
createRootNodes();
@ -63,22 +71,22 @@ void RestoreCoordinationRemote::createRootNodes()
}
void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception)
void RestoreCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setError(current_host, exception);
}
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait)
{
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}

View File

@ -11,14 +11,21 @@ namespace DB
class RestoreCoordinationRemote : public IRestoreCoordination
{
public:
RestoreCoordinationRemote(const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_);
RestoreCoordinationRemote(
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const String & restore_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_);
~RestoreCoordinationRemote() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
@ -44,10 +51,13 @@ private:
class ReplicatedDatabasesMetadataSync;
const String root_zookeeper_path;
const String zookeeper_path;
const String restore_uuid;
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;
const String restore_uuid;
const String zookeeper_path;
const Strings all_hosts;
const String current_host;
const size_t current_host_index;
const bool is_internal;
std::optional<BackupCoordinationStageSync> stage_sync;

View File

@ -150,11 +150,11 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa
if (restore_coordination)
{
restore_coordination->setStage(restore_settings.host_id, new_stage, message);
restore_coordination->setStage(new_stage, message);
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(all_hosts, new_stage);
restore_coordination->waitForStage(new_stage);
}
}

View File

@ -10,9 +10,11 @@
namespace mysqlxx
{
std::string errorMessage(MYSQL * driver)
std::string errorMessage(MYSQL * driver, const std::string & query)
{
return fmt::format("{} ({}:{})", mysql_error(driver), driver->host ? driver->host : "(nullptr)", driver->port);
return fmt::format("{}{} ({}:{})", mysql_error(driver),
query.empty() ? "" : " while executing query: '" + query + "'",
driver->host ? driver->host : "(nullptr)", driver->port);
}
void checkError(MYSQL * driver)

View File

@ -64,7 +64,7 @@ void Query::executeImpl()
case CR_SERVER_LOST:
throw ConnectionLost(errorMessage(mysql_driver), err_no);
default:
throw BadQuery(errorMessage(mysql_driver), err_no);
throw BadQuery(errorMessage(mysql_driver, query), err_no);
}
}
}

View File

@ -160,14 +160,16 @@ void Value::throwException(const char * text) const
if (!isNull())
{
info.append(": ");
info.append(": '");
info.append(m_data, m_length);
info.append("'");
}
if (res && res->getQuery())
{
info.append(", query: ");
info.append(", query: '");
info.append(res->getQuery()->str().substr(0, preview_length));
info.append("'");
}
throw CannotParseValue(info);

View File

@ -53,7 +53,7 @@ struct CannotParseValue : public Exception
};
std::string errorMessage(MYSQL * driver);
std::string errorMessage(MYSQL * driver, const std::string & query = "");
/// For internal need of library.
void checkError(MYSQL * driver);

View File

@ -1,8 +1,13 @@
#pragma once
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Formats/registerFormats.h>
inline void tryRegisterAggregateFunctions()
{
static struct Register { Register() { DB::registerAggregateFunctions(); } } registered;
}
inline void tryRegisterFunctions()
{

View File

@ -48,10 +48,9 @@ void backupUserDefinedSQLObjects(
}
String replication_id = loader.getReplicationID();
String current_host_id = backup_entries_collector.getBackupSettings().host_id;
auto backup_coordination = backup_entries_collector.getBackupCoordination();
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, current_host_id, data_path_in_backup);
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup);
// On the stage of running post tasks, all directories will already be added to the backup coordination object.
// They will only be returned for one of the hosts below, for the rest an empty list.
@ -60,11 +59,10 @@ void backupUserDefinedSQLObjects(
[backup_entries = std::move(backup_entries),
replication_id = std::move(replication_id),
object_type,
current_host_id = std::move(current_host_id),
&backup_entries_collector,
backup_coordination]
{
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type, current_host_id);
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type);
for (const auto & dir : dirs)
{

View File

@ -6,6 +6,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ZLIB_INFLATE_FAILED;
extern const int ARGUMENT_OUT_OF_BOUND;
}
ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
@ -17,6 +18,11 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, eof_flag(false)
{
if (buf_size > max_buffer_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"Zlib does not support decompression with buffer size greater than {}, got buffer size: {}",
max_buffer_size, buf_size);
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
@ -31,10 +37,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
window_bits += 16;
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
int rc = inflateInit2(&zstr, window_bits);
#pragma GCC diagnostic pop
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateInit2 failed: {}; zlib version: {}.", zError(rc), ZLIB_VERSION);
@ -61,16 +64,22 @@ bool ZlibInflatingReadBuffer::nextImpl()
{
in->nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
zstr.avail_in = static_cast<unsigned>(in->buffer().end() - in->position());
zstr.avail_in = static_cast<BufferSizeType>(std::min(
static_cast<UInt64>(in->buffer().end() - in->position()),
static_cast<UInt64>(max_buffer_size)));
}
/// init output bytes (place, where decompressed data will be)
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
zstr.avail_out = static_cast<unsigned>(internal_buffer.size());
zstr.avail_out = static_cast<BufferSizeType>(internal_buffer.size());
size_t old_total_in = zstr.total_in;
int rc = inflate(&zstr, Z_NO_FLUSH);
/// move in stream on place, where reading stopped
in->position() = in->buffer().end() - zstr.avail_in;
size_t bytes_read = zstr.total_in - old_total_in;
in->position() += bytes_read;
/// change size of working buffer (it's size equal to internal_buffer size without unused uncompressed values)
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
@ -94,9 +103,10 @@ bool ZlibInflatingReadBuffer::nextImpl()
return true;
}
}
/// If it is not end and not OK, something went wrong, throw exception
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateReset failed: {}", zError(rc));
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflate failed: {}", zError(rc));
}
while (working_buffer.empty());

View File

@ -4,6 +4,7 @@
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/CompressionMethod.h>
#include <limits>
#include <zlib.h>
@ -33,6 +34,11 @@ private:
z_stream zstr;
bool eof_flag;
/// Limit size of buffer because zlib uses
/// UInt32 for sizes of internal buffers.
using BufferSizeType = decltype(zstr.avail_in);
static constexpr auto max_buffer_size = std::numeric_limits<BufferSizeType>::max();
};
}

View File

@ -2874,8 +2874,10 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const st
SortDescription sort_description = getSortDescription(query, context);
const UInt64 limit = getLimitForSorting(query, context);
const auto max_block_size = context->getSettingsRef().max_block_size;
const auto exact_rows_before_limit = context->getSettingsRef().exact_rows_before_limit;
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit);
auto merging_sorted = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit, exact_rows_before_limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
}

View File

@ -71,6 +71,8 @@ public:
/// Set number_of_current_replica and count_participating_replicas in client_info
void setProperClientInfo(size_t replica_number, size_t count_participating_replicas);
const Planner & getPlanner() const { return planner; }
private:
ASTPtr query;
ContextMutablePtr context;

View File

@ -525,7 +525,8 @@ void addMergeSortingStep(QueryPlan & query_plan,
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(),
sort_description,
max_block_size,
query_analysis_result.partial_sorting_limit);
query_analysis_result.partial_sorting_limit,
settings.exact_rows_before_limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
}
@ -1104,11 +1105,7 @@ void Planner::buildPlanForQueryNode()
query_node.getWhere() = {};
}
SelectQueryInfo select_query_info;
select_query_info.original_query = queryNodeToSelectQuery(query_tree);
select_query_info.query = select_query_info.original_query;
select_query_info.query_tree = query_tree;
select_query_info.planner_context = planner_context;
SelectQueryInfo select_query_info = buildSelectQueryInfo();
StorageLimitsList current_storage_limits = storage_limits;
select_query_info.local_storage_limits = buildStorageLimits(*query_context, select_query_options);
@ -1405,6 +1402,11 @@ void Planner::buildPlanForQueryNode()
addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
}
SelectQueryInfo Planner::buildSelectQueryInfo() const
{
return ::DB::buildSelectQueryInfo(query_tree, planner_context);
}
void Planner::addStorageLimits(const StorageLimitsList & limits)
{
for (const auto & limit : limits)

View File

@ -6,6 +6,7 @@
#include <Analyzer/QueryTreePassManager.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -50,8 +51,15 @@ public:
return std::move(query_plan);
}
SelectQueryInfo buildSelectQueryInfo() const;
void addStorageLimits(const StorageLimitsList & limits);
PlannerContextPtr getPlannerContext() const
{
return planner_context;
}
private:
void buildPlanForUnionNode();

View File

@ -93,7 +93,11 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
for (auto & grouping_set_key_node : grouping_set_keys_list_node_typed.getNodes())
{
group_by_with_constant_keys |= (grouping_set_key_node->as<ConstantNode>() != nullptr);
auto is_constant_key = grouping_set_key_node->as<ConstantNode>() != nullptr;
group_by_with_constant_keys |= is_constant_key;
if (is_constant_key && !aggregates_descriptions.empty())
continue;
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, grouping_set_key_node);
aggregation_keys.reserve(expression_dag_nodes.size());
@ -139,21 +143,27 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
else
{
for (auto & group_by_key_node : query_node.getGroupBy().getNodes())
group_by_with_constant_keys |= (group_by_key_node->as<ConstantNode>() != nullptr);
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, query_node.getGroupByNode());
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_aggregation_actions_output_node_names.contains(expression_dag_node->result_name))
auto is_constant_key = group_by_key_node->as<ConstantNode>() != nullptr;
group_by_with_constant_keys |= is_constant_key;
if (is_constant_key && !aggregates_descriptions.empty())
continue;
auto expression_type_after_aggregation = group_by_use_nulls ? makeNullableSafe(expression_dag_node->result_type) : expression_dag_node->result_type;
available_columns_after_aggregation.emplace_back(nullptr, expression_type_after_aggregation, expression_dag_node->result_name);
aggregation_keys.push_back(expression_dag_node->result_name);
before_aggregation_actions->getOutputs().push_back(expression_dag_node);
before_aggregation_actions_output_node_names.insert(expression_dag_node->result_name);
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, group_by_key_node);
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_aggregation_actions_output_node_names.contains(expression_dag_node->result_name))
continue;
auto expression_type_after_aggregation = group_by_use_nulls ? makeNullableSafe(expression_dag_node->result_type) : expression_dag_node->result_type;
available_columns_after_aggregation.emplace_back(nullptr, expression_type_after_aggregation, expression_dag_node->result_name);
aggregation_keys.push_back(expression_dag_node->result_name);
before_aggregation_actions->getOutputs().push_back(expression_dag_node);
before_aggregation_actions_output_node_names.insert(expression_dag_node->result_name);
}
}
}
}

View File

@ -406,4 +406,14 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp
return query_node;
}
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
{
SelectQueryInfo select_query_info;
select_query_info.original_query = queryNodeToSelectQuery(query_tree);
select_query_info.query = select_query_info.original_query;
select_query_info.query_tree = query_tree;
select_query_info.planner_context = planner_context;
return select_query_info;
}
}

View File

@ -17,6 +17,8 @@
#include <Planner/PlannerContext.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -74,4 +76,6 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp
const QueryTreeNodePtr & table_expression,
const ContextPtr & context);
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context);
}

View File

@ -39,7 +39,7 @@ public:
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_counter.swap(counter); }
/// Notify about progress. Method could be called from different threads.
/// Passed value are delta, that must be summarized.

View File

@ -21,6 +21,9 @@ class IQueryPlanStep;
struct StorageLimits;
using StorageLimitsList = std::list<StorageLimits>;
class RowsBeforeLimitCounter;
using RowsBeforeLimitCounterPtr = std::shared_ptr<RowsBeforeLimitCounter>;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
using Processors = std::vector<ProcessorPtr>;
@ -357,6 +360,10 @@ public:
/// You should zero internal counters in the call, in order to make in idempotent.
virtual std::optional<ReadProgress> getReadProgress() { return std::nullopt; }
/// Set rows_before_limit counter for current processor.
/// This counter is used to calculate the number of rows right before any filtration of LimitTransform.
virtual void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr /* counter */) {}
protected:
virtual void onCancel() {}

View File

@ -183,7 +183,7 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
auto rows = data.current_chunk.getNumRows();
if (rows_before_limit_at_least)
if (rows_before_limit_at_least && !data.input_port_has_counter)
rows_before_limit_at_least->add(rows);
/// Skip block (for 'always_read_till_end' case).

View File

@ -41,6 +41,11 @@ private:
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
/// This flag is used to avoid counting rows multiple times before applying a limit
/// condition, which can happen through certain input ports like PartialSortingTransform and
/// RemoteSource.
bool input_port_has_counter = false;
};
std::vector<PortsData> ports_data;
@ -66,7 +71,8 @@ public:
InputPort & getInputPort() { return inputs.front(); }
OutputPort & getOutputPort() { return outputs.front(); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; }
};
}

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -18,17 +18,6 @@
using namespace DB;
static int regAggregateFunctions = 0;
void tryRegisterAggregateFunctions()
{
if (!regAggregateFunctions)
{
registerAggregateFunctions();
regAggregateFunctions = 1;
}
}
static ConfigProcessor::LoadedConfig loadConfiguration(const std::string & config_path)
{
ConfigProcessor config_processor(config_path, true, true);

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -20,7 +20,7 @@ public:
size_t max_block_size,
size_t max_block_bytes)
: IMergingTransform(
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -14,10 +14,12 @@ IMergingTransformBase::IMergingTransformBase(
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_)
UInt64 limit_hint_,
bool always_read_till_end_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
, limit_hint(limit_hint_)
, always_read_till_end(always_read_till_end_)
{
}
@ -33,10 +35,12 @@ IMergingTransformBase::IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_)
UInt64 limit_hint_,
bool always_read_till_end_)
: IProcessor(createPorts(input_headers), {output_header})
, have_all_inputs(have_all_inputs_)
, limit_hint(limit_hint_)
, always_read_till_end(always_read_till_end_)
{
}
@ -98,7 +102,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(limit_hint != 0);
if (limit_hint && chunk.getNumRows() < limit_hint)
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
input.setNeeded();
if (!chunk.hasRows())
@ -164,6 +168,21 @@ IProcessor::Status IMergingTransformBase::prepare()
if (is_port_full)
return Status::PortFull;
if (always_read_till_end)
{
for (auto & input : inputs)
{
if (!input.isFinished())
{
input.setNeeded();
if (input.hasData())
std::ignore = input.pull();
return Status::NeedData;
}
}
}
for (auto & input : inputs)
input.close();

View File

@ -17,13 +17,15 @@ public:
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_);
UInt64 limit_hint_,
bool always_read_till_end_);
IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_);
UInt64 limit_hint_,
bool always_read_till_end_);
OutputPort & getOutputPort() { return outputs.front(); }
@ -67,6 +69,7 @@ private:
std::atomic<bool> have_all_inputs;
bool is_initialized = false;
UInt64 limit_hint = 0;
bool always_read_till_end = false;
IProcessor::Status prepareInitializeInputs();
};
@ -83,8 +86,9 @@ public:
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_,
bool always_read_till_end_,
Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
, algorithm(std::forward<Args>(args) ...)
{
}
@ -95,9 +99,10 @@ public:
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_,
bool always_read_till_end_,
bool empty_chunk_on_finish_,
Args && ... args)
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_)
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
, empty_chunk_on_finish(empty_chunk_on_finish_)
, algorithm(std::forward<Args>(args) ...)
{

View File

@ -14,6 +14,7 @@ MergingSortedTransform::MergingSortedTransform(
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
@ -24,6 +25,7 @@ MergingSortedTransform::MergingSortedTransform(
header,
have_all_inputs_,
limit_,
always_read_till_end_,
header,
num_inputs,
description_,

View File

@ -18,6 +18,7 @@ public:
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,

View File

@ -20,7 +20,7 @@ public:
bool use_average_block_sizes = false,
bool cleanup = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -45,7 +45,7 @@ public:
InputPort & getInputPort() { return inputs.front(); }
OutputPort & getOutputPort() { return outputs.front(); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
};
}

View File

@ -55,6 +55,10 @@ std::unique_ptr<QueryPlan> createLocalPlan(
auto query_plan = std::make_unique<QueryPlan>();
auto new_context = Context::createCopy(context);
/// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter.
if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit)
processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
/// Do not apply AST optimizations, because query
/// is already optimized and some optimizations
/// can be applied only for non-distributed tables

View File

@ -2,7 +2,6 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Interpreters/ActionsDAG.h>
#include <deque>
namespace DB::QueryPlanOptimizations

View File

@ -0,0 +1,155 @@
#include "ReadFromMemoryStorageStep.h"
#include <atomic>
#include <functional>
#include <memory>
#include <Interpreters/getColumnFromBlock.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/StorageMemory.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISource.h>
namespace DB
{
class MemorySource : public ISource
{
using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;
public:
MemorySource(
Names column_names_,
const StorageSnapshotPtr & storage_snapshot,
std::shared_ptr<const Blocks> data_,
std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
InitializerFunc initializer_func_ = {})
: ISource(storage_snapshot->getSampleBlockForColumns(column_names_))
, column_names_and_types(storage_snapshot->getColumnsByNames(
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withExtendedObjects(), column_names_))
, data(data_)
, parallel_execution_index(parallel_execution_index_)
, initializer_func(std::move(initializer_func_))
{
}
String getName() const override { return "Memory"; }
protected:
Chunk generate() override
{
if (initializer_func)
{
initializer_func(data);
initializer_func = {};
}
size_t current_index = getAndIncrementExecutionIndex();
if (!data || current_index >= data->size())
{
return {};
}
const Block & src = (*data)[current_index];
Columns columns;
size_t num_columns = column_names_and_types.size();
columns.reserve(num_columns);
auto name_and_type = column_names_and_types.begin();
for (size_t i = 0; i < num_columns; ++i)
{
columns.emplace_back(tryGetColumnFromBlock(src, *name_and_type));
++name_and_type;
}
fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
return Chunk(std::move(columns), src.rows());
}
private:
size_t getAndIncrementExecutionIndex()
{
if (parallel_execution_index)
{
return (*parallel_execution_index)++;
}
else
{
return execution_index++;
}
}
const NamesAndTypesList column_names_and_types;
size_t execution_index = 0;
std::shared_ptr<const Blocks> data;
std::shared_ptr<std::atomic<size_t>> parallel_execution_index;
InitializerFunc initializer_func;
};
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(Pipe pipe_) :
SourceStepWithFilter(DataStream{.header = pipe_.getHeader()}),
pipe(std::move(pipe_))
{
}
void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
// use move - make sure that the call will only be made once.
pipeline.init(std::move(pipe));
}
Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
const bool delay_read_for_global_sub_queries_)
{
storage_snapshot_->check(columns_to_read_);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot_->data);
auto current_data = snapshot_data.blocks;
if (delay_read_for_global_sub_queries_)
{
/// Note: for global subquery we use single source.
/// Mainly, the reason is that at this point table is empty,
/// and we don't know the number of blocks are going to be inserted into it.
///
/// It may seem to be not optimal, but actually data from such table is used to fill
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
return Pipe(std::make_shared<MemorySource>(
columns_to_read_,
storage_snapshot_,
nullptr /* data */,
nullptr /* parallel execution index */,
[current_data](std::shared_ptr<const Blocks> & data_to_initialize)
{
data_to_initialize = current_data;
}));
}
size_t size = current_data->size();
if (num_streams_ > size)
num_streams_ = size;
Pipes pipes;
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
for (size_t stream = 0; stream < num_streams_; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read_, storage_snapshot_, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <memory>
#include <Interpreters/TreeRewriter.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
namespace DB
{
class QueryPipelineBuilder;
class ReadFromMemoryStorageStep final : public SourceStepWithFilter
{
public:
explicit ReadFromMemoryStorageStep(Pipe pipe_);
ReadFromMemoryStorageStep() = delete;
ReadFromMemoryStorageStep(const ReadFromMemoryStorageStep &) = delete;
ReadFromMemoryStorageStep & operator=(const ReadFromMemoryStorageStep &) = delete;
ReadFromMemoryStorageStep(ReadFromMemoryStorageStep &&) = default;
ReadFromMemoryStorageStep & operator=(ReadFromMemoryStorageStep &&) = default;
String getName() const override { return name; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
static Pipe makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
private:
static constexpr auto name = "ReadFromMemoryStorage";
Pipe pipe;
};
}

View File

@ -98,11 +98,13 @@ SortingStep::SortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
UInt64 limit_,
bool always_read_till_end_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::MergingSorted)
, result_description(std::move(sort_description_))
, limit(limit_)
, always_read_till_end(always_read_till_end_)
, sort_settings(max_block_size_)
{
sort_settings.max_block_size = max_block_size_;
@ -175,7 +177,8 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
result_sort_desc,
sort_settings.max_block_size,
SortingQueueStrategy::Batch,
limit_);
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}
@ -262,7 +265,13 @@ void SortingStep::fullSort(
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getNumStreams(), result_sort_desc, sort_settings.max_block_size, SortingQueueStrategy::Batch, limit_);
pipeline.getHeader(),
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}

View File

@ -53,7 +53,9 @@ public:
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
UInt64 limit_ = 0,
bool always_read_till_end_ = false
);
String getName() const override { return "Sorting"; }
@ -100,6 +102,7 @@ private:
SortDescription prefix_description;
const SortDescription result_description;
UInt64 limit;
bool always_read_till_end = false;
Settings sort_settings;

View File

@ -107,6 +107,11 @@ void MySQLWithFailoverSource::onStart()
throw;
}
}
catch (const mysqlxx::BadQuery & e)
{
LOG_ERROR(log, "Error processing query '{}': {}", query_str, e.displayText());
throw;
}
}
initPositionMappingFromQueryResultStructure();

View File

@ -106,8 +106,13 @@ std::optional<Chunk> RemoteSource::tryGenerate()
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{
if (rows_before_limit && info.hasAppliedLimit())
rows_before_limit->set(info.getRowsBeforeLimit());
if (rows_before_limit)
{
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
else
manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit
}
});
query_executor->sendQuery();
@ -146,11 +151,15 @@ std::optional<Chunk> RemoteSource::tryGenerate()
if (!block)
{
if (manually_add_rows_before_limit_counter)
rows_before_limit->add(rows);
query_executor->finish(&read_context);
return {};
}
UInt64 num_rows = block.rows();
rows += num_rows;
Chunk chunk(block.getColumns(), num_rows);
if (add_aggregation_info)

View File

@ -3,7 +3,7 @@
#include <Processors/ISource.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <QueryPipeline/Pipe.h>
#include "Core/UUID.h"
#include <Core/UUID.h>
#include <atomic>
namespace DB
@ -29,7 +29,7 @@ public:
void connectToScheduler(InputPort & input_port);
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }
UUID getParallelReplicasGroupUUID();
@ -58,6 +58,8 @@ private:
std::unique_ptr<RemoteQueryExecutorReadContext> read_context;
UUID uuid;
int fd = -1;
size_t rows = 0;
bool manually_add_rows_before_limit_counter = false;
};
/// Totals source from RemoteQueryExecutor.

View File

@ -126,7 +126,7 @@ ColumnGathererTransform::ColumnGathererTransform(
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream"))
{

Some files were not shown because too many files have changed in this diff Show More