Merge branch 'master' of github.com:ClickHouse/ClickHouse into dynamic-data-type

This commit is contained in:
avogar 2024-05-16 22:03:59 +00:00
commit 4b178b5d35
137 changed files with 2455 additions and 956 deletions

View File

@ -89,7 +89,7 @@ PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
RemoveBracesLLVM: true
RemoveBracesLLVM: false
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements

View File

@ -138,6 +138,8 @@ Checks: [
# This is a good check, but clang-tidy crashes, see https://github.com/llvm/llvm-project/issues/91872
'-modernize-use-constraints',
# https://github.com/abseil/abseil-cpp/issues/1667
'-clang-analyzer-optin.core.EnumCastOutOfRange'
]
WarningsAsErrors: '*'

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit 40d8eadf96b127d9b22d53ce7a4fc52aaedea965
Subproject commit ba533a7246a2686b0552061809612f503804d26b

View File

@ -83,7 +83,7 @@ setup_minio() {
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
./mc mb --ignore-existing clickminio/test
if [ "$test_type" = "stateless" ]; then
./mc policy set public clickminio/test
fi

View File

@ -67,6 +67,8 @@ generates merged configuration file:
</clickhouse>
```
### Using from_env and from_zk
To specify that a value of an element should be replaced by the value of an environment variable, you can use attribute `from_env`.
Example with `$MAX_QUERY_SIZE = 150000`:
@ -93,6 +95,59 @@ which is equal to
</clickhouse>
```
The same is possible using `from_zk`:
``` xml
<clickhouse>
<postgresql_port from_zk="/zk_configs/postgresql_port"/>
</clickhouse>
```
```
# clickhouse-keeper-client
/ :) touch /zk_configs
/ :) create /zk_configs/postgresql_port "9005"
/ :) get /zk_configs/postgresql_port
9005
```
which is equal to
``` xml
<clickhouse>
<postgresql_port>9005</postgresql_port>
</clickhouse>
```
#### Default values for from_env and from_zk attributes
It's possible to set the default value and substitute it only if the environment variable or zookeeper node is set using `replace="1"`.
With previous example, but `MAX_QUERY_SIZE` is unset:
``` xml
<clickhouse>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" replace="1">150000</max_query_size>
</default>
</profiles>
</clickhouse>
```
will take the default value
``` xml
<clickhouse>
<profiles>
<default>
<max_query_size>150000</max_query_size>
</default>
</profiles>
</clickhouse>
```
## Substituting Configuration {#substitution}
The config can define substitutions. There are two types of substitutions:

View File

@ -32,20 +32,21 @@ WHERE name LIKE '%thread_pool%'
```
``` text
┌─name────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
│ max_thread_pool_size │ 10000 │ 10000 │ 0 │ The maximum number of threads that could be allocated from the OS and used for query execution and background operations. │ UInt64 │ No │ 0 │
│ max_thread_pool_free_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that will always stay in a global thread pool once allocated and remain idle in case of insufficient number of tasks. │ UInt64 │ No │ 0 │
│ thread_pool_queue_size │ 10000 │ 10000 │ 0 │ The maximum number of tasks that will be placed in a queue and wait for execution. │ UInt64 │ No │ 0 │
│ max_io_thread_pool_size │ 100 │ 100 │ 0 │ The maximum number of threads that would be used for IO operations │ UInt64 │ No │ 0 │
│ max_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for IO thread pool. │ UInt64 │ No │ 0 │
│ io_thread_pool_queue_size │ 10000 │ 10000 │ 0 │ Queue size for IO thread pool. │ UInt64 │ No │ 0 │
│ max_active_parts_loading_thread_pool_size │ 64 │ 64 │ 0 │ The number of threads to load active set of data parts (Active ones) at startup. │ UInt64 │ No │ 0 │
│ max_outdated_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Outdated ones) at startup. │ UInt64 │ No │ 0 │
│ max_parts_cleaning_thread_pool_size │ 128 │ 128 │ 0 │ The number of threads for concurrent removal of inactive data parts. │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that would be used for IO operations for BACKUP queries │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for backups IO thread pool. │ UInt64 │ No │ 0 │
│ backups_io_thread_pool_queue_size │ 0 │ 0 │ 0 │ Queue size for backups IO thread pool. │ UInt64 │ No │ 0 │
└─────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
┌─name──────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
│ max_thread_pool_size │ 10000 │ 10000 │ 0 │ The maximum number of threads that could be allocated from the OS and used for query execution and background operations. │ UInt64 │ No │ 0 │
│ max_thread_pool_free_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that will always stay in a global thread pool once allocated and remain idle in case of insufficient number of tasks. │ UInt64 │ No │ 0 │
│ thread_pool_queue_size │ 10000 │ 10000 │ 0 │ The maximum number of tasks that will be placed in a queue and wait for execution. │ UInt64 │ No │ 0 │
│ max_io_thread_pool_size │ 100 │ 100 │ 0 │ The maximum number of threads that would be used for IO operations │ UInt64 │ No │ 0 │
│ max_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for IO thread pool. │ UInt64 │ No │ 0 │
│ io_thread_pool_queue_size │ 10000 │ 10000 │ 0 │ Queue size for IO thread pool. │ UInt64 │ No │ 0 │
│ max_active_parts_loading_thread_pool_size │ 64 │ 64 │ 0 │ The number of threads to load active set of data parts (Active ones) at startup. │ UInt64 │ No │ 0 │
│ max_outdated_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Outdated ones) at startup. │ UInt64 │ No │ 0 │
│ max_unexpected_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Unexpected ones) at startup. │ UInt64 │ No │ 0 │
│ max_parts_cleaning_thread_pool_size │ 128 │ 128 │ 0 │ The number of threads for concurrent removal of inactive data parts. │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that would be used for IO operations for BACKUP queries │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for backups IO thread pool. │ UInt64 │ No │ 0 │
│ backups_io_thread_pool_queue_size │ 0 │ 0 │ 0 │ Queue size for backups IO thread pool. │ UInt64 │ No │ 0 │
└───────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
```

View File

@ -152,7 +152,7 @@ Configuration example:
**Syntax**
``` sql
cutToFirstSignificantSubdomain(URL, TLD)
cutToFirstSignificantSubdomainCustom(URL, TLD)
```
**Arguments**

View File

@ -248,6 +248,25 @@ FROM s3(
LIMIT 5;
```
## Working with archives
Suppose that we have several archive files with following URIs on S3:
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-10.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-11.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-12.csv.zip'
Extracting data from these archives is possible using ::. Globs can be used both in the url part as well as in the part after :: (responsible for the name of a file inside the archive).
``` sql
SELECT *
FROM s3(
'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
```
## Virtual Columns {#virtual-columns}
- `_path` — Path to the file. Type: `LowCardinalty(String)`.

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
@ -216,6 +221,8 @@ bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> &
node->args.push_back(threshold->as<ASTLiteral &>().value);
ParserToken{TokenType::Whitespace}.ignore(pos);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
@ -230,19 +237,23 @@ void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (!client->zookeeper->exists(path, &stat))
return; /// It is ok if node was deleted meanwhile
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
Strings children;
auto status = client->zookeeper->tryGetChildren(path, children);
if (status == Coordination::Error::ZNONODE)
return; /// It is ok if node was deleted meanwhile
else if (status != Coordination::Error::ZOK)
throw DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, "Error {} while getting children of {}", status, path.string());
std::sort(children.begin(), children.end());
auto next_query = *query;
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
@ -310,31 +321,34 @@ bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> &
return true;
}
/// DFS the subtree and return the number of nodes in the subtree
static Int64 traverse(const fs::path & path, KeeperClient * client, std::vector<std::tuple<Int64, String>> & result)
{
Int64 nodes_in_subtree = 1;
Strings children;
auto status = client->zookeeper->tryGetChildren(path, children);
if (status == Coordination::Error::ZNONODE)
return 0;
else if (status != Coordination::Error::ZOK)
throw DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, "Error {} while getting children of {}", status, path.string());
for (auto & child : children)
nodes_in_subtree += traverse(path / child, client, result);
result.emplace_back(nodes_in_subtree, path.string());
return nodes_in_subtree;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::vector<std::tuple<Int64, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
for (auto & child : children)
child = next_path / child;
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
traverse(path, client, result);
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)

View File

@ -160,6 +160,14 @@ void LocalServer::initialize(Poco::Util::Application & self)
getOutdatedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t unexpected_parts_loading_threads = config().getUInt("max_unexpected_parts_loading_thread_pool_size", 32);
getUnexpectedPartsLoadingThreadPool().initialize(
unexpected_parts_loading_threads,
0, // We don't need any threads one all the parts will be loaded
unexpected_parts_loading_threads);
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t cleanup_threads = config().getUInt("max_parts_cleaning_thread_pool_size", 128);
getPartsCleaningThreadPool().initialize(
cleanup_threads,

View File

@ -885,6 +885,16 @@ try
server_settings.max_active_parts_loading_thread_pool_size
);
getUnexpectedPartsLoadingThreadPool().initialize(
server_settings.max_unexpected_parts_loading_thread_pool_size,
0, // We don't need any threads once all the parts will be loaded
server_settings.max_unexpected_parts_loading_thread_pool_size);
/// It could grow if we need to synchronously wait until all the data parts will be loaded.
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(
server_settings.max_active_parts_loading_thread_pool_size
);
getPartsCleaningThreadPool().initialize(
server_settings.max_parts_cleaning_thread_pool_size,
0, // We don't need any threads one all the parts will be deleted

View File

@ -769,6 +769,7 @@ struct IdentifierResolveScope
/// Nodes with duplicated aliases
std::unordered_set<QueryTreeNodePtr> nodes_with_duplicated_aliases;
std::vector<QueryTreeNodePtr> cloned_nodes_with_duplicated_aliases;
/// Current scope expression in resolve process stack
ExpressionsStack expressions_in_resolve_process_stack;
@ -1031,6 +1032,14 @@ public:
return true;
}
private:
void addDuplicatingAlias(const QueryTreeNodePtr & node)
{
scope.nodes_with_duplicated_aliases.emplace(node);
auto cloned_node = node->clone();
scope.cloned_nodes_with_duplicated_aliases.emplace_back(cloned_node);
scope.nodes_with_duplicated_aliases.emplace(cloned_node);
}
void updateAliasesIfNeeded(const QueryTreeNodePtr & node, bool is_lambda_node)
{
if (!node->hasAlias())
@ -1045,21 +1054,21 @@ private:
if (is_lambda_node)
{
if (scope.alias_name_to_expression_node->contains(alias))
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
auto [_, inserted] = scope.alias_name_to_lambda_node.insert(std::make_pair(alias, node));
if (!inserted)
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
return;
}
if (scope.alias_name_to_lambda_node.contains(alias))
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
auto [_, inserted] = scope.alias_name_to_expression_node->insert(std::make_pair(alias, node));
if (!inserted)
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
/// If node is identifier put it also in scope alias name to lambda node map
if (node->getNodeType() == QueryTreeNodeType::IDENTIFIER)
@ -6254,6 +6263,10 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
result_projection_names.push_back(node_alias);
}
bool is_duplicated_alias = scope.nodes_with_duplicated_aliases.contains(node);
if (is_duplicated_alias)
scope.non_cached_identifier_lookups_during_expression_resolve.insert({Identifier{node_alias}, IdentifierLookupContext::EXPRESSION});
/** Do not use alias table if node has alias same as some other node.
* Example: WITH x -> x + 1 AS lambda SELECT 1 AS lambda;
* During 1 AS lambda resolve if we use alias table we replace node with x -> x + 1 AS lambda.
@ -6264,7 +6277,7 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
* alias table because in alias table subquery could be evaluated as scalar.
*/
bool use_alias_table = true;
if (scope.nodes_with_duplicated_aliases.contains(node) || (allow_table_expression && isSubqueryNodeType(node->getNodeType())))
if (is_duplicated_alias || (allow_table_expression && isSubqueryNodeType(node->getNodeType())))
use_alias_table = false;
if (!node_alias.empty() && use_alias_table)
@ -6568,6 +6581,9 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
}
}
if (is_duplicated_alias)
scope.non_cached_identifier_lookups_during_expression_resolve.erase({Identifier{node_alias}, IdentifierLookupContext::EXPRESSION});
resolved_expressions.emplace(node, result_projection_names);
scope.popExpressionNode();
@ -6600,7 +6616,6 @@ ProjectionNames QueryAnalyzer::resolveExpressionNodeList(QueryTreeNodePtr & node
{
auto node_to_resolve = node;
auto expression_node_projection_names = resolveExpressionNode(node_to_resolve, scope, allow_lambda_expression, allow_table_expression);
size_t expected_projection_names_size = 1;
if (auto * expression_list = node_to_resolve->as<ListNode>())
{
@ -8208,10 +8223,13 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
* After scope nodes are resolved, we can compare node with duplicate alias with
* node from scope alias table.
*/
for (const auto & node_with_duplicated_alias : scope.nodes_with_duplicated_aliases)
for (const auto & node_with_duplicated_alias : scope.cloned_nodes_with_duplicated_aliases)
{
auto node = node_with_duplicated_alias;
auto node_alias = node->getAlias();
/// Add current alias to non cached set, because in case of cyclic alias identifier should not be substituted from cache.
/// See 02896_cyclic_aliases_crash.
resolveExpressionNode(node, scope, true /*allow_lambda_expression*/, false /*allow_table_expression*/);
bool has_node_in_alias_table = false;

View File

@ -1242,8 +1242,9 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
}
else if (auto * explain_query = typeid_cast<ASTExplainQuery *>(ast.get()))
{
const auto & explained_query = explain_query->getExplainedQuery();
/// Fuzzing EXPLAIN query to SELECT query randomly
if (fuzz_rand() % 20 == 0 && explain_query->getExplainedQuery()->getQueryKind() == IAST::QueryKind::Select)
if (explained_query && explained_query->getQueryKind() == IAST::QueryKind::Select && fuzz_rand() % 20 == 0)
{
auto select_query = explain_query->getExplainedQuery()->clone();
fuzz(select_query);

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion, UInt64 server_revision)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
@ -60,7 +60,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
add_column("name", "data_type_families", false, {});
add_column("name", "merge_tree_settings", false, {});
add_column("name", "settings", false, {});
add_column("keyword", "keywords", false, {});
if (server_revision >= DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE)
add_column("keyword", "keywords", false, {});
if (!basic_suggestion)
{
@ -101,7 +103,11 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
auto connection = ConnectionType::createConnection(connection_parameters, my_context);
fetch(*connection,
connection_parameters.timeouts,
getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>),
getLoadSuggestionQuery(
suggestion_limit,
std::is_same_v<ConnectionType, LocalConnection>,
connection->getServerRevision(connection_parameters.timeouts)
),
my_context->getClientInfo());
}
catch (const Exception & e)
@ -146,7 +152,7 @@ void Suggest::load(IServerConnection & connection,
{
try
{
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true), client_info);
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true, connection.getServerRevision(timeouts)), client_info);
}
catch (...)
{

View File

@ -177,6 +177,9 @@
M(MergeTreeOutdatedPartsLoaderThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Outdated data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreads, "Number of threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Unexpected data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(MergeTreePartsCleanerThreadsScheduled, "Number of queued or active jobs in the MergeTree parts cleaner thread pool.") \

View File

@ -5,7 +5,6 @@
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/noexcept_scope.h>
#include <cassert>
#include <type_traits>
#include <Poco/Util/Application.h>
@ -437,6 +436,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
{
ALLOW_ALLOCATIONS_IN_SCOPE;
/// job can contain packaged_task which can set exception during destruction
job_data.reset();
}
job_is_done = true;
continue;
}

View File

@ -79,6 +79,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// Send read-only flag for Replicated tables as well
static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -86,6 +88,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54468;
}

View File

@ -25,6 +25,7 @@ namespace DB
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_active_parts_loading_thread_pool_size, 64, "The number of threads to load active set of data parts (Active ones) at startup.", 0) \
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The number of threads to load inactive set of data parts (Outdated ones) at startup.", 0) \
M(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, "The number of threads to load inactive set of data parts (Unexpected ones) at startup.", 0) \
M(UInt64, max_parts_cleaning_thread_pool_size, 128, "The number of threads for concurrent removal of inactive data parts.", 0) \
M(UInt64, max_mutations_bandwidth_for_server, 0, "The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_merges_bandwidth_for_server, 0, "The maximum read speed of all merges on server in bytes per second. Zero means unlimited.", 0) \

View File

@ -799,8 +799,8 @@ class IColumn;
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \
M(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), "Wait time to lock cache for sapce reservation for temporary data in filesystem cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for space reservation in filesystem cache", 0) \
M(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), "Wait time to lock cache for space reservation for temporary data in filesystem cache", 0) \
\
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \

View File

@ -139,7 +139,11 @@ namespace
S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
String endpoint_subpath;
if (config.has(config_prefix + ".endpoint_subpath"))
endpoint_subpath = context->getMacros()->expand(config.getString(config_prefix + ".endpoint_subpath"));
S3::URI uri(fs::path(endpoint) / endpoint_subpath);
/// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/'))

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNothing.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnString.h>
@ -3791,6 +3792,12 @@ private:
}
else if (const auto * from_array = typeid_cast<const DataTypeArray *>(from_type_untyped.get()))
{
if (typeid_cast<const DataTypeNothing *>(from_array->getNestedType().get()))
return [nested = to_type->getNestedType()](ColumnsWithTypeAndName &, const DataTypePtr &, const ColumnNullable *, size_t size)
{
return ColumnMap::create(nested->createColumnConstWithDefaultValue(size)->convertToFullColumnIfConst());
};
const auto * nested_tuple = typeid_cast<const DataTypeTuple *>(from_array->getNestedType().get());
if (!nested_tuple || nested_tuple->getElements().size() != 2)
throw Exception(

View File

@ -73,8 +73,9 @@ bool ParallelReadBuffer::addReaderToPool()
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(input, range_start, size));
++active_working_readers;
schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, Priority{});
/// increase number of workers only after we are sure that the reader was scheduled
++active_working_readers;
return true;
}

View File

@ -1,8 +1,7 @@
#include <IO/S3/URI.h>
#include <Poco/URI.h>
#include "Common/Macros.h"
#include <Interpreters/Context.h>
#include <Storages/NamedCollectionsHelpers.h>
#include "Common/Macros.h"
#if USE_AWS_S3
#include <Common/Exception.h>
#include <Common/quoteString.h>
@ -55,7 +54,11 @@ URI::URI(const std::string & uri_)
static constexpr auto OSS = "OSS";
static constexpr auto EOS = "EOS";
uri = Poco::URI(uri_);
if (containsArchive(uri_))
std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_);
else
uri_str = uri_;
uri = Poco::URI(uri_str);
std::unordered_map<std::string, std::string> mapper;
auto context = Context::getGlobalContextInstance();
@ -126,9 +129,10 @@ URI::URI(const std::string & uri_)
boost::to_upper(name);
/// For S3Express it will look like s3express-eun1-az1, i.e. contain region and AZ info
if (name != S3 && !name.starts_with(S3EXPRESS) && name != COS && name != OBS && name != OSS && name != EOS)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
if (name == COS)
storage_name = COSN;
@ -156,10 +160,40 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri)
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket),
!uri.empty() ? " (" + uri.toString() + ")" : "");
}
bool URI::containsArchive(const std::string & source)
{
size_t pos = source.find("::");
return (pos != std::string::npos);
}
std::pair<std::string, std::string> URI::getPathToArchiveAndArchivePattern(const std::string & source)
{
size_t pos = source.find("::");
assert(pos != std::string::npos);
std::string path_to_archive = source.substr(0, pos);
while ((!path_to_archive.empty()) && path_to_archive.ends_with(' '))
path_to_archive.pop_back();
if (path_to_archive.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2);
while (path_in_archive_view.front() == ' ')
path_in_archive_view.remove_prefix(1);
if (path_in_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
return {path_to_archive, std::string{path_in_archive_view}};
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <string>
#include "config.h"
@ -28,6 +29,8 @@ struct URI
std::string key;
std::string version_id;
std::string storage_name;
std::optional<std::string> archive_pattern;
std::string uri_str;
bool is_virtual_hosted_style;
@ -36,6 +39,10 @@ struct URI
void addRegionToURI(const std::string & region);
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
private:
bool containsArchive(const std::string & source);
std::pair<std::string, std::string> getPathToArchiveAndArchivePattern(const std::string & source);
};
}

View File

@ -20,6 +20,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeOutdatedPartsLoaderThreads;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsScheduled;
extern const Metric MergeTreeUnexpectedPartsLoaderThreads;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
@ -151,6 +154,12 @@ StaticThreadPool & getOutdatedPartsLoadingThreadPool()
return instance;
}
StaticThreadPool & getUnexpectedPartsLoadingThreadPool()
{
static StaticThreadPool instance("MergeTreeUnexpectedPartsLoaderThreadPool", CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreads, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsActive, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsScheduled);
return instance;
}
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
{
static StaticThreadPool instance("CreateTablesThreadPool", CurrentMetrics::DatabaseReplicatedCreateTablesThreads, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsActive, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsScheduled);

View File

@ -64,6 +64,8 @@ StaticThreadPool & getPartsCleaningThreadPool();
/// the number of threads by calling enableTurboMode() :-)
StaticThreadPool & getOutdatedPartsLoadingThreadPool();
StaticThreadPool & getUnexpectedPartsLoadingThreadPool();
/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();

View File

@ -21,6 +21,9 @@
#include <base/sort.h>
#include <Common/JSONBuilder.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
namespace DB
{
@ -708,16 +711,18 @@ static ColumnWithTypeAndName executeActionForPartialResult(const ActionsDAG::Nod
return res_column;
}
Block ActionsDAG::updateHeader(Block header) const
Block ActionsDAG::updateHeader(const Block & header) const
{
IntermediateExecutionResult node_to_column;
std::set<size_t> pos_to_remove;
{
std::unordered_map<std::string_view, std::list<size_t>> input_positions;
using inline_vector = absl::InlinedVector<size_t, 7>; // 64B, holding max 7 size_t elements inlined
absl::flat_hash_map<std::string_view, inline_vector> input_positions;
for (size_t pos = 0; pos < inputs.size(); ++pos)
input_positions[inputs[pos]->result_name].emplace_back(pos);
/// We insert from last to first in the inlinedVector so it's easier to pop_back matches later
for (size_t pos = inputs.size(); pos != 0; pos--)
input_positions[inputs[pos - 1]->result_name].emplace_back(pos - 1);
for (size_t pos = 0; pos < header.columns(); ++pos)
{
@ -725,10 +730,11 @@ Block ActionsDAG::updateHeader(Block header) const
auto it = input_positions.find(col.name);
if (it != input_positions.end() && !it->second.empty())
{
auto & list = it->second;
pos_to_remove.insert(pos);
node_to_column[inputs[list.front()]] = col;
list.pop_front();
auto & v = it->second;
node_to_column[inputs[v.back()]] = col;
v.pop_back();
}
}
}
@ -746,18 +752,21 @@ Block ActionsDAG::updateHeader(Block header) const
throw;
}
if (isInputProjected())
header.clear();
else
header.erase(pos_to_remove);
Block res;
res.reserve(result_columns.size());
for (auto & col : result_columns)
res.insert(std::move(col));
for (auto && item : header)
res.insert(std::move(item));
if (isInputProjected())
return res;
res.reserve(header.columns() - pos_to_remove.size());
for (size_t i = 0; i < header.columns(); i++)
{
if (!pos_to_remove.contains(i))
res.insert(header.data[i]);
}
return res;
}

View File

@ -272,7 +272,7 @@ public:
///
/// In addition, check that result constants are constants according to DAG.
/// In case if function return constant, but arguments are not constant, materialize it.
Block updateHeader(Block header) const;
Block updateHeader(const Block & header) const;
using IntermediateExecutionResult = std::unordered_map<const Node *, ColumnWithTypeAndName>;
static ColumnsWithTypeAndName evaluatePartialResult(

View File

@ -667,11 +667,7 @@ namespace
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
TimePoint flush_time,
const String & flush_query_id,
const String & flush_exception)
AsynchronousInsertLog & log, std::vector<AsynchronousInsertLogElement> elements, TimePoint flush_time, const String & flush_exception)
try
{
using Status = AsynchronousInsertLogElement::Status;
@ -680,7 +676,6 @@ try
{
elem.flush_time = timeInSeconds(flush_time);
elem.flush_time_microseconds = timeInMicroseconds(flush_time);
elem.flush_query_id = flush_query_id;
elem.exception = flush_exception;
elem.status = flush_exception.empty() ? Status::Ok : Status::FlushError;
log.add(std::move(elem));
@ -808,12 +803,12 @@ try
throw;
}
auto add_entry_to_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
auto add_entry_to_asynchronous_insert_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
{
if (!async_insert_log)
return;
@ -831,6 +826,7 @@ try
elem.exception = exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = timeout_ms.count();
elem.flush_query_id = insert_query_id;
/// If there was a parsing error,
/// the entry won't be flushed anyway,
@ -857,7 +853,7 @@ try
if (!log_elements.empty())
{
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, insert_query_id, "");
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, "");
}
};
@ -865,15 +861,27 @@ try
auto header = pipeline.getHeader();
if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_log);
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
else
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_log);
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
{
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
};
if (chunk.getNumRows() == 0)
{
finish_entries();
log_and_add_finish_to_query_log(0, 0);
return;
}
@ -888,12 +896,7 @@ try
CompletedPipelineExecutor completed_executor(pipeline);
completed_executor.execute();
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
log_and_add_finish_to_query_log(num_rows, num_bytes);
}
catch (...)
{
@ -903,7 +906,7 @@ try
{
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, insert_query_id, exception);
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, exception);
}
throw;
}

View File

@ -4467,7 +4467,7 @@ void Context::setApplicationType(ApplicationType type)
/// Lock isn't required, you should set it at start
shared->application_type = type;
if (type == ApplicationType::LOCAL || type == ApplicationType::SERVER)
if (type == ApplicationType::LOCAL || type == ApplicationType::SERVER || type == ApplicationType::DISKS)
shared->server_settings.loadSettingsFromConfig(Poco::Util::Application::instance().config());
if (type == ApplicationType::SERVER)

View File

@ -2487,10 +2487,15 @@ HashJoin::~HashJoin()
{
if (!data)
{
LOG_TRACE(log, "{}Join data has been already released", instance_log_id);
LOG_TEST(log, "{}Join data has been already released", instance_log_id);
return;
}
LOG_TRACE(log, "{}Join data is being destroyed, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
LOG_TEST(
log,
"{}Join data is being destroyed, {} bytes and {} rows in hash table",
instance_log_id,
getTotalByteCount(),
getTotalRowCount());
}
template <typename Mapped>

View File

@ -505,7 +505,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription &
}
ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode)
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode, bool is_restore_from_backup)
{
/// First, deduce implicit types.
@ -514,7 +514,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NamesAndTypesList column_names_and_types;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().data_type_default_nullable;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable;
bool has_columns_with_default_without_type = false;
for (const auto & ast : columns_ast.children)
@ -694,7 +694,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
res.add(std::move(column));
}
if (mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().flatten_nested)
if (mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().flatten_nested)
res.flattenNested();
@ -739,7 +739,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
if (create.columns_list->columns)
{
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode);
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode, is_restore_from_backup);
}
if (create.columns_list->indices)

View File

@ -74,7 +74,7 @@ public:
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode);
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode, bool is_restore_from_backup = false);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);

View File

@ -700,8 +700,10 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
/// We need to check type of masks before `addConditionJoinColumn`, because it assumes that types is correct
JoinCommon::checkTypesOfMasks(block, mask_column_name_left, right_sample_block, mask_column_name_right);
/// Add auxiliary column, will be removed after joining
addConditionJoinColumn(block, JoinTableSide::Left);
if (!not_processed)
/// Add an auxiliary column, which will be removed after joining
/// We do not need to add it twice when we are continuing to process the block from the previous iteration
addConditionJoinColumn(block, JoinTableSide::Left);
/// Types of keys can be checked only after `checkTypesOfKeys`
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);

View File

@ -808,12 +808,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool is_create_parameterized_view = false;
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
is_create_parameterized_view = create_query->isParameterizedView();
}
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
assert(!explain_query->children.empty());
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
if (!explain_query->children.empty())
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
}
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.

View File

@ -107,6 +107,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!columns_p.parse(pos, columns, expected))
return false;
/// Optional trailing comma
ParserToken(TokenType::Comma).ignore(pos);
if (!s_rparen.ignore(pos, expected))
return false;
}

View File

@ -4,20 +4,6 @@
namespace DB
{
Tokens::Tokens(const char * begin, const char * end, size_t max_query_size, bool skip_insignificant)
{
Lexer lexer(begin, end, max_query_size);
bool stop = false;
do
{
Token token = lexer.nextToken();
stop = token.isEnd() || token.type == TokenType::ErrorMaxQuerySizeExceeded;
if (token.isSignificant() || (!skip_insignificant && !data.empty() && data.back().isSignificant()))
data.emplace_back(std::move(token));
} while (!stop);
}
UnmatchedParentheses checkUnmatchedParentheses(TokenIterator begin)
{
/// We have just two kind of parentheses: () and [].

View File

@ -15,25 +15,44 @@ namespace DB
*/
/** Used as an input for parsers.
* All whitespace and comment tokens are transparently skipped.
* All whitespace and comment tokens are transparently skipped if `skip_insignificant`.
*/
class Tokens
{
private:
std::vector<Token> data;
std::size_t last_accessed_index = 0;
Lexer lexer;
bool skip_insignificant;
public:
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant = true);
ALWAYS_INLINE inline const Token & operator[](size_t index)
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant_ = true)
: lexer(begin, end, max_query_size), skip_insignificant(skip_insignificant_)
{
assert(index < data.size());
last_accessed_index = std::max(last_accessed_index, index);
return data[index];
}
ALWAYS_INLINE inline const Token & max() { return data[last_accessed_index]; }
const Token & operator[] (size_t index)
{
while (true)
{
if (index < data.size())
return data[index];
if (!data.empty() && data.back().isEnd())
return data.back();
Token token = lexer.nextToken();
if (!skip_insignificant || token.isSignificant())
data.emplace_back(token);
}
}
const Token & max()
{
if (data.empty())
return (*this)[0];
return data.back();
}
};

View File

@ -1229,8 +1229,9 @@ void Planner::buildQueryPlanIfNeeded()
if (query_plan.isInitialized())
return;
LOG_TRACE(getLogger("Planner"), "Query {} to stage {}{}",
query_tree->formatConvertedASTForErrorMessage(),
LOG_TRACE(
getLogger("Planner"),
"Query to stage {}{}",
QueryProcessingStage::toString(select_query_options.to_stage),
select_query_options.only_analyze ? " only analyze" : "");
@ -1506,8 +1507,9 @@ void Planner::buildPlanForQueryNode()
auto & mapping = join_tree_query_plan.query_node_to_plan_step_mapping;
query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
LOG_TRACE(getLogger("Planner"), "Query {} from stage {} to stage {}{}",
query_tree->formatConvertedASTForErrorMessage(),
LOG_TRACE(
getLogger("Planner"),
"Query from stage {} to stage {}{}",
QueryProcessingStage::toString(from_stage),
QueryProcessingStage::toString(select_query_options.to_stage),
select_query_options.only_analyze ? " only analyze" : "");

View File

@ -1,24 +1,25 @@
#include <Planner/findQueryForParallelReplicas.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/Utils.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/queryToString.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/Utils.h>
#include <Planner/findQueryForParallelReplicas.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/buildQueryTreeForShard.h>
namespace DB
{
@ -316,7 +317,8 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que
case QueryTreeNodeType::TABLE:
{
const auto & table_node = query_tree_node->as<TableNode &>();
const auto & storage = table_node.getStorage();
const auto * as_mat_view = typeid_cast<const StorageMaterializedView *>(table_node.getStorage().get());
const auto & storage = as_mat_view ? as_mat_view->getTargetTable() : table_node.getStorage();
if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
return &table_node;

View File

@ -13,10 +13,14 @@ namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp(format_settings.regexp.regexp), skip_unmatched(format_settings.regexp.skip_unmatched)
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp_str(format_settings.regexp.regexp), regexp(regexp_str), skip_unmatched(format_settings.regexp.skip_unmatched)
{
if (regexp_str.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The regular expression is not set for the `Regexp` format. It requires setting the value of the `format_regexp` setting.");
size_t fields_count = regexp.NumberOfCapturingGroups();
matched_fields.resize(fields_count);
re2_arguments.resize(fields_count);
@ -58,8 +62,8 @@ bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf)
static_cast<int>(re2_arguments_ptrs.size()));
if (!match && !skip_unmatched)
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp.",
std::string(buf.position(), line_to_match));
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp: `{}`",
std::string(buf.position(), line_to_match), regexp_str);
buf.position() += line_size;
if (!buf.eof() && !checkChar('\n', buf))

View File

@ -31,6 +31,7 @@ public:
size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); }
private:
String regexp_str;
const re2::RE2 regexp;
// The vector of fields extracted from line using regexp.
std::vector<std::string_view> matched_fields;

View File

@ -572,9 +572,16 @@ bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
skipWhitespaceIfAny(*buf);
if (likely(column_idx + 1 != num_columns))
{
return checkChar(',', *buf);
}
else
{
/// Optional trailing comma.
if (checkChar(',', *buf))
skipWhitespaceIfAny(*buf);
return checkChar(')', *buf);
}
}
bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)

View File

@ -262,10 +262,6 @@ static size_t tryPushDownOverJoinStep(QueryPlan::Node * parent_node, QueryPlan::
{
const auto & left_table_key_name = join_clause.key_names_left[i];
const auto & right_table_key_name = join_clause.key_names_right[i];
if (!join_header.has(left_table_key_name) || !join_header.has(right_table_key_name))
continue;
const auto & left_table_column = left_stream_input_header.getByName(left_table_key_name);
const auto & right_table_column = right_stream_input_header.getByName(right_table_key_name);
@ -338,9 +334,9 @@ static size_t tryPushDownOverJoinStep(QueryPlan::Node * parent_node, QueryPlan::
auto join_filter_push_down_actions = filter->getExpression()->splitActionsForJOINFilterPushDown(filter->getFilterColumnName(),
filter->removesFilterColumn(),
left_stream_available_columns_to_push_down,
left_stream_input_header.getColumnsWithTypeAndName(),
left_stream_input_header,
right_stream_available_columns_to_push_down,
right_stream_input_header.getColumnsWithTypeAndName(),
right_stream_input_header,
equivalent_columns_to_push_down,
equivalent_left_stream_column_to_right_stream_column,
equivalent_right_stream_column_to_left_stream_column);

View File

@ -64,36 +64,61 @@ namespace
return non_const_columns;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool compareAggregationKeysWithDistinctColumns(
const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector<std::vector<ActionsDAGPtr>> actions_chain)
{
logDebug("aggregation_keys", aggregation_keys);
logDebug("aggregation_keys size", aggregation_keys.size());
logDebug("distinct_columns size", distinct_columns.size());
std::set<std::string_view> original_distinct_columns;
FindOriginalNodeForOutputName original_node_finder(path_actions);
for (const auto & column : distinct_columns)
std::set<String> current_columns(begin(distinct_columns), end(distinct_columns));
std::set<String> source_columns;
for (auto & actions : actions_chain)
{
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
FindOriginalNodeForOutputName original_node_finder(buildActionsForPlanPath(actions));
for (const auto & column : current_columns)
{
logDebug("original name for alias is not found", column);
original_distinct_columns.insert(column);
}
else
{
logDebug("alias result name", alias_node->result_name);
original_distinct_columns.insert(alias_node->result_name);
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
{
logDebug("original name for alias is not found", column);
source_columns.insert(String(column));
}
else
{
logDebug("alias result name", alias_node->result_name);
source_columns.insert(alias_node->result_name);
}
}
current_columns = std::move(source_columns);
source_columns.clear();
}
/// if aggregation keys are part of distinct columns then rows already distinct
for (const auto & key : aggregation_keys)
{
if (!original_distinct_columns.contains(key))
if (!current_columns.contains(key))
{
logDebug("aggregation key NOT found: {}", key);
logDebug("aggregation key NOT found", key);
return false;
}
}
@ -122,30 +147,13 @@ namespace
return false;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool passTillAggregation(const QueryPlan::Node * distinct_node)
{
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
chassert(distinct_step);
std::vector<ActionsDAGPtr> dag_stack;
std::vector<std::vector<ActionsDAGPtr>> actions_chain;
const DistinctStep * inner_distinct_step = nullptr;
const IQueryPlanStep * aggregation_before_distinct = nullptr;
const QueryPlan::Node * node = distinct_node;
@ -163,6 +171,12 @@ namespace
break;
}
if (typeid_cast<const WindowStep *>(current_step))
{
actions_chain.push_back(std::move(dag_stack));
dag_stack.clear();
}
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
@ -177,16 +191,22 @@ namespace
if (aggregation_before_distinct)
{
ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
logActionsDAG("aggregation pass: merged DAG", actions);
if (actions_chain.empty())
actions_chain.push_back(std::move(dag_stack));
const auto distinct_columns = getDistinctColumns(distinct_step);
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
aggregating_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
merging_aggregated_step)
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
merging_aggregated_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
}
return false;

View File

@ -128,15 +128,21 @@ class IndexAccess
public:
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_)
{
/// Some suffix of index columns might not be loaded (see `primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns`)
/// and we need to use the same set of index columns across all parts.
/// Indices might be reloaded during the process and the reload might produce a different value
/// (change in `primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns`). Also, some suffix of index
/// columns might not be loaded (same setting) so we keep a reference to the current indices and
/// track the minimal subset of loaded columns across all parts.
indices.reserve(parts.size());
for (const auto & part : parts)
loaded_columns = std::min(loaded_columns, part.data_part->getIndex()->size());
indices.push_back(part.data_part->getIndex());
for (const auto & index : indices)
loaded_columns = std::min(loaded_columns, index->size());
}
Values getValue(size_t part_idx, size_t mark) const
{
const auto & index = parts[part_idx].data_part->getIndex();
const auto & index = indices[part_idx];
chassert(index->size() >= loaded_columns);
Values values(loaded_columns);
for (size_t i = 0; i < loaded_columns; ++i)
@ -206,6 +212,7 @@ public:
}
private:
const RangesInDataParts & parts;
std::vector<IMergeTreeDataPart::Index> indices;
size_t loaded_columns = std::numeric_limits<size_t>::max();
};

View File

@ -21,7 +21,7 @@ Block SourceStepWithFilter::applyPrewhereActions(Block block, const PrewhereInfo
{
if (prewhere_info->row_level_filter)
{
block = prewhere_info->row_level_filter->updateHeader(std::move(block));
block = prewhere_info->row_level_filter->updateHeader(block);
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
if (!row_level_column.type->canBeUsedInBooleanContext())
{
@ -36,7 +36,7 @@ Block SourceStepWithFilter::applyPrewhereActions(Block block, const PrewhereInfo
if (prewhere_info->prewhere_actions)
{
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
block = prewhere_info->prewhere_actions->updateHeader(block);
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())

View File

@ -3,9 +3,9 @@
namespace DB
{
Block ExpressionTransform::transformHeader(Block header, const ActionsDAG & expression)
Block ExpressionTransform::transformHeader(const Block & header, const ActionsDAG & expression)
{
return expression.updateHeader(std::move(header));
return expression.updateHeader(header);
}

View File

@ -24,7 +24,7 @@ public:
String getName() const override { return "ExpressionTransform"; }
static Block transformHeader(Block header, const ActionsDAG & expression);
static Block transformHeader(const Block & header, const ActionsDAG & expression);
protected:
void transform(Chunk & chunk) override;

View File

@ -174,26 +174,22 @@ static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
}
Block FilterTransform::transformHeader(
Block header,
const ActionsDAG * expression,
const String & filter_column_name,
bool remove_filter_column)
const Block & header, const ActionsDAG * expression, const String & filter_column_name, bool remove_filter_column)
{
if (expression)
header = expression->updateHeader(std::move(header));
Block result = expression ? expression->updateHeader(header) : header;
auto filter_type = header.getByName(filter_column_name).type;
auto filter_type = result.getByName(filter_column_name).type;
if (!filter_type->onlyNull() && !isUInt8(removeNullable(removeLowCardinality(filter_type))))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column {} for filter. Must be UInt8 or Nullable(UInt8).",
filter_type->getName(), filter_column_name);
if (remove_filter_column)
header.erase(filter_column_name);
result.erase(filter_column_name);
else
replaceFilterToConstant(header, filter_column_name);
replaceFilterToConstant(result, filter_column_name);
return header;
return result;
}
FilterTransform::FilterTransform(

View File

@ -22,11 +22,8 @@ public:
const Block & header_, ExpressionActionsPtr expression_, String filter_column_name_,
bool remove_filter_column_, bool on_totals_ = false, std::shared_ptr<std::atomic<size_t>> rows_filtered_ = nullptr);
static Block transformHeader(
Block header,
const ActionsDAG * expression,
const String & filter_column_name,
bool remove_filter_column);
static Block
transformHeader(const Block & header, const ActionsDAG * expression, const String & filter_column_name, bool remove_filter_column);
String getName() const override { return "FilterTransform"; }

View File

@ -14,12 +14,12 @@ namespace ErrorCodes
Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
{
LOG_DEBUG(getLogger("JoiningTransform"), "Before join block: '{}'", header.dumpStructure());
LOG_TEST(getLogger("JoiningTransform"), "Before join block: '{}'", header.dumpStructure());
join->checkTypesOfKeys(header);
join->initialize(header);
ExtraBlockPtr tmp;
join->joinBlock(header, tmp);
LOG_DEBUG(getLogger("JoiningTransform"), "After join block: '{}'", header.dumpStructure());
LOG_TEST(getLogger("JoiningTransform"), "After join block: '{}'", header.dumpStructure());
return header;
}

View File

@ -338,8 +338,6 @@ static void prepareChunk(Chunk & chunk)
void MergeJoinAlgorithm::initialize(Inputs inputs)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{}: {} - '{}'", __FILE__, __LINE__, 0, inputs[0].chunk.dumpStructure());
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{}: {} - '{}'", __FILE__, __LINE__, 1, inputs[1].chunk.dumpStructure());
if (inputs.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Two inputs are required, got {}", inputs.size());
@ -351,8 +349,6 @@ void MergeJoinAlgorithm::initialize(Inputs inputs)
void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{}: {} - '{}'", __FILE__, __LINE__, source_num, input.chunk.dumpStructure());
if (input.skip_last_row)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "skip_last_row is not supported");
@ -816,15 +812,9 @@ IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
if (!cursors[1]->cursor.isValid() && !cursors[1]->fullyCompleted())
return Status(1);
for (size_t i = 0; i < 2; ++i)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{}: sampleColumns {} '{}'", __FILE__, __LINE__, i, cursors[i]->sampleBlock().dumpStructure());
}
if (auto result = handleAllJoinState())
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{}: '{}'", __FILE__, __LINE__, result ? result->chunk.dumpStructure() : "NA");
return std::move(*result);
}

View File

@ -49,7 +49,7 @@ Block TotalsHavingTransform::transformHeader(
if (expression)
{
block = expression->updateHeader(std::move(block));
block = expression->updateHeader(block);
if (remove_filter)
block.erase(filter_column_name);
}

View File

@ -17,6 +17,7 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <parquet/arrow/reader.h>
#include <ranges>
#include <filesystem>
namespace fs = std::filesystem;

View File

@ -54,7 +54,7 @@ public:
{
std::lock_guard lock(configuration_update_mutex);
updateConfigurationImpl(local_context);
return Storage::getConfiguration();
return Storage::getConfigurationCopy();
}
void updateConfiguration(const ContextPtr & local_context) override
@ -106,7 +106,7 @@ private:
const bool updated = base_configuration.update(local_context);
auto new_keys = getDataFiles(base_configuration, local_context);
if (!updated && new_keys == Storage::getConfiguration().keys)
if (!updated && new_keys == Storage::getConfigurationCopy().keys)
return;
Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));

View File

@ -31,16 +31,17 @@
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
IcebergMetadata::IcebergMetadata(

View File

@ -63,7 +63,7 @@ public:
{
std::lock_guard lock(configuration_update_mutex);
updateConfigurationImpl(local_context);
return StorageS3::getConfiguration();
return StorageS3::getConfigurationCopy();
}
void updateConfiguration(const ContextPtr & local_context) override

View File

@ -10,6 +10,7 @@
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <filesystem>
namespace DB
{

View File

@ -346,16 +346,25 @@ IMergeTreeDataPart::Index IMergeTreeDataPart::getIndex() const
if (!index_loaded)
loadIndex();
index_loaded = true;
return TSA_SUPPRESS_WARNING_FOR_READ(index); /// The variable is guaranteed to be unchanged after return.
return index;
}
void IMergeTreeDataPart::setIndex(Index index_)
void IMergeTreeDataPart::setIndex(const Columns & cols_)
{
std::scoped_lock lock(index_mutex);
if (!index->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "The index of data part can be set only once");
index = index_;
index = std::make_shared<const Columns>(cols_);
index_loaded = true;
}
void IMergeTreeDataPart::setIndex(Columns && cols_)
{
std::scoped_lock lock(index_mutex);
if (!index->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "The index of data part can be set only once");
index = std::make_shared<const Columns>(std::move(cols_));
index_loaded = true;
}
@ -913,7 +922,7 @@ void IMergeTreeDataPart::loadIndex() const
if (!index_file->eof())
throw Exception(ErrorCodes::EXPECTED_END_OF_FILE, "Index file {} is unexpectedly long", index_path);
index->assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
index = std::make_shared<Columns>(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
}
@ -1260,6 +1269,33 @@ void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
files.push_back("checksums.txt");
}
void IMergeTreeDataPart::loadRowsCountFileForUnexpectedPart()
{
auto read_rows_count = [&]()
{
auto buf = metadata_manager->read("count.txt");
readIntText(rows_count, *buf);
assertEOF(*buf);
};
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::Compact || parent_part)
{
if (metadata_manager->exists("count.txt"))
{
read_rows_count();
return;
}
}
else
{
if (getDataPartStorage().exists("count.txt"))
{
read_rows_count();
return;
}
}
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No count.txt in part {}", name);
}
void IMergeTreeDataPart::loadRowsCount()
{
auto read_rows_count = [&]()

View File

@ -79,7 +79,7 @@ public:
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
using NameToNumber = std::unordered_map<std::string, size_t>;
using Index = std::shared_ptr<Columns>;
using Index = std::shared_ptr<const Columns>;
using IndexSizeByName = std::unordered_map<std::string, ColumnSize>;
using Type = MergeTreeDataPartType;
@ -187,6 +187,8 @@ public:
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
void loadRowsCountFileForUnexpectedPart();
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
@ -372,7 +374,8 @@ public:
int32_t metadata_version;
Index getIndex() const;
void setIndex(Index index_);
void setIndex(const Columns & cols_);
void setIndex(Columns && cols_);
void unloadIndex();
/// For data in RAM ('index')

View File

@ -1312,6 +1312,46 @@ static constexpr size_t loading_parts_initial_backoff_ms = 100;
static constexpr size_t loading_parts_max_backoff_ms = 5000;
static constexpr size_t loading_parts_max_tries = 3;
void MergeTreeData::loadUnexpectedDataPart(UnexpectedPartLoadState & state)
{
const MergeTreePartInfo & part_info = state.loading_info->info;
const String & part_name = state.loading_info->name;
const DiskPtr & part_disk_ptr = state.loading_info->disk;
LOG_TRACE(log, "Loading unexpected part {} from disk {}", part_name, part_disk_ptr->getName());
LoadPartResult res;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDiskFull>(single_disk_volume, relative_data_path, part_name);
String part_path = fs::path(relative_data_path) / part_name;
try
{
state.part = getDataPartBuilder(part_name, single_disk_volume, part_name)
.withPartInfo(part_info)
.withPartFormatFromDisk()
.build();
state.part->loadRowsCountFileForUnexpectedPart();
}
catch (...)
{
LOG_DEBUG(log, "Failed to load unexcepted data part {} with exception: {}", part_name, getExceptionMessage(std::current_exception(), false));
if (!state.part)
{
/// Build a fake part and mark it as broken in case of filesystem error.
/// If the error impacts part directory instead of single files,
/// an exception will be thrown during detach and silently ignored.
state.part = getDataPartBuilder(part_name, single_disk_volume, part_name)
.withPartStorageType(MergeTreeDataPartStorageType::Full)
.withPartType(MergeTreeDataPartType::Wide)
.build();
}
state.is_broken = true;
tryLogCurrentException(log, fmt::format("while loading unexcepted part {} on path {}", part_name, part_path));
}
}
MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
@ -1704,6 +1744,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
}
std::vector<PartLoadingTree::PartLoadingInfos> parts_to_load_by_disk(disks.size());
std::vector<PartLoadingTree::PartLoadingInfos> unexpected_parts_to_load_by_disk(disks.size());
ThreadPoolCallbackRunnerLocal<void> runner(getActivePartsLoadingThreadPool().get(), "ActiveParts");
@ -1714,6 +1755,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
continue;
auto & disk_parts = parts_to_load_by_disk[i];
auto & unexpected_disk_parts = unexpected_parts_to_load_by_disk[i];
runner([&, disk_ptr]()
{
@ -1725,7 +1767,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
continue;
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
disk_parts.emplace_back(*part_info, it->name(), disk_ptr);
{
if (expected_parts && !expected_parts->contains(it->name()))
unexpected_disk_parts.emplace_back(*part_info, it->name(), disk_ptr);
else
disk_parts.emplace_back(*part_info, it->name(), disk_ptr);
}
}
}, Priority{0});
}
@ -1736,6 +1783,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
PartLoadingTree::PartLoadingInfos parts_to_load;
for (auto & disk_parts : parts_to_load_by_disk)
std::move(disk_parts.begin(), disk_parts.end(), std::back_inserter(parts_to_load));
PartLoadingTree::PartLoadingInfos unexpected_parts_to_load;
for (auto & disk_parts : unexpected_parts_to_load_by_disk)
std::move(disk_parts.begin(), disk_parts.end(), std::back_inserter(unexpected_parts_to_load));
auto loading_tree = PartLoadingTree::build(std::move(parts_to_load));
@ -1811,7 +1861,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
}
}
if (num_parts == 0)
if (num_parts == 0 && unexpected_parts_to_load.empty())
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
@ -1864,6 +1914,36 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
calculateColumnAndSecondaryIndexSizesImpl();
PartLoadingTreeNodes unloaded_parts;
std::vector<UnexpectedPartLoadState> unexpected_unloaded_data_parts;
for (const auto & [info, name, disk] : unexpected_parts_to_load)
{
bool uncovered = true;
for (const auto & part : unexpected_parts_to_load)
{
if (name != part.name && part.info.contains(info))
{
uncovered = false;
break;
}
}
unexpected_unloaded_data_parts.push_back({std::make_shared<PartLoadingTree::Node>(info, name, disk), uncovered, /*is_broken*/ false, /*part*/ nullptr});
}
if (!unexpected_unloaded_data_parts.empty())
{
LOG_DEBUG(log, "Found {} unexpected data parts. They will be loaded asynchronously", unexpected_unloaded_data_parts.size());
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts = std::move(unexpected_unloaded_data_parts);
unexpected_data_parts_loading_finished = false;
}
unexpected_data_parts_loading_task = getContext()->getSchedulePool().createTask(
"MergeTreeData::loadUnexpectedDataParts",
[this] { loadUnexpectedDataParts(); });
}
loading_tree.traverse(/*recursive=*/ true, [&](const auto & node)
{
if (!node->is_loaded)
@ -1889,6 +1969,54 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
data_parts_loading_finished = true;
}
void MergeTreeData::loadUnexpectedDataParts()
{
{
std::lock_guard lock(unexpected_data_parts_mutex);
if (unexpected_data_parts.empty())
{
unexpected_data_parts_loading_finished = true;
unexpected_data_parts_cv.notify_all();
return;
}
LOG_DEBUG(log, "Loading {} unexpected data parts",
unexpected_data_parts.size());
}
ThreadFuzzer::maybeInjectSleep();
ThreadPoolCallbackRunnerLocal<void> runner(getUnexpectedPartsLoadingThreadPool().get(), "UnexpectedParts");
for (auto & load_state : unexpected_data_parts)
{
std::lock_guard lock(unexpected_data_parts_mutex);
chassert(!load_state.part);
if (unexpected_data_parts_loading_canceled)
{
runner.waitForAllToFinishAndRethrowFirstError();
return;
}
runner([&]()
{
loadUnexpectedDataPart(load_state);
chassert(load_state.part);
if (load_state.is_broken)
{
load_state.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
}
}, Priority{});
}
runner.waitForAllToFinishAndRethrowFirstError();
LOG_DEBUG(log, "Loaded {} unexpected data parts", unexpected_data_parts.size());
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts_loading_finished = true;
unexpected_data_parts_cv.notify_all();
}
}
void MergeTreeData::loadOutdatedDataParts(bool is_async)
try
{
@ -2024,24 +2152,74 @@ void MergeTreeData::waitForOutdatedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_
LOG_TRACE(log, "Finished waiting for outdated data parts to be loaded");
}
void MergeTreeData::startOutdatedDataPartsLoadingTask()
void MergeTreeData::waitForUnexpectedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_ANALYSIS
{
/// Background tasks are not run if storage is static.
if (isStaticStorage())
return;
/// If waiting is not required, do NOT log and do NOT enable/disable turbo mode to make `waitForUnexpectedPartsToBeLoaded` a lightweight check
{
std::unique_lock lock(unexpected_data_parts_mutex);
if (unexpected_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of unexpected data parts was already canceled");
if (unexpected_data_parts_loading_finished)
return;
}
/// We need to load parts as fast as possible
getUnexpectedPartsLoadingThreadPool().enableTurboMode();
SCOPE_EXIT({
/// Let's lower the number of threads e.g. for later ATTACH queries to behave as usual
getUnexpectedPartsLoadingThreadPool().disableTurboMode();
});
LOG_TRACE(log, "Will wait for unexpected data parts to be loaded");
std::unique_lock lock(unexpected_data_parts_mutex);
unexpected_data_parts_cv.wait(lock, [this]() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return unexpected_data_parts_loading_finished || unexpected_data_parts_loading_canceled;
});
if (unexpected_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of unexpected data parts was canceled");
LOG_TRACE(log, "Finished waiting for unexpected data parts to be loaded");
}
void MergeTreeData::startOutdatedAndUnexpectedDataPartsLoadingTask()
{
if (outdated_data_parts_loading_task)
outdated_data_parts_loading_task->activateAndSchedule();
if (unexpected_data_parts_loading_task)
unexpected_data_parts_loading_task->activateAndSchedule();
}
void MergeTreeData::stopOutdatedDataPartsLoadingTask()
void MergeTreeData::stopOutdatedAndUnexpectedDataPartsLoadingTask()
{
if (!outdated_data_parts_loading_task)
return;
if (outdated_data_parts_loading_task)
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_canceled = true;
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_canceled = true;
}
outdated_data_parts_loading_task->deactivate();
outdated_data_parts_cv.notify_all();
}
outdated_data_parts_loading_task->deactivate();
outdated_data_parts_cv.notify_all();
if (unexpected_data_parts_loading_task)
{
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts_loading_canceled = true;
}
unexpected_data_parts_loading_task->deactivate();
unexpected_data_parts_cv.notify_all();
}
}
/// Is the part directory old.
@ -4101,16 +4279,13 @@ void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr &
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part_to_detach}, true, &lock);
}
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), prefix, part_to_detach->name);
if (restore_covered)
waitForOutdatedPartsToBeLoaded();
auto lock = lockParts();
bool removed_active_part = false;
bool restored_active_part = false;
@ -4136,132 +4311,6 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
LOG_TEST(log, "forcefullyMovePartToDetachedAndRemoveFromMemory: removing {} from data_parts_indexes", part->getNameWithState());
data_parts_indexes.erase(it_part);
if (restore_covered && part->info.level == 0 && part->info.mutation == 0)
{
LOG_WARNING(log, "Will not recover parts covered by zero-level part {}", part->name);
return;
}
/// Let's restore some parts covered by unexpected to avoid partial data
if (restore_covered)
{
Strings restored;
Strings error_parts;
auto is_appropriate_state = [] (const DataPartPtr & part_)
{
/// In rare cases, we may have a chain of unexpected parts that cover common source parts, e.g. all_1_2_3, all_1_3_4
/// It may happen as a result of interrupted cloneReplica
bool already_active = part_->getState() == DataPartState::Active;
if (!already_active && part_->getState() != DataPartState::Outdated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to restore a part {} from unexpected state: {}", part_->name, part_->getState());
return !already_active;
};
auto activate_part = [this, &restored_active_part](auto it)
{
/// It's not clear what to do if we try to activate part that was removed in transaction.
/// It may happen only in ReplicatedMergeTree, so let's simply throw LOGICAL_ERROR for now.
chassert((*it)->version.isRemovalTIDLocked());
if ((*it)->version.removal_tid_lock == Tx::PrehistoricTID.getHash())
(*it)->version.unlockRemovalTID(Tx::PrehistoricTID, TransactionInfoContext{getStorageID(), (*it)->name});
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot activate part {} that was removed by transaction ({})",
(*it)->name, (*it)->version.removal_tid_lock);
addPartContributionToColumnAndSecondaryIndexSizes(*it);
addPartContributionToDataVolume(*it);
modifyPartState(it, DataPartState::Active); /// iterator is not invalidated here
restored_active_part = true;
};
/// ActiveDataPartSet allows to restore most top-level parts instead of unexpected.
/// It can be important in case of assigned merges. If unexpected part is result of some
/// finished, but not committed merge then we should restore (at least try to restore)
/// closest ancestors for the unexpected part to be able to execute it.
/// However it's not guaranteed because outdated parts can intersect
ActiveDataPartSet parts_for_replacement(format_version);
auto range = getDataPartsPartitionRange(part->info.partition_id);
DataPartsVector parts_candidates(range.begin(), range.end());
/// In case of intersecting outdated parts we want to add bigger parts (with higher level) first
auto comparator = [] (const DataPartPtr left, const DataPartPtr right) -> bool
{
if (left->info.level < right->info.level)
return true;
else if (left->info.level > right->info.level)
return false;
else
return left->info.mutation < right->info.mutation;
};
std::sort(parts_candidates.begin(), parts_candidates.end(), comparator);
/// From larger to smaller parts
for (const auto & part_candidate_in_partition : parts_candidates | std::views::reverse)
{
if (part->info.contains(part_candidate_in_partition->info)
&& is_appropriate_state(part_candidate_in_partition))
{
String out_reason;
/// Outdated parts can itersect legally (because of DROP_PART) here it's okay, we
/// are trying to do out best to restore covered parts.
auto outcome = parts_for_replacement.tryAddPart(part_candidate_in_partition->info, &out_reason);
if (outcome == ActiveDataPartSet::AddPartOutcome::HasIntersectingPart)
{
error_parts.push_back(part->name);
LOG_ERROR(log, "Failed to restore part {}, because of intersection reason '{}'", part->name, out_reason);
}
}
}
if (parts_for_replacement.size() > 0)
{
std::vector<std::pair<uint64_t, uint64_t>> holes_list;
/// Most part of the code below is just to write pretty message
auto part_infos = parts_for_replacement.getPartInfos();
int64_t current_right_block = part_infos[0].min_block;
for (const auto & top_level_part_to_replace : part_infos)
{
auto data_part_it = data_parts_by_info.find(top_level_part_to_replace);
if (data_part_it == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find part {} in own set", top_level_part_to_replace.getPartNameForLogs());
activate_part(data_part_it);
restored.push_back((*data_part_it)->name);
if (top_level_part_to_replace.min_block - current_right_block > 1)
holes_list.emplace_back(current_right_block, top_level_part_to_replace.min_block);
current_right_block = top_level_part_to_replace.max_block;
}
if (part->info.max_block != current_right_block)
holes_list.emplace_back(current_right_block, part->info.max_block);
for (const String & name : restored)
LOG_INFO(log, "Activated part {} in place of unexpected {}", name, part->name);
if (!error_parts.empty() || !holes_list.empty())
{
std::string error_parts_message, holes_list_message;
if (!error_parts.empty())
error_parts_message = fmt::format(" Parts failed to restore because of intersection: [{}]", fmt::join(error_parts, ", "));
if (!holes_list.empty())
{
if (!error_parts.empty())
holes_list_message = ".";
Strings holes_list_pairs;
for (const auto & [left_side, right_side] : holes_list)
holes_list_pairs.push_back(fmt::format("({}, {})", left_side + 1, right_side - 1));
holes_list_message += fmt::format(" Block ranges failed to restore: [{}]", fmt::join(holes_list_pairs, ", "));
}
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}{}",
part->name, error_parts_message, holes_list_message);
}
}
else
{
LOG_INFO(log, "Don't find any parts for replacement instead of unexpected {}", part->name);
}
}
if (removed_active_part || restored_active_part)
resetObjectColumnsFromActiveParts(lock);
}

View File

@ -653,10 +653,9 @@ public:
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
/// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "");
/// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}
@ -1073,6 +1072,7 @@ public:
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name) const;
void waitForOutdatedPartsToBeLoaded() const;
void waitForUnexpectedPartsToBeLoaded() const;
bool canUsePolymorphicParts() const;
/// TODO: make enabled by default in the next release if no problems found.
@ -1552,13 +1552,33 @@ protected:
PartLoadingTreeNodes outdated_unloaded_data_parts TSA_GUARDED_BY(outdated_data_parts_mutex);
bool outdated_data_parts_loading_canceled TSA_GUARDED_BY(outdated_data_parts_mutex) = false;
mutable std::mutex unexpected_data_parts_mutex;
mutable std::condition_variable unexpected_data_parts_cv;
struct UnexpectedPartLoadState
{
PartLoadingTree::NodePtr loading_info;
/// if it is covered by any unexpected part
bool uncovered = true;
bool is_broken = false;
MutableDataPartPtr part;
};
BackgroundSchedulePool::TaskHolder unexpected_data_parts_loading_task;
std::vector<UnexpectedPartLoadState> unexpected_data_parts;
bool unexpected_data_parts_loading_canceled TSA_GUARDED_BY(unexpected_data_parts_mutex) = false;
void loadUnexpectedDataParts();
void loadUnexpectedDataPart(UnexpectedPartLoadState & state);
/// This has to be "true" by default, because in case of empty table or absence of Outdated parts
/// it is automatically finished.
std::atomic_bool outdated_data_parts_loading_finished = true;
std::atomic_bool unexpected_data_parts_loading_finished = true;
void loadOutdatedDataParts(bool is_async);
void startOutdatedDataPartsLoadingTask();
void stopOutdatedDataPartsLoadingTask();
void startOutdatedAndUnexpectedDataPartsLoadingTask();
void stopOutdatedAndUnexpectedDataPartsLoadingTask();
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);

View File

@ -181,7 +181,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->setIndex(std::make_shared<Columns>(writer->releaseIndexColumns()));
new_part->setIndex(writer->releaseIndexColumns());
new_part->checksums = checksums;
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());

View File

@ -950,7 +950,7 @@ void finalizeMutatedPart(
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
new_data_part->setIndex(source_part->getIndex());
new_data_part->setIndex(*source_part->getIndex());
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);

View File

@ -1,3 +1,4 @@
#include <optional>
#include "config.h"
#if USE_AWS_S3
@ -365,7 +366,11 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto internal_source = std::make_unique<StorageS3Source>(
info, configuration.format, getName(), local_context, format_settings,
info,
configuration.format,
getName(),
local_context,
format_settings,
max_block_size,
configuration_snapshot.request_settings,
configuration_snapshot.compression_method,
@ -373,7 +378,9 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
configuration_snapshot.url.bucket,
configuration_snapshot.url.version_id,
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
file_iterator, local_context->getSettingsRef().max_download_threads, false);
file_iterator,
local_context->getSettingsRef().max_download_threads,
false);
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client, blob_storage_log = BlobStorageLogWriter::create()](const std::string & path) mutable
{
@ -608,8 +615,13 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
{
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
*configuration.client, configuration.url, predicate, getVirtualsList(), local_context,
/* read_keys */nullptr, configuration.request_settings);
*configuration.client,
configuration.url,
predicate,
getVirtualsList(),
local_context,
/* read_keys */ nullptr,
configuration.request_settings);
return std::make_shared<FileIterator>(
files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log);

View File

@ -153,7 +153,7 @@ void StorageMergeTree::startup()
{
background_operations_assignee.start();
startBackgroundMovesIfNeeded();
startOutdatedDataPartsLoadingTask();
startOutdatedAndUnexpectedDataPartsLoadingTask();
}
catch (...)
{
@ -179,7 +179,7 @@ void StorageMergeTree::shutdown(bool)
if (shutdown_called.exchange(true))
return;
stopOutdatedDataPartsLoadingTask();
stopOutdatedAndUnexpectedDataPartsLoadingTask();
/// Unlock all waiting mutations
{

View File

@ -1575,18 +1575,12 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
* But actually we can't precisely determine that ALL missing parts
* covered by this unexpected part. So missing parts will be downloaded.
*/
DataParts unexpected_parts;
/// Intersection of local parts and expected parts
ActiveDataPartSet local_expected_parts_set(format_version);
/// Collect unexpected parts
for (const auto & part : parts)
{
if (expected_parts.contains(part->name))
local_expected_parts_set.add(part->name);
else
unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
local_expected_parts_set.add(part->name);
}
/// Which parts should be taken from other replicas.
@ -1598,18 +1592,15 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
paranoidCheckForCoveredPartsInZooKeeperOnStart(expected_parts_vec, parts_to_fetch);
waitForUnexpectedPartsToBeLoaded();
ActiveDataPartSet set_of_empty_unexpected_parts(format_version);
for (const auto & part : parts)
for (const auto & load_state : unexpected_data_parts)
{
if (part->rows_count || part->getState() != MergeTreeDataPartState::Active || expected_parts.contains(part->name))
if (load_state.is_broken || load_state.part->rows_count || !load_state.uncovered)
continue;
if (incomplete_list_of_outdated_parts)
{
LOG_INFO(log, "Outdated parts are not loaded yet, but we may need them to handle dropped parts. Need retry.");
return false;
}
set_of_empty_unexpected_parts.add(part->name);
set_of_empty_unexpected_parts.add(load_state.part->name);
}
if (auto empty_count = set_of_empty_unexpected_parts.size())
LOG_WARNING(log, "Found {} empty unexpected parts (probably some dropped parts were not cleaned up before restart): [{}]",
@ -1628,33 +1619,35 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
std::unordered_set<String> restorable_unexpected_parts;
UInt64 uncovered_unexpected_parts_rows = 0;
for (const auto & part : unexpected_parts)
for (const auto & load_state : unexpected_data_parts)
{
unexpected_parts_rows += part->rows_count;
if (load_state.is_broken)
continue;
unexpected_parts_rows += load_state.part->rows_count;
/// This part may be covered by some expected part that is active and present locally
/// Probably we just did not remove this part from disk before restart (but removed from ZooKeeper)
String covering_local_part = local_expected_parts_set.getContainingPart(part->name);
String covering_local_part = local_expected_parts_set.getContainingPart(load_state.part->name);
if (!covering_local_part.empty())
{
covered_unexpected_parts.push_back(part->name);
covered_unexpected_parts.push_back(load_state.part->name);
continue;
}
String covering_empty_part = set_of_empty_unexpected_parts.getContainingPart(part->name);
String covering_empty_part = set_of_empty_unexpected_parts.getContainingPart(load_state.part->name);
if (!covering_empty_part.empty())
{
LOG_INFO(log, "Unexpected part {} is covered by empty part {}, assuming it has been dropped just before restart",
part->name, covering_empty_part);
covered_unexpected_parts.push_back(part->name);
load_state.part->name, covering_empty_part);
covered_unexpected_parts.push_back(load_state.part->name);
continue;
}
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(part->info);
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(load_state.part->info);
if (MergeTreePartInfo::areAllBlockNumbersCovered(part->info, covered_parts))
if (MergeTreePartInfo::areAllBlockNumbersCovered(load_state.part->info, covered_parts))
{
restorable_unexpected_parts.insert(part->name);
restorable_unexpected_parts.insert(load_state.part->name);
continue;
}
@ -1668,13 +1661,13 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
}
/// Part is unexpected and we don't have covering part: it's suspicious
uncovered_unexpected_parts.insert(part->name);
uncovered_unexpected_parts_rows += part->rows_count;
uncovered_unexpected_parts.insert(load_state.part->name);
uncovered_unexpected_parts_rows += load_state.part->rows_count;
if (part->info.level > 0)
if (load_state.part->info.level > 0)
{
++unexpected_parts_nonnew;
unexpected_parts_nonnew_rows += part->rows_count;
unexpected_parts_nonnew_rows += load_state.part->rows_count;
}
}
@ -1700,6 +1693,9 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
UInt64 total_rows_on_filesystem = 0;
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
/// We need to sum the rows count of all unexpected data parts;
for (const auto & part : unexpected_data_parts)
total_rows_on_filesystem += part.part->rows_count;
const auto storage_settings_ptr = getSettings();
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
@ -1741,13 +1737,12 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
queue.setBrokenPartsToEnqueueFetchesOnLoading(std::move(parts_to_fetch));
/// Remove extra local parts.
for (const DataPartPtr & part : unexpected_parts)
/// detached all unexpected data parts after sanity check.
for (auto & part_state : unexpected_data_parts)
{
bool restore_covered = restorable_unexpected_parts.contains(part->name) || uncovered_unexpected_parts.contains(part->name);
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}{}", part->name, part->name, restore_covered ? ", restoring covered parts" : "");
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", restore_covered);
part_state.part->renameToDetached("ignored");
}
unexpected_data_parts.clear();
return true;
}
@ -5133,7 +5128,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
void StorageReplicatedMergeTree::startup()
{
LOG_TRACE(log, "Starting up table");
startOutdatedDataPartsLoadingTask();
startOutdatedAndUnexpectedDataPartsLoadingTask();
if (attach_thread)
{
attach_thread->start();
@ -5336,7 +5331,7 @@ void StorageReplicatedMergeTree::shutdown(bool)
}
session_expired_callback_handler.reset();
stopOutdatedDataPartsLoadingTask();
stopOutdatedAndUnexpectedDataPartsLoadingTask();
partialShutdown();

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,10 @@
#pragma once
#include "config.h"
#include <memory>
#include <IO/ReadBufferFromS3.h>
#include "IO/Archives/IArchiveReader.h"
#include "IO/Archives/createArchiveReader.h"
#include "IO/ReadBuffer.h"
#if USE_AWS_S3
#include <Compression/CompressionInfo.h>
@ -23,36 +26,52 @@
#include <Poco/URI.h>
#include <Common/threadPoolCallbackRunner.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class PullingPipelineExecutor;
class NamedCollection;
class StorageS3Source : public SourceWithKeyCondition, WithContext
{
public:
struct KeyWithInfo
{
KeyWithInfo() = default;
explicit KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_ = std::nullopt)
: key(std::move(key_)), info(std::move(info_)) {}
explicit KeyWithInfo(
String key_,
std::optional<S3::ObjectInfo> info_ = std::nullopt,
std::optional<String> path_in_archive_ = std::nullopt,
std::shared_ptr<IArchiveReader> archive_reader_ = nullptr)
: key(std::move(key_))
, info(std::move(info_))
, path_in_archive(std::move(path_in_archive_))
, archive_reader(std::move(archive_reader_))
{
if (path_in_archive.has_value() != (archive_reader != nullptr))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Archive reader and path in archive must exist simultaneously");
}
virtual ~KeyWithInfo() = default;
String key;
std::optional<S3::ObjectInfo> info;
std::optional<String> path_in_archive;
std::shared_ptr<IArchiveReader> archive_reader;
String getPath() const { return path_in_archive.has_value() ? (key + "::" + path_in_archive.value()) : key; }
String getFileName() const { return path_in_archive.has_value() ? path_in_archive.value() : key; }
};
using KeyWithInfoPtr = std::shared_ptr<KeyWithInfo>;
using KeysWithInfo = std::vector<KeyWithInfoPtr>;
class IIterator
{
public:
@ -65,7 +84,7 @@ public:
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
virtual size_t estimatedKeysCount() = 0;
KeyWithInfoPtr operator ()() { return next(); }
KeyWithInfoPtr operator()() { return next(); }
};
class DisclosedGlobIterator : public IIterator
@ -126,6 +145,41 @@ public:
ReadTaskCallback callback;
};
class ArchiveIterator : public IIterator, public WithContext
{
public:
explicit ArchiveIterator(
std::unique_ptr<IIterator> basic_iterator_,
const std::string & archive_pattern_,
std::shared_ptr<const S3::Client> client_,
const String & bucket_,
const String & version_id_,
const S3Settings::RequestSettings & request_settings,
ContextPtr context_,
KeysWithInfo * read_keys_);
KeyWithInfoPtr next(size_t) override; /// NOLINT
size_t estimatedKeysCount() override;
void refreshArchiveReader();
private:
std::unique_ptr<IIterator> basic_iterator;
KeyWithInfoPtr basic_key_with_info_ptr;
std::unique_ptr<ReadBufferFromFileBase> basic_read_buffer;
std::shared_ptr<IArchiveReader> archive_reader{nullptr};
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator = nullptr;
std::string path_in_archive = {}; // used when reading a single file from archive
IArchiveReader::NameFilter filter = {}; // used when files inside archive are defined with a glob
std::shared_ptr<const S3::Client> client;
const String bucket;
const String version_id;
S3Settings::RequestSettings request_settings;
std::mutex take_next_mutex;
KeysWithInfo * read_keys;
};
friend StorageS3Source::ArchiveIterator;
StorageS3Source(
const ReadFromFormatInfo & info,
const String & format,
@ -194,10 +248,7 @@ private:
ReaderHolder(const ReaderHolder & other) = delete;
ReaderHolder & operator=(const ReaderHolder & other) = delete;
ReaderHolder(ReaderHolder && other) noexcept
{
*this = std::move(other);
}
ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); }
ReaderHolder & operator=(ReaderHolder && other) noexcept
{
@ -215,8 +266,9 @@ private:
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
String getPath() const { return fs::path(bucket) / key_with_info->key; }
const String & getFile() const { return key_with_info->key; }
String getPath() const { return bucket + "/" + key_with_info->getPath(); }
String getFile() const { return key_with_info->getFileName(); }
bool isArchive() { return key_with_info->path_in_archive.has_value(); }
const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; }
std::optional<size_t> getFileSize() const { return key_with_info->info ? std::optional(key_with_info->info->size) : std::nullopt; }
@ -255,10 +307,7 @@ private:
ReaderHolder createReader(size_t idx = 0);
std::future<ReaderHolder> createReaderAsync(size_t idx = 0);
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
void addNumRowsToCache(const String & key, size_t num_rows);
void addNumRowsToCache(const String & bucket_with_key, size_t num_rows);
std::optional<size_t> tryGetNumRowsFromCache(const KeyWithInfo & key_with_info);
};
@ -285,8 +334,7 @@ public:
bool withPartitionWildcard() const
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
}
bool withGlobsIgnorePartitionWildcard() const;
@ -315,10 +363,7 @@ public:
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
String getName() const override
{
return name;
}
String getName() const override { return name; }
void read(
QueryPlan & query_plan,
@ -330,27 +375,25 @@ public:
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
SinkToStoragePtr
write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
void truncate(
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
bool supportsPartitionBy() const override;
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
static StorageS3::Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true);
static Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true);
static ColumnsDescription getTableStructureFromData(
const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
const Configuration & configuration_, const std::optional<FormatSettings> & format_settings_, const ContextPtr & ctx);
static std::pair<ColumnsDescription, String> getTableStructureAndFormatFromData(
const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
const Configuration & configuration, const std::optional<FormatSettings> & format_settings, const ContextPtr & ctx);
using KeysWithInfo = StorageS3Source::KeysWithInfo;
@ -363,7 +406,9 @@ protected:
void useConfiguration(const Configuration & new_configuration);
const Configuration & getConfiguration();
Configuration getConfigurationCopy() const;
String getFormatCopy() const;
private:
friend class StorageS3Cluster;
@ -372,7 +417,7 @@ private:
friend class ReadFromStorageS3Step;
Configuration configuration;
std::mutex configuration_update_mutex;
mutable std::mutex configuration_update_mutex;
String name;
const bool distributed_processing;
@ -396,6 +441,24 @@ private:
bool parallelizeOutputAfterReading(ContextPtr context) const override;
};
std::unique_ptr<ReadBufferFromFileBase> createS3ReadBuffer(
const String & key,
size_t object_size,
std::shared_ptr<const Context> context,
std::shared_ptr<const S3::Client> client_ptr,
const String & bucket,
const String & version_id,
const S3Settings::RequestSettings & request_settings);
std::unique_ptr<ReadBufferFromFileBase> createAsyncS3ReadBuffer(
const String & key,
const ReadSettings & read_settings,
size_t object_size,
std::shared_ptr<const Context> context,
std::shared_ptr<const S3::Client> client_ptr,
const String & bucket,
const String & version_id,
const S3Settings::RequestSettings & request_settings);
}
#endif

View File

@ -91,7 +91,14 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, predicate, getVirtualsList(), context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
*s3_configuration.client,
s3_configuration.url,
predicate,
getVirtualsList(),
context,
nullptr,
s3_configuration.request_settings,
context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{

View File

@ -216,7 +216,19 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
configuration.auth_settings.no_sign_request = no_sign_request;
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(url).getPath()).value_or("auto");
{
if (configuration.url.archive_pattern.has_value())
{
configuration.format = FormatFactory::instance()
.tryGetFormatFromFileName(Poco::URI(configuration.url.archive_pattern.value()).getPath())
.value_or("auto");
}
else
{
configuration.format
= FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(configuration.url.uri_str).getPath()).value_or("auto");
}
}
}
configuration.keys = {configuration.url.key};

View File

@ -1806,7 +1806,7 @@ def _upload_build_profile_data(
address,
size,
type,
symbol,
symbol
)
SELECT {pr_info.number}, '{pr_info.sha}', '{job_report.start_time}', '{build_name}', '{instance_type}', '{instance_id}',
file, reinterpretAsUInt64(reverse(unhex(address))), reinterpretAsUInt64(reverse(unhex(size))), type, symbol

View File

@ -75,6 +75,12 @@ def get_options(i: int, upgrade_check: bool) -> str:
if not upgrade_check:
client_options.append("ignore_drop_queries_probability=0.5")
if random.random() < 0.2:
client_options.append("allow_experimental_parallel_reading_from_replicas=1")
client_options.append("max_parallel_replicas=3")
client_options.append("cluster_for_parallel_replicas='parallel_replicas'")
client_options.append("parallel_replicas_for_non_replicated_merge_tree=1")
if client_options:
options.append(" --client-option " + " ".join(client_options))

View File

@ -300,11 +300,44 @@ list_children () {
echo "$children"
}
while true; do
runner_pid=$(pgrep Runner.Listener)
echo "Got runner pid '$runner_pid'"
# There's possibility that it fails because the runner's version is outdated,
# so after the first failure we'll try to launch it with enabled autoupdate.
#
# We'll fail and terminate after 10 consequent failures.
ATTEMPT=0
# In `kill` 0 means "all processes in process group", -1 is "all but PID 1"
# We use `-2` to get an error
RUNNER_PID=-2
while true; do
# Does not send signal, but checks that the process $RUNNER_PID is running
if kill -0 -- $RUNNER_PID; then
ATTEMPT=0
echo "Runner is working with pid $RUNNER_PID, checking the metadata in background"
check_proceed_spot_termination
if ! is_job_assigned; then
RUNNER_AGE=$(( $(date +%s) - $(stat -c +%Y /proc/"$RUNNER_PID" 2>/dev/null || date +%s) ))
echo "The runner is launched $RUNNER_AGE seconds ago and still hasn't received a job"
if (( 60 < RUNNER_AGE )); then
echo "Attempt to delete the runner for a graceful shutdown"
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)" \
|| continue
echo "Runner didn't launch or have assigned jobs after ${RUNNER_AGE} seconds, shutting down"
terminate_and_exit
fi
fi
else
if [ "$RUNNER_PID" != "-2" ]; then
wait $RUNNER_PID \
&& echo "Runner with PID $RUNNER_PID successfully finished" \
|| echo "Attempt $((++ATTEMPT)) to start the runner"
fi
if (( ATTEMPT > 10 )); then
echo "The runner has failed to start after $ATTEMPT attempt. Give up and terminate it"
terminate_and_exit
fi
if [ -z "$runner_pid" ]; then
cd $RUNNER_HOME || terminate_and_exit
detect_delayed_termination
# If runner is not active, check that it needs to terminate itself
@ -314,37 +347,50 @@ while true; do
check_proceed_spot_termination force
echo "Going to configure runner"
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" \
--ephemeral --disableupdate --unattended \
--runnergroup Default --labels "$LABELS" --work _work --name "$INSTANCE_ID"
token_args=(--token "$(get_runner_token)")
config_args=(
"${token_args[@]}" --url "$RUNNER_URL"
--ephemeral --unattended --replace --runnergroup Default
--labels "$LABELS" --work _work --name "$INSTANCE_ID"
)
if (( ATTEMPT > 1 )); then
echo 'The runner failed to start at least once. Removing it and then configuring with autoupdate enabled.'
sudo -u ubuntu ./config.sh remove "${token_args[@]}"
sudo -u ubuntu ./config.sh "${config_args[@]}"
else
echo "Configure runner with disabled autoupdate"
config_args+=("--disableupdate")
sudo -u ubuntu ./config.sh "${config_args[@]}"
fi
echo "Another one check to avoid race between runner and infrastructure"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination force
# There were some failures to start the Job because of trash in _work
rm -rf _work
# https://github.com/actions/runner/issues/3266
# We're unable to know if the runner is failed to start.
echo 'Monkey-patching run helpers to get genuine exit code of the runner'
for script in run.sh run-helper.sh.template; do
# shellcheck disable=SC2016
grep -q 'exit 0$' "$script" && \
sed 's/exit 0/exit $returnCode/' -i "$script" && \
echo "Script $script is patched"
done
echo "Run"
sudo -u ubuntu \
ACTIONS_RUNNER_HOOK_JOB_STARTED=/tmp/actions-hooks/pre-run.sh \
ACTIONS_RUNNER_HOOK_JOB_COMPLETED=/tmp/actions-hooks/post-run.sh \
./run.sh &
sleep 10
else
echo "Runner is working with pid $runner_pid, checking the metadata in background"
check_proceed_spot_termination
RUNNER_PID=$!
if ! is_job_assigned; then
RUNNER_AGE=$(( $(date +%s) - $(stat -c +%Y /proc/"$runner_pid" 2>/dev/null || date +%s) ))
echo "The runner is launched $RUNNER_AGE seconds ago and still has hot received the job"
if (( 60 < RUNNER_AGE )); then
echo "Attempt to delete the runner for a graceful shutdown"
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)" \
|| continue
echo "Runner didn't launch or have assigned jobs after ${RUNNER_AGE} seconds, shutting down"
terminate_and_exit
fi
fi
sleep 10
fi
sleep 5
done

View File

@ -9,7 +9,7 @@ set -xeuo pipefail
echo "Running prepare script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_VERSION=2.315.0
export RUNNER_VERSION=2.316.1
export RUNNER_HOME=/home/ubuntu/actions-runner
deb_arch() {
@ -155,31 +155,56 @@ apt-get install tailscale --yes --no-install-recommends
# Create a common script for the instances
mkdir /usr/local/share/scripts -p
cat > /usr/local/share/scripts/init-network.sh << 'EOF'
#!/usr/bin/env bash
setup_cloudflare_dns() {
# Add cloudflare DNS as a fallback
# Get default gateway interface
local IFACE ETH_DNS CLOUDFLARE_NS new_dns
IFACE=$(ip --json route list | jq '.[]|select(.dst == "default").dev' --raw-output)
# `Link 2 (eth0): 172.31.0.2`
ETH_DNS=$(resolvectl dns "$IFACE") || :
CLOUDFLARE_NS=1.1.1.1
if [[ "$ETH_DNS" ]] && [[ "${ETH_DNS#*: }" != *"$CLOUDFLARE_NS"* ]]; then
# Cut the leading legend
ETH_DNS=${ETH_DNS#*: }
# shellcheck disable=SC2206
new_dns=(${ETH_DNS} "$CLOUDFLARE_NS")
resolvectl dns "$IFACE" "${new_dns[@]}"
fi
}
# Add cloudflare DNS as a fallback
# Get default gateway interface
IFACE=$(ip --json route list | jq '.[]|select(.dst == "default").dev' --raw-output)
# `Link 2 (eth0): 172.31.0.2`
ETH_DNS=$(resolvectl dns "$IFACE") || :
CLOUDFLARE_NS=1.1.1.1
if [[ "$ETH_DNS" ]] && [[ "${ETH_DNS#*: }" != *"$CLOUDFLARE_NS"* ]]; then
# Cut the leading legend
ETH_DNS=${ETH_DNS#*: }
# shellcheck disable=SC2206
new_dns=(${ETH_DNS} "$CLOUDFLARE_NS")
resolvectl dns "$IFACE" "${new_dns[@]}"
fi
setup_tailscale() {
# Setup tailscale, the very first action
local TS_API_CLIENT_ID TS_API_CLIENT_SECRET TS_AUTHKEY RUNNER_TYPE
TS_API_CLIENT_ID=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-id --query 'Parameter.Value' --output text --with-decryption)
TS_API_CLIENT_SECRET=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-secret --query 'Parameter.Value' --output text --with-decryption)
# Setup tailscale, the very first action
TS_API_CLIENT_ID=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-id --query 'Parameter.Value' --output text --with-decryption)
TS_API_CLIENT_SECRET=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-secret --query 'Parameter.Value' --output text --with-decryption)
export TS_API_CLIENT_ID TS_API_CLIENT_SECRET
TS_AUTHKEY=$(get-authkey -tags tag:svc-core-ci-github -reusable -ephemeral)
tailscale up --ssh --auth-key="$TS_AUTHKEY" --hostname="ci-runner-$INSTANCE_ID"
RUNNER_TYPE=$(/usr/local/bin/aws ec2 describe-tags --filters "Name=resource-id,Values=$INSTANCE_ID" --query "Tags[?Key=='github:runner-type'].Value" --output text)
RUNNER_TYPE=${RUNNER_TYPE:-unknown}
# Clean possible garbage from the runner type
RUNNER_TYPE=${RUNNER_TYPE//[^0-9a-z]/-}
TS_AUTHKEY=$(TS_API_CLIENT_ID="$TS_API_CLIENT_ID" TS_API_CLIENT_SECRET="$TS_API_CLIENT_SECRET" \
get-authkey -tags tag:svc-core-ci-github -ephemeral)
tailscale up --ssh --auth-key="$TS_AUTHKEY" --hostname="ci-runner-$RUNNER_TYPE-$INSTANCE_ID"
}
cat > /usr/local/share/scripts/init-network.sh << EOF
!/usr/bin/env bash
$(declare -f setup_cloudflare_dns)
$(declare -f setup_tailscale)
# If the script is sourced, it will return now and won't execute functions
return 0 &>/dev/null || :
echo Setup Cloudflare DNS
setup_cloudflare_dns
echo Setup Tailscale VPN
setup_tailscale
EOF
chmod +x /usr/local/share/scripts/init-network.sh
# The following line is used in aws TOE check.
touch /var/tmp/clickhouse-ci-ami.success

View File

@ -2549,15 +2549,15 @@ def reportLogStats(args):
WITH
240 AS mins,
(
SELECT (count(), sum(length(message)))
SELECT (count(), sum(length(toValidUTF8(message))))
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
) AS total
SELECT
count() AS count,
round(count / (total.1), 3) AS `count_%`,
formatReadableSize(sum(length(message))) AS size,
round(sum(length(message)) / (total.2), 3) AS `size_%`,
formatReadableSize(sum(length(toValidUTF8(message)))) AS size,
round(sum(length(toValidUTF8(message))) / (total.2), 3) AS `size_%`,
countDistinct(logger_name) AS uniq_loggers,
countDistinct(thread_id) AS uniq_threads,
groupArrayDistinct(toString(level)) AS levels,
@ -2580,8 +2580,8 @@ def reportLogStats(args):
240 AS mins
SELECT
count() AS count,
substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
substr(any(message), 1, 256) as runtime_message,
substr(replaceRegexpAll(toValidUTF8(message), '[^A-Za-z]+', ''), 1, 32) AS pattern,
substr(any(toValidUTF8(message)), 1, 256) as runtime_message,
any((extract(source_file, '/[a-zA-Z0-9_]+\\.[a-z]+'), source_line)) as line
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
@ -2596,7 +2596,7 @@ def reportLogStats(args):
print("\n")
query = """
SELECT message_format_string, count(), any(message) AS any_message
SELECT message_format_string, count(), any(toValidUTF8(message)) AS any_message
FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
@ -2631,8 +2631,8 @@ def reportLogStats(args):
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
'Attempt to read after eof', 'String size is too big ({}), maximum: {}'
) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(message), 1, 120),
min(if(length(regexpExtract(message, '(.*)\\([A-Z0-9_]+\\)')) as prefix_len > 0, prefix_len, length(message)) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
SELECT count() AS c, message_format_string, substr(any(toValidUTF8(message)), 1, 120),
min(if(length(regexpExtract(toValidUTF8(message), '(.*)\\([A-Z0-9_]+\\)')) as prefix_len > 0, prefix_len, length(toValidUTF8(message))) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time
AND (length(message_format_string) < 16

View File

@ -1,13 +1,14 @@
import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION, is_arm
# For arm version see https://github.com/ClickHouse/ClickHouse/pull/59132
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
)
@ -15,7 +16,7 @@ node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
)

View File

@ -23,13 +23,23 @@ def ch_cluster():
try:
cluster.start()
os.system(
"docker cp {local} {cont_id}:{dist}".format(
local=os.path.join(SCRIPT_DIR, "model/."),
cont_id=instance.docker_id,
dist="/etc/clickhouse-server/model",
instance.exec_in_container(["mkdir", f"/etc/clickhouse-server/model/"])
machine = instance.get_machine_name()
for source_name in os.listdir(os.path.join(SCRIPT_DIR, "model/.")):
dest_name = source_name
if machine in source_name:
machine_suffix = "_" + machine
dest_name = source_name[: -len(machine_suffix)]
os.system(
"docker cp {local} {cont_id}:{dist}".format(
local=os.path.join(SCRIPT_DIR, f"model/{source_name}"),
cont_id=instance.docker_id,
dist=f"/etc/clickhouse-server/model/{dest_name}",
)
)
)
instance.restart_clickhouse()
yield cluster

View File

@ -39,6 +39,10 @@ def wait_for_clickhouse_stop(started_node):
assert result == "OK", "ClickHouse process is still running"
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_pkill(started_node):
if (
started_node.is_built_with_thread_sanitizer()
@ -59,6 +63,10 @@ def test_pkill(started_node):
)
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_pkill_query_log(started_node):
for signal in ["SEGV", "4"]:
# force create query_log if it was not created

View File

@ -0,0 +1,17 @@
<clickhouse>
<storage_configuration>
<disks>
<disk_s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3>
<disk_encrypted>
<type>encrypted</type>
<disk>disk_s3</disk>
<key>1234567812345678</key>
</disk_encrypted>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -19,7 +19,9 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage.xml"],
main_configs=["configs/storage_arm.xml"]
if is_arm()
else ["configs/storage_amd.xml"],
with_minio=True,
with_hdfs=not is_arm(),
)

View File

@ -0,0 +1,71 @@
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# This test construct intersecting parts intentially. It's not a elegent test.
# TODO(hanfei): write a test which select part 1_1 merging with part 2_2 and drop range.
def test_intersect_parts_when_restart(started_cluster):
node.query(
"""
CREATE TABLE data (
key Int
)
ENGINE = ReplicatedMergeTree('/ch/tables/default/data', 'node')
ORDER BY key;
"""
)
node.query("system stop cleanup data")
node.query("INSERT INTO data values (1)")
node.query("INSERT INTO data values (2)")
node.query("INSERT INTO data values (3)")
node.query("INSERT INTO data values (4)")
node.query("ALTER TABLE data DROP PART 'all_1_1_0'")
node.query("ALTER TABLE data DROP PART 'all_2_2_0'")
node.query("OPTIMIZE TABLE data FINAL")
part_path = node.query(
"SELECT path FROM system.parts WHERE table = 'data' and name = 'all_0_3_1'"
).strip()
assert len(part_path) != 0
node.query("detach table data")
new_path = part_path[:-6] + "1_2_3"
node.exec_in_container(
[
"bash",
"-c",
"cp -r {p} {p1}".format(p=part_path, p1=new_path),
],
privileged=True,
)
# mock empty part
node.exec_in_container(
[
"bash",
"-c",
"echo -n 0 > {p1}/count.txt".format(p1=new_path),
],
privileged=True,
)
node.query("attach table data")
data_size = node.query("SELECT sum(key) FROM data").strip()
assert data_size == "5"

View File

@ -47,24 +47,25 @@ def test_big_family(client: KeeperClient):
assert response == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family/2", "3"],
["/test_big_family/2/3", "0"],
["/test_big_family/2/2", "0"],
["/test_big_family/2/1", "0"],
["/test_big_family/1/5", "0"],
["/test_big_family/1/4", "0"],
["/test_big_family/1/3", "0"],
["/test_big_family/1/2", "0"],
["/test_big_family/1/1", "0"],
["/test_big_family", "11"],
["/test_big_family/1", "6"],
["/test_big_family/2", "4"],
["/test_big_family/2/3", "1"],
["/test_big_family/2/2", "1"],
["/test_big_family/2/1", "1"],
["/test_big_family/1/5", "1"],
["/test_big_family/1/4", "1"],
["/test_big_family/1/3", "1"],
["/test_big_family/1/2", "1"],
]
)
response = client.find_big_family("/test_big_family", 1)
response = client.find_big_family("/test_big_family", 2)
assert response == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family", "11"],
["/test_big_family/1", "6"],
]
)

View File

@ -223,4 +223,4 @@ def test_corrupted_unexpected_part_ultimate():
== "1\n"
)
assert node.query("SELECT sum(key) FROM broken_table_3") == "190\n"
assert node.query("SELECT sum(key) FROM broken_table_3") == "145\n"

View File

@ -2,7 +2,7 @@ import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
cluster = ClickHouseCluster(__file__)
@ -255,6 +255,11 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
def test_preconfigured_deflateqpl_codec(start_cluster):
if is_arm():
pytest.skip(
"Skipping test because it's special test for Intel code (doesn't work on ARM)"
)
node6.query(
"""
CREATE TABLE compression_codec_multiple_with_key (

View File

@ -79,6 +79,29 @@ def started_cluster():
cluster.shutdown()
def test_flatten_nested(started_cluster):
main_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');"
)
dummy_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica2');"
)
main_node.query(
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
)
main_node.query(
"CREATE MATERIALIZED VIEW create_replicated_table.mv ENGINE=ReplicatedMergeTree ORDER BY tuple() AS select d, cast([(k, toString(i32))] as Nested(a UInt64, b String)) from create_replicated_table.replicated_table"
)
assert main_node.query(
"show create create_replicated_table.mv"
) == dummy_node.query("show create create_replicated_table.mv")
main_node.query("DROP DATABASE create_replicated_table SYNC")
dummy_node.query("DROP DATABASE create_replicated_table SYNC")
def test_create_replicated_table(started_cluster):
main_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');"

View File

@ -4,6 +4,7 @@
<disk_s3_plain_rewritable>
<type>s3_plain_rewritable</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3_plain_rewritable>

View File

@ -1,24 +1,39 @@
import pytest
import random
import string
import threading
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
stay_alive=True,
)
insert_values = [
"(0,'data'),(1,'data')",
",".join(
NUM_WORKERS = 5
nodes = []
for i in range(NUM_WORKERS):
name = "node{}".format(i + 1)
node = cluster.add_instance(
name,
main_configs=["configs/storage_conf.xml"],
env_variables={"ENDPOINT_SUBPATH": name},
with_minio=True,
stay_alive=True,
)
nodes.append(node)
MAX_ROWS = 1000
def gen_insert_values(size):
return ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')"
for i in range(10)
),
]
for i in range(size)
)
insert_values = ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" for i in range(10)
)
@pytest.fixture(scope="module", autouse=True)
@ -32,47 +47,71 @@ def start_cluster():
@pytest.mark.order(0)
def test_insert():
for index, value in enumerate(insert_values):
def create_insert(node, insert_values):
node.query(
"""
CREATE TABLE test_{} (
CREATE TABLE test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3_plain_rewritable'
""".format(
index
)
"""
)
node.query("INSERT INTO test VALUES {}".format(insert_values))
node.query("INSERT INTO test_{} VALUES {}".format(index, value))
insert_values_arr = [
gen_insert_values(random.randint(1, MAX_ROWS)) for _ in range(0, NUM_WORKERS)
]
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(
target=create_insert, args=(nodes[i], insert_values_arr[i])
)
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== insert_values_arr[i]
)
@pytest.mark.order(1)
def test_restart():
for index, value in enumerate(insert_values):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
insert_values_arr = []
for i in range(NUM_WORKERS):
insert_values_arr.append(
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
)
node.restart_clickhouse()
for index, value in enumerate(insert_values):
def restart(node):
node.restart_clickhouse()
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=restart, args=(nodes[i],))
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== insert_values_arr[i]
)
@pytest.mark.order(2)
def test_drop():
for index, value in enumerate(insert_values):
node.query("DROP TABLE IF EXISTS test_{} SYNC".format(index))
for i in range(NUM_WORKERS):
nodes[i].query("DROP TABLE IF EXISTS test SYNC")
it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True

View File

@ -35,6 +35,10 @@ def started_node():
pass
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_send_segfault(started_node):
# NOTE: another option is to increase waiting time.
if (

View File

@ -194,7 +194,7 @@ select 'exceptions shorter than 30',
(uniqExact(message_format_string) as c) <= max_messages,
c <= max_messages ? [] : groupUniqArray(message_format_string)
from logs
where message ilike '%DB::Exception%' and if(length(extract(message, '(.*)\\([A-Z0-9_]+\\)')) as pref > 0, pref, length(message)) < 30 + 26 and message_format_string not in known_short_messages;
where message ilike '%DB::Exception%' and if(length(extract(toValidUTF8(message), '(.*)\\([A-Z0-9_]+\\)')) as pref > 0, pref, length(toValidUTF8(message))) < 30 + 26 and message_format_string not in known_short_messages;
-- Avoid too noisy messages: top 1 message frequency must be less than 30%. We should reduce the threshold
WITH 0.30 as threshold
@ -207,7 +207,7 @@ select
with 0.16 as threshold
select
'noisy Trace messages',
greatest(coalesce(((select message_format_string, count() from logs where level = 'Trace' and message_format_string not in ('Access granted: {}{}', '{} -> {}', 'Query {} to stage {}{}', 'Query {} from stage {} to stage {}{}')
greatest(coalesce(((select message_format_string, count() from logs where level = 'Trace' and message_format_string not in ('Access granted: {}{}', '{} -> {}', 'Query to stage {}{}', 'Query from stage {} to stage {}{}')
group by message_format_string order by count() desc limit 1) as top_message).2, 0) / (select count() from logs), threshold) as r,
r <= threshold ? '' : top_message.1;
@ -252,7 +252,7 @@ select 'number of noisy messages',
-- Each message matches its pattern (returns 0 rows)
-- Note: maybe we should make it stricter ('Code:%Exception: '||s||'%'), but it's not easy because of addMessage
select 'incorrect patterns', greatest(uniqExact(message_format_string), 15) from (
select message_format_string, any(message) as any_message from logs
select message_format_string, any(toValidUTF8(message)) as any_message from logs
where ((rand() % 8) = 0)
and message not like (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') as s)
and message not like (s || ' (skipped % similar messages)')

View File

@ -2,3 +2,6 @@
{'a':1,'b':2}
{'a':1,'b':2}
{'a':1,'b':2}
{}
{}
{'':''}

View File

@ -1,2 +1,8 @@
SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}', system, one);
SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}');
SELECT map() from remote('127.0.0.{1,2}', system,one);
drop table if exists bug_repro_local;
CREATE TABLE bug_repro_local (`attributes` Map(LowCardinality(String), String)) ENGINE = Log as select map('','');
SELECT if(1, attributes, map()) from remote('127.0.0.{1,2}', currentDatabase(), bug_repro_local) limit 1;

View File

@ -9,6 +9,7 @@ INSERT_BLOCK_SETTINGS="max_insert_block_size=1&min_insert_block_size_rows=0&min_
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=MergeTree() ORDER BY id SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES block_dedup_token;"
$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'

View File

@ -9,7 +9,7 @@ USER_FILES_PATH=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('none
cp "$CUR_DIR"/data_csv/10m_rows.csv.xz $USER_FILES_PATH/
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=1000000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=100000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=1000000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=100000000"
rm $USER_FILES_PATH/10m_rows.csv.xz

Some files were not shown because too many files have changed in this diff Show More