This commit is contained in:
Alexey Milovidov 2022-08-10 19:41:44 +02:00
commit 0b24dbaec8
53 changed files with 939 additions and 335 deletions

View File

@ -1218,13 +1218,25 @@ Result:
└────────────────────────────┴────────────────────────────────┘
```
## parseDateTime64BestEffortUS
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that this function prefers US date format (`MM/DD/YYYY` etc.) in case of ambiguity.
## parseDateTime64BestEffortOrNull
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort) except that it returns `NULL` when it encounters a date format that cannot be processed.
## parseDateTime64BestEffortOrZero
Same as for [parseDateTime64BestEffort](#parsedatetimebesteffort) except that it returns zero date or zero date time when it encounters a date format that cannot be processed.
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort) except that it returns zero date or zero date time when it encounters a date format that cannot be processed.
## parseDateTime64BestEffortUSOrNull
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that this function prefers US date format (`MM/DD/YYYY` etc.) in case of ambiguity and returns `NULL` when it encounters a date format that cannot be processed.
## parseDateTime64BestEffortUSOrZero
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that this function prefers US date format (`MM/DD/YYYY` etc.) in case of ambiguity and returns zero date or zero date time when it encounters a date format that cannot be processed.
## toLowCardinality

View File

@ -1140,13 +1140,25 @@ FORMAT PrettyCompactMonoBlock;
└────────────────────────────┴────────────────────────────────┘
```
## parseDateTime64BestEffortOrNull {#parsedatetime32besteffortornull}
## parseDateTime64BestEffortUS {#parsedatetime64besteffortus}
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetime64besteffort), но разница состоит в том, что в она предполагает американский формат даты (`MM/DD/YYYY` etc.) в случае неоднозначности.
## parseDateTime64BestEffortOrNull {#parsedatetime64besteffortornull}
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetime64besteffort), но возвращает `NULL`, если формат даты не может быть обработан.
## parseDateTime64BestEffortOrZero {#parsedatetime64besteffortorzero}
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetimebesteffort), но возвращает нулевую дату и время, если формат даты не может быть обработан.
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetime64besteffort), но возвращает нулевую дату и время, если формат даты не может быть обработан.
## parseDateTime64BestEffortUSOrNull {#parsedatetime64besteffortusornull}
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetime64besteffort), но разница состоит в том, что в она предполагает американский формат даты (`MM/DD/YYYY` etc.) в случае неоднозначности и возвращает `NULL`, если формат даты не может быть обработан.
## parseDateTime64BestEffortUSOrZero {#parsedatetime64besteffortusorzero}
Работает аналогично функции [parseDateTime64BestEffort](#parsedatetime64besteffort), но разница состоит в том, что в она предполагает американский формат даты (`MM/DD/YYYY` etc.) в случае неоднозначности и возвращает нулевую дату и время, если формат даты не может быть обработан.
## toLowCardinality {#tolowcardinality}

View File

@ -26,13 +26,13 @@ std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHa
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
{
if (uri.getPath() == "/extdict_ping")
return std::make_unique<LibraryBridgeExistsHandler>(keep_alive_timeout, getContext());
return std::make_unique<ExternalDictionaryLibraryBridgeExistsHandler>(keep_alive_timeout, getContext());
}
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
if (uri.getPath() == "/extdict_request")
return std::make_unique<LibraryBridgeRequestHandler>(keep_alive_timeout, getContext());
return std::make_unique<ExternalDictionaryLibraryBridgeRequestHandler>(keep_alive_timeout, getContext());
}
return nullptr;

View File

@ -5,6 +5,7 @@
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/BridgeProtocolVersion.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
@ -78,18 +79,39 @@ static void writeData(Block data, OutputFormatPtr format)
executor.execute();
}
LibraryBridgeRequestHandler::LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_)
ExternalDictionaryLibraryBridgeRequestHandler::ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("LibraryBridgeRequestHandler"))
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
{
}
void LibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
processError(response, "Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != LIBRARY_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
processError(response, "Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
if (!params.has("method"))
{
processError(response, "No 'method' in request URL");
@ -340,14 +362,14 @@ void LibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTT
}
}
LibraryBridgeExistsHandler::LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
ExternalDictionaryLibraryBridgeExistsHandler::ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("LibraryBridgeExistsHandler"))
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeExistsHandler"))
{
}
void LibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
@ -361,6 +383,7 @@ void LibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTP
}
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
String res = library_handler ? "1" : "0";

View File

@ -9,7 +9,6 @@
namespace DB
{
/// Handler for requests to Library Dictionary Source, returns response in RowBinary format.
/// When a library dictionary source is created, it sends 'extDict_libNew' request to library bridge (which is started on first
/// request to it, if it was not yet started). On this request a new sharedLibrayHandler is added to a
@ -17,10 +16,10 @@ namespace DB
/// names of dictionary attributes, sample block to parse block of null values, block of null values. Everything is
/// passed in binary format and is urlencoded. When dictionary is cloned, a new handler is created.
/// Each handler is unique to dictionary.
class LibraryBridgeRequestHandler : public HTTPRequestHandler, WithContext
class ExternalDictionaryLibraryBridgeRequestHandler : public HTTPRequestHandler, WithContext
{
public:
LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
@ -32,10 +31,10 @@ private:
};
class LibraryBridgeExistsHandler : public HTTPRequestHandler, WithContext
class ExternalDictionaryLibraryBridgeExistsHandler : public HTTPRequestHandler, WithContext
{
public:
LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
@ -14,6 +15,7 @@
#include <Poco/NumberParser.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/quoteString.h>
#include "getIdentifierQuote.h"
#include "validateODBCConnectionString.h"
@ -80,6 +82,27 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
LOG_WARNING(log, fmt::runtime(message));
};
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
process_error("Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != XDBC_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
process_error("Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
if (!params.has("table"))
{
process_error("No 'table' param in request URL");

View File

@ -5,11 +5,13 @@
#include <DataTypes/DataTypeFactory.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
#include "getIdentifierQuote.h"
@ -32,6 +34,27 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
LOG_WARNING(log, fmt::runtime(message));
};
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
process_error("Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != XDBC_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
process_error("Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");

View File

@ -17,6 +17,7 @@
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>
#include <Common/config.h>
@ -55,6 +56,28 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
HTMLForm params(getContext()->getSettingsRef(), request);
LOG_TRACE(log, "Request URI: {}", request.getURI());
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
processError(response, "Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != XDBC_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
processError(response, "Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
if (mode == "read")
params.read(request.getStream());

View File

@ -4,9 +4,11 @@
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/logger_useful.h>
#include "validateODBCConnectionString.h"
#include "ODBCPooledConnectionFactory.h"
@ -40,6 +42,28 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
LOG_WARNING(log, fmt::runtime(message));
};
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
process_error("Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != XDBC_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
process_error("Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");

View File

@ -78,6 +78,7 @@
/* For iPad */
margin: 0;
border-radius: 0;
tab-size: 4;
}
html, body
@ -609,11 +610,13 @@
}
}
let query_area = document.getElementById('query');
window.onpopstate = function(event) {
if (!event.state) {
return;
}
document.getElementById('query').value = event.state.query;
query_area.value = event.state.query;
if (!event.state.response) {
clear();
return;
@ -622,13 +625,13 @@
};
if (window.location.hash) {
document.getElementById('query').value = window.atob(window.location.hash.substr(1));
query_area.value = window.atob(window.location.hash.substr(1));
}
function post()
{
++request_num;
let query = document.getElementById('query').value;
let query = query_area.value;
postImpl(request_num, query);
}
@ -645,6 +648,32 @@
}
}
/// Pressing Tab in textarea will increase indentation.
/// But for accessibility reasons, we will fall back to tab navigation if the user already used Tab for that.
let user_prefers_tab_navigation = false;
[...document.querySelectorAll('input')].map(elem => {
elem.onkeydown = (e) => {
if (e.key == 'Tab') { user_prefers_tab_navigation = true; }
};
});
query_area.onkeydown = (e) => {
if (e.key == 'Tab' && !event.shiftKey && !user_prefers_tab_navigation) {
let elem = e.target;
let selection_start = elem.selectionStart;
let selection_end = elem.selectionEnd;
elem.value = elem.value.substring(0, elem.selectionStart) + ' ' + elem.value.substring(elem.selectionEnd);
elem.selectionStart = selection_start + 4;
elem.selectionEnd = selection_start + 4;
e.preventDefault();
return false;
}
};
function clearElement(id)
{
let elem = document.getElementById(id);
@ -701,7 +730,7 @@
stats.innerText = `Elapsed: ${seconds} sec, read ${formatted_rows} rows, ${formatted_bytes}.`;
/// We can also render graphs if user performed EXPLAIN PIPELINE graph=1 or EXPLAIN AST graph = 1
if (response.data.length > 3 && document.getElementById('query').value.match(/^\s*EXPLAIN/i) && typeof(response.data[0][0]) === "string" && response.data[0][0].startsWith("digraph")) {
if (response.data.length > 3 && query_area.value.match(/^\s*EXPLAIN/i) && typeof(response.data[0][0]) === "string" && response.data[0][0].startsWith("digraph")) {
renderGraph(response);
} else {
renderTable(response);

View File

@ -31,16 +31,10 @@ ExternalDictionaryLibraryBridgeHelper::ExternalDictionaryLibraryBridgeHelper(
const Block & sample_block_,
const Field & dictionary_id_,
const LibraryInitData & library_data_)
: IBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeHelper"))
: LibraryBridgeHelper(context_->getGlobalContext())
, sample_block(sample_block_)
, config(context_->getConfigRef())
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, library_data(library_data_)
, dictionary_id(dictionary_id_)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
{
}
@ -65,28 +59,13 @@ Poco::URI ExternalDictionaryLibraryBridgeHelper::getMainURI() const
Poco::URI ExternalDictionaryLibraryBridgeHelper::createRequestURI(const String & method) const
{
auto uri = getMainURI();
uri.addQueryParameter("version", std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION));
uri.addQueryParameter("dictionary_id", toString(dictionary_id));
uri.addQueryParameter("method", method);
return uri;
}
Poco::URI ExternalDictionaryLibraryBridgeHelper::createBaseURI() const
{
Poco::URI uri;
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
return uri;
}
void ExternalDictionaryLibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{
getContext()->addBridgeCommand(std::move(cmd));
}
bool ExternalDictionaryLibraryBridgeHelper::bridgeHandShake()
{
String result;
@ -225,6 +204,14 @@ QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadAll()
}
static String getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
startBridgeSync();
@ -283,13 +270,4 @@ QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadBase(const Poco::URI &
return QueryPipeline(std::move(source));
}
String ExternalDictionaryLibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
}

View File

@ -2,10 +2,9 @@
#include <Interpreters/Context.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <BridgeHelper/IBridgeHelper.h>
#include <BridgeHelper/LibraryBridgeHelper.h>
#include <QueryPipeline/QueryPipeline.h>
@ -14,7 +13,8 @@ namespace DB
class Pipe;
class ExternalDictionaryLibraryBridgeHelper : public IBridgeHelper
// Class to access the external dictionary part of the clickhouse-library-bridge.
class ExternalDictionaryLibraryBridgeHelper : public LibraryBridgeHelper
{
public:
@ -25,7 +25,6 @@ public:
String dict_attributes;
};
static constexpr inline size_t DEFAULT_PORT = 9012;
static constexpr inline auto PING_HANDLER = "/extdict_ping";
static constexpr inline auto MAIN_HANDLER = "/extdict_request";
@ -56,26 +55,6 @@ protected:
bool bridgeHandShake() override;
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
String serviceAlias() const override { return "clickhouse-library-bridge"; }
String serviceFileName() const override { return serviceAlias(); }
size_t getDefaultPort() const override { return DEFAULT_PORT; }
bool startBridgeManually() const override { return false; }
String configPrefix() const override { return "library_bridge"; }
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
Poco::Logger * getLog() const override { return log; }
Poco::Timespan getHTTPTimeout() const override { return http_timeout; }
Poco::URI createBaseURI() const override;
QueryPipeline loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
@ -94,20 +73,10 @@ private:
Poco::URI createRequestURI(const String & method) const;
static String getDictIdsString(const std::vector<UInt64> & ids);
Poco::Logger * log;
const Block sample_block;
const Poco::Util::AbstractConfiguration & config;
const Poco::Timespan http_timeout;
LibraryInitData library_data;
Field dictionary_id;
std::string bridge_host;
size_t bridge_port;
bool library_initialized = false;
ConnectionTimeouts http_timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -0,0 +1,34 @@
#include "LibraryBridgeHelper.h"
namespace DB
{
LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
: IBridgeHelper(context_)
, config(context_->getConfigRef())
, log(&Poco::Logger::get("LibraryBridgeHelper"))
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
{
}
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{
getContext()->addBridgeCommand(std::move(cmd));
}
Poco::URI LibraryBridgeHelper::createBaseURI() const
{
Poco::URI uri;
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
return uri;
}
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <Interpreters/Context.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <BridgeHelper/IBridgeHelper.h>
#include <Common/BridgeProtocolVersion.h>
namespace DB
{
// Common base class to access the clickhouse-library-bridge.
class LibraryBridgeHelper : public IBridgeHelper
{
protected:
explicit LibraryBridgeHelper(ContextPtr context_);
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
String serviceAlias() const override { return "clickhouse-library-bridge"; }
String serviceFileName() const override { return serviceAlias(); }
size_t getDefaultPort() const override { return DEFAULT_PORT; }
bool startBridgeManually() const override { return false; }
String configPrefix() const override { return "library_bridge"; }
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
Poco::Logger * getLog() const override { return log; }
Poco::Timespan getHTTPTimeout() const override { return http_timeout; }
Poco::URI createBaseURI() const override;
static constexpr inline size_t DEFAULT_PORT = 9012;
const Poco::Util::AbstractConfiguration & config;
Poco::Logger * log;
const Poco::Timespan http_timeout;
std::string bridge_host;
size_t bridge_port;
ConnectionTimeouts http_timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -9,6 +9,7 @@
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
#include <IO/ConnectionTimeoutsContext.h>
@ -86,6 +87,7 @@ protected:
{
auto uri = createBaseURI();
uri.setPath(MAIN_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
return uri;
}
@ -163,6 +165,7 @@ protected:
{
auto uri = createBaseURI();
uri.setPath(COL_INFO_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
return uri;
}
@ -184,6 +187,7 @@ protected:
auto uri = createBaseURI();
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
@ -204,6 +208,7 @@ protected:
auto uri = createBaseURI();
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);

View File

@ -0,0 +1,14 @@
#pragma once
#include <cstddef>
namespace DB
{
// Version of protocol between clickhouse-server and clickhouse-library-bridge. Increment if you change it in a non-compatible way.
static constexpr size_t LIBRARY_BRIDGE_PROTOCOL_VERSION = 1;
// Version of protocol between clickhouse-server and clickhouse-xdbc-bridge. Increment if you change it in a non-compatible way.
static constexpr size_t XDBC_BRIDGE_PROTOCOL_VERSION = 1;
}

View File

@ -103,6 +103,9 @@ REGISTER_FUNCTION(Conversion)
factory.registerFunction<FunctionParseDateTime64BestEffort>();
factory.registerFunction<FunctionParseDateTime64BestEffortOrZero>();
factory.registerFunction<FunctionParseDateTime64BestEffortOrNull>();
factory.registerFunction<FunctionParseDateTime64BestEffortUS>();
factory.registerFunction<FunctionParseDateTime64BestEffortUSOrZero>();
factory.registerFunction<FunctionParseDateTime64BestEffortUSOrNull>();
factory.registerFunction<FunctionConvert<DataTypeInterval, NameToIntervalNanosecond, PositiveMonotonicity>>();
factory.registerFunction<FunctionConvert<DataTypeInterval, NameToIntervalMicrosecond, PositiveMonotonicity>>();

View File

@ -1336,9 +1336,18 @@ struct ConvertThroughParsing
}
else if constexpr (parsing_mode == ConvertFromStringParsingMode::BestEffortUS)
{
time_t res;
parsed = tryParseDateTimeBestEffortUS(res, read_buffer, *local_time_zone, *utc_time_zone);
convertFromTime<ToDataType>(vec_to[i],res);
if constexpr (to_datetime64)
{
DateTime64 res = 0;
parsed = tryParseDateTime64BestEffortUS(res, col_to->getScale(), read_buffer, *local_time_zone, *utc_time_zone);
vec_to[i] = res;
}
else
{
time_t res;
parsed = tryParseDateTimeBestEffortUS(res, read_buffer, *local_time_zone, *utc_time_zone);
convertFromTime<ToDataType>(vec_to[i],res);
}
}
else
{
@ -2525,6 +2534,9 @@ struct NameParseDateTime32BestEffortOrNull { static constexpr auto name = "parse
struct NameParseDateTime64BestEffort { static constexpr auto name = "parseDateTime64BestEffort"; };
struct NameParseDateTime64BestEffortOrZero { static constexpr auto name = "parseDateTime64BestEffortOrZero"; };
struct NameParseDateTime64BestEffortOrNull { static constexpr auto name = "parseDateTime64BestEffortOrNull"; };
struct NameParseDateTime64BestEffortUS { static constexpr auto name = "parseDateTime64BestEffortUS"; };
struct NameParseDateTime64BestEffortUSOrZero { static constexpr auto name = "parseDateTime64BestEffortUSOrZero"; };
struct NameParseDateTime64BestEffortUSOrNull { static constexpr auto name = "parseDateTime64BestEffortUSOrNull"; };
using FunctionParseDateTimeBestEffort = FunctionConvertFromString<
@ -2555,6 +2567,14 @@ using FunctionParseDateTime64BestEffortOrZero = FunctionConvertFromString<
using FunctionParseDateTime64BestEffortOrNull = FunctionConvertFromString<
DataTypeDateTime64, NameParseDateTime64BestEffortOrNull, ConvertFromStringExceptionMode::Null, ConvertFromStringParsingMode::BestEffort>;
using FunctionParseDateTime64BestEffortUS = FunctionConvertFromString<
DataTypeDateTime64, NameParseDateTime64BestEffortUS, ConvertFromStringExceptionMode::Throw, ConvertFromStringParsingMode::BestEffortUS>;
using FunctionParseDateTime64BestEffortUSOrZero = FunctionConvertFromString<
DataTypeDateTime64, NameParseDateTime64BestEffortUSOrZero, ConvertFromStringExceptionMode::Zero, ConvertFromStringParsingMode::BestEffortUS>;
using FunctionParseDateTime64BestEffortUSOrNull = FunctionConvertFromString<
DataTypeDateTime64, NameParseDateTime64BestEffortUSOrNull, ConvertFromStringExceptionMode::Null, ConvertFromStringParsingMode::BestEffortUS>;
class ExecutableFunctionCast : public IExecutableFunction
{
public:

View File

@ -697,4 +697,9 @@ bool tryParseDateTime64BestEffort(DateTime64 & res, UInt32 scale, ReadBuffer & i
return parseDateTime64BestEffortImpl<bool, false>(res, scale, in, local_time_zone, utc_time_zone);
}
bool tryParseDateTime64BestEffortUS(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone)
{
return parseDateTime64BestEffortImpl<bool, true>(res, scale, in, local_time_zone, utc_time_zone);
}
}

View File

@ -27,7 +27,6 @@ class ReadBuffer;
*
* DD/MM/YY
* DD/MM/YYYY - when '/' separator is used, these are the only possible forms
* Note that American style is not supported.
*
* hh:mm:ss - when ':' separator is used, it is always time
* hh:mm - it can be specified without seconds
@ -61,7 +60,7 @@ bool tryParseDateTimeBestEffort(time_t & res, ReadBuffer & in, const DateLUTImpl
void parseDateTimeBestEffortUS(time_t & res, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
bool tryParseDateTimeBestEffortUS(time_t & res, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
void parseDateTime64BestEffort(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
void parseDateTime64BestEffortUS(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
bool tryParseDateTime64BestEffort(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
void parseDateTime64BestEffortUS(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
bool tryParseDateTime64BestEffortUS(DateTime64 & res, UInt32 scale, ReadBuffer & in, const DateLUTImpl & local_time_zone, const DateLUTImpl & utc_time_zone);
}

View File

@ -57,7 +57,7 @@ void ActionsDAG::Node::toTree(JSONBuilder::JSONMap & map) const
ActionsDAG::ActionsDAG(const NamesAndTypesList & inputs_)
{
for (const auto & input : inputs_)
index.push_back(&addInput(input.name, input.type));
outputs.push_back(&addInput(input.name, input.type));
}
ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
@ -74,10 +74,10 @@ ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
/// without any respect to header structure. So, it is a way to drop materialized column and use
/// constant value from header.
/// We cannot remove such input right now cause inputs positions are important in some cases.
index.push_back(&addColumn(input));
outputs.push_back(&addColumn(input));
}
else
index.push_back(&addInput(input.name, input.type));
outputs.push_back(&addInput(input.name, input.type));
}
}
@ -237,42 +237,42 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
return addNode(std::move(node));
}
const ActionsDAG::Node & ActionsDAG::findInIndex(const std::string & name) const
const ActionsDAG::Node & ActionsDAG::findInOutputs(const std::string & name) const
{
if (const auto * node = tryFindInIndex(name))
if (const auto * node = tryFindInOutputs(name))
return *node;
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier: '{}'", name);
}
const ActionsDAG::Node * ActionsDAG::tryFindInIndex(const std::string & name) const
const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name) const
{
for (const auto & node : index)
for (const auto & node : outputs)
if (node->result_name == name)
return node;
return nullptr;
}
void ActionsDAG::addOrReplaceInIndex(const Node & node)
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
{
for (auto & index_node : index)
for (auto & output_node : outputs)
{
if (index_node->result_name == node.result_name)
if (output_node->result_name == node.result_name)
{
index_node = &node;
output_node = &node;
return;
}
}
index.push_back(&node);
outputs.push_back(&node);
}
NamesAndTypesList ActionsDAG::getRequiredColumns() const
{
NamesAndTypesList result;
for (const auto & input : inputs)
result.emplace_back(input->result_name, input->result_type);
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name, input_node->result_type);
return result;
}
@ -282,8 +282,8 @@ Names ActionsDAG::getRequiredColumnsNames() const
Names result;
result.reserve(inputs.size());
for (const auto & input : inputs)
result.emplace_back(input->result_name);
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name);
return result;
}
@ -291,8 +291,9 @@ Names ActionsDAG::getRequiredColumnsNames() const
ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
{
ColumnsWithTypeAndName result;
result.reserve(index.size());
for (const auto & node : index)
result.reserve(outputs.size());
for (const auto & node : outputs)
result.emplace_back(node->column, node->result_type, node->result_name);
return result;
@ -301,7 +302,7 @@ ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
{
NamesAndTypesList result;
for (const auto & node : index)
for (const auto & node : outputs)
result.emplace_back(node->result_name, node->result_type);
return result;
@ -310,8 +311,9 @@ NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
Names ActionsDAG::getNames() const
{
Names names;
names.reserve(index.size());
for (const auto & node : index)
names.reserve(outputs.size());
for (const auto & node : outputs)
names.emplace_back(node->result_name);
return names;
@ -335,7 +337,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
required_nodes.reserve(required_names.size());
NameSet added;
for (const auto & node : index)
for (const auto & node : outputs)
{
if (required_names.contains(node->result_name) && !added.contains(node->result_name))
{
@ -352,7 +354,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
"Unknown column: {}, there are only columns {}", name, dumpNames());
}
index.swap(required_nodes);
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
}
@ -362,7 +364,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
required_nodes.reserve(required_names.size());
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : index)
for (const auto * node : outputs)
names_map[node->result_name] = node;
for (const auto & name : required_names)
@ -375,7 +377,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
required_nodes.push_back(it->second);
}
index.swap(required_nodes);
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
}
@ -384,7 +386,7 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta
std::unordered_set<const Node *> visited_nodes;
std::stack<Node *> stack;
for (const auto * node : index)
for (const auto * node : outputs)
{
visited_nodes.insert(node);
stack.push(const_cast<Node *>(node));
@ -516,7 +518,7 @@ Block ActionsDAG::updateHeader(Block header) const
}
ColumnsWithTypeAndName result_columns;
result_columns.reserve(index.size());
result_columns.reserve(outputs.size());
struct Frame
{
@ -525,12 +527,12 @@ Block ActionsDAG::updateHeader(Block header) const
};
{
for (const auto * output : index)
for (const auto * output_node : outputs)
{
if (!node_to_column.contains(output))
if (!node_to_column.contains(output_node))
{
std::stack<Frame> stack;
stack.push({.node = output});
stack.push({.node = output_node});
while (!stack.empty())
{
@ -567,8 +569,8 @@ Block ActionsDAG::updateHeader(Block header) const
}
}
if (node_to_column[output].column)
result_columns.push_back(node_to_column[output]);
if (node_to_column[output_node].column)
result_columns.push_back(node_to_column[output_node]);
}
}
@ -592,35 +594,35 @@ NameSet ActionsDAG::foldActionsByProjection(
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys)
{
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<std::string_view> visited_index_names;
std::unordered_set<std::string_view> visited_output_nodes_names;
std::stack<Node *> stack;
/// Record all needed index nodes to start folding.
for (const auto & node : index)
/// Record all needed output nodes to start folding.
for (const auto & output_node : outputs)
{
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
if (required_columns.find(output_node->result_name) != required_columns.end() || output_node->result_name == predicate_column_name)
{
visited_nodes.insert(node);
visited_index_names.insert(node->result_name);
stack.push(const_cast<Node *>(node));
visited_nodes.insert(output_node);
visited_output_nodes_names.insert(output_node->result_name);
stack.push(const_cast<Node *>(output_node));
}
}
/// If some required columns are not in any index node, try searching from all projection key
/// If some required columns are not in any output node, try searching from all projection key
/// columns. If still missing, return empty set which means current projection fails to match
/// (missing columns).
if (add_missing_keys)
{
for (const auto & column : required_columns)
{
if (visited_index_names.find(column) == visited_index_names.end())
if (visited_output_nodes_names.find(column) == visited_output_nodes_names.end())
{
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
{
const auto * node = &addInput(*column_with_type_name);
visited_nodes.insert(node);
index.push_back(node);
visited_index_names.insert(column);
outputs.push_back(node);
visited_output_nodes_names.insert(column);
}
else
{
@ -662,20 +664,20 @@ NameSet ActionsDAG::foldActionsByProjection(
/// Clean up unused nodes after folding.
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
std::erase_if(index, [&](const Node * node) { return !visited_index_names.contains(node->result_name); });
std::erase_if(outputs, [&](const Node * node) { return !visited_output_nodes_names.contains(node->result_name); });
nodes.remove_if([&](const Node & node) { return !visited_nodes.contains(&node); });
/// Calculate the required columns after folding.
NameSet next_required_columns;
for (const auto & input : inputs)
next_required_columns.insert(input->result_name);
for (const auto & input_node : inputs)
next_required_columns.insert(input_node->result_name);
return next_required_columns;
}
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
{
::sort(index.begin(), index.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
::sort(outputs.begin(), outputs.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
{
return key_names_pos_map.find(lhs->result_name)->second < key_names_pos_map.find(rhs->result_name)->second;
});
@ -684,17 +686,21 @@ void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<st
void ActionsDAG::addAggregatesViaProjection(const Block & aggregates)
{
for (const auto & aggregate : aggregates)
index.push_back(&addInput(aggregate));
outputs.push_back(&addInput(aggregate));
}
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
{
std::unordered_map<std::string_view, size_t> names_map;
for (size_t i = 0; i < index.size(); ++i)
names_map[index[i]->result_name] = i;
size_t output_nodes_size = outputs.size();
for (size_t i = 0; i < output_nodes_size; ++i)
names_map[outputs[i]->result_name] = i;
size_t aliases_size = aliases.size();
NodeRawConstPtrs required_nodes;
required_nodes.reserve(aliases.size());
required_nodes.reserve(aliases_size);
for (const auto & item : aliases)
{
@ -703,10 +709,10 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
required_nodes.push_back(index[it->second]);
required_nodes.push_back(outputs[it->second]);
}
for (size_t i = 0; i < aliases.size(); ++i)
for (size_t i = 0; i < aliases_size; ++i)
{
const auto & item = aliases[i];
const auto * child = required_nodes[i];
@ -726,22 +732,24 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
auto it = names_map.find(child->result_name);
if (it == names_map.end())
{
names_map[child->result_name] = index.size();
index.push_back(child);
names_map[child->result_name] = outputs.size();
outputs.push_back(child);
}
else
index[it->second] = child;
outputs[it->second] = child;
}
}
void ActionsDAG::project(const NamesWithAliases & projection)
{
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : index)
names_map.emplace(node->result_name, node);
for (const auto * output_node : outputs)
names_map.emplace(output_node->result_name, output_node);
index.clear();
index.reserve(projection.size());
outputs.clear();
size_t projection_size = projection.size();
outputs.reserve(projection_size);
for (const auto & item : projection)
{
@ -750,13 +758,13 @@ void ActionsDAG::project(const NamesWithAliases & projection)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
index.push_back(it->second);
outputs.push_back(it->second);
}
for (size_t i = 0; i < projection.size(); ++i)
for (size_t i = 0; i < projection_size; ++i)
{
const auto & item = projection[i];
auto & child = index[i];
auto & child = outputs[i];
if (!item.second.empty() && item.first != item.second)
{
@ -778,8 +786,8 @@ void ActionsDAG::project(const NamesWithAliases & projection)
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
{
for (const auto * node : index)
if (node->result_name == column_name)
for (const auto * output_node : outputs)
if (output_node->result_name == column_name)
return true;
for (auto it = nodes.rbegin(); it != nodes.rend(); ++it)
@ -787,7 +795,7 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
auto & node = *it;
if (node.result_name == column_name)
{
index.push_back(&node);
outputs.push_back(&node);
return true;
}
}
@ -797,19 +805,19 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
bool ActionsDAG::removeUnusedResult(const std::string & column_name)
{
/// Find column in index and remove.
/// Find column in output nodes and remove.
const Node * col;
{
auto it = index.begin();
for (; it != index.end(); ++it)
auto it = outputs.begin();
for (; it != outputs.end(); ++it)
if ((*it)->result_name == column_name)
break;
if (it == index.end())
if (it == outputs.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found result {} in ActionsDAG\n{}", column_name, dumpDAG());
col = *it;
index.erase(it);
outputs.erase(it);
}
/// Check if column is in input.
@ -827,9 +835,9 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
if (col == child)
return false;
/// Do not remove input if it was mentioned in index several times.
for (const auto * node : index)
if (col == node)
/// Do not remove input if it was mentioned in output nodes several times.
for (const auto * output_node : outputs)
if (col == output_node)
return false;
/// Remove from nodes and inputs.
@ -864,11 +872,11 @@ ActionsDAGPtr ActionsDAG::clone() const
for (auto & child : node.children)
child = copy_map[child];
for (const auto & node : index)
actions->index.push_back(copy_map[node]);
for (const auto & output_node : outputs)
actions->outputs.push_back(copy_map[output_node]);
for (const auto & node : inputs)
actions->inputs.push_back(copy_map[node]);
for (const auto & input_node : inputs)
actions->inputs.push_back(copy_map[input_node]);
return actions;
}
@ -939,8 +947,8 @@ std::string ActionsDAG::dumpDAG() const
out << "\n";
}
out << "Index:";
for (const auto * node : index)
out << "Output nodes:";
for (const auto * node : outputs)
out << ' ' << map[node];
out << '\n';
@ -984,8 +992,8 @@ void ActionsDAG::assertDeterministic() const
void ActionsDAG::addMaterializingOutputActions()
{
for (auto & node : index)
node = &materializeNode(*node);
for (auto & output_node : outputs)
output_node = &materializeNode(*output_node);
}
const ActionsDAG::Node & ActionsDAG::materializeNode(const Node & node)
@ -1023,7 +1031,8 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
std::map<std::string_view, std::list<size_t>> inputs;
if (mode == MatchColumnsMode::Name)
{
for (size_t pos = 0; pos < actions_dag->inputs.size(); ++pos)
size_t input_nodes_size = actions_dag->inputs.size();
for (size_t pos = 0; pos < input_nodes_size; ++pos)
inputs[actions_dag->inputs[pos]->result_name].push_back(pos);
}
@ -1134,7 +1143,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
}
}
actions_dag->index.swap(projection);
actions_dag->outputs.swap(projection);
actions_dag->removeUnusedActions();
actions_dag->projectInput();
@ -1153,7 +1162,7 @@ ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
const auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {});
const auto & alias_node = adding_column_action->addAlias(function_node, std::move(column_name));
adding_column_action->index.push_back(&alias_node);
adding_column_action->outputs.push_back(&alias_node);
return adding_column_action;
}
@ -1165,7 +1174,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
/// Will store merged result in `first`.
/// This map contains nodes which should be removed from `first` index, cause they are used as inputs for `second`.
/// This map contains nodes which should be removed from `first` outputs, cause they are used as inputs for `second`.
/// The second element is the number of removes (cause one node may be repeated several times in result).
std::unordered_map<const Node *, size_t> removed_first_result;
/// Map inputs of `second` to nodes of `first`.
@ -1173,25 +1182,25 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
/// Update inputs list.
{
/// Index may have multiple columns with same name. They also may be used by `second`. Order is important.
/// Outputs may have multiple columns with same name. They also may be used by `second`. Order is important.
std::unordered_map<std::string_view, std::list<const Node *>> first_result;
for (const auto & node : first.index)
first_result[node->result_name].push_back(node);
for (const auto & output_node : first.outputs)
first_result[output_node->result_name].push_back(output_node);
for (const auto & node : second.inputs)
for (const auto & input_node : second.inputs)
{
auto it = first_result.find(node->result_name);
auto it = first_result.find(input_node->result_name);
if (it == first_result.end() || it->second.empty())
{
if (first.project_input)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find column {} in ActionsDAG result", node->result_name);
"Cannot find column {} in ActionsDAG result", input_node->result_name);
first.inputs.push_back(node);
first.inputs.push_back(input_node);
}
else
{
inputs_map[node] = it->second.front();
inputs_map[input_node] = it->second.front();
removed_first_result[it->second.front()] += 1;
it->second.pop_front();
}
@ -1212,35 +1221,35 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
}
}
for (auto & node : second.index)
for (auto & output_node : second.outputs)
{
if (node->type == ActionType::INPUT)
if (output_node->type == ActionType::INPUT)
{
auto it = inputs_map.find(node);
auto it = inputs_map.find(output_node);
if (it != inputs_map.end())
node = it->second;
output_node = it->second;
}
}
/// Update index.
/// Update output nodes.
if (second.project_input)
{
first.index.swap(second.index);
first.outputs.swap(second.outputs);
first.project_input = true;
}
else
{
/// Add not removed result from first actions.
for (const auto * node : first.index)
for (const auto * output_node : first.outputs)
{
auto it = removed_first_result.find(node);
auto it = removed_first_result.find(output_node);
if (it != removed_first_result.end() && it->second > 0)
--it->second;
else
second.index.push_back(node);
second.outputs.push_back(output_node);
}
first.index.swap(second.index);
first.outputs.swap(second.outputs);
}
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
@ -1256,12 +1265,13 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
{
/// Split DAG into two parts.
/// (first_nodes, first_index) is a part which will have split_list in result.
/// (second_nodes, second_index) is a part which will have same index as current actions.
Nodes second_nodes;
/// (first_nodes, first_outputs) is a part which will have split_list in result.
/// (second_nodes, second_outputs) is a part which will have same outputs as current actions.
Nodes first_nodes;
NodeRawConstPtrs second_index;
NodeRawConstPtrs first_index;
NodeRawConstPtrs first_outputs;
Nodes second_nodes;
NodeRawConstPtrs second_outputs;
/// List of nodes from current actions which are not inputs, but will be in second part.
NodeRawConstPtrs new_inputs;
@ -1287,8 +1297,8 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
std::stack<Frame> stack;
std::unordered_map<const Node *, Data> data;
for (const auto & node : index)
data[node].used_in_result = true;
for (const auto & output_node : outputs)
data[output_node].used_in_result = true;
/// DFS. Decide if node is needed by split.
for (const auto & node : nodes)
@ -1422,15 +1432,15 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
}
}
for (const auto * node : index)
second_index.push_back(data[node].to_second);
for (const auto * output_node : outputs)
second_outputs.push_back(data[output_node].to_second);
NodeRawConstPtrs second_inputs;
NodeRawConstPtrs first_inputs;
for (const auto * input : inputs)
for (const auto * input_node : inputs)
{
const auto & cur = data[input];
const auto & cur = data[input_node];
first_inputs.push_back(cur.to_first);
}
@ -1438,17 +1448,17 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_index.push_back(cur.to_first);
first_outputs.push_back(cur.to_first);
}
auto first_actions = std::make_shared<ActionsDAG>();
first_actions->nodes.swap(first_nodes);
first_actions->index.swap(first_index);
first_actions->outputs.swap(first_outputs);
first_actions->inputs.swap(first_inputs);
auto second_actions = std::make_shared<ActionsDAG>();
second_actions->nodes.swap(second_nodes);
second_actions->index.swap(second_index);
second_actions->outputs.swap(second_outputs);
second_actions->inputs.swap(second_inputs);
return {std::move(first_actions), std::move(second_actions)};
@ -1524,11 +1534,13 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
{
std::unordered_set<const Node *> split_nodes;
for (const auto & sort_column : sort_columns)
if (const auto * node = tryFindInIndex(sort_column))
if (const auto * node = tryFindInOutputs(sort_column))
split_nodes.insert(node);
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Sorting column {} wasn't found in the ActionsDAG's index. DAG:\n{}", sort_column, dumpDAG());
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sorting column {} wasn't found in the ActionsDAG's outputs. DAG:\n{}",
sort_column,
dumpDAG());
auto res = split(split_nodes);
res.second->project_input = project_input;
@ -1537,11 +1549,12 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
{
const auto * node = tryFindInIndex(column_name);
const auto * node = tryFindInOutputs(column_name);
if (!node)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name, dumpDAG());
"Outputs for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name,
dumpDAG());
std::unordered_set<const Node *> split_nodes = {node};
auto res = split(split_nodes);
@ -1689,7 +1702,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPt
/// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
///
/// Result actions add single column with conjunction result (it is always first in index).
/// Result actions add single column with conjunction result (it is always first in outputs).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs)
{
@ -1763,7 +1776,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
actions->index.push_back(result_predicate);
actions->outputs.push_back(result_predicate);
for (const auto & col : all_inputs)
{
@ -1778,9 +1791,9 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
actions->inputs.push_back(input);
}
/// We should not add result_predicate into the index for the second time.
/// We should not add result_predicate into the outputs for the second time.
if (input->result_name != result_predicate->result_name)
actions->index.push_back(input);
actions->outputs.push_back(input);
}
return actions;
@ -1792,10 +1805,12 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
const Names & available_inputs,
const ColumnsWithTypeAndName & all_inputs)
{
Node * predicate = const_cast<Node *>(tryFindInIndex(filter_name));
Node * predicate = const_cast<Node *>(tryFindInOutputs(filter_name));
if (!predicate)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Index for ActionsDAG does not contain filter column name {}. DAG:\n{}", filter_name, dumpDAG());
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Output nodes for ActionsDAG do not contain filter column name {}. DAG:\n{}",
filter_name,
dumpDAG());
/// If condition is constant let's do nothing.
/// It means there is nothing to push down or optimization was already applied.
@ -1807,8 +1822,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
/// Get input nodes from available_inputs names.
{
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
for (const auto & input : inputs)
inputs_map[input->result_name].emplace_back(input);
for (const auto & input_node : inputs)
inputs_map[input_node->result_name].emplace_back(input_node);
for (const auto & name : available_inputs)
{
@ -1833,8 +1848,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
/// The whole predicate was split.
if (can_remove_filter)
{
/// If filter column is not needed, remove it from index.
std::erase_if(index, [&](const Node * node) { return node == predicate; });
/// If filter column is not needed, remove it from output nodes.
std::erase_if(outputs, [&](const Node * node) { return node == predicate; });
/// At the very end of this method we'll call removeUnusedActions() with allow_remove_inputs=false,
/// so we need to manually remove predicate if it is an input node.
@ -1859,11 +1874,11 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
{
/// Special case. We cannot replace input to constant inplace.
/// Because we cannot affect inputs list for actions.
/// So we just add a new constant and update index.
/// So we just add a new constant and update outputs.
const auto * new_predicate = &addNode(node);
for (auto & index_node : index)
if (index_node == predicate)
index_node = new_predicate;
for (auto & output_node : outputs)
if (output_node == predicate)
output_node = new_predicate;
}
}
}

View File

@ -96,8 +96,8 @@ public:
private:
Nodes nodes;
NodeRawConstPtrs index;
NodeRawConstPtrs inputs;
NodeRawConstPtrs outputs;
bool project_input = false;
bool projected_output = false;
@ -111,7 +111,12 @@ public:
explicit ActionsDAG(const ColumnsWithTypeAndName & inputs_);
const Nodes & getNodes() const { return nodes; }
const NodeRawConstPtrs & getIndex() const { return index; }
const NodeRawConstPtrs & getOutputs() const { return outputs; }
/** Output nodes can contain any column returned from DAG.
* You may manually change it if needed.
*/
NodeRawConstPtrs & getOutputs() { return outputs; }
const NodeRawConstPtrs & getInputs() const { return inputs; }
NamesAndTypesList getRequiredColumns() const;
@ -133,25 +138,26 @@ public:
NodeRawConstPtrs children,
std::string result_name);
/// Index can contain any column returned from DAG.
/// You may manually change it if needed.
NodeRawConstPtrs & getIndex() { return index; }
/// Find first column by name in index. This search is linear.
const Node & findInIndex(const std::string & name) const;
/// Find first column by name in output nodes. This search is linear.
const Node & findInOutputs(const std::string & name) const;
/// Same, but return nullptr if node not found.
const Node * tryFindInIndex(const std::string & name) const;
/// Find first node with the same name in index and replace it.
/// If was not found, add node to index end.
void addOrReplaceInIndex(const Node & node);
const Node * tryFindInOutputs(const std::string & name) const;
/// Find first node with the same name in output nodes and replace it.
/// If was not found, add node to outputs end.
void addOrReplaceInOutputs(const Node & node);
/// Call addAlias several times.
void addAliases(const NamesWithAliases & aliases);
/// Add alias actions and remove unused columns from index. Also specify result columns order in index.
/// Add alias actions and remove unused columns from outputs. Also specify result columns order in outputs.
void project(const NamesWithAliases & projection);
/// If column is not in index, try to find it in nodes and insert back into index.
/// If column is not in outputs, try to find it in nodes and insert back into outputs.
bool tryRestoreColumn(const std::string & column_name);
/// Find column in result. Remove it from index.
/// Find column in result. Remove it from outputs.
/// If columns is in inputs and has no dependent nodes, remove it from inputs too.
/// Return true if column was removed from inputs.
bool removeUnusedResult(const std::string & column_name);
@ -160,7 +166,13 @@ public:
bool isInputProjected() const { return project_input; }
bool isOutputProjected() const { return projected_output; }
/// Remove actions that are not needed to compute output nodes
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Remove actions that are not needed to compute output nodes with required names
void removeUnusedActions(const Names & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Remove actions that are not needed to compute output nodes with required names
void removeUnusedActions(const NameSet & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Transform the current DAG in a way that leaf nodes get folded into their parents. It's done
@ -196,10 +208,10 @@ public:
const String & predicate_column_name = {},
bool add_missing_keys = true);
/// Reorder the index nodes using given position mapping.
/// Reorder the output nodes using given position mapping.
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);
/// Add aggregate columns to index nodes from projection
/// Add aggregate columns to output nodes from projection
void addAggregatesViaProjection(const Block & aggregates);
bool hasArrayJoin() const;
@ -263,7 +275,7 @@ public:
/// Split ActionsDAG into two DAGs, where first part contains all nodes from split_nodes and their children.
/// Execution of first then second parts on block is equivalent to execution of initial DAG.
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal index (outputs).
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal outputs.
/// Second DAG inputs may contain less inputs then first DAG (but also include other columns).
SplitResult split(std::unordered_set<const Node *> split_nodes) const;
@ -271,7 +283,7 @@ public:
SplitResult splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
/// Splits actions into two parts. First part has minimal size sufficient for calculation of column_name.
/// Index of initial actions must contain column_name.
/// Outputs of initial actions must contain column_name.
SplitResult splitActionsForFilter(const std::string & column_name) const;
/// Splits actions into two parts. The first part contains all the calculations required to calculate sort_columns.
@ -304,8 +316,6 @@ public:
private:
Node & addNode(Node node);
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
#if USE_EMBEDDED_COMPILER
void compileFunctions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {});
#endif

View File

@ -389,7 +389,7 @@ SetPtr makeExplicitSet(
const ASTPtr & right_arg = args.children.at(1);
auto column_name = left_arg->getColumnName();
const auto & dag_node = actions.findInIndex(column_name);
const auto & dag_node = actions.findInOutputs(column_name);
const DataTypePtr & left_arg_type = dag_node.result_type;
DataTypes set_element_types = {left_arg_type};
@ -507,7 +507,7 @@ ActionsMatcher::Data::Data(
, actions_stack(std::move(actions_dag), context_)
, aggregation_keys_info(aggregation_keys_info_)
, build_expression_with_window_functions(build_expression_with_window_functions_)
, next_unique_suffix(actions_stack.getLastActions().getIndex().size() + 1)
, next_unique_suffix(actions_stack.getLastActions().getOutputs().size() + 1)
{
}
@ -526,9 +526,9 @@ ScopeStack::ScopeStack(ActionsDAGPtr actions_dag, ContextPtr context_) : WithCon
{
auto & level = stack.emplace_back();
level.actions_dag = std::move(actions_dag);
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
for (const auto & node : level.actions_dag->getIndex())
for (const auto & node : level.actions_dag->getOutputs())
if (node->type == ActionsDAG::ActionType::INPUT)
level.inputs.emplace(node->result_name);
}
@ -537,7 +537,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
{
auto & level = stack.emplace_back();
level.actions_dag = std::make_shared<ActionsDAG>();
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
const auto & prev = stack[stack.size() - 2];
for (const auto & input_column : input_columns)
@ -547,7 +547,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
level.inputs.emplace(input_column.name);
}
for (const auto & node : prev.actions_dag->getIndex())
for (const auto & node : prev.actions_dag->getOutputs())
{
if (!level.index->contains(node->result_name))
{

View File

@ -301,7 +301,7 @@ static std::unordered_set<const ActionsDAG::Node *> processShortCircuitFunctions
if (short_circuit_nodes.empty())
return {};
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getIndex());
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getOutputs());
/// For each node we fill LazyExecutionInfo.
std::unordered_map<const ActionsDAG::Node *, LazyExecutionInfo> lazy_execution_infos;
@ -335,10 +335,10 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
};
const auto & nodes = getNodes();
const auto & index = actions_dag->getIndex();
const auto & outputs = actions_dag->getOutputs();
const auto & inputs = actions_dag->getInputs();
auto reverse_info = getActionsDAGReverseInfo(nodes, index);
auto reverse_info = getActionsDAGReverseInfo(nodes, outputs);
std::vector<Data> data;
for (const auto & node : nodes)
data.push_back({.node = &node});
@ -428,9 +428,9 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
}
}
result_positions.reserve(index.size());
result_positions.reserve(outputs.size());
for (const auto & node : index)
for (const auto & node : outputs)
{
auto pos = data[reverse_info.reverse_index[node]].position;

View File

@ -304,7 +304,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
ssize_t group_size = group_elements_ast.size();
const auto & column_name = group_elements_ast[j]->getColumnName();
const auto * node = temp_actions->tryFindInIndex(column_name);
const auto * node = temp_actions->tryFindInOutputs(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
@ -358,7 +358,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
getRootActionsNoMakeSet(group_asts[i], temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto * node = temp_actions->tryFindInIndex(column_name);
const auto * node = temp_actions->tryFindInOutputs(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
@ -524,7 +524,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
auto temp_actions = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(left_in_operand, true, temp_actions);
if (temp_actions->tryFindInIndex(left_in_operand->getColumnName()))
if (temp_actions->tryFindInOutputs(left_in_operand->getColumnName()))
makeExplicitSet(func, *temp_actions, true, getContext(), settings.size_limits_for_set, prepared_sets);
}
}
@ -634,7 +634,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
for (size_t i = 0; i < arguments.size(); ++i)
{
const std::string & name = arguments[i]->getColumnName();
const auto * dag_node = actions->tryFindInIndex(name);
const auto * dag_node = actions->tryFindInOutputs(name);
if (!dag_node)
{
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
@ -841,7 +841,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
for (size_t i = 0; i < arguments.size(); ++i)
{
const std::string & name = arguments[i]->getColumnName();
const auto * node = actions->tryFindInIndex(name);
const auto * node = actions->tryFindInOutputs(name);
if (!node)
{
@ -937,8 +937,8 @@ ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAGPtr
/// Assign new names to columns, if needed.
if (result_source.first != result_source.second)
{
const auto & node = actions->findInIndex(result_source.second);
actions->getIndex().push_back(&actions->addAlias(node, result_source.first));
const auto & node = actions->findInOutputs(result_source.second);
actions->getOutputs().push_back(&actions->addAlias(node, result_source.first));
}
/// Make ARRAY JOIN (replace arrays with their insides) for the columns in these new names.
@ -1097,10 +1097,10 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
{
auto pos = original_right_columns.getPositionByName(name_with_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
rename_dag->getIndex()[pos] = &alias;
rename_dag->getOutputs()[pos] = &alias;
}
}
rename_dag->projectInput();
auto rename_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(rename_dag));
rename_step->setStepDescription("Rename joined columns");
joined_plan->addStep(std::move(rename_step));
@ -1212,7 +1212,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
String prewhere_column_name = select_query->prewhere()->getColumnName();
step.addRequiredOutput(prewhere_column_name);
const auto & node = step.actions()->findInIndex(prewhere_column_name);
const auto & node = step.actions()->findInOutputs(prewhere_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
@ -1295,7 +1295,7 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
auto where_column_name = select_query->where()->getColumnName();
step.addRequiredOutput(where_column_name);
const auto & node = step.actions()->findInIndex(where_column_name);
const auto & node = step.actions()->findInOutputs(where_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(),

View File

@ -551,10 +551,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression, const
node_to_data[child].all_parents_compilable &= node_is_valid_for_compilation;
}
for (const auto & node : index)
for (const auto & output_node : outputs)
{
/// Force result nodes to compile
node_to_data[node].all_parents_compilable = false;
/// Force output nodes to compile
node_to_data[output_node].all_parents_compilable = false;
}
std::vector<Node *> nodes_to_compile;

View File

@ -106,14 +106,14 @@ static ActionsDAGPtr makeAdditionalPostFilter(ASTPtr & ast, ContextPtr context,
auto syntax_result = TreeRewriter(context).analyze(ast, header.getNamesAndTypesList());
String result_column_name = ast->getColumnName();
auto dag = ExpressionAnalyzer(ast, syntax_result, context).getActionsDAG(false, false);
const ActionsDAG::Node * result_node = &dag->findInIndex(result_column_name);
auto & index = dag->getIndex();
index.clear();
index.reserve(dag->getInputs().size() + 1);
const ActionsDAG::Node * result_node = &dag->findInOutputs(result_column_name);
auto & outputs = dag->getOutputs();
outputs.clear();
outputs.reserve(dag->getInputs().size() + 1);
for (const auto * node : dag->getInputs())
index.push_back(node);
outputs.push_back(node);
index.push_back(result_node);
outputs.push_back(result_node);
return dag;
}
@ -128,7 +128,7 @@ void IInterpreterUnionOrSelectQuery::addAdditionalPostFilter(QueryPlan & plan) c
return;
auto dag = makeAdditionalPostFilter(ast, context, plan.getCurrentDataStream().header);
std::string filter_name = dag->getIndex().back()->result_name;
std::string filter_name = dag->getOutputs().back()->result_name;
auto filter_step = std::make_unique<FilterStep>(
plan.getCurrentDataStream(), std::move(dag), std::move(filter_name), true);
filter_step->setStepDescription("Additional result filter");

View File

@ -162,7 +162,7 @@ FilterDAGInfoPtr generateFilterActions(
filter_info->actions->projectInput(false);
for (const auto * node : filter_info->actions->getInputs())
filter_info->actions->getIndex().push_back(node);
filter_info->actions->getOutputs().push_back(node);
auto required_columns_from_filter = filter_info->actions->getRequiredColumns();

View File

@ -859,9 +859,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
for (const auto & kv : stage.column_to_updated)
{
auto column_name = kv.second->getColumnName();
const auto & dag_node = actions->findInIndex(column_name);
const auto & dag_node = actions->findInOutputs(column_name);
const auto & alias = actions->addAlias(dag_node, kv.first);
actions->addOrReplaceInIndex(alias);
actions->addOrReplaceInOutputs(alias);
}
}

View File

@ -23,7 +23,7 @@ ActionsDAGPtr addMissingDefaults(
bool null_as_default)
{
auto actions = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto & index = actions->getIndex();
auto & index = actions->getOutputs();
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.

View File

@ -326,6 +326,69 @@ static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr
return std::make_shared<arrow::ChunkedArray>(array_vector);
}
static ColumnWithTypeAndName createLCColumnFromArrowDictionaryValues(
const std::shared_ptr<ColumnWithTypeAndName> & dict_values,
const ColumnPtr & indexes_column,
const String & column_name
)
{
auto lc_type = std::make_shared<DataTypeLowCardinality>(dict_values->type);
auto lc_column = lc_type->createColumn();
for (auto i = 0u; i < indexes_column->size(); i++)
{
Field f;
dict_values->column->get(indexes_column->getUInt(i), f);
lc_column->insert(f);
}
return {std::move(lc_column), std::move(lc_type), column_name};
}
/*
* Dictionary(Nullable(X)) in ArrowColumn format is composed of a nullmap, dictionary and an index.
* It doesn't have the concept of null or default values.
* An empty string is just a regular value appended at any position of the dictionary.
* Null values have an index of 0, but it should be ignored since the nullmap will return null.
* In ClickHouse LowCardinality, it's different. The dictionary contains null and default values at the beginning.
* [null, default, ...]. Therefore, null values have an index of 0 and default values have an index of 1.
* No nullmap is used.
* */
static ColumnWithTypeAndName createLCOfNullableColumnFromArrowDictionaryValues(
const std::shared_ptr<ColumnWithTypeAndName> & dict_values,
const ColumnPtr & indexes_column,
const ColumnPtr & nullmap_column,
const String & column_name
)
{
/*
* ArrowColumn format handles nulls by maintaining a nullmap column, there is no nullable type.
* Therefore, dict_values->type is the actual data type/ non-nullable. It needs to be transformed into nullable
* so LC column is created from nullable type and a null value at the beginning of the collection
* is automatically added.
* */
auto lc_type = std::make_shared<DataTypeLowCardinality>(makeNullable(dict_values->type));
auto lc_column = lc_type->createColumn();
for (auto i = 0u; i < indexes_column->size(); i++)
{
if (nullmap_column && nullmap_column->getBool(i))
{
lc_column->insertDefault();
}
else
{
Field f;
dict_values->column->get(indexes_column->getUInt(i), f);
lc_column->insert(f);
}
}
return {std::move(lc_column), std::move(lc_type), column_name};
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & column_name,
@ -338,7 +401,8 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
bool & skipped)
{
if (!is_nullable && arrow_column->null_count() && arrow_column->type()->id() != arrow::Type::LIST
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT)
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT &&
arrow_column->type()->id() != arrow::Type::DICTIONARY)
{
auto nested_column = readColumnFromArrowColumn(arrow_column, column_name, format_name, true, dictionary_values, read_ints_as_dates, allow_null_type, skip_columns_with_unsupported_types, skipped);
if (skipped)
@ -455,12 +519,6 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
}
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_values, read_ints_as_dates, allow_null_type, skip_columns_with_unsupported_types, skipped);
/// We should convert read column to ColumnUnique.
auto tmp_lc_column = DataTypeLowCardinality(dict_column.type).createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
static_cast<IColumnUnique *>(tmp_dict_column.get())->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
dict_column.column = std::move(tmp_dict_column);
dict_values = std::make_shared<ColumnWithTypeAndName>(std::move(dict_column));
}
@ -473,9 +531,19 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
auto arrow_indexes_column = std::make_shared<arrow::ChunkedArray>(indexes_array);
auto indexes_column = readColumnWithIndexesData(arrow_indexes_column);
auto lc_column = ColumnLowCardinality::create(dict_values->column, indexes_column);
auto lc_type = std::make_shared<DataTypeLowCardinality>(dict_values->type);
return {std::move(lc_column), std::move(lc_type), column_name};
const auto contains_null = arrow_column->null_count() > 0;
if (contains_null)
{
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
return createLCOfNullableColumnFromArrowDictionaryValues(dict_values, indexes_column, nullmap_column, column_name);
}
else
{
return createLCColumnFromArrowDictionaryValues(dict_values, indexes_column, column_name);
}
}
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \

View File

@ -239,15 +239,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs index;
index.reserve(output_header.columns() + 1);
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.push_back(grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
@ -264,19 +264,19 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node);
index.push_back(node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)];
const auto * column_node = dag->getOutputs()[header.getPositionByName(col.name)];
if (group_by_use_nulls && column_node->result_type->canBeInsideNullable())
index.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
outputs.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
else
index.push_back(column_node);
outputs.push_back(column_node);
}
}
dag->getIndex().swap(index);
dag->getOutputs().swap(outputs);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -40,17 +40,17 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
{
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto & index = dag->getIndex();
auto & outputs = dag->getOutputs();
if (use_nulls)
{
auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr);
for (const auto & key : keys)
{
const auto * node = dag->getIndex()[header.getPositionByName(key)];
const auto * node = dag->getOutputs()[header.getPositionByName(key)];
if (node->result_type->canBeInsideNullable())
{
dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name));
dag->addOrReplaceInOutputs(dag->addFunction(to_nullable, { node }, node->result_name));
}
}
}
@ -60,7 +60,7 @@ ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, b
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.insert(index.begin(), grouping_node);
outputs.insert(outputs.begin(), grouping_node);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
return std::make_shared<ExpressionTransform>(header, expression);

View File

@ -84,7 +84,7 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
const auto * filter_node = expression->tryFindInIndex(filter_column_name);
const auto * filter_node = expression->tryFindInOutputs(filter_column_name);
if (!filter_node && !filter->removesFilterColumn())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
@ -102,7 +102,8 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.
String split_filter_column_name = split_filter->getIndex().front()->result_name;
String split_filter_column_name = split_filter->getOutputs().front()->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(), split_filter, std::move(split_filter_column_name), can_remove_filter);
@ -284,7 +285,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
*
* New filter column is the first one.
*/
const String & split_filter_column_name = split_filter->getIndex().front()->result_name;
const String & split_filter_column_name = split_filter->getOutputs().front()->result_name;
bool can_remove_filter = source_columns.end() == std::find(source_columns.begin(), source_columns.end(), split_filter_column_name);
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
if (updated_steps > 0)

View File

@ -895,13 +895,13 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
ActionDAGNodes nodes;
if (prewhere_info)
{
const auto & node = prewhere_info->prewhere_actions->findInIndex(prewhere_info->prewhere_column_name);
const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
nodes.nodes.push_back(&node);
}
if (added_filter)
{
const auto & node = added_filter->findInIndex(added_filter_column_name);
const auto & node = added_filter->findInOutputs(added_filter_column_name);
nodes.nodes.push_back(&node);
}

View File

@ -748,7 +748,7 @@ static const ActionsDAG::Node & cloneASTWithInversionPushDown(
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction()))
{
const auto & index_hint_dag = index_hint->getActions();
children = index_hint_dag->getIndex();
children = index_hint_dag->getOutputs();
for (auto & arg : children)
arg = &cloneASTWithInversionPushDown(*arg, inverted_dag, to_inverted, context, need_inversion);
@ -824,7 +824,7 @@ static ActionsDAGPtr cloneASTWithInversionPushDown(ActionsDAG::NodeRawConstPtrs
nodes = {&res->addFunction(function_builder, std::move(nodes), "")};
}
res->getIndex().swap(nodes);
res->getOutputs().swap(nodes);
return res;
}
@ -948,7 +948,7 @@ KeyCondition::KeyCondition(
// std::cerr << "========== inverted dag: " << inverted_dag->dumpDAG() << std::endl;
Block empty;
for (const auto * node : inverted_dag->getIndex())
for (const auto * node : inverted_dag->getOutputs())
traverseAST(Tree(node), context, empty);
}
else

View File

@ -1,6 +1,8 @@
import os
from os import path as p
from build_download_helper import get_with_retries
module_dir = p.abspath(p.dirname(__file__))
git_root = p.abspath(p.join(module_dir, "..", ".."))
@ -22,3 +24,37 @@ REPO_COPY = os.getenv("REPO_COPY", git_root)
RUNNER_TEMP = os.getenv("RUNNER_TEMP", p.abspath(p.join(module_dir, "./tmp")))
S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds")
S3_TEST_REPORTS_BUCKET = os.getenv("S3_TEST_REPORTS_BUCKET", "clickhouse-test-reports")
# These parameters are set only on demand, and only once
_GITHUB_JOB_ID = ""
_GITHUB_JOB_URL = ""
def GITHUB_JOB_ID() -> str:
global _GITHUB_JOB_ID
global _GITHUB_JOB_URL
if _GITHUB_JOB_ID:
return _GITHUB_JOB_ID
jobs = []
while not _GITHUB_JOB_ID:
response = get_with_retries(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/"
f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100"
)
data = response.json()
jobs.extend(data["jobs"])
for job in data["jobs"]:
if job["name"] != GITHUB_JOB:
continue
_GITHUB_JOB_ID = job["id"]
_GITHUB_JOB_URL = job["html_url"]
return _GITHUB_JOB_ID
if len(jobs) == data["total_count"]:
_GITHUB_JOB_ID = "0"
return _GITHUB_JOB_ID
def GITHUB_JOB_URL() -> str:
GITHUB_JOB_ID()
return _GITHUB_JOB_URL

View File

@ -34,6 +34,7 @@ th {{ cursor: pointer; }}
<a href="{commit_url}">Commit</a>
{additional_urls}
<a href="{task_url}">Task (github actions)</a>
<a href="{job_url}">Job (github actions)</a>
</p>
{test_part}
</body>
@ -150,6 +151,7 @@ def create_test_html_report(
test_result,
raw_log_url,
task_url,
job_url,
branch_url,
branch_name,
commit_url,
@ -236,12 +238,17 @@ def create_test_html_report(
[_get_html_url(url) for url in sorted(additional_urls, key=_get_html_url_name)]
)
raw_log_name = os.path.basename(raw_log_url)
if raw_log_name.endswith("?check_suite_focus=true"):
raw_log_name = "Job (github actions)"
result = HTML_BASE_TEST_TEMPLATE.format(
title=_format_header(header, branch_name),
header=_format_header(header, branch_name, branch_url),
raw_log_name=os.path.basename(raw_log_url),
raw_log_name=raw_log_name,
raw_log_url=raw_log_url,
task_url=task_url,
job_url=job_url,
test_part=test_part,
branch_name=branch_name,
commit_url=commit_url,

View File

@ -2,7 +2,12 @@ import os
import logging
import ast
from env_helper import GITHUB_SERVER_URL, GITHUB_REPOSITORY, GITHUB_RUN_URL
from env_helper import (
GITHUB_JOB_URL,
GITHUB_REPOSITORY,
GITHUB_RUN_URL,
GITHUB_SERVER_URL,
)
from report import ReportColorTheme, create_test_html_report
@ -66,13 +71,11 @@ def upload_results(
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/pull/{pr_number}"
commit_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/commit/{commit_sha}"
task_url = GITHUB_RUN_URL
if additional_urls:
raw_log_url = additional_urls[0]
additional_urls.pop(0)
else:
raw_log_url = task_url
raw_log_url = GITHUB_JOB_URL()
statuscolors = (
ReportColorTheme.bugfixcheck if "bugfix validate check" in check_name else None
@ -82,7 +85,8 @@ def upload_results(
check_name,
test_results,
raw_log_url,
task_url,
GITHUB_RUN_URL,
GITHUB_JOB_URL(),
branch_url,
branch_name,
commit_url,

View File

@ -1179,15 +1179,21 @@ class TestSuite:
def is_shebang(line: str) -> bool:
return line.startswith("#!")
def find_tag_line(file):
for line in file:
line = line.strip()
if line and not is_shebang(line):
return line
return ''
def load_tags_from_file(filepath):
comment_sign = get_comment_sign(filepath)
with open(filepath, "r", encoding="utf-8") as file:
try:
line = file.readline()
if is_shebang(line):
line = file.readline()
line = find_tag_line(file)
except UnicodeDecodeError:
return []
return parse_tags_from_line(line, get_comment_sign(filepath))
return parse_tags_from_line(line, comment_sign)
all_tags = {}
start_time = datetime.now()

View File

@ -6,10 +6,10 @@ CREATE TABLE t_part_log_has_merge_type_table
UserID UInt64,
Comment String
)
ENGINE = MergeTree()
ENGINE = MergeTree()
ORDER BY tuple()
TTL event_time + INTERVAL 3 MONTH
SETTINGS min_bytes_for_wide_part = 0, materialize_ttl_recalculate_only = true;
SETTINGS min_bytes_for_wide_part = 0, materialize_ttl_recalculate_only = true, max_number_of_merges_with_ttl_in_pool = 100;
INSERT INTO t_part_log_has_merge_type_table VALUES (now(), 1, 'username1');
INSERT INTO t_part_log_has_merge_type_table VALUES (now() - INTERVAL 4 MONTH, 2, 'username2');
@ -20,7 +20,7 @@ SYSTEM FLUSH LOGS;
SELECT
event_type,
merge_reason
merge_reason
FROM
system.part_log
WHERE

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, race, no-ordinary-database
# Tags: long, race
# Regression test for INSERT into table with MV attached,
# to avoid possible errors if some table will disappears,
@ -9,6 +9,12 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "ATTACH TABLE mv" |& {
# CANNOT_GET_CREATE_TABLE_QUERY -- ATTACH TABLE IF EXISTS
# TABLE_ALREADY_EXISTS -- ATTACH TABLE IF NOT EXISTS
grep -F -m1 Exception | grep -v -e CANNOT_GET_CREATE_TABLE_QUERY -e TABLE_ALREADY_EXISTS
}
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS null;
CREATE TABLE null (key Int) ENGINE = Null;
@ -23,4 +29,8 @@ $CLICKHOUSE_CLIENT -q "INSERT INTO null SELECT * FROM numbers_mt(1000) settings
} &
sleep 0.05
$CLICKHOUSE_CLIENT -q "DETACH TABLE mv"
# avoid leftovers on DROP DATABASE (force_remove_data_recursively_on_drop) for Ordinary database
$CLICKHOUSE_CLIENT -q "ATTACH TABLE mv"
wait

View File

@ -0,0 +1,9 @@
lc_nullable_string
LowCardinality(Nullable(String))
one
\N
three
\N
six

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# ## reading ArrowStream file from python
# import pyarrow as pa
# stream = pa.ipc.open_stream("test.arrows")
# x = stream.read_all()
# print(x)
## writing ArrowStream file from python
# import pyarrow as pa
# data = [
# pa.array(["one", None, "three", "", None, "", "six"]).dictionary_encode(),
# ]
# batch = pa.record_batch(data, names=['id', 'lc_nullable', 'lc_int_nullable', 'bool_nullable'])
# writer = pa.ipc.new_stream("test4.arrows", batch.schema)
# writer.write_batch(batch)
# writer.close()
# cat data.arrow | gzip | base64
cat <<EOF | base64 --decode | gunzip | $CLICKHOUSE_LOCAL --query='SELECT * FROM table FORMAT TSVWithNamesAndTypes' --input-format=ArrowStream
H4sIAAAAAAAAA3VQQQ6CQAychRWIcjCGGA4ePHrwCT7Bgz8waoiSICaoiU/wGR49+Me1XboIJDYp
3d2ZTjsYY8wLwBgcQ8QIMEBEJwqloal8iMNVUSaWmxIjQEjsMb3UvWrA2IZySalRx4SyOGzLe1Hs
9kW2vd6qvDyC+iPL4m8MvseUob2z2NyiutGhFcxb5sP2JLJpbPvhaYstBK8d1PrOW0pMLcha3p0+
h4//4eamUkctTPV02nqRpONfyux2qrLsmj8aX8+Or2nXl6/tzEWj2vWxEh9ha67X2w2yA8esh7k+
13PueVAtzMPvH/HebPkLlbsntUACAAA=
EOF

View File

@ -0,0 +1,7 @@
id lc_nullable lc_int_nullable bool_nullable
Nullable(Int64) LowCardinality(Nullable(String)) LowCardinality(Nullable(Int64)) Nullable(UInt8)
1 onee 1 1
2 twoo 2 0
3 three 3 1
4 four 4 1
5 five 5 1

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# ## reading ArrowStream file from python
# import pyarrow as pa
# stream = pa.ipc.open_stream("test.arrows")
# x = stream.read_all()
# print(x)
## writing ArrowStream file from python
# import pyarrow as pa
#data = [
# pa.array([1, 2, 3, 4, 5]),
# pa.array(["onee", "twoo", "three", "four", "five"]).dictionary_encode(),
# pa.array([1, 2, 3, 4, 5]).dictionary_encode(),
# pa.array([True, False, True, True, True])
#]
# batch = pa.record_batch(data, names=['id', 'lc_nullable', 'lc_int_nullable', 'bool_nullable'])
# writer = pa.ipc.new_stream("test4.arrows", batch.schema)
# writer.write_batch(batch)
# writer.close()
# cat data.arrow | gzip | base64
cat <<EOF | base64 --decode | gunzip | $CLICKHOUSE_LOCAL --query='SELECT * FROM table FORMAT TSVWithNamesAndTypes' --input-format=ArrowStream
H4sIAAAAAAAAA5VTsU7DQAz1pZc2giBaFFAGkBgYMjIyMOQD+gHdCqWpiBQlqErhZzowMvIB/Ft4
d+drrqGFYsny+fzO9rN1TdM0L4JoSEqOKKQ++RTgBBGSJEwEjLL6DOwn7B37IWIA9tX7a75TcgKd
VVUxLVdF8TgrMvgpsGuD9yL4Y2jivDmFFk/TvKzbVwFFUAk1PQpqZaJzkVB153xONS4Gvk8DsBni
veEmfFVTxW+cmsemplMv0NGAMV9ODUlmHkPdk8mvPM7vKXvp5Pag+ZyADaEDndP2iLTNh5onY0Oc
zORDnZU8qWO3HDcbaeegdhUDKTky5nvfmU+P9kvcsedOTHTyWJG6D7PbEb+pyiyr36qqfl5m2aJa
LRf5a8b83g/gl2z4nW32HJO7522e9zt4er/wTJzzLl62js1hZ2Z3aPGKTyxcPhfbfHpS9/2wp+/1
jr6DA/pO9tzbPtJOPO3EJ5249d1/JOnnXP7rHzpHi/UYI/+4v2LbmH9I36C0faSwBAAA
EOF

View File

@ -0,0 +1,58 @@
Expression
Header: key String
value String
Join
Header: key String
value String
Expression
Header: key String
ReadFromStorage
Header: dummy UInt8
Expression
Header: s2.key String
value String
Union
Header: key String
value String
Expression
Header: key String
value String
ReadFromStorage
Header: dummy UInt8
Expression
Header: key String
value String
ReadFromStorage
Header: dummy UInt8
Expression
Header: key String
value String
Join
Header: key String
s2.key String
value String
Sorting
Header: key String
Expression
Header: key String
ReadFromStorage
Header: dummy UInt8
Sorting
Header: s2.key String
value String
Expression
Header: s2.key String
value String
Union
Header: key String
value String
Expression
Header: key String
value String
ReadFromStorage
Header: dummy UInt8
Expression
Header: key String
value String
ReadFromStorage
Header: dummy UInt8

View File

@ -0,0 +1,13 @@
SET join_algorithm = 'hash';
EXPLAIN actions=0, description=0, header=1
SELECT * FROM ( SELECT 'key2' AS key ) AS s1
JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2
USING (key);
SET join_algorithm = 'full_sorting_merge';
EXPLAIN actions=0, description=0, header=1
SELECT * FROM ( SELECT 'key2' AS key ) AS s1
JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2
USING (key);

View File

@ -0,0 +1,12 @@
parseDateTime64BestEffortUS
s a
01-02-1930 12:00:00 1930-01-02 12:00:00.000
12.02.1930 12:00:00 1930-12-02 12:00:00.000
13/02/1930 12:00:00 1930-02-13 12:00:00.000
02/25/1930 12:00:00 1930-02-25 12:00:00.000
parseDateTime64BestEffortUSOrNull
\N
parseDateTime64BestEffortUSOrZero
1970-01-01 00:00:00.000

View File

@ -0,0 +1,22 @@
SELECT 'parseDateTime64BestEffortUS';
SELECT
s,
parseDateTime64BestEffortUS(s,3,'UTC') AS a
FROM
(
SELECT arrayJoin([
'01-02-1930 12:00:00',
'12.02.1930 12:00:00',
'13/02/1930 12:00:00',
'02/25/1930 12:00:00'
]) AS s)
FORMAT PrettySpaceNoEscapes;
SELECT '';
SELECT 'parseDateTime64BestEffortUSOrNull';
SELECT parseDateTime64BestEffortUSOrNull('01/45/1925 16:00:00',3,'UTC');
SELECT 'parseDateTime64BestEffortUSOrZero';
SELECT parseDateTime64BestEffortUSOrZero('01/45/1925 16:00:00',3,'UTC');

View File

@ -0,0 +1 @@
7

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CURL} -X POST -F 'query=select {p1:UInt8} + {p2:UInt8}' -F 'param_p1=3' -F 'param_p2=4' "${CLICKHOUSE_URL}"

View File

@ -25,13 +25,18 @@ function try_sync_replicas()
done
done
i=0
for t in "${tables_arr[@]}"
do
# The size of log may be big, so increase timeout.
$CLICKHOUSE_CLIENT --receive_timeout 400 -q "SYSTEM SYNC REPLICA $t" || $CLICKHOUSE_CLIENT -q \
"select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" &
$CLICKHOUSE_CLIENT --receive_timeout 300 -q "SYSTEM SYNC REPLICA $t" || ($CLICKHOUSE_CLIENT -q \
"select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) &
pids[${i}]=$!
i=$((i + 1))
done
for pid in ${pids[*]}; do
wait $pid || (echo "Failed to sync some replicas" && exit 1)
done
wait
echo "Replication did not hang: synced all replicas of $table_name_prefix"
}
@ -73,7 +78,7 @@ function check_replication_consistency()
# SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet
wait_for_all_mutations "$table_name_prefix%"
try_sync_replicas "$table_name_prefix"
try_sync_replicas "$table_name_prefix" || exit 1
res=$($CLICKHOUSE_CLIENT -q \
"SELECT