mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Merge pull request #60429 from Algunenano/fix_unlimited_keeper
Fix unrestricted reads from keeper
This commit is contained in:
commit
f8ff15a023
@ -168,13 +168,13 @@ public:
|
||||
/// Type of path to be fetched
|
||||
enum class ZkPathType
|
||||
{
|
||||
Exact, /// Fetch all nodes under this path
|
||||
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
|
||||
Recurse, /// Fatch all nodes under this path, recursively
|
||||
Exact, /// Fetch all nodes under this path
|
||||
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
|
||||
Recurse, /// Fetch all nodes under this path, recursively
|
||||
};
|
||||
|
||||
/// List of paths to be feched from zookeeper
|
||||
using Paths = std::deque<std::pair<String, ZkPathType>>;
|
||||
/// List of paths to be fetched from zookeeper
|
||||
using Paths = std::unordered_map<String, ZkPathType>;
|
||||
|
||||
class ReadFromSystemZooKeeper final : public SourceStepWithFilter
|
||||
{
|
||||
@ -226,6 +226,7 @@ private:
|
||||
ContextPtr context;
|
||||
ZooKeeperWithFaultInjection::Ptr zookeeper;
|
||||
bool started = false;
|
||||
std::unordered_set<String> visited;
|
||||
};
|
||||
|
||||
|
||||
@ -375,7 +376,8 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
|
||||
size_t size = values->size();
|
||||
|
||||
for (size_t row = 0; row < size; ++row)
|
||||
res.emplace_back(values->getDataAt(row).toString(), ZkPathType::Exact);
|
||||
/// Only inserted if the key doesn't exists already
|
||||
res.insert({values->getDataAt(row).toString(), ZkPathType::Exact});
|
||||
}
|
||||
else if (function_name == "equals")
|
||||
{
|
||||
@ -395,7 +397,8 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
|
||||
if (value->column->size() != 1)
|
||||
return;
|
||||
|
||||
res.emplace_back(value->column->getDataAt(0).toString(), ZkPathType::Exact);
|
||||
/// Only inserted if the key doesn't exists already
|
||||
res.insert({value->column->getDataAt(0).toString(), ZkPathType::Exact});
|
||||
}
|
||||
else if (allow_unrestricted && function_name == "like")
|
||||
{
|
||||
@ -414,7 +417,7 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
|
||||
|
||||
String pattern = value->column->getDataAt(0).toString();
|
||||
bool has_metasymbol = false;
|
||||
String prefix; // pattern prefix before the first metasymbol occurrence
|
||||
String prefix{}; // pattern prefix before the first metasymbol occurrence
|
||||
for (size_t i = 0; i < pattern.size(); i++)
|
||||
{
|
||||
char c = pattern[i];
|
||||
@ -440,7 +443,7 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
|
||||
prefix.append(1, c);
|
||||
}
|
||||
|
||||
res.emplace_back(prefix, has_metasymbol ? ZkPathType::Prefix : ZkPathType::Exact);
|
||||
res.insert_or_assign(prefix, has_metasymbol ? ZkPathType::Prefix : ZkPathType::Exact);
|
||||
}
|
||||
}
|
||||
|
||||
@ -453,8 +456,17 @@ static Paths extractPath(const ActionsDAG::NodeRawConstPtrs & filter_nodes, Cont
|
||||
for (const auto * node : filter_nodes)
|
||||
extractPathImpl(*node, res, context, allow_unrestricted);
|
||||
|
||||
auto node1 = res.find("/");
|
||||
auto node2 = res.find("");
|
||||
if ((node1 != res.end() && node1->second != ZkPathType::Exact) || (node2 != res.end() && node2->second != ZkPathType::Exact))
|
||||
{
|
||||
/// If we are already searching everything recursively, remove all other nodes
|
||||
res.clear();
|
||||
res.insert({"/", ZkPathType::Recurse});
|
||||
}
|
||||
|
||||
if (res.empty() && allow_unrestricted)
|
||||
res.emplace_back("/", ZkPathType::Recurse);
|
||||
res.insert({"/", ZkPathType::Recurse});
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -521,7 +533,6 @@ Chunk SystemZooKeeperSource::generate()
|
||||
String path_part;
|
||||
};
|
||||
std::vector<ListTask> list_tasks;
|
||||
std::unordered_set<String> added;
|
||||
while (!paths.empty())
|
||||
{
|
||||
if (query_status)
|
||||
@ -541,8 +552,9 @@ Chunk SystemZooKeeperSource::generate()
|
||||
std::vector<String> paths_to_list;
|
||||
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
|
||||
{
|
||||
auto [path, path_type] = std::move(paths.front());
|
||||
paths.pop_front();
|
||||
auto node = paths.extract(paths.begin());
|
||||
auto & path = node.key();
|
||||
auto & path_type = node.mapped();
|
||||
|
||||
ListTask task;
|
||||
task.path = path;
|
||||
@ -623,7 +635,7 @@ Chunk SystemZooKeeperSource::generate()
|
||||
|
||||
// Deduplication
|
||||
String key = list_task.path_part + '/' + get_task.node;
|
||||
if (auto [it, inserted] = added.emplace(key); !inserted)
|
||||
if (auto [it, inserted] = visited.emplace(key); !inserted)
|
||||
continue;
|
||||
|
||||
const Coordination::Stat & stat = res.stat;
|
||||
@ -649,7 +661,7 @@ Chunk SystemZooKeeperSource::generate()
|
||||
|
||||
if (list_task.path_type != ZkPathType::Exact && res.stat.numChildren > 0)
|
||||
{
|
||||
paths.emplace_back(key, ZkPathType::Recurse);
|
||||
paths.insert_or_assign(key, ZkPathType::Recurse);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ ${CLICKHOUSE_CLIENT} -n --query="CREATE TABLE sample_table (
|
||||
)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/02221_system_zookeeper_unrestricted_like', '1')
|
||||
ORDER BY tuple();
|
||||
DROP TABLE IF EXISTS sample_table;"
|
||||
DROP TABLE IF EXISTS sample_table SYNC;"
|
||||
|
||||
|
||||
${CLICKHOUSE_CLIENT} -n --query "CREATE TABLE sample_table_2 (
|
||||
|
Loading…
Reference in New Issue
Block a user