mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-20 14:42:02 +00:00
Merge branch 'master' into explain-query-tree-run
This commit is contained in:
commit
06ec01aa6e
@ -15,6 +15,7 @@
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/LockMemoryExceptionInThread.h>
|
||||
#include <filesystem>
|
||||
|
||||
@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
|
||||
ErrorCodes::increment(code, remote, msg, trace);
|
||||
}
|
||||
|
||||
Exception::Exception(const std::string & msg, int code, bool remote_)
|
||||
: Poco::Exception(msg, code)
|
||||
Exception::MessageMasked::MessageMasked(const std::string & msg_)
|
||||
: msg(msg_)
|
||||
{
|
||||
if (auto * masker = SensitiveDataMasker::getInstance())
|
||||
masker->wipeSensitiveData(msg);
|
||||
}
|
||||
|
||||
Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
|
||||
: Poco::Exception(msg_masked.msg, code)
|
||||
, remote(remote_)
|
||||
{
|
||||
handle_error_code(msg, code, remote, getStackFramePointers());
|
||||
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
|
||||
}
|
||||
|
||||
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
|
||||
|
@ -27,7 +27,19 @@ public:
|
||||
using FramePointers = std::vector<void *>;
|
||||
|
||||
Exception() = default;
|
||||
Exception(const std::string & msg, int code, bool remote_ = false);
|
||||
|
||||
// used to remove the sensitive information from exceptions if query_masking_rules is configured
|
||||
struct MessageMasked
|
||||
{
|
||||
std::string msg;
|
||||
MessageMasked(const std::string & msg_);
|
||||
};
|
||||
|
||||
Exception(const MessageMasked & msg_masked, int code, bool remote_);
|
||||
|
||||
// delegating constructor to mask sensitive information from the message
|
||||
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_)
|
||||
{}
|
||||
|
||||
Exception(int code, const std::string & message)
|
||||
: Exception(message, code)
|
||||
@ -54,12 +66,17 @@ public:
|
||||
template <typename... Args>
|
||||
void addMessage(fmt::format_string<Args...> format, Args &&... args)
|
||||
{
|
||||
extendedMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
addMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
void addMessage(const std::string& message)
|
||||
{
|
||||
extendedMessage(message);
|
||||
addMessage(MessageMasked(message));
|
||||
}
|
||||
|
||||
void addMessage(const MessageMasked & msg_masked)
|
||||
{
|
||||
extendedMessage(msg_masked.msg);
|
||||
}
|
||||
|
||||
/// Used to distinguish local exceptions from the one that was received from remote node.
|
||||
|
@ -925,7 +925,7 @@ public:
|
||||
, ErrorCodes::SYNTAX_ERROR);
|
||||
}
|
||||
|
||||
if (allow_function_parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
|
||||
if (allow_function_parameters && !parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
|
||||
{
|
||||
parameters = std::make_shared<ASTExpressionList>();
|
||||
std::swap(parameters->children, elements);
|
||||
|
@ -22,7 +22,7 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
auto res = std::make_shared<ASTTableExpression>();
|
||||
|
||||
if (!ParserWithOptionalAlias(std::make_unique<ParserSubquery>(), true).parse(pos, res->subquery, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(true, true), true).parse(pos, res->table_function, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(false, true), true).parse(pos, res->table_function, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(true, true), true)
|
||||
.parse(pos, res->database_and_table_name, expected))
|
||||
return false;
|
||||
|
@ -99,32 +99,24 @@ Pipe StorageHDFSCluster::read(
|
||||
addColumnsStructureToQueryWithClusterEngine(
|
||||
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
|
||||
|
||||
for (const auto & replicas : cluster->getShardsAddresses())
|
||||
const auto & current_settings = context->getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
|
||||
for (const auto & shard_info : cluster->getShardsInfo())
|
||||
{
|
||||
/// There will be only one replica, because we consider each replica as a shard
|
||||
for (const auto & node : replicas)
|
||||
auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
for (auto & try_result : try_results)
|
||||
{
|
||||
auto connection = std::make_shared<Connection>(
|
||||
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
|
||||
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
|
||||
"HDFSClusterInititiator",
|
||||
node.compression,
|
||||
node.secure
|
||||
);
|
||||
|
||||
|
||||
/// For unknown reason global context is passed to IStorage::read() method
|
||||
/// So, task_identifier is passed as constructor argument. It is more obvious.
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
connection,
|
||||
queryToString(query_to_send),
|
||||
header,
|
||||
context,
|
||||
/*throttler=*/nullptr,
|
||||
scalars,
|
||||
Tables(),
|
||||
processed_stage,
|
||||
RemoteQueryExecutor::Extension{.task_iterator = callback});
|
||||
shard_info.pool,
|
||||
std::vector<IConnectionPool::Entry>{try_result},
|
||||
queryToString(query_to_send),
|
||||
header,
|
||||
context,
|
||||
/*throttler=*/nullptr,
|
||||
scalars,
|
||||
Tables(),
|
||||
processed_stage,
|
||||
RemoteQueryExecutor::Extension{.task_iterator = callback});
|
||||
|
||||
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
|
||||
}
|
||||
|
@ -84,6 +84,8 @@ def test_restart_zookeeper(start_cluster):
|
||||
time.sleep(5)
|
||||
|
||||
for table_id in range(NUM_TABLES):
|
||||
node1.query(
|
||||
f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);"
|
||||
node1.query_with_retry(
|
||||
sql=f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);",
|
||||
retry_count=10,
|
||||
sleep_time=1,
|
||||
)
|
||||
|
18
tests/integration/test_storage_hdfs/configs/cluster.xml
Normal file
18
tests/integration/test_storage_hdfs/configs/cluster.xml
Normal file
@ -0,0 +1,18 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster_non_existent_port>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>19000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster_non_existent_port>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -9,7 +9,11 @@ from pyhdfs import HdfsClient
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/macro.xml", "configs/schema_cache.xml"],
|
||||
main_configs=[
|
||||
"configs/macro.xml",
|
||||
"configs/schema_cache.xml",
|
||||
"configs/cluster.xml",
|
||||
],
|
||||
with_hdfs=True,
|
||||
)
|
||||
|
||||
@ -783,6 +787,32 @@ def test_schema_inference_cache(started_cluster):
|
||||
check_cache_misses(node1, files, 4)
|
||||
|
||||
|
||||
def test_hdfsCluster_skip_unavailable_shards(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
node = started_cluster.instances["node1"]
|
||||
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
||||
hdfs_api.write_data("/skip_unavailable_shards", data)
|
||||
|
||||
assert (
|
||||
node1.query(
|
||||
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1"
|
||||
)
|
||||
== data
|
||||
)
|
||||
|
||||
|
||||
def test_hdfsCluster_unskip_unavailable_shards(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
node = started_cluster.instances["node1"]
|
||||
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
||||
hdfs_api.write_data("/unskip_unavailable_shards", data)
|
||||
error = node.query_and_get_error(
|
||||
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')"
|
||||
)
|
||||
|
||||
assert "NETWORK_ERROR" in error
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -1,11 +1,14 @@
|
||||
1
|
||||
2
|
||||
3
|
||||
3.1
|
||||
4
|
||||
5
|
||||
5.1
|
||||
6
|
||||
7
|
||||
7.1
|
||||
7.2
|
||||
8
|
||||
9
|
||||
text_log non empty
|
||||
|
@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 3
|
||||
# failure at before query start
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \
|
||||
--query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \
|
||||
--log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file"
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b'
|
||||
|
||||
echo '3.1'
|
||||
echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b'
|
||||
|
||||
#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @-
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 4
|
||||
# failure at the end of query
|
||||
@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
|
||||
|
||||
echo '7.1'
|
||||
# query_log exceptions
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'"
|
||||
|
||||
echo '7.2'
|
||||
|
||||
# not perfect: when run in parallel with other tests that check can give false-negative result
|
||||
# because other tests can overwrite the last_error_message, where we check the absence of sensitive data.
|
||||
# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't)
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.errors where last_error_message like '%TOPSECRET%';"
|
||||
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 8
|
||||
|
@ -0,0 +1,2 @@
|
||||
SELECT func(1)(2)(3); -- { clientError SYNTAX_ERROR }
|
||||
SELECT * FROM VALUES(1)(2); -- { clientError SYNTAX_ERROR }
|
Loading…
Reference in New Issue
Block a user