Merge branch 'master' of github.com:ClickHouse/ClickHouse into urlCluster

This commit is contained in:
avogar 2023-05-22 19:19:57 +00:00
commit 88e4c93abc
43 changed files with 651 additions and 345 deletions

View File

@ -12,18 +12,18 @@ This is an experimental feature that is currently in development and is not read
Performs stemming on a given word.
**Syntax**
### Syntax
``` sql
stem('language', word)
```
**Arguments**
### Arguments
- `language` — Language which rules will be applied. Must be in lowercase. [String](../../sql-reference/data-types/string.md#string).
- `language` — Language which rules will be applied. Use the two letter [ISO 639-1 code](https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes).
- `word` — word that needs to be stemmed. Must be in lowercase. [String](../../sql-reference/data-types/string.md#string).
**Examples**
### Examples
Query:
@ -38,23 +38,58 @@ Result:
│ ['I','think','it','is','a','bless','in','disguis'] │
└────────────────────────────────────────────────────┘
```
### Supported languages for stem()
:::note
The stem() function uses the [Snowball stemming](https://snowballstem.org/) library, see the Snowball website for updated languages etc.
:::
- Arabic
- Armenian
- Basque
- Catalan
- Danish
- Dutch
- English
- Finnish
- French
- German
- Greek
- Hindi
- Hungarian
- Indonesian
- Irish
- Italian
- Lithuanian
- Nepali
- Norwegian
- Porter
- Portuguese
- Romanian
- Russian
- Serbian
- Spanish
- Swedish
- Tamil
- Turkish
- Yiddish
## lemmatize
Performs lemmatization on a given word. Needs dictionaries to operate, which can be obtained [here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
**Syntax**
### Syntax
``` sql
lemmatize('language', word)
```
**Arguments**
### Arguments
- `language` — Language which rules will be applied. [String](../../sql-reference/data-types/string.md#string).
- `word` — Word that needs to be lemmatized. Must be lowercase. [String](../../sql-reference/data-types/string.md#string).
**Examples**
### Examples
Query:
@ -70,12 +105,18 @@ Result:
└─────────────────────┘
```
Configuration:
### Configuration
This configuration specifies that the dictionary `en.bin` should be used for lemmatization of English (`en`) words. The `.bin` files can be downloaded from
[here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
``` xml
<lemmatizers>
<lemmatizer>
<!-- highlight-start -->
<lang>en</lang>
<path>en.bin</path>
<!-- highlight-end -->
</lemmatizer>
</lemmatizers>
```
@ -88,18 +129,18 @@ With the `plain` extension type we need to provide a path to a simple text file,
With the `wordnet` extension type we need to provide a path to a directory with WordNet thesaurus in it. Thesaurus must contain a WordNet sense index.
**Syntax**
### Syntax
``` sql
synonyms('extension_name', word)
```
**Arguments**
### Arguments
- `extension_name` — Name of the extension in which search will be performed. [String](../../sql-reference/data-types/string.md#string).
- `word` — Word that will be searched in extension. [String](../../sql-reference/data-types/string.md#string).
**Examples**
### Examples
Query:
@ -115,7 +156,7 @@ Result:
└──────────────────────────────────────────┘
```
Configuration:
### Configuration
``` xml
<synonyms_extensions>
<extension>
@ -137,17 +178,17 @@ Detects the language of the UTF8-encoded input string. The function uses the [CL
The `detectLanguage` function works best when providing over 200 characters in the input string.
**Syntax**
### Syntax
``` sql
detectLanguage('text_to_be_analyzed')
```
**Arguments**
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../../sql-reference/data-types/string.md#string).
**Returned value**
### Returned value
- The 2-letter ISO code of the detected language
@ -156,7 +197,7 @@ Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
**Examples**
### Examples
Query:
@ -175,22 +216,22 @@ fr
Similar to the `detectLanguage` function, but `detectLanguageMixed` returns a `Map` of 2-letter language codes that are mapped to the percentage of the certain language in the text.
**Syntax**
### Syntax
``` sql
detectLanguageMixed('text_to_be_analyzed')
```
**Arguments**
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../../sql-reference/data-types/string.md#string).
**Returned value**
### Returned value
- `Map(String, Float32)`: The keys are 2-letter ISO codes and the values are a percentage of text found for that language
**Examples**
### Examples
Query:
@ -211,17 +252,17 @@ Result:
Similar to the `detectLanguage` function, except the `detectLanguageUnknown` function works with non-UTF8-encoded strings. Prefer this version when your character set is UTF-16 or UTF-32.
**Syntax**
### Syntax
``` sql
detectLanguageUnknown('text_to_be_analyzed')
```
**Arguments**
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../../sql-reference/data-types/string.md#string).
**Returned value**
### Returned value
- The 2-letter ISO code of the detected language
@ -230,7 +271,7 @@ Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
**Examples**
### Examples
Query:
@ -251,21 +292,21 @@ Result:
The `detectCharset` function detects the character set of the non-UTF8-encoded input string.
**Syntax**
### Syntax
``` sql
detectCharset('text_to_be_analyzed')
```
**Arguments**
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../../sql-reference/data-types/string.md#string).
**Returned value**
### Returned value
- A `String` containing the code of the detected character set
**Examples**
### Examples
Query:

View File

@ -46,3 +46,12 @@ SELECT * FROM test_table;
Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function.
Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting.
## Virtual Columns
- `_path` — Path to the `URL`.
- `_file` — Resource name of the `URL`.
**See Also**
- [Virtual columns](/docs/en/engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -46,3 +46,12 @@ SELECT * FROM test_table;
Шаблоны в фигурных скобках `{ }` используются, чтобы сгенерировать список шардов или указать альтернативные адреса на случай отказа. Поддерживаемые типы шаблонов и примеры смотрите в описании функции [remote](remote.md#globs-in-addresses).
Символ `|` внутри шаблонов используется, чтобы задать адреса, если предыдущие оказались недоступны. Эти адреса перебираются в том же порядке, в котором они указаны в шаблоне. Количество адресов, которые могут быть сгенерированы, ограничено настройкой [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements).
## Виртуальные столбцы
- `_path` — Путь до `URL`.
- `_file` — Имя ресурса `URL`.
**Смотрите также**
- [Виртуальные столбцы](index.md#table_engines-virtual_columns)

View File

@ -41,3 +41,11 @@ CREATE TABLE test_table (column1 String, column2 UInt32) ENGINE=Memory;
INSERT INTO FUNCTION url('http://127.0.0.1:8123/?query=INSERT+INTO+test_table+FORMAT+CSV', 'CSV', 'column1 String, column2 UInt32') VALUES ('http interface', 42);
SELECT * FROM test_table;
```
## 虚拟列 {#virtual-columns}
- `_path``URL`路径。
- `_file` — 资源名称。
**另请参阅**
- [虚拟列](https://clickhouse.com/docs/en/operations/table_engines/#table_engines-virtual_columns)

View File

@ -69,6 +69,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/ProtocolServerAdapter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/PrometheusRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/PrometheusMetricsWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/waitServersToFinish.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTPRequestHandlerFactoryMain.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTP/HTTPServer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTP/ReadHeaders.cpp

View File

@ -11,6 +11,9 @@
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/ErrorHandlers.h>
#include <Common/assertProcessUserMatchesDataOwner.h>
#include <Common/makeSocketAddress.h>
#include <Server/waitServersToFinish.h>
#include <base/scope_guard.h>
#include <base/safeExit.h>
#include <Poco/Net/NetException.h>
@ -75,92 +78,9 @@ namespace ErrorCodes
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int NETWORK_ERROR;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA;
extern const int FAILED_TO_GETPWUID;
extern const int LOGICAL_ERROR;
}
namespace
{
size_t waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{
const size_t sleep_max_ms = 1000 * seconds_to_wait;
const size_t sleep_one_ms = 100;
size_t sleep_current_ms = 0;
size_t current_connections = 0;
for (;;)
{
current_connections = 0;
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
if (sleep_current_ms < sleep_max_ms)
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
else
break;
}
return current_connections;
}
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>",
host, e.code(), e.message());
}
throw;
}
return socket_address;
}
std::string getUserName(uid_t user_id)
{
/// Try to convert user id into user name.
auto buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX);
if (buffer_size <= 0)
buffer_size = 1024;
std::string buffer;
buffer.reserve(buffer_size);
struct passwd passwd_entry;
struct passwd * result = nullptr;
const auto error = getpwuid_r(user_id, &passwd_entry, buffer.data(), buffer_size, &result);
if (error)
throwFromErrno("Failed to find user name for " + toString(user_id), ErrorCodes::FAILED_TO_GETPWUID, error);
else if (result)
return result->pw_name;
return toString(user_id);
}
}
Poco::Net::SocketAddress Keeper::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const
{
auto address = makeSocketAddress(host, port, &logger());
@ -364,24 +284,7 @@ try
std::filesystem::create_directories(path);
/// Check that the process user id matches the owner of the data.
const auto effective_user_id = geteuid();
struct stat statbuf;
if (stat(path.c_str(), &statbuf) == 0 && effective_user_id != statbuf.st_uid)
{
const auto effective_user = getUserName(effective_user_id);
const auto data_owner = getUserName(statbuf.st_uid);
std::string message = "Effective user of the process (" + effective_user +
") does not match the owner of the data (" + data_owner + ").";
if (effective_user_id == 0)
{
message += " Run under 'sudo -u " + data_owner + "'.";
throw Exception::createDeprecated(message, ErrorCodes::MISMATCHING_USERS_FOR_PROCESS_AND_DATA);
}
else
{
LOG_WARNING(log, fmt::runtime(message));
}
}
assertProcessUserMatchesDataOwner(path, [&](const std::string & message){ LOG_WARNING(log, fmt::runtime(message)); });
DB::ServerUUID::load(path + "/uuid", log);

View File

@ -39,6 +39,9 @@
#include <Common/remapExecutable.h>
#include <Common/TLDListsHolder.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/assertProcessUserMatchesDataOwner.h>
#include <Common/makeSocketAddress.h>
#include <Server/waitServersToFinish.h>
#include <Core/ServerUUID.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -200,40 +203,6 @@ int mainEntryClickHouseServer(int argc, char ** argv)
}
}
namespace
{
size_t waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{
const size_t sleep_max_ms = 1000 * seconds_to_wait;
const size_t sleep_one_ms = 100;
size_t sleep_current_ms = 0;
size_t current_connections = 0;
for (;;)
{
current_connections = 0;
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
if (sleep_current_ms < sleep_max_ms)
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
else
break;
}
return current_connections;
}
}
namespace DB
{
@ -244,8 +213,6 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
extern const int FAILED_TO_GETPWUID;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA;
extern const int NETWORK_ERROR;
extern const int CORRUPTED_DATA;
}
@ -261,54 +228,6 @@ static std::string getCanonicalPath(std::string && path)
return std::move(path);
}
static std::string getUserName(uid_t user_id)
{
/// Try to convert user id into user name.
auto buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX);
if (buffer_size <= 0)
buffer_size = 1024;
std::string buffer;
buffer.reserve(buffer_size);
struct passwd passwd_entry;
struct passwd * result = nullptr;
const auto error = getpwuid_r(user_id, &passwd_entry, buffer.data(), buffer_size, &result);
if (error)
throwFromErrno("Failed to find user name for " + toString(user_id), ErrorCodes::FAILED_TO_GETPWUID, error);
else if (result)
return result->pw_name;
return toString(user_id);
}
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>",
host, e.code(), e.message());
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress Server::socketBindListen(
const Poco::Util::AbstractConfiguration & config,
Poco::Net::ServerSocket & socket,
@ -959,24 +878,7 @@ try
std::string default_database = server_settings.default_database.toString();
/// Check that the process user id matches the owner of the data.
const auto effective_user_id = geteuid();
struct stat statbuf;
if (stat(path_str.c_str(), &statbuf) == 0 && effective_user_id != statbuf.st_uid)
{
const auto effective_user = getUserName(effective_user_id);
const auto data_owner = getUserName(statbuf.st_uid);
std::string message = "Effective user of the process (" + effective_user +
") does not match the owner of the data (" + data_owner + ").";
if (effective_user_id == 0)
{
message += " Run under 'sudo -u " + data_owner + "'.";
throw Exception::createDeprecated(message, ErrorCodes::MISMATCHING_USERS_FOR_PROCESS_AND_DATA);
}
else
{
global_context->addWarningMessage(message);
}
}
assertProcessUserMatchesDataOwner(path_str, [&](const std::string & message){ global_context->addWarningMessage(message); });
global_context->setPath(path_str);

View File

@ -5,7 +5,8 @@
#include <Common/Exception.h>
#include <base/hex.h>
#include <Core/Settings.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/AsyncTaskExecutor.h>
@ -249,26 +250,26 @@ String TracingContext::composeTraceparentHeader() const
void TracingContext::deserialize(ReadBuffer & buf)
{
buf >> this->trace_id
>> "\n"
>> this->span_id
>> "\n"
>> this->tracestate
>> "\n"
>> this->trace_flags
>> "\n";
readUUIDText(trace_id, buf);
assertChar('\n', buf);
readIntText(span_id, buf);
assertChar('\n', buf);
readEscapedString(tracestate, buf);
assertChar('\n', buf);
readIntText(trace_flags, buf);
assertChar('\n', buf);
}
void TracingContext::serialize(WriteBuffer & buf) const
{
buf << this->trace_id
<< "\n"
<< this->span_id
<< "\n"
<< this->tracestate
<< "\n"
<< this->trace_flags
<< "\n";
writeUUIDText(trace_id, buf);
writeChar('\n', buf);
writeIntText(span_id, buf);
writeChar('\n', buf);
writeEscapedString(tracestate, buf);
writeChar('\n', buf);
writeIntText(trace_flags, buf);
writeChar('\n', buf);
}
const TracingContextOnThread & CurrentContext()

View File

@ -0,0 +1,66 @@
#include <Common/assertProcessUserMatchesDataOwner.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <sys/stat.h>
#include <unistd.h>
#include <pwd.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FAILED_TO_GETPWUID;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA;
}
namespace
{
std::string getUserName(uid_t user_id)
{
/// Try to convert user id into user name.
auto buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX);
if (buffer_size <= 0)
buffer_size = 1024;
std::string buffer;
buffer.reserve(buffer_size);
struct passwd passwd_entry;
struct passwd * result = nullptr;
const auto error = getpwuid_r(user_id, &passwd_entry, buffer.data(), buffer_size, &result);
if (error)
throwFromErrno("Failed to find user name for " + std::to_string(user_id), ErrorCodes::FAILED_TO_GETPWUID, error);
else if (result)
return result->pw_name;
return std::to_string(user_id);
}
}
void assertProcessUserMatchesDataOwner(const std::string & path, std::function<void(const std::string &)> on_warning)
{
/// Check that the process user id matches the owner of the data.
const auto effective_user_id = geteuid();
struct stat statbuf;
if (stat(path.c_str(), &statbuf) == 0 && effective_user_id != statbuf.st_uid)
{
const auto effective_user = getUserName(effective_user_id);
const auto data_owner = getUserName(statbuf.st_uid);
std::string message = fmt::format(
"Effective user of the process ({}) does not match the owner of the data ({}).",
effective_user, data_owner);
if (effective_user_id == 0)
{
message += fmt::format(" Run under 'sudo -u {}'.", data_owner);
throw Exception(ErrorCodes::MISMATCHING_USERS_FOR_PROCESS_AND_DATA, "{}", message);
}
else
{
on_warning(message);
}
}
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <string>
namespace DB
{
void assertProcessUserMatchesDataOwner(
const std::string & path, std::function<void(const std::string &)> on_warning);
}

View File

@ -0,0 +1,36 @@
#include <Common/makeSocketAddress.h>
#include <Common/logger_useful.h>
#include <Poco/Net/NetException.h>
namespace DB
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, uint16_t port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>",
host, e.code(), e.message());
}
throw;
}
return socket_address;
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Poco/Net/SocketAddress.h>
namespace Poco { class Logger; }
namespace DB
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, uint16_t port, Poco::Logger * log);
}

View File

@ -64,7 +64,8 @@ static bool parseNumber(const String & description, size_t l, size_t r, size_t &
* abc{1..9}de{f,g,h} - is a direct product, 27 shards.
* abc{1..9}de{0|1} - is a direct product, 9 shards, in each 2 replicas.
*/
std::vector<String> parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses)
std::vector<String>
parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses, const String & func_name)
{
std::vector<String> res;
std::vector<String> cur;
@ -97,28 +98,41 @@ std::vector<String> parseRemoteDescription(const String & description, size_t l,
if (cnt == 0) break;
}
if (cnt != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': incorrect brace sequence in first argument");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': incorrect brace sequence in first argument", func_name);
/// The presence of a dot - numeric interval
if (last_dot != -1)
{
size_t left, right;
if (description[last_dot - 1] != '.')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': incorrect argument in braces (only one dot): {}",
description.substr(i, m - i + 1));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': incorrect argument in braces (only one dot): {}",
func_name,
description.substr(i, m - i + 1));
if (!parseNumber(description, i + 1, last_dot - 1, left))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
"incorrect argument in braces (Incorrect left number): {}",
description.substr(i, m - i + 1));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (Incorrect left number): {}",
func_name,
description.substr(i, m - i + 1));
if (!parseNumber(description, last_dot + 1, m, right))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
"incorrect argument in braces (Incorrect right number): {}",
description.substr(i, m - i + 1));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (Incorrect right number): {}",
func_name,
description.substr(i, m - i + 1));
if (left > right)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
"incorrect argument in braces (left number is greater then right): {}",
description.substr(i, m - i + 1));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (left number is greater then right): {}",
func_name,
description.substr(i, m - i + 1));
if (right - left + 1 > max_addresses)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': first argument generates too many result addresses");
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Table function '{}': first argument generates too many result addresses", func_name);
bool add_leading_zeroes = false;
size_t len = last_dot - 1 - (i + 1);
/// If the left and right borders have equal numbers, then you must add leading zeros.
@ -161,7 +175,7 @@ std::vector<String> parseRemoteDescription(const String & description, size_t l,
res.insert(res.end(), cur.begin(), cur.end());
if (res.size() > max_addresses)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': first argument generates too many result addresses");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': first argument generates too many result addresses", func_name);
return res;
}

View File

@ -15,7 +15,8 @@ namespace DB
* abc{1..9}de{f,g,h} - is a direct product, 27 shards.
* abc{1..9}de{0|1} - is a direct product, 9 shards, in each 2 replicas.
*/
std::vector<String> parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses);
std::vector<String> parseRemoteDescription(
const String & description, size_t l, size_t r, char separator, size_t max_addresses, const String & func_name = "remote");
/// Parse remote description for external database (MySQL or PostgreSQL).
std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDatabase(const String & description, size_t max_addresses, UInt16 default_port);

View File

@ -176,7 +176,7 @@ StoragePtr DatabasePostgreSQL::tryGetTable(const String & table_name, ContextPtr
}
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr, bool table_checked) const
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr context_, bool table_checked) const
{
if (!cache_tables || !cached_tables.contains(table_name))
{
@ -191,7 +191,8 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
auto storage = std::make_shared<StoragePostgreSQL>(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{},
context_, configuration.schema, configuration.on_conflict);
if (cache_tables)
{

View File

@ -0,0 +1,36 @@
#include <Server/waitServersToFinish.h>
#include <Server/ProtocolServerAdapter.h>
#include <base/sleep.h>
namespace DB
{
size_t waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{
const size_t sleep_max_ms = 1000 * seconds_to_wait;
const size_t sleep_one_ms = 100;
size_t sleep_current_ms = 0;
size_t current_connections = 0;
for (;;)
{
current_connections = 0;
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
if (sleep_current_ms < sleep_max_ms)
sleepForMilliseconds(sleep_one_ms);
else
break;
}
return current_connections;
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
class ProtocolServerAdapter;
size_t waitServersToFinish(std::vector<ProtocolServerAdapter> & servers, size_t seconds_to_wait);
}

View File

@ -18,6 +18,7 @@
#include <Common/logger_useful.h>
#include <Common/parseAddress.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
namespace DB
{
@ -37,12 +38,27 @@ StorageMeiliSearch::StorageMeiliSearch(
: IStorage(table_id), config{config_}, log(&Poco::Logger::get("StorageMeiliSearch (" + table_id.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(config);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageMeiliSearch::getTableStructureFromData(const MeiliSearchConfiguration & config_)
{
MeiliSearchColumnDescriptionFetcher fetcher(config_);
fetcher.addParam(doubleQuoteString("limit"), "1");
return fetcher.fetchColumnsDescription();
}
String convertASTtoStr(ASTPtr ptr)
{
WriteBufferFromOwnString out;
@ -175,6 +191,7 @@ void registerStorageMeiliSearch(StorageFactory & factory)
return std::make_shared<StorageMeiliSearch>(args.table_id, config, args.columns, args.constraints, args.comment);
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::MEILISEARCH,
});
}

View File

@ -28,7 +28,9 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override;
MeiliSearchConfiguration static getConfiguration(ASTs engine_args, ContextPtr context);
static MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
static ColumnsDescription getTableStructureFromData(const MeiliSearchConfiguration & config_);
private:
MeiliSearchConfiguration config;

View File

@ -170,7 +170,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
shards.insert(std::make_shared<StoragePostgreSQL>(
args.table_id, std::move(pool), configuration.table, args.columns, args.constraints, String{}));
args.table_id, std::move(pool), configuration.table, args.columns, args.constraints, String{}, context));
}
}
#endif

View File

@ -21,6 +21,7 @@
#include <Common/parseRemoteDescription.h>
#include <Common/logger_useful.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Databases/MySQL/FetchTablesColumnsList.h>
namespace DB
@ -30,6 +31,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
}
static String backQuoteMySQL(const String & x)
@ -65,12 +67,36 @@ StorageMySQL::StorageMySQL(
, log(&Poco::Logger::get("StorageMySQL (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(*pool, remote_database_name, remote_table_name, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageMySQL::getTableStructureFromData(
mysqlxx::PoolWithFailover & pool_,
const String & database,
const String & table,
const ContextPtr & context_)
{
const auto & settings = context_->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(pool_, database, {table}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(table);
if (columns == tables_and_columns.end())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.",
(database.empty() ? "" : (backQuote(database) + "." + backQuote(table))));
return columns->second;
}
Pipe StorageMySQL::read(
const Names & column_names_,
@ -354,6 +380,7 @@ void registerStorageMySQL(StorageFactory & factory)
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::MYSQL,
});
}

View File

@ -75,6 +75,12 @@ public:
const NamedCollection & named_collection, MySQLSettings & storage_settings,
ContextPtr context_, bool require_table = true);
static ColumnsDescription getTableStructureFromData(
mysqlxx::PoolWithFailover & pool_,
const String & database,
const String & table,
const ContextPtr & context_);
private:
friend class StorageMySQLSink;

View File

@ -43,6 +43,8 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
namespace DB
{
@ -51,6 +53,7 @@ namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
StoragePostgreSQL::StoragePostgreSQL(
@ -60,6 +63,7 @@ StoragePostgreSQL::StoragePostgreSQL(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & remote_table_schema_,
const String & on_conflict_)
: IStorage(table_id_)
@ -70,12 +74,36 @@ StoragePostgreSQL::StoragePostgreSQL(
, log(&Poco::Logger::get("StoragePostgreSQL (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(pool, remote_table_name, remote_table_schema, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StoragePostgreSQL::getTableStructureFromData(
const postgres::PoolWithFailoverPtr & pool_,
const String & table,
const String & schema,
const ContextPtr & context_)
{
const bool use_nulls = context_->getSettingsRef().external_table_functions_use_nulls;
auto connection_holder = pool_->get();
auto columns_info = fetchPostgreSQLTableStructure(
connection_holder->get(), table, schema, use_nulls).physical_columns;
if (!columns_info)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{columns_info->columns};
}
Pipe StoragePostgreSQL::read(
const Names & column_names_,
@ -504,10 +532,12 @@ void registerStoragePostgreSQL(StorageFactory & factory)
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.schema,
configuration.on_conflict);
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::POSTGRES,
});
}

View File

@ -31,6 +31,7 @@ public:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & remote_table_schema_ = "",
const String & on_conflict = "");
@ -66,6 +67,12 @@ public:
static Configuration processNamedCollectionResult(const NamedCollection & named_collection, bool require_table = true);
static ColumnsDescription getTableStructureFromData(
const postgres::PoolWithFailoverPtr & pool_,
const String & table,
const String & schema,
const ContextPtr & context_);
private:
String remote_table_name;
String remote_table_schema;

View File

@ -1297,7 +1297,7 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur
configuration.auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
configuration.auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.format = collection.getOrDefault<String>("format", configuration.format);
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");

View File

@ -4,6 +4,7 @@
#include <Common/logger_useful.h>
#include <Processors/Sources/SQLiteSource.h>
#include <Databases/SQLite/SQLiteUtils.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
@ -44,12 +45,33 @@ StorageSQLite::StorageSQLite(
, log(&Poco::Logger::get("StorageSQLite (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(sqlite_db, remote_table_name);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageSQLite::getTableStructureFromData(
const SQLitePtr & sqlite_db_,
const String & table)
{
auto columns = fetchSQLiteTableStructure(sqlite_db_.get(), table);
if (!columns)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR, "Failed to fetch table structure for {}", table);
return ColumnsDescription{*columns};
}
Pipe StorageSQLite::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -176,6 +198,7 @@ void registerStorageSQLite(StorageFactory & factory)
table_name, args.columns, args.constraints, args.getContext());
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::SQLITE,
});
}

View File

@ -42,6 +42,10 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
static ColumnsDescription getTableStructureFromData(
const SQLitePtr & sqlite_db_,
const String & table);
private:
String remote_table_name;
String database_path;

View File

@ -38,6 +38,7 @@
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
#include <regex>
#include <DataTypes/DataTypeString.h>
namespace DB
@ -210,7 +211,16 @@ void StorageURLSource::setCredentials(Poco::Net::HTTPBasicCredentials & credenti
}
}
Block StorageURLSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageURLSource::StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
@ -227,7 +237,7 @@ StorageURLSource::StorageURLSource(
const HTTPHeaderEntries & headers_,
const URIParams & params,
bool glob_url)
: ISource(sample_block), name(std::move(name_)), uri_iterator(uri_iterator_)
: ISource(getHeader(sample_block, requested_virtual_columns_)), name(std::move(name_)), requested_virtual_columns(requested_virtual_columns_), uri_iterator(uri_iterator_)
{
auto headers = getHeaders(headers_);
@ -238,7 +248,7 @@ StorageURLSource::StorageURLSource(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
auto buf_factory = getFirstAvailableURLReadBuffer(
auto [actual_uri, buf_factory] = getFirstAvailableURIAndReadBuffer(
first_option,
uri_options.end(),
context,
@ -251,6 +261,8 @@ StorageURLSource::StorageURLSource(
glob_url,
uri_options.size() == 1);
curr_uri = actual_uri;
try
{
total_size += buf_factory->getFileSize();
@ -311,6 +323,22 @@ Chunk StorageURLSource::generate()
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
const String & path{curr_uri.getPath()};
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
return chunk;
}
@ -320,7 +348,7 @@ Chunk StorageURLSource::generate()
return {};
}
SeekableReadBufferFactoryPtr StorageURLSource::getFirstAvailableURLReadBuffer(
std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> StorageURLSource::getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
@ -381,7 +409,7 @@ SeekableReadBufferFactoryPtr StorageURLSource::getFirstAvailableURLReadBuffer(
}
}
return res;
return std::make_tuple(request_uri, std::move(res));
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
@ -548,10 +576,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url");
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
}
@ -569,7 +597,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (it == urls_to_check.cend())
return nullptr;
auto buf_factory = StorageURLSource::getFirstAvailableURLReadBuffer(
auto [_, buf_factory] = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
context,
@ -639,6 +667,14 @@ Pipe IStorageURLBase::read(
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
@ -689,6 +725,7 @@ Pipe IStorageURLBase::read(
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
requested_virtual_columns,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
@ -744,6 +781,7 @@ Pipe StorageURLWithFailover::read(
});
auto pipe = Pipe(std::make_shared<StorageURLSource>(
std::vector<NameAndTypePair>{},
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
@ -803,6 +841,13 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
}
}
NamesAndTypesList IStorageURLBase::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));

View File

@ -45,6 +45,8 @@ public:
bool supportsPartitionBy() const override { return true; }
NamesAndTypesList getVirtuals() const override;
static ColumnsDescription getTableStructureFromData(
const String & format,
const String & uri,
@ -155,6 +157,7 @@ public:
using IteratorWrapper = std::function<FailoverOptions()>;
StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
@ -178,7 +181,9 @@ public:
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri);
static SeekableReadBufferFactoryPtr getFirstAvailableURLReadBuffer(
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
static std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
@ -196,7 +201,9 @@ private:
InitializeFunc initialize;
String name;
std::vector<NameAndTypePair> requested_virtual_columns;
std::shared_ptr<IteratorWrapper> uri_iterator;
Poco::URI curr_uri;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;

View File

@ -1,6 +1,5 @@
#include <memory>
#include <Parsers/ASTFunction.h>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
#include <Storages/MeiliSearch/StorageMeiliSearch.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionMeiliSearch.h>
@ -9,19 +8,15 @@
namespace DB
{
StoragePtr TableFunctionMeiliSearch::executeImpl(
const ASTPtr & /* ast_function */, ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
return std::make_shared<StorageMeiliSearch>(
StorageID(getDatabaseName(), table_name), configuration.value(), columns, ConstraintsDescription{}, String{});
StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{});
}
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const
{
MeiliSearchColumnDescriptionFetcher fetcher(configuration.value());
fetcher.addParam(doubleQuoteString("limit"), "1");
return fetcher.fetchColumnsDescription();
return StorageMeiliSearch::getTableStructureFromData(configuration.value());
}

View File

@ -1,7 +1,6 @@
#include "config.h"
#if USE_MYSQL
#include <Databases/MySQL/FetchTablesColumnsList.h>
#include <Processors/Sources/MySQLSource.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -16,7 +15,7 @@
#include <Common/quoteString.h>
#include "registerTableFunctions.h"
#include <Databases/MySQL/DatabaseMySQL.h> // for fetchTablesColumnsList
#include <Databases/MySQL/DatabaseMySQL.h>
#include <Common/parseRemoteDescription.h>
@ -26,7 +25,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
}
void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr context)
@ -61,15 +59,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
{
const auto & settings = context->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, configuration->database, {configuration->table}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(configuration->table);
if (columns == tables_and_columns.end())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.",
(configuration->database.empty() ? "" : (backQuote(configuration->database) + "." + backQuote(configuration->table))));
return columns->second;
return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context);
}
StoragePtr TableFunctionMySQL::executeImpl(
@ -78,8 +68,6 @@ StoragePtr TableFunctionMySQL::executeImpl(
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto res = std::make_shared<StorageMySQL>(
StorageID(getDatabaseName(), table_name),
std::move(*pool),
@ -87,7 +75,7 @@ StoragePtr TableFunctionMySQL::executeImpl(
configuration->table,
configuration->replace_query,
configuration->on_duplicate_clause,
columns,
ColumnsDescription{},
ConstraintsDescription{},
String{},
context,

View File

@ -1,8 +1,6 @@
#include <TableFunctions/TableFunctionPostgreSQL.h>
#if USE_LIBPQXX
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <TableFunctions/ITableFunction.h>
@ -24,14 +22,14 @@ namespace ErrorCodes
StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name),
connection_pool,
configuration->table,
columns,
ColumnsDescription{},
ConstraintsDescription{},
String{},
context,
configuration->schema,
configuration->on_conflict);
@ -42,15 +40,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const
{
const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
auto connection_holder = connection_pool->get();
auto columns_info = fetchPostgreSQLTableStructure(
connection_holder->get(), configuration->table, configuration->schema, use_nulls).physical_columns;
if (!columns_info)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{columns_info->columns};
return StoragePostgreSQL::getTableStructureFromData(connection_pool, configuration->table, configuration->schema, context);
}

View File

@ -134,7 +134,13 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format"))
configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
{
auto format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format != "auto")
configuration.format = format;
}
if (args_to_idx.contains("structure"))
configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");

View File

@ -5,7 +5,6 @@
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Databases/SQLite/SQLiteUtils.h>
#include "registerTableFunctions.h"
@ -26,20 +25,17 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int SQLITE_ENGINE_ERROR;
}
StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto storage = std::make_shared<StorageSQLite>(StorageID(getDatabaseName(), table_name),
sqlite_db,
database_path,
remote_table_name,
columns, ConstraintsDescription{}, context);
ColumnsDescription{}, ConstraintsDescription{}, context);
storage->startup();
return storage;
@ -48,12 +44,7 @@ StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionSQLite::getActualTableStructure(ContextPtr /* context */) const
{
auto columns = fetchSQLiteTableStructure(sqlite_db.get(), remote_table_name);
if (!columns)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR, "Failed to fetch table structure for {}", remote_table_name);
return ColumnsDescription{*columns};
return StorageSQLite::getTableStructureFromData(sqlite_db, remote_table_name);
}

View File

@ -57,10 +57,12 @@ def test_simple_select(started_cluster):
push_data(client, table, data)
parameters = "'http://meili1:7700', 'new_table', ''"
node = started_cluster.instances["meili"]
node.query("DROP TABLE IF EXISTS simple_meili_table")
node.query(
"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili1:7700', 'new_table', '')"
f"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch({parameters})"
)
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
@ -73,7 +75,25 @@ def test_simple_select(started_cluster):
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
== hex(42 * 42) + "\n"
)
node.query(
f"CREATE TABLE simple_meili_table_auto_schema_engine ENGINE=MeiliSearch({parameters})"
)
node.query(
f"CREATE TABLE simple_meili_table_auto_schema_function AS meilisearch({parameters})"
)
expected = "id\tInt64\t\t\t\t\t\ndata\tString\t\t\t\t\t\n"
assert (
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_engine") == expected
)
assert (
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_function") == expected
)
node.query("DROP TABLE simple_meili_table")
node.query("DROP TABLE simple_meili_table_auto_schema_engine")
node.query("DROP TABLE simple_meili_table_auto_schema_function")
table.delete()

View File

@ -307,6 +307,32 @@ def test_table_function(started_cluster):
conn.close()
def test_schema_inference(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, "inference_table")
with conn.cursor() as cursor:
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
parameters = "'mysql57:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(f"CREATE TABLE mysql_schema_inference_function AS mysql({parameters})")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert node1.query("DESCRIBE TABLE mysql_schema_inference_engine") == expected
assert node1.query("DESCRIBE TABLE mysql_schema_inference_function") == expected
node1.query("DROP TABLE mysql_schema_inference_engine")
node1.query("DROP TABLE mysql_schema_inference_function")
drop_mysql_table(conn, "inference_table")
def test_binary_type(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, "binary_type")
@ -329,6 +355,7 @@ def test_binary_type(started_cluster):
node1.query("SELECT * FROM {}".format(table_function))
== "42\tclickhouse\\0\\0\\0\\0\\0\\0\n"
)
drop_mysql_table(conn, "binary_type")
def test_enum_type(started_cluster):

View File

@ -198,7 +198,9 @@ def test_non_default_scema(started_cluster):
expected = node1.query("SELECT number FROM numbers(100)")
assert result == expected
table_function = """postgresql('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema')"""
parameters = "'postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema'"
table_function = f"postgresql({parameters})"
table_engine = f"PostgreSQL({parameters})"
result = node1.query(f"SELECT * FROM {table_function}")
assert result == expected
@ -224,10 +226,19 @@ def test_non_default_scema(started_cluster):
expected = node1.query("SELECT number FROM numbers(200)")
assert result == expected
node1.query(f"CREATE TABLE test.test_pg_auto_schema_engine ENGINE={table_engine}")
node1.query(f"CREATE TABLE test.test_pg_auto_schema_function AS {table_function}")
expected = "a\tNullable(Int32)\t\t\t\t\t\n"
assert node1.query("DESCRIBE TABLE test.test_pg_auto_schema_engine") == expected
assert node1.query("DESCRIBE TABLE test.test_pg_auto_schema_function") == expected
cursor.execute("DROP SCHEMA test_schema CASCADE")
cursor.execute('DROP SCHEMA "test.nice.schema" CASCADE')
node1.query("DROP TABLE test.test_pg_table_schema")
node1.query("DROP TABLE test.test_pg_table_schema_with_dots")
node1.query("DROP TABLE test.test_pg_auto_schema_engine")
node1.query("DROP TABLE test.test_pg_auto_schema_function")
def test_concurrent_queries(started_cluster):

View File

@ -7,6 +7,7 @@ import tempfile
import threading
import os
import traceback
from urllib.parse import urljoin
import urllib.request
import subprocess
from io import StringIO
@ -163,6 +164,7 @@ def test_select(
requests=[],
answers=[],
test_data="",
res_path="",
):
with open(CSV_DATA, "w") as f: # clear file
f.write("")
@ -183,7 +185,7 @@ def test_select(
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(
addr=HTTP_SERVER_URL_STR, schema=schema
addr=urljoin(HTTP_SERVER_URL_STR, res_path), schema=schema
)
check_answers(requests[i].format(tbl=tbl), answers[i])
@ -252,6 +254,20 @@ def main():
"select double, count(*) from {tbl} group by double order by double": "7.7\t2\n9.9\t10",
}
pathname = CSV_DATA
filename = os.path.basename(CSV_DATA)
select_virtual_requests = {
"select _path from {tbl}": "\n".join(pathname for _ in range(2)),
"select _file from {tbl}": "\n".join(filename for _ in range(2)),
"select _file, from {tbl} order by _path": "\n".join(
filename for _ in range(2)
),
"select _path, _file from {tbl}": "\n".join(
f"{pathname}\t{filename}" for _ in range(2)
),
"select _path, count(*) from {tbl} group by _path": f"{pathname}\t2",
}
t, httpd = start_server()
t.start()
# test table with url engine
@ -267,6 +283,14 @@ def main():
answers=list(select_only_requests.values()),
test_data=test_data,
)
# test table function url for virtual column
test_select(
requests=list(select_virtual_requests.keys()),
answers=list(select_virtual_requests.values()),
test_data=test_data,
res_path=CSV_DATA,
)
# test insert into table with url engine
test_insert(
table_name="test_table_insert",

View File

@ -36,6 +36,11 @@ line1 1
line2 2
line3 3
line4 4
test schema inference
col1 Nullable(String)
col2 Nullable(Int32)
col1 Nullable(String)
col2 Nullable(Int32)
test path in clickhouse-local
line1 1
line2 2

View File

@ -87,6 +87,14 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO TABLE FUNCTION sqlite('${DB_PATH}', 't
${CLICKHOUSE_CLIENT} --query="SELECT * FROM sqlite('${DB_PATH}', 'table1') ORDER BY col2"
${CLICKHOUSE_CLIENT} --query="select 'test schema inference'";
${CLICKHOUSE_CLIENT} --query="CREATE TABLE sqlite_table3_inferred_engine ENGINE = SQLite('${DB_PATH}', 'table3')"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE sqlite_table3_inferred_function AS sqlite('${DB_PATH}', 'table3')"
${CLICKHOUSE_CLIENT} --query="DESCRIBE TABLE sqlite_table3_inferred_engine;"
${CLICKHOUSE_CLIENT} --query="DESCRIBE TABLE sqlite_table3_inferred_function;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE sqlite_table3_inferred_engine;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE sqlite_table3_inferred_function;"
sqlite3 "${DB_PATH2}" 'DROP TABLE IF EXISTS table1'
sqlite3 "${DB_PATH2}" 'CREATE TABLE table1 (col1 text, col2 smallint);'
sqlite3 "${DB_PATH2}" "INSERT INTO table1 VALUES ('line1', 1), ('line2', 2), ('line3', 3)"

View File

@ -15,13 +15,15 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# $3 - Query Settings
function execute_query()
{
# Some queries are supposed to fail, use -f to suppress error messages
echo $2 | ${CLICKHOUSE_CURL_COMMAND} -q -s --max-time 180 \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
-H "tracestate: a\nb cd" \
"${CLICKHOUSE_URL}&${3}" \
--data @-
local trace_id=$1 && shift
local ddl_version=$1 && shift
local opts=(
--opentelemetry-traceparent "00-$trace_id-5150000000000515-01"
--opentelemetry-tracestate $'a\nb cd'
--distributed_ddl_output_mode "none"
--distributed_ddl_entry_format_version "$ddl_version"
)
${CLICKHOUSE_CLIENT} "${opts[@]}" "$@"
}
# This function takes following argument:
@ -82,9 +84,9 @@ for ddl_version in 3 4; do
echo "===ddl_format_version ${ddl_version}===="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
execute_query $trace_id "CREATE TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry ON CLUSTER ${cluster_name} (id UInt64) Engine=MergeTree ORDER BY id" "distributed_ddl_output_mode=none&distributed_ddl_entry_format_version=${ddl_version}"
execute_query $trace_id $ddl_version -q "CREATE TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry ON CLUSTER ${cluster_name} (id UInt64) Engine=MergeTree ORDER BY id"
check_span 1 $trace_id "HTTPHandler"
check_span 1 $trace_id "TCPHandler"
if [ $cluster_name = "test_shard_localhost" ]; then
check_span 1 $trace_id "%executeDDLQueryOnCluster%" "attribute['clickhouse.cluster']='${cluster_name}'"
@ -106,7 +108,7 @@ for ddl_version in 3 4; do
check_span $expected $trace_id "%DDLWorker::processTask%"
# For queries that tracing are enabled(format version is 4 or Replicated database engine), there should be two 'query' spans,
# one is for the HTTPHandler, the other is for the DDL executing in DDLWorker.
# one is for the TCPHandler, the other is for the DDL executing in DDLWorker.
#
# For other format, there should be only one 'query' span
if [ $cluster_name = "test_shard_localhost" ]; then
@ -134,9 +136,9 @@ done
echo "===exception===="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
execute_query $trace_id "DROP TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry_non_exist ON CLUSTER ${cluster_name}" "distributed_ddl_output_mode=none&distributed_ddl_entry_format_version=4" 2>&1| grep -Fv "UNKNOWN_TABLE"
execute_query $trace_id 4 -q "DROP TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry_non_exist ON CLUSTER ${cluster_name}" 2>&1 | grep 'DB::Exception ' | grep -Fv "UNKNOWN_TABLE"
check_span 1 $trace_id "HTTPHandler"
check_span 1 $trace_id "TCPHandler"
if [ $cluster_name = "test_shard_localhost" ]; then
expected=1
@ -148,7 +150,7 @@ check_span $expected $trace_id "%executeDDLQueryOnCluster%" "attribute['clickhou
check_span $expected $trace_id "%DDLWorker::processTask%" "kind = 'CONSUMER'"
if [ $cluster_name = "test_shard_localhost" ]; then
# There should be two 'query' spans, one is for the HTTPHandler, the other is for the DDL executing in DDLWorker.
# There should be two 'query' spans, one is for the TCPHandler, the other is for the DDL executing in DDLWorker.
# Both of these two spans contain exception
expected=2
else

View File

@ -0,0 +1,4 @@
/
1
/ 1

View File

@ -0,0 +1,8 @@
-- Tags: no-parallel
select _path from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _file from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _file, count() from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String') group by _file;
select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default&password=wrong', LineAsString, 's String'); -- { serverError RECEIVED_ERROR_FROM_REMOTE_IO_SERVER }