#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { NamesAndTypesList StorageSystemZooKeeper::getNamesAndTypes() { return { { "name", std::make_shared() }, { "value", std::make_shared() }, { "czxid", std::make_shared() }, { "mzxid", std::make_shared() }, { "ctime", std::make_shared() }, { "mtime", std::make_shared() }, { "version", std::make_shared() }, { "cversion", std::make_shared() }, { "aversion", std::make_shared() }, { "ephemeralOwner", std::make_shared() }, { "dataLength", std::make_shared() }, { "numChildren", std::make_shared() }, { "pzxid", std::make_shared() }, { "path", std::make_shared() }, }; } static bool extractPathImpl(const IAST & elem, String & res) { const ASTFunction * function = typeid_cast(&elem); if (!function) return false; if (function->name == "and") { for (size_t i = 0; i < function->arguments->children.size(); ++i) if (extractPathImpl(*function->arguments->children[i], res)) return true; return false; } if (function->name == "equals") { const ASTExpressionList & args = typeid_cast(*function->arguments); const IAST * value; if (args.children.size() != 2) return false; const ASTIdentifier * ident; if ((ident = typeid_cast(&*args.children.at(0)))) value = &*args.children.at(1); else if ((ident = typeid_cast(&*args.children.at(1)))) value = &*args.children.at(0); else return false; if (ident->name != "path") return false; const ASTLiteral * literal = typeid_cast(value); if (!literal) return false; if (literal->value.getType() != Field::Types::String) return false; res = literal->value.safeGet(); return true; } return false; } /** Retrieve from the query a condition of the form `path = 'path'`, from conjunctions in the WHERE clause. */ static String extractPath(const ASTPtr & query) { const ASTSelectQuery & select = typeid_cast(*query); if (!select.where_expression) return ""; String res; return extractPathImpl(*select.where_expression, res) ? res : ""; } void StorageSystemZooKeeper::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const { String path = extractPath(query_info.query); if (path.empty()) throw Exception("SELECT from system.zookeeper table must contain condition like path = 'path' in WHERE clause."); zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper(); /// In all cases except the root, path must not end with a slash. String path_corrected = path; if (path_corrected != "/" && path_corrected.back() == '/') path_corrected.resize(path_corrected.size() - 1); zkutil::Strings nodes = zookeeper->getChildren(path_corrected); String path_part = path_corrected; if (path_part == "/") path_part.clear(); std::vector> futures; futures.reserve(nodes.size()); for (const String & node : nodes) futures.push_back(zookeeper->asyncTryGet(path_part + '/' + node)); for (size_t i = 0, size = nodes.size(); i < size; ++i) { auto res = futures[i].get(); if (res.error == Coordination::ZNONODE) continue; /// Node was deleted meanwhile. const Coordination::Stat & stat = res.stat; size_t col_num = 0; res_columns[col_num++]->insert(nodes[i]); res_columns[col_num++]->insert(res.data); res_columns[col_num++]->insert(Int64(stat.czxid)); res_columns[col_num++]->insert(Int64(stat.mzxid)); res_columns[col_num++]->insert(UInt64(stat.ctime / 1000)); res_columns[col_num++]->insert(UInt64(stat.mtime / 1000)); res_columns[col_num++]->insert(Int64(stat.version)); res_columns[col_num++]->insert(Int64(stat.cversion)); res_columns[col_num++]->insert(Int64(stat.aversion)); res_columns[col_num++]->insert(Int64(stat.ephemeralOwner)); res_columns[col_num++]->insert(Int64(stat.dataLength)); res_columns[col_num++]->insert(Int64(stat.numChildren)); res_columns[col_num++]->insert(Int64(stat.pzxid)); res_columns[col_num++]->insert(path); /// This is the original path. In order to process the request, condition in WHERE should be triggered. } } }