fix comflict

This commit is contained in:
feng lv 2021-10-19 12:50:22 +00:00
commit 05fd6f7caf
192 changed files with 3078 additions and 1092 deletions

2
contrib/capnproto vendored

@ -1 +1 @@
Subproject commit a00ccd91b3746ef2ab51d40fe3265829949d1ace
Subproject commit c8189ec3c27dacbd4a3288e682473010e377f593

View File

@ -45,6 +45,7 @@ set (CAPNP_SRCS
"${CAPNPROTO_SOURCE_DIR}/capnp/serialize-packed.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/stream.capnp.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema-loader.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/dynamic.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/stringify.c++"
@ -63,6 +64,7 @@ set (CAPNPC_SRCS
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/lexer.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/grammar.capnp.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/parser.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/generics.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/node-translator.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/compiler.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema-parser.c++"

View File

@ -320,7 +320,7 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
- `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all ngrams from a block of data. Works only with strings. Can be used for optimization of `equals`, `like` and `in` expressions.
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all ngrams from a block of data. Works only with datatypes: [String](../../../sql-reference/data-types/string.md), [FixedString](../../../sql-reference/data-types/fixedstring.md) and [Map](../../../sql-reference/data-types/map.md). Can be used for optimization of `EQUALS`, `LIKE` and `IN` expressions.
- `n` — ngram size,
- `size_of_bloom_filter_in_bytes` — Bloom filter size in bytes (you can use large values here, for example, 256 or 512, because it can be compressed well).
@ -337,7 +337,9 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID`, `Map`.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](../../../sql-reference/functions/tuple-map-functions.md#mapkeys) or [mapValues](../../../sql-reference/functions/tuple-map-functions.md#mapvalues) function.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](../../../sql-reference/functions/tuple-map-functions.md#mapkeys) or [mapValues](../../../sql-reference/functions/tuple-map-functions.md#mapvalues) function.
The following functions can use the filter: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md#hasarr-elem).
Example of index creation for `Map` data type
@ -346,9 +348,6 @@ INDEX map_key_index mapKeys(map_column) TYPE bloom_filter GRANULARITY 1
INDEX map_key_index mapValues(map_column) TYPE bloom_filter GRANULARITY 1
```
The following functions can use it: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md).
<!-- -->
``` sql
INDEX sample_index (u64 * length(s)) TYPE minmax GRANULARITY 4

View File

@ -128,6 +128,8 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--history_file` — Path to a file containing command history.
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
- `--hardware-utilization` — Print hardware utilization information in progress bar.
- `--print-profile-events` Print `ProfileEvents` packets.
- `--profile-events-delay-ms` Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet).
Since version 20.5, `clickhouse-client` has automatic syntax highlighting (always enabled).

View File

@ -166,5 +166,6 @@ toc_title: Adopters
| <a href="https://beeline.ru/" class="favicon">Beeline</a> | Telecom | Data Platform | — | — | [Blog post, July 2021](https://habr.com/en/company/beeline/blog/567508/) |
| <a href="https://ecommpay.com/" class="favicon">Ecommpay</a> | Payment Processing | Logs | — | — | [Video, Nov 2019](https://www.youtube.com/watch?v=d3GdZTOWGLk) |
| <a href="https://omnicomm.ru/" class="favicon">Omnicomm</a> | Transportation Monitoring | — | — | — | [Facebook post, Oct 2021](https://www.facebook.com/OmnicommTeam/posts/2824479777774500) |
| <a href="https://ok.ru" class="favicon">Ok.ru</a> | Social Network | — | 72 servers | 810 TB compressed, 50bn rows/day, 1.5 TB/day | [SmartData conference, Oct 2021](https://assets.ctfassets.net/oxjq45e8ilak/4JPHkbJenLgZhBGGyyonFP/57472ec6987003ec4078d0941740703b/____________________ClickHouse_______________________.pdf) |
[Original article](https://clickhouse.com/docs/en/introduction/adopters/) <!--hide-->

View File

@ -316,17 +316,26 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
#### Доступные индексы {#available-types-of-indices}
- `minmax`Хранит минимум и максимум выражения (если выражение - `tuple`, то для каждого элемента `tuple`), используя их для пропуска блоков аналогично первичному ключу.
- `minmax`хранит минимум и максимум выражения (если выражение - [Tuple](../../../sql-reference/data-types/tuple.md), то для каждого элемента `Tuple`), используя их для пропуска блоков аналогично первичному ключу.
- `set(max_rows)` — Хранит уникальные значения выражения на блоке в количестве не более `max_rows` (если `max_rows = 0`, то ограничений нет), используя их для пропуска блоков, оценивая выполнимость `WHERE` выражения на хранимых данных.
- `set(max_rows)` — хранит уникальные значения выражения на блоке в количестве не более `max_rows` (если `max_rows = 0`, то ограничений нет), используя их для пропуска блоков, оценивая выполнимость `WHERE` выражения на хранимых данных.
- `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)` — хранит [фильтр Блума](https://en.wikipedia.org/wiki/Bloom_filter), содержащий все N-граммы блока данных. Работает только с данными форматов [String](../../../sql-reference/data-types/string.md), [FixedString](../../../sql-reference/data-types/fixedstring.md) и [Map](../../../sql-reference/data-types/map.md) с ключами типа `String` или `fixedString`. Может быть использован для оптимизации выражений `EQUALS`, `LIKE` и `IN`.
- `n` — размер N-граммы,
- `size_of_bloom_filter_in_bytes` — размер в байтах фильтра Блума (можно использовать большие значения, например, 256 или 512, поскольку сжатие компенсирует возможные издержки).
- `number_of_hash_functions` — количество хеш-функций, использующихся в фильтре Блума.
- `random_seed` — состояние генератора случайных чисел для хеш-функций фильтра Блума.
- `tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)` — то же, что и`ngrambf_v1`, но хранит токены вместо N-грамм. Токены — это последовательности символов, разделенные не буквенно-цифровыми символами.
- `bloom_filter([false_positive])` — [фильтр Блума](https://en.wikipedia.org/wiki/Bloom_filter) для указанных стоблцов.
Необязательный параметр `false_positive` — это вероятность получения ложноположительного срабатывания. Возможные значения: (0, 1). Значение по умолчанию: 0.025.
Поддержанные типы данных: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`.
Поддерживаемые типы данных: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`.
Фильтром могут пользоваться функции: [equals](../../../engines/table-engines/mergetree-family/mergetree.md), [notEquals](../../../engines/table-engines/mergetree-family/mergetree.md), [in](../../../engines/table-engines/mergetree-family/mergetree.md), [notIn](../../../engines/table-engines/mergetree-family/mergetree.md).
Фильтром могут пользоваться функции: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md#hasarr-elem).
**Примеры**

View File

@ -2684,6 +2684,43 @@ SELECT CAST(toNullable(toInt32(0)) AS Int32) as x, toTypeName(x);
Значение по умолчанию: `1`.
## output_format_csv_null_representation {#output_format_csv_null_representation}
Определяет представление `NULL` для формата выходных данных [CSV](../../interfaces/formats.md#csv). Пользователь может установить в качестве значения любую строку, например, `My NULL`.
Значение по умолчанию: `\N`.
**Примеры**
Запрос:
```sql
SELECT * FROM csv_custom_null FORMAT CSV;
```
Результат:
```text
788
\N
\N
```
Запрос:
```sql
SET output_format_csv_null_representation = 'My NULL';
SELECT * FROM csv_custom_null FORMAT CSV;
```
Результат:
```text
788
My NULL
My NULL
```
## output_format_tsv_null_representation {#output_format_tsv_null_representation}
Определяет представление `NULL` для формата выходных данных [TSV](../../interfaces/formats.md#tabseparated). Пользователь может установить в качестве значения любую строку.

View File

@ -25,9 +25,6 @@
#endif
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/NetException.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/PODArray.h>
#include <Common/TerminalSize.h>
#include <Common/Config/configReadClient.h>
#include "Common/MemoryTracker.h"
@ -35,13 +32,11 @@
#include <Core/QueryProcessingStage.h>
#include <Client/TestHint.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Poco/Util/Application.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h>
@ -51,9 +46,6 @@
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/formatAST.h>
#include <Interpreters/InterpreterSetQuery.h>
@ -86,7 +78,6 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
extern const int TOO_DEEP_RECURSION;
extern const int NETWORK_ERROR;
extern const int UNRECOGNIZED_ARGUMENTS;
extern const int AUTHENTICATION_FAILED;
}
@ -993,7 +984,7 @@ void Client::printHelpMessage(const OptionsDescription & options_description)
}
void Client::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
void Client::addOptions(OptionsDescription & options_description)
{
/// Main commandline options related to client functionality and all parameters from Settings.
options_description.main_description->add_options()
@ -1050,14 +1041,6 @@ void Client::addAndCheckOptions(OptionsDescription & options_description, po::va
(
"types", po::value<std::string>(), "types"
);
cmd_settings.addProgramOptions(options_description.main_description.value());
/// Parse main commandline options.
po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run();
auto unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional);
if (unrecognized_options.size() > 1)
throw Exception(ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized option '{}'", unrecognized_options[1]);
po::store(parsed, options);
}
@ -1235,16 +1218,16 @@ int mainEntryClickHouseClient(int argc, char ** argv)
client.init(argc, argv);
return client.run();
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return 1;
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;

View File

@ -24,7 +24,7 @@ protected:
String getName() const override { return "client"; }
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) override;
void processConfig() override;

View File

@ -358,8 +358,8 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
/// Initialize keeper RAFT. Do nothing if no keeper_server in config.
global_context->initializeKeeperDispatcher(/* start_async = */false);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -1,8 +1,6 @@
#include "LocalServer.h"
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Util/OptionCallback.h>
#include <Poco/String.h>
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
@ -10,7 +8,6 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <base/getFQDNOrHostName.h>
@ -20,17 +17,12 @@
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/escapeForFileName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/ThreadStatus.h>
#include <Common/UnicodeBar.h>
#include <Common/config_version.h>
#include <Common/quoteString.h>
#include <loggers/Loggers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/UseSSL.h>
#include <Parsers/IAST.h>
#include <base/ErrorHandlers.h>
@ -42,9 +34,7 @@
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options.hpp>
#include <base/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Common/randomSeed.h>
#include <filesystem>
@ -660,7 +650,7 @@ void LocalServer::printHelpMessage(const OptionsDescription & options_descriptio
}
void LocalServer::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
void LocalServer::addOptions(OptionsDescription & options_description)
{
options_description.main_description->add_options()
("database,d", po::value<std::string>(), "database")
@ -678,11 +668,8 @@ void LocalServer::addAndCheckOptions(OptionsDescription & options_description, p
("logger.level", po::value<std::string>(), "Log level")
("no-system-tables", "do not attach system tables (better startup time)")
("path", po::value<std::string>(), "Storage path")
;
cmd_settings.addProgramOptions(options_description.main_description.value());
po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run();
po::store(parsed, options);
}
@ -737,6 +724,17 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
app.init(argc, argv);
return app.run();
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';

View File

@ -40,7 +40,7 @@ protected:
String getQueryTextPrefix() override;
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> &) override;
void processConfig() override;

View File

@ -997,8 +997,19 @@ if (ThreadFuzzer::instance().isEffective())
if (config().has("keeper_server"))
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
//// of clickhouse-keeper, so start synchronously.
bool can_initialize_keeper_async = false;
if (has_zookeeper) /// We have configured connection to some zookeeper cluster
{
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
/// synchronously.
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
}
/// Initialize keeper RAFT.
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -71,6 +71,7 @@ namespace ErrorCodes
extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int INVALID_USAGE_OF_INPUT;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int UNRECOGNIZED_ARGUMENTS;
}
}
@ -266,7 +267,7 @@ void ClientBase::onLogData(Block & block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->write(block);
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -668,39 +669,61 @@ void ClientBase::onEndOfStream()
void ClientBase::onProfileEvents(Block & block)
{
const auto rows = block.rows();
if (rows == 0 || !progress_indication.print_hardware_utilization)
if (rows == 0)
return;
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
if (progress_indication.print_hardware_utilization)
{
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
{
thread_times[host_name][thread_id].user_ms = value;
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
{
thread_times[host_name][thread_id].user_ms = value;
}
else if (event_name == system_time_name)
{
thread_times[host_name][thread_id].system_ms = value;
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
{
thread_times[host_name][thread_id].memory_usage = value;
}
}
else if (event_name == system_time_name)
progress_indication.updateThreadEventData(thread_times);
}
if (profile_events.print)
{
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
{
thread_times[host_name][thread_id].system_ms = value;
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
profile_events.watch.restart();
profile_events.last_block = {};
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
else
{
thread_times[host_name][thread_id].memory_usage = value;
profile_events.last_block = block;
}
}
progress_indication.updateThreadEventData(thread_times);
}
@ -1023,6 +1046,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
processed_rows = 0;
written_first_block = false;
progress_indication.resetProgress();
profile_events.watch.restart();
{
/// Temporarily apply query settings to context.
@ -1091,6 +1115,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
}
/// Always print last block (if it was not printed already)
if (profile_events.last_block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();
}
if (is_interactive)
{
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
@ -1505,6 +1538,26 @@ void ClientBase::readArguments(int argc, char ** argv, Arguments & common_argume
}
}
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
cmd_settings.addProgramOptions(options_description.main_description.value());
/// Parse main commandline options.
auto parser = po::command_line_parser(arguments).options(options_description.main_description.value()).allow_unregistered();
po::parsed_options parsed = parser.run();
/// Check unrecognized options without positional options.
auto unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::exclude_positional);
if (!unrecognized_options.empty())
throw Exception(ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized option '{}'", unrecognized_options[0]);
/// Check positional options (options after ' -- ', ex: clickhouse-client -- <options>).
unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional);
if (unrecognized_options.size() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Positional options are not supported.");
po::store(parsed, options);
}
void ClientBase::init(int argc, char ** argv)
{
@ -1561,9 +1614,12 @@ void ClientBase::init(int argc, char ** argv)
("ignore-error", "do not stop processing in multiquery mode")
("stacktrace", "print stack traces of exceptions")
("hardware-utilization", "print hardware utilization information in progress bar")
("print-profile-events", po::value(&profile_events.print)->zero_tokens(), "Printing ProfileEvents packets")
("profile-events-delay-ms", po::value<UInt64>()->default_value(profile_events.delay_ms), "Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet)")
;
addAndCheckOptions(options_description, options, common_arguments);
addOptions(options_description);
parseAndCheckOptions(options_description, options, common_arguments);
po::notify(options);
if (options.count("version") || options.count("V"))
@ -1611,6 +1667,10 @@ void ClientBase::init(int argc, char ** argv)
config().setBool("vertical", true);
if (options.count("stacktrace"))
config().setBool("stacktrace", true);
if (options.count("print-profile-events"))
config().setBool("print-profile-events", true);
if (options.count("profile-events-delay-ms"))
config().setInt("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
if (options.count("progress"))
config().setBool("progress", true);
if (options.count("echo"))
@ -1631,6 +1691,8 @@ void ClientBase::init(int argc, char ** argv)
progress_indication.print_hardware_utilization = true;
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
profile_events.print = options.count("print-profile-events");
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
processOptions(options_description, options, external_tables_arguments);
argsToConfig(common_arguments, config(), 100);

View File

@ -3,6 +3,7 @@
#include <Common/ProgressIndication.h>
#include <Common/InterruptListener.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Core/ExternalTable.h>
#include <Poco/Util/Application.h>
#include <Interpreters/Context.h>
@ -91,7 +92,7 @@ protected:
};
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
@ -132,6 +133,7 @@ private:
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
@ -217,6 +219,16 @@ protected:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
struct
{
bool print = false;
/// UINT64_MAX -- print only last
UInt64 delay_ms = 0;
Stopwatch watch;
/// For printing only last (delay_ms == 0).
Block last_block;
} profile_events;
QueryProcessingStage::Enum query_processing_stage;
};

View File

@ -1,6 +1,7 @@
#include <Client/InternalTextLogs.h>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Common/typeid_cast.h>
#include <Common/HashTable/Hash.h>
#include <DataTypes/IDataType.h>
@ -13,7 +14,7 @@
namespace DB
{
void InternalTextLogs::write(const Block & block)
void InternalTextLogs::writeLogs(const Block & block)
{
const auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
const auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();
@ -97,4 +98,69 @@ void InternalTextLogs::write(const Block & block)
}
}
void InternalTextLogs::writeProfileEvents(const Block & block)
{
const auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_current_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("current_time").column).getData();
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & array_type = typeid_cast<const ColumnInt8 &>(*block.getByName("type").column).getData();
const auto & column_name = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & array_value = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
for (size_t row_num = 0; row_num < block.rows(); ++row_num)
{
/// host_name
auto host_name = column_host_name.getDataAt(row_num);
if (host_name.size)
{
writeCString("[", wb);
if (color)
writeString(setColor(StringRefHash()(host_name)), wb);
writeString(host_name, wb);
if (color)
writeCString(resetColor(), wb);
writeCString("] ", wb);
}
/// current_time
auto current_time = array_current_time[row_num];
writeDateTimeText<'.', ':'>(current_time, wb);
/// thread_id
UInt64 thread_id = array_thread_id[row_num];
writeCString(" [ ", wb);
if (color)
writeString(setColor(intHash64(thread_id)), wb);
writeIntText(thread_id, wb);
if (color)
writeCString(resetColor(), wb);
writeCString(" ] ", wb);
/// name
auto name = column_name.getDataAt(row_num);
if (color)
writeString(setColor(StringRefHash()(name)), wb);
DB::writeString(name, wb);
if (color)
writeCString(resetColor(), wb);
writeCString(": ", wb);
/// value
UInt64 value = array_value[row_num];
writeIntText(value, wb);
//// type
Int8 type = array_type[row_num];
writeCString(" (", wb);
if (color)
writeString(setColor(intHash64(type)), wb);
writeString(toString(ProfileEvents::TypeEnum->castToName(type)), wb);
if (color)
writeCString(resetColor(), wb);
writeCString(")", wb);
writeChar('\n', wb);
}
}
}

View File

@ -6,16 +6,37 @@
namespace DB
{
/// Prints internal server logs
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock()
/// Prints internal server logs or profile events with colored output (if requested).
/// NOTE: IRowOutputFormat does not suite well for this case
class InternalTextLogs
{
public:
InternalTextLogs(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
void write(const Block & block);
/// Print internal server logs
///
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock():
/// - event_time
/// - event_time_microseconds
/// - host_name
/// - query_id
/// - thread_id
/// - priority
/// - source
/// - text
void writeLogs(const Block & block);
/// Print profile events.
///
/// Block:
/// - host_name
/// - current_time
/// - thread_id
/// - type
/// - name
/// - value
///
/// See also TCPHandler::sendProfileEvents() for block columns.
void writeProfileEvents(const Block & block);
void flush()
{

View File

@ -589,8 +589,10 @@
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(620, QUERY_NOT_ALLOWED) \
M(621, CANNOT_NORMALIZE_STRING) \
M(622, BAD_FILE_TYPE) \
M(623, IO_SETUP_ERROR) \
M(622, CANNOT_PARSE_CAPN_PROTO_SCHEMA) \
M(623, CAPN_PROTO_BAD_CAST) \
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -40,13 +40,7 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
}
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;

View File

@ -53,7 +53,7 @@ void CompressionCodecFactory::validateCodec(
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -100,12 +100,13 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
ISerialization::StreamCallback callback = [&](const auto & substream_path)
{
assert(!substream_path.empty());
if (ISerialization::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
const auto & last_type = substream_path.back().data.type;
result_codec = getImpl(codec_family_name, codec_arguments, last_type.get());
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
@ -115,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
}
};
ISerialization::SubstreamPath stream_path;
column_type->enumerateStreams(column_type->getDefaultSerialization(), callback, stream_path);
ISerialization::SubstreamPath path;
column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type, nullptr);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());

View File

@ -28,7 +28,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(Milliseconds, startup_timeout, 180000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \

View File

@ -250,7 +250,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return true;
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("keeper_server.server_id");
@ -271,8 +271,16 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
server->startup();
LOG_DEBUG(log, "Server initialized, waiting for quorum");
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
if (!start_async)
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
}
else
{
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
}
}
catch (...)
{
@ -366,7 +374,7 @@ void KeeperDispatcher::sessionCleanerTask()
try
{
/// Only leader node must check dead sessions
if (isLeader())
if (server->checkInit() && isLeader())
{
auto dead_sessions = server->getDeadSessions();

View File

@ -100,7 +100,12 @@ public:
/// Initialization from config.
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper);
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async);
bool checkInit() const
{
return server && server->checkInit();
}
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();

View File

@ -353,6 +353,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");

View File

@ -80,6 +80,12 @@ public:
/// Wait server initialization (see callbackFunc)
void waitInit();
/// Return true if KeeperServer initialized
bool checkInit() const
{
return initialized_flag;
}
void shutdown();
int getServerID() const { return server_id; }

View File

@ -702,7 +702,7 @@ ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column
current_column = current_column->decompress();
if (column.isSubcolumn())
return column.getTypeInStorage()->getSubcolumn(column.getSubcolumnName(), *current_column);
return column.getTypeInStorage()->getSubcolumn(column.getSubcolumnName(), current_column);
return current_column;
}

View File

@ -625,7 +625,8 @@ class IColumn;
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0)\
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -116,4 +116,9 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS
{{"enable", ShortCircuitFunctionEvaluation::ENABLE},
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},
{"disable", ShortCircuitFunctionEvaluation::DISABLE}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
}

View File

@ -168,4 +168,6 @@ enum class ShortCircuitFunctionEvaluation
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
}

View File

@ -3,8 +3,6 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/AlignedBuffer.h>
#include <Common/FieldVisitorToString.h>

View File

@ -1,17 +1,9 @@
#include <Columns/ColumnArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <Parsers/IAST.h>
@ -53,69 +45,6 @@ bool DataTypeArray::equals(const IDataType & rhs) const
return typeid(rhs) == typeid(*this) && nested->equals(*static_cast<const DataTypeArray &>(rhs).nested);
}
DataTypePtr DataTypeArray::tryGetSubcolumnType(const String & subcolumn_name) const
{
return tryGetSubcolumnTypeImpl(subcolumn_name, 0);
}
DataTypePtr DataTypeArray::tryGetSubcolumnTypeImpl(const String & subcolumn_name, size_t level) const
{
if (subcolumn_name == "size" + std::to_string(level))
return std::make_shared<DataTypeUInt64>();
DataTypePtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->tryGetSubcolumnTypeImpl(subcolumn_name, level + 1);
else
subcolumn = nested->tryGetSubcolumnType(subcolumn_name);
if (subcolumn && subcolumn_name != MAIN_SUBCOLUMN_NAME)
subcolumn = std::make_shared<DataTypeArray>(std::move(subcolumn));
return subcolumn;
}
ColumnPtr DataTypeArray::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
return getSubcolumnImpl(subcolumn_name, column, 0);
}
ColumnPtr DataTypeArray::getSubcolumnImpl(const String & subcolumn_name, const IColumn & column, size_t level) const
{
const auto & column_array = assert_cast<const ColumnArray &>(column);
if (subcolumn_name == "size" + std::to_string(level))
return arrayOffsetsToSizes(column_array.getOffsetsColumn());
ColumnPtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->getSubcolumnImpl(subcolumn_name, column_array.getData(), level + 1);
else
subcolumn = nested->getSubcolumn(subcolumn_name, column_array.getData());
return ColumnArray::create(subcolumn, column_array.getOffsetsPtr());
}
SerializationPtr DataTypeArray::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
return getSubcolumnSerializationImpl(subcolumn_name, base_serialization_getter, 0);
}
SerializationPtr DataTypeArray::getSubcolumnSerializationImpl(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter, size_t level) const
{
if (subcolumn_name == "size" + std::to_string(level))
return std::make_shared<SerializationTupleElement>(base_serialization_getter(DataTypeUInt64()), subcolumn_name, false);
SerializationPtr subcolumn;
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
subcolumn = nested_array->getSubcolumnSerializationImpl(subcolumn_name, base_serialization_getter, level + 1);
else
subcolumn = nested->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
return std::make_shared<SerializationArray>(subcolumn);
}
SerializationPtr DataTypeArray::doGetDefaultSerialization() const
{
return std::make_shared<SerializationArray>(nested->getDefaultSerialization());

View File

@ -54,23 +54,12 @@ public:
return nested->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion();
}
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
SerializationPtr doGetDefaultSerialization() const override;
const DataTypePtr & getNestedType() const { return nested; }
/// 1 for plain array, 2 for array of arrays and so on.
size_t getNumberOfDimensions() const;
private:
ColumnPtr getSubcolumnImpl(const String & subcolumn_name, const IColumn & column, size_t level) const;
DataTypePtr tryGetSubcolumnTypeImpl(const String & subcolumn_name, size_t level) const;
SerializationPtr getSubcolumnSerializationImpl(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter, size_t level) const;
};
}

View File

@ -1,14 +1,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/Serializations/SerializationDate.h>
#include <DataTypes/DataTypeFactory.h>
#include <Common/assert_cast.h>
namespace DB
{

View File

@ -1,28 +1,12 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/Serializations/SerializationDateTime.h>
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <base/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
TimezoneMixin::TimezoneMixin(const String & time_zone_name)
: has_explicit_time_zone(!time_zone_name.empty()),
time_zone(DateLUT::instance(time_zone_name)),
utc_time_zone(DateLUT::instance("UTC"))
{
}
DataTypeDateTime::DataTypeDateTime(const String & time_zone_name)
: TimezoneMixin(time_zone_name)
{
@ -52,7 +36,7 @@ bool DataTypeDateTime::equals(const IDataType & rhs) const
SerializationPtr DataTypeDateTime::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDateTime>(time_zone, utc_time_zone);
return std::make_shared<SerializationDateTime>(*this);
}
}

View File

@ -2,33 +2,11 @@
#include <Core/Types.h>
#include <DataTypes/DataTypeNumberBase.h>
class DateLUTImpl;
#include <DataTypes/TimezoneMixin.h>
namespace DB
{
/** Mixin-class that manages timezone info for timezone-aware DateTime implementations
*
* Must be used as a (second) base for class implementing IDateType-interface.
*/
class TimezoneMixin
{
public:
explicit TimezoneMixin(const String & time_zone_name = "");
TimezoneMixin(const TimezoneMixin &) = default;
const DateLUTImpl & getTimeZone() const { return time_zone; }
bool hasExplicitTimeZone() const { return has_explicit_time_zone; }
protected:
/// true if time zone name was provided in data type parameters, false if it's using default time zone.
bool has_explicit_time_zone;
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
};
/** DateTime stores time as unix timestamp.
* The value itself is independent of time zone.
*

View File

@ -1,19 +1,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/Serializations/SerializationDateTime64.h>
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <base/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <Parsers/ASTLiteral.h>
#include <optional>
#include <string>
@ -65,7 +53,7 @@ bool DataTypeDateTime64::equals(const IDataType & rhs) const
SerializationPtr DataTypeDateTime64::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDateTime64>(time_zone, utc_time_zone, scale);
return std::make_shared<SerializationDateTime64>(scale, *this);
}
}

View File

@ -1,15 +1,5 @@
#include <DataTypes/DataTypeDecimalBase.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <type_traits>
namespace DB

View File

@ -1,5 +1,4 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/Serializations/SerializationEnum.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -1,22 +1,12 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationFixedString.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
namespace DB
{

View File

@ -1,9 +1,7 @@
#include <base/map.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnArray.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
@ -11,14 +9,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationMap.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTNameTypePair.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -84,27 +75,6 @@ std::string DataTypeMap::doGetName() const
return s.str();
}
static const IColumn & extractNestedColumn(const IColumn & column)
{
return assert_cast<const ColumnMap &>(column).getNestedColumn();
}
DataTypePtr DataTypeMap::tryGetSubcolumnType(const String & subcolumn_name) const
{
return nested->tryGetSubcolumnType(subcolumn_name);
}
ColumnPtr DataTypeMap::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
return nested->getSubcolumn(subcolumn_name, extractNestedColumn(column));
}
SerializationPtr DataTypeMap::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
return nested->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
}
MutableColumnPtr DataTypeMap::createColumn() const
{
return ColumnMap::create(nested->createColumn());

View File

@ -32,11 +32,6 @@ public:
bool canBeInsideNullable() const override { return false; }
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -2,7 +2,6 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/quoteString.h>
#include <Parsers/ASTNameTypePair.h>

View File

@ -1,10 +1,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/Serializations/SerializationNothing.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnNothing.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
namespace DB

View File

@ -1,17 +1,9 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <Columns/ColumnNullable.h>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ConcatReadBuffer.h>
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
@ -63,32 +55,6 @@ bool DataTypeNullable::equals(const IDataType & rhs) const
return rhs.isNullable() && nested_data_type->equals(*static_cast<const DataTypeNullable &>(rhs).nested_data_type);
}
DataTypePtr DataTypeNullable::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == "null")
return std::make_shared<DataTypeUInt8>();
return nested_data_type->tryGetSubcolumnType(subcolumn_name);
}
ColumnPtr DataTypeNullable::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
const auto & column_nullable = assert_cast<const ColumnNullable &>(column);
if (subcolumn_name == "null")
return column_nullable.getNullMapColumnPtr();
return nested_data_type->getSubcolumn(subcolumn_name, column_nullable.getNestedColumn());
}
SerializationPtr DataTypeNullable::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
if (subcolumn_name == "null")
return std::make_shared<SerializationTupleElement>(base_serialization_getter(DataTypeUInt8()), subcolumn_name, false);
return nested_data_type->getSubcolumnSerialization(subcolumn_name, base_serialization_getter);
}
SerializationPtr DataTypeNullable::doGetDefaultSerialization() const
{
return std::make_shared<SerializationNullable>(nested_data_type->getDefaultSerialization());

View File

@ -41,11 +41,6 @@ public:
bool onlyNull() const override;
bool canBeInsideLowCardinality() const override { return nested_data_type->canBeInsideLowCardinality(); }
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
const DataTypePtr & getNestedType() const { return nested_data_type; }
private:
SerializationPtr doGetDefaultSerialization() const override;

View File

@ -1,13 +1,6 @@
#include <type_traits>
#include <DataTypes/DataTypeNumberBase.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnConst.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,14 +1,6 @@
#include <Core/Defines.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationString.h>
@ -16,15 +8,6 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#ifdef __SSE2__
#include <emmintrin.h>
#endif
namespace DB
{

View File

@ -1,7 +1,5 @@
#pragma once
#include <ostream>
#include <DataTypes/IDataType.h>

View File

@ -3,20 +3,17 @@
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnTuple.h>
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/NestedUtils.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTNameTypePair.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -30,7 +27,6 @@ namespace ErrorCodes
extern const int DUPLICATE_COLUMN;
extern const int EMPTY_DATA_PASSED;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH;
}
@ -107,11 +103,6 @@ static inline IColumn & extractElementColumn(IColumn & column, size_t idx)
return assert_cast<ColumnTuple &>(column).getColumn(idx);
}
static inline const IColumn & extractElementColumn(const IColumn & column, size_t idx)
{
return assert_cast<const ColumnTuple &>(column).getColumn(idx);
}
template <typename F>
static void addElementSafe(const DataTypes & elems, IColumn & column, F && impl)
{
@ -234,74 +225,6 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
return res;
}
template <typename OnSuccess, typename OnContinue>
auto DataTypeTuple::getSubcolumnEntity(const String & subcolumn_name,
const OnSuccess & on_success, const OnContinue & on_continue) const
{
using ReturnType = decltype(on_success(0));
for (size_t i = 0; i < names.size(); ++i)
{
if (startsWith(subcolumn_name, names[i]))
{
size_t name_length = names[i].size();
if (subcolumn_name.size() == name_length)
return on_success(i);
if (subcolumn_name[name_length] == '.')
return on_continue(i, subcolumn_name.substr(name_length + 1));
}
}
return ReturnType{};
}
DataTypePtr DataTypeTuple::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == MAIN_SUBCOLUMN_NAME)
return shared_from_this();
auto on_success = [&](size_t pos) { return elems[pos]; };
auto on_continue = [&](size_t pos, const String & next_subcolumn) { return elems[pos]->tryGetSubcolumnType(next_subcolumn); };
return getSubcolumnEntity(subcolumn_name, on_success, on_continue);
}
ColumnPtr DataTypeTuple::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
{
auto on_success = [&](size_t pos) { return extractElementColumn(column, pos).getPtr(); };
auto on_continue = [&](size_t pos, const String & next_subcolumn)
{
return elems[pos]->getSubcolumn(next_subcolumn, extractElementColumn(column, pos));
};
if (auto subcolumn = getSubcolumnEntity(subcolumn_name, on_success, on_continue))
return subcolumn;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr DataTypeTuple::getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const
{
auto on_success = [&](size_t pos)
{
return std::make_shared<SerializationTupleElement>(base_serialization_getter(*elems[pos]), names[pos]);
};
auto on_continue = [&](size_t pos, const String & next_subcolumn)
{
auto next_serialization = elems[pos]->getSubcolumnSerialization(next_subcolumn, base_serialization_getter);
return std::make_shared<SerializationTupleElement>(next_serialization, names[pos]);
};
if (auto serialization = getSubcolumnEntity(subcolumn_name, on_success, on_continue))
return serialization;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
SerializationTuple::ElementSerializations serializations(elems.size());
@ -310,7 +233,7 @@ SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
auto serialization = elems[i]->getDefaultSerialization();
serializations[i] = std::make_shared<SerializationTupleElement>(serialization, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
}
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);
@ -325,7 +248,7 @@ SerializationPtr DataTypeTuple::getSerialization(const String & column_name, con
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
auto subcolumn_name = Nested::concatenateName(column_name, elem_name);
auto serializaion = elems[i]->getSerialization(subcolumn_name, callback);
serializations[i] = std::make_shared<SerializationTupleElement>(serializaion, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serializaion, elem_name);
}
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);

View File

@ -52,16 +52,11 @@ public:
size_t getMaximumSizeOfValueInMemory() const override;
size_t getSizeOfValueInMemory() const override;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;
SerializationPtr doGetDefaultSerialization() const override;
const DataTypePtr & getElement(size_t i) const { return elems[i]; }
const DataTypes & getElements() const { return elems; }
const Strings & getElementNames() const { return names; }
@ -69,11 +64,6 @@ public:
bool haveExplicitNames() const { return have_explicit_names; }
bool serializeNames() const { return serialize_names; }
private:
template <typename OnSuccess, typename OnContinue>
auto getSubcolumnEntity(const String & subcolumn_name,
const OnSuccess & on_success, const OnContinue & on_continue) const;
};
}

View File

@ -1,16 +1,13 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/Serializations/SerializationDecimal.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/readDecimalText.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <type_traits>

View File

@ -1,4 +1,5 @@
#include <DataTypes/EnumValues.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -82,6 +83,24 @@ Names EnumValues<T>::getAllRegisteredNames() const
return result;
}
template <typename T>
std::unordered_set<String> EnumValues<T>::getSetOfAllNames(bool to_lower) const
{
std::unordered_set<String> result;
for (const auto & value : values)
result.insert(to_lower ? boost::algorithm::to_lower_copy(value.first) : value.first);
return result;
}
template <typename T>
std::unordered_set<T> EnumValues<T>::getSetOfAllValues() const
{
std::unordered_set<T> result;
for (const auto & value : values)
result.insert(value.second);
return result;
}
template class EnumValues<Int8>;
template class EnumValues<Int16>;

View File

@ -80,6 +80,10 @@ public:
}
Names getAllRegisteredNames() const override;
std::unordered_set<String> getSetOfAllNames(bool to_lower) const;
std::unordered_set<T> getSetOfAllValues() const;
};
}

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnConst.h>
#include <Common/Exception.h>
#include <Common/escapeForFileName.h>
#include <Common/SipHash.h>
#include <IO/WriteHelpers.h>
@ -11,7 +10,6 @@
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeCustom.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
namespace DB
@ -65,12 +63,40 @@ size_t IDataType::getSizeOfValueInMemory() const
throw Exception("Value of type " + getName() + " in memory is not of fixed size.", ErrorCodes::LOGICAL_ERROR);
}
void IDataType::forEachSubcolumn(
const SubcolumnCallback & callback,
const SerializationPtr & serialization,
const DataTypePtr & type,
const ColumnPtr & column)
{
ISerialization::StreamCallback callback_with_data = [&](const auto & subpath)
{
for (size_t i = 0; i < subpath.size(); ++i)
{
if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1))
{
auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1);
auto data = ISerialization::createFromPath(subpath, i);
callback(subpath, name, data);
}
subpath[i].visited = true;
}
};
ISerialization::SubstreamPath path;
serialization->enumerateStreams(path, callback_with_data, type, column);
}
DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == MAIN_SUBCOLUMN_NAME)
return shared_from_this();
DataTypePtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
if (name == subcolumn_name)
res = data.type;
}, getDefaultSerialization(), getPtr(), nullptr);
return nullptr;
return res;
}
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
@ -82,42 +108,43 @@ DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const IColumn &) const
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const
{
SerializationPtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
if (name == subcolumn_name)
res = data.serialization;
}, serialization, nullptr, nullptr);
if (res)
return res;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
void IDataType::forEachSubcolumn(const SubcolumnCallback & callback) const
ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const
{
NameSet set;
getDefaultSerialization()->enumerateStreams([&, this](const ISerialization::SubstreamPath & substream_path)
ColumnPtr res;
forEachSubcolumn([&](const auto &, const auto & name, const auto & data)
{
ISerialization::SubstreamPath new_path;
/// Iterate over path to try to get intermediate subcolumns for complex nested types.
for (const auto & elem : substream_path)
{
new_path.push_back(elem);
auto name = ISerialization::getSubcolumnNameForStream(new_path);
auto type = tryGetSubcolumnType(name);
if (name == subcolumn_name)
res = data.column;
}, getDefaultSerialization(), nullptr, column);
/// Subcolumn names may repeat among several substream paths.
if (!name.empty() && type && !set.count(name))
{
callback(name, type, substream_path);
set.insert(name);
}
}
});
if (res)
return res;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
Names IDataType::getSubcolumnNames() const
{
Names res;
forEachSubcolumn([&](const auto & name, const auto &, const auto &)
forEachSubcolumn([&](const auto &, const auto & name, const auto &)
{
res.push_back(name);
});
}, getDefaultSerialization(), nullptr, nullptr);
return res;
}
@ -144,24 +171,14 @@ SerializationPtr IDataType::getDefaultSerialization() const
return doGetDefaultSerialization();
}
SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const BaseSerializationGetter &) const
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
// static
SerializationPtr IDataType::getSerialization(const NameAndTypePair & column, const IDataType::StreamExistenceCallback & callback)
{
if (column.isSubcolumn())
{
/// Wrap to custom serialization deepest subcolumn, which is represented in non-complex type.
auto base_serialization_getter = [&](const IDataType & subcolumn_type)
{
return subcolumn_type.getSerialization(column.name, callback);
};
const auto & type_in_storage = column.getTypeInStorage();
return type_in_storage->getSubcolumnSerialization(column.getSubcolumnName(), base_serialization_getter);
auto default_serialization = type_in_storage->getDefaultSerialization();
return type_in_storage->getSubcolumnSerialization(column.getSubcolumnName(), default_serialization);
}
return column.type->getSerialization(column.name, callback);
@ -172,21 +189,4 @@ SerializationPtr IDataType::getSerialization(const String &, const StreamExisten
return getDefaultSerialization();
}
DataTypePtr IDataType::getTypeForSubstream(const ISerialization::SubstreamPath & substream_path) const
{
auto type = tryGetSubcolumnType(ISerialization::getSubcolumnNameForStream(substream_path));
if (type)
return type->getSubcolumnType(MAIN_SUBCOLUMN_NAME);
return getSubcolumnType(MAIN_SUBCOLUMN_NAME);
}
void IDataType::enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath & path) const
{
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
callback(substream_path, *getTypeForSubstream(substream_path));
}, path);
}
}

View File

@ -70,19 +70,31 @@ public:
return doGetName();
}
DataTypePtr getPtr() const { return shared_from_this(); }
/// Name of data type family (example: FixedString, Array).
virtual const char * getFamilyName() const = 0;
/// Data type id. It's used for runtime type checks.
virtual TypeIndex getTypeId() const = 0;
static constexpr auto MAIN_SUBCOLUMN_NAME = "__main";
virtual DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const;
DataTypePtr getSubcolumnType(const String & subcolumn_name) const;
virtual ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const;
using SubcolumnCallback = std::function<void(const String &, const DataTypePtr &, const ISerialization::SubstreamPath &)>;
void forEachSubcolumn(const SubcolumnCallback & callback) const;
SerializationPtr getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const;
ColumnPtr getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const;
using SubcolumnCallback = std::function<void(
const ISerialization::SubstreamPath &,
const String &,
const ISerialization::SubstreamData &)>;
static void forEachSubcolumn(
const SubcolumnCallback & callback,
const SerializationPtr & serialization,
const DataTypePtr & type,
const ColumnPtr & column);
Names getSubcolumnNames() const;
/// Returns default serialization of data type.
@ -93,7 +105,6 @@ public:
/// one of serialization types, that serialization will be chosen for reading.
/// If callback always returned false, the default serialization will be chosen.
using StreamExistenceCallback = std::function<bool(const String &)>;
using BaseSerializationGetter = std::function<SerializationPtr(const IDataType &)>;
/// Chooses serialization for reading of one column or subcolumns by
/// checking existence of substreams using callback.
@ -103,22 +114,10 @@ public:
virtual SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const;
/// Returns serialization wrapper for reading one particular subcolumn of data type.
virtual SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const;
using StreamCallbackWithType = std::function<void(const ISerialization::SubstreamPath &, const IDataType &)>;
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath & path) const;
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback, ISerialization::SubstreamPath && path) const { enumerateStreams(serialization, callback, path); }
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback) const { enumerateStreams(serialization, callback, {}); }
protected:
virtual String doGetName() const { return getFamilyName(); }
virtual SerializationPtr doGetDefaultSerialization() const = 0;
DataTypePtr getTypeForSubstream(const ISerialization::SubstreamPath & substream_path) const;
public:
/** Create empty column for corresponding type.
*/

View File

@ -5,6 +5,7 @@
#include <IO/Operators.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/NestedUtils.h>
#include <base/EnumReflection.h>
namespace DB
@ -17,30 +18,11 @@ namespace ErrorCodes
String ISerialization::Substream::toString() const
{
switch (type)
{
case ArrayElements:
return "ArrayElements";
case ArraySizes:
return "ArraySizes";
case NullableElements:
return "NullableElements";
case NullMap:
return "NullMap";
case TupleElement:
return "TupleElement(" + tuple_element_name + ", "
+ std::to_string(escape_tuple_delimiter) + ")";
case DictionaryKeys:
return "DictionaryKeys";
case DictionaryIndexes:
return "DictionaryIndexes";
case SparseElements:
return "SparseElements";
case SparseOffsets:
return "SparseOffsets";
}
if (type == TupleElement)
return fmt::format("TupleElement({}, escape_tuple_delimiter={})",
tuple_element_name, escape_tuple_delimiter ? "true" : "false");
__builtin_unreachable();
return String(magic_enum::enum_name(type));
}
String ISerialization::SubstreamPath::toString() const
@ -57,9 +39,21 @@ String ISerialization::SubstreamPath::toString() const
return wb.str();
}
void ISerialization::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
path.push_back(Substream::Regular);
path.back().data = {type, column, getPtr(), nullptr};
callback(path);
path.pop_back();
}
void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
enumerateStreams(path, callback, nullptr, nullptr);
}
void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const
@ -104,40 +98,48 @@ void ISerialization::deserializeBinaryBulkWithMultipleStreams(
}
}
static String getNameForSubstreamPath(
namespace
{
using SubstreamIterator = ISerialization::SubstreamPath::const_iterator;
String getNameForSubstreamPath(
String stream_name,
const ISerialization::SubstreamPath & path,
SubstreamIterator begin,
SubstreamIterator end,
bool escape_tuple_delimiter)
{
using Substream = ISerialization::Substream;
size_t array_level = 0;
for (const auto & elem : path)
for (auto it = begin; it != end; ++it)
{
if (elem.type == Substream::NullMap)
if (it->type == Substream::NullMap)
stream_name += ".null";
else if (elem.type == Substream::ArraySizes)
else if (it->type == Substream::ArraySizes)
stream_name += ".size" + toString(array_level);
else if (elem.type == Substream::ArrayElements)
else if (it->type == Substream::ArrayElements)
++array_level;
else if (elem.type == Substream::DictionaryKeys)
else if (it->type == Substream::DictionaryKeys)
stream_name += ".dict";
else if (elem.type == Substream::SparseOffsets)
else if (it->type == Substream::SparseOffsets)
stream_name += ".sparse.idx";
else if (elem.type == Substream::TupleElement)
else if (it->type == Substream::TupleElement)
{
/// For compatibility reasons, we use %2E (escaped dot) instead of dot.
/// Because nested data may be represented not by Array of Tuple,
/// but by separate Array columns with names in a form of a.b,
/// and name is encoded as a whole.
stream_name += (escape_tuple_delimiter && elem.escape_tuple_delimiter ?
escapeForFileName(".") : ".") + escapeForFileName(elem.tuple_element_name);
stream_name += (escape_tuple_delimiter && it->escape_tuple_delimiter ?
escapeForFileName(".") : ".") + escapeForFileName(it->tuple_element_name);
}
}
return stream_name;
}
}
String ISerialization::getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path)
{
return getFileNameForStream(column.getNameInStorage(), path);
@ -152,12 +154,17 @@ String ISerialization::getFileNameForStream(const String & name_in_storage, cons
else
stream_name = escapeForFileName(name_in_storage);
return getNameForSubstreamPath(std::move(stream_name), path, true);
return getNameForSubstreamPath(std::move(stream_name), path.begin(), path.end(), true);
}
String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path)
{
auto subcolumn_name = getNameForSubstreamPath("", path, false);
return getSubcolumnNameForStream(path, path.size());
}
String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path, size_t prefix_len)
{
auto subcolumn_name = getNameForSubstreamPath("", path.begin(), path.begin() + prefix_len, false);
if (!subcolumn_name.empty())
subcolumn_name = subcolumn_name.substr(1); // It starts with a dot.
@ -195,4 +202,44 @@ bool ISerialization::isSpecialCompressionAllowed(const SubstreamPath & path)
return true;
}
size_t ISerialization::getArrayLevel(const SubstreamPath & path)
{
size_t level = 0;
for (const auto & elem : path)
level += elem.type == Substream::ArrayElements;
return level;
}
bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t prefix_len)
{
if (prefix_len == 0 || prefix_len > path.size())
return false;
size_t last_elem = prefix_len - 1;
return path[last_elem].type == Substream::NullMap
|| path[last_elem].type == Substream::TupleElement
|| path[last_elem].type == Substream::ArraySizes;
}
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{
assert(prefix_len < path.size());
SubstreamData res = path[prefix_len].data;
res.creator.reset();
for (ssize_t i = static_cast<ssize_t>(prefix_len) - 1; i >= 0; --i)
{
const auto & creator = path[i].data.creator;
if (creator)
{
res.type = res.type ? creator->create(res.type) : res.type;
res.serialization = res.serialization ? creator->create(res.serialization) : res.serialization;
res.column = res.column ? creator->create(res.column) : res.column;
}
}
return res;
}
}

View File

@ -2,35 +2,39 @@
#include <Common/COW.h>
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <memory>
namespace DB
{
class IDataType;
class ReadBuffer;
class WriteBuffer;
class ProtobufReader;
class ProtobufWriter;
class IColumn;
using ColumnPtr = COW<IColumn>::Ptr;
using MutableColumnPtr = COW<IColumn>::MutablePtr;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
class ISerialization;
using SerializationPtr = std::shared_ptr<const ISerialization>;
class Field;
struct FormatSettings;
struct NameAndTypePair;
class ISerialization
class ISerialization : private boost::noncopyable, public std::enable_shared_from_this<ISerialization>
{
public:
ISerialization() = default;
virtual ~ISerialization() = default;
SerializationPtr getPtr() const { return shared_from_this(); }
/** Binary serialization for range of values in column - for writing to disk/network, etc.
*
* Some data types are represented in multiple streams while being serialized.
@ -54,6 +58,24 @@ public:
* Default implementations of ...WithMultipleStreams methods will call serializeBinaryBulk, deserializeBinaryBulk for single stream.
*/
struct ISubcolumnCreator
{
virtual DataTypePtr create(const DataTypePtr & prev) const = 0;
virtual SerializationPtr create(const SerializationPtr & prev) const = 0;
virtual ColumnPtr create(const ColumnPtr & prev) const = 0;
virtual ~ISubcolumnCreator() = default;
};
using SubcolumnCreatorPtr = std::shared_ptr<const ISubcolumnCreator>;
struct SubstreamData
{
DataTypePtr type;
ColumnPtr column;
SerializationPtr serialization;
SubcolumnCreatorPtr creator;
};
struct Substream
{
enum Type
@ -71,7 +93,10 @@ public:
SparseElements,
SparseOffsets,
Regular,
};
Type type;
/// Index of tuple element, starting at 1 or name.
@ -80,6 +105,12 @@ public:
/// Do we need to escape a dot in filenames for tuple elements.
bool escape_tuple_delimiter = true;
/// Data for current substream.
SubstreamData data;
/// Flag, that may help to traverse substream paths.
mutable bool visited = false;
Substream(Type type_) : type(type_) {}
String toString() const;
@ -96,7 +127,13 @@ public:
using StreamCallback = std::function<void(const SubstreamPath &)>;
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
virtual void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const;
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
@ -249,11 +286,16 @@ public:
static String getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path);
static String getFileNameForStream(const String & name_in_storage, const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path, size_t prefix_len);
static void addToSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path, ColumnPtr column);
static ColumnPtr getFromSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path);
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
static size_t getArrayLevel(const SubstreamPath & path);
static bool hasSubcolumnForPath(const SubstreamPath & path, size_t prefix_len);
static SubstreamData createFromPath(const SubstreamPath & path, size_t prefix_len);
};
using SerializationPtr = std::shared_ptr<const ISerialization>;

View File

@ -1,7 +1,6 @@
#include <DataTypes/Serializations/SerializationAggregateFunction.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnAggregateFunction.h>

View File

@ -1,5 +1,8 @@
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -177,16 +180,53 @@ ColumnPtr arrayOffsetsToSizes(const IColumn & column)
return column_sizes;
}
void SerializationArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const
{
path.push_back(Substream::ArraySizes);
callback(path);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
path.pop_back();
return std::make_shared<DataTypeArray>(prev);
}
SerializationPtr SerializationArray::SubcolumnCreator::create(const SerializationPtr & prev) const
{
return std::make_shared<SerializationArray>(prev);
}
ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) const
{
return ColumnArray::create(prev, offsets);
}
void SerializationArray::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * type_array = type ? &assert_cast<const DataTypeArray &>(*type) : nullptr;
const auto * column_array = column ? &assert_cast<const ColumnArray &>(*column) : nullptr;
auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr;
path.push_back(Substream::ArraySizes);
path.back().data =
{
type ? std::make_shared<DataTypeUInt64>() : nullptr,
offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr,
std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(),
"size" + std::to_string(getArrayLevel(path)), false),
nullptr,
};
callback(path);
path.back() = Substream::ArrayElements;
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(offsets_column)};
auto next_type = type_array ? type_array->getNestedType() : nullptr;
auto next_column = column_array ? column_array->getDataPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
path.pop_back();
}
void SerializationArray::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -35,7 +35,11 @@ public:
* This is necessary, because when implementing nested structures, several arrays can have common sizes.
*/
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
@ -62,6 +66,18 @@ public:
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const ColumnPtr offsets;
SubcolumnCreator(const ColumnPtr & offsets_) : offsets(offsets_) {}
DataTypePtr create(const DataTypePtr & prev) const override;
SerializationPtr create(const SerializationPtr & prev) const override;
ColumnPtr create(const ColumnPtr & prev) const override;
};
};
ColumnPtr arrayOffsetsToSizes(const IColumn & column);

View File

@ -8,6 +8,7 @@
namespace DB
{
void SerializationDate32::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const
{
writeDateText(ExtendedDayNum(assert_cast<const ColumnInt32 &>(column).getData()[row_num]), ostr);

View File

@ -32,9 +32,8 @@ inline void readText(time_t & x, ReadBuffer & istr, const FormatSettings & setti
}
SerializationDateTime::SerializationDateTime(
const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_)
: time_zone(time_zone_), utc_time_zone(utc_time_zone_)
SerializationDateTime::SerializationDateTime(const TimezoneMixin & time_zone_)
: TimezoneMixin(time_zone_)
{
}

View File

@ -1,20 +1,17 @@
#pragma once
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/TimezoneMixin.h>
class DateLUTImpl;
namespace DB
{
class SerializationDateTime final : public SerializationNumber<UInt32>
class SerializationDateTime final : public SerializationNumber<UInt32>, public TimezoneMixin
{
private:
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
public:
SerializationDateTime(const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_);
SerializationDateTime(const TimezoneMixin & time_zone_);
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;

View File

@ -17,9 +17,9 @@ namespace DB
{
SerializationDateTime64::SerializationDateTime64(
const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_, UInt32 scale_)
UInt32 scale_, const TimezoneMixin & time_zone_)
: SerializationDecimalBase<DateTime64>(DecimalUtils::max_precision<DateTime64>, scale_)
, time_zone(time_zone_), utc_time_zone(utc_time_zone_)
, TimezoneMixin(time_zone_)
{
}

View File

@ -1,20 +1,17 @@
#pragma once
#include <DataTypes/Serializations/SerializationDecimalBase.h>
#include <DataTypes/TimezoneMixin.h>
class DateLUTImpl;
namespace DB
{
class SerializationDateTime64 final : public SerializationDecimalBase<DateTime64>
class SerializationDateTime64 final : public SerializationDecimalBase<DateTime64>, public TimezoneMixin
{
private:
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
public:
SerializationDateTime64(const DateLUTImpl & time_zone_, const DateLUTImpl & utc_time_zone_, UInt32 scale_);
SerializationDateTime64(UInt32 scale_, const TimezoneMixin & time_zone_);
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;

View File

@ -40,11 +40,27 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic
{
}
void SerializationLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationLowCardinality::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * column_lc = column ? &getColumnLowCardinality(*column) : nullptr;
SubstreamData data;
data.type = type ? dictionary_type : nullptr;
data.column = column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr;
data.serialization = dict_inner_serialization;
path.push_back(Substream::DictionaryKeys);
dict_inner_serialization->enumerateStreams(callback, path);
path.back().data = data;
dict_inner_serialization->enumerateStreams(path, callback, data.type, data.column);
path.back() = Substream::DictionaryIndexes;
path.back().data = {type, column, getPtr(), nullptr};
callback(path);
path.pop_back();
}

View File

@ -17,7 +17,11 @@ private:
public:
SerializationLowCardinality(const DataTypePtr & dictionary_type);
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -3,6 +3,7 @@
#include <DataTypes/Serializations/SerializationMap.h>
#include <DataTypes/Serializations/SerializationArray.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnMap.h>
@ -250,10 +251,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
deserializeText(column, rb, settings);
}
void SerializationMap::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationMap::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
nested->enumerateStreams(callback, path);
auto next_type = type ? assert_cast<const DataTypeMap &>(*type).getNestedType() : nullptr;
auto next_column = column ? assert_cast<const ColumnMap &>(*column).getNestedColumnPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
}
void SerializationMap::serializeBinaryBulkStatePrefix(

View File

@ -31,7 +31,11 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -1,18 +1,21 @@
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
namespace DB
{
void SerializationTupleElement::enumerateStreams(
void SerializationNamed::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
SubstreamPath & path) const
DataTypePtr type,
ColumnPtr column) const
{
addToPath(path);
nested_serialization->enumerateStreams(callback, path);
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(name, escape_delimiter)};
nested_serialization->enumerateStreams(path, callback, type, column);
path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkStatePrefix(
void SerializationNamed::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -21,7 +24,7 @@ void SerializationTupleElement::serializeBinaryBulkStatePrefix(
settings.path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkStateSuffix(
void SerializationNamed::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -30,7 +33,7 @@ void SerializationTupleElement::serializeBinaryBulkStateSuffix(
settings.path.pop_back();
}
void SerializationTupleElement::deserializeBinaryBulkStatePrefix(
void SerializationNamed::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
@ -39,7 +42,7 @@ void SerializationTupleElement::deserializeBinaryBulkStatePrefix(
settings.path.pop_back();
}
void SerializationTupleElement::serializeBinaryBulkWithMultipleStreams(
void SerializationNamed::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
size_t offset,
size_t limit,
@ -51,7 +54,7 @@ void SerializationTupleElement::serializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
void SerializationTupleElement::deserializeBinaryBulkWithMultipleStreams(
void SerializationNamed::deserializeBinaryBulkWithMultipleStreams(
ColumnPtr & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
@ -63,7 +66,7 @@ void SerializationTupleElement::deserializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
void SerializationTupleElement::addToPath(SubstreamPath & path) const
void SerializationNamed::addToPath(SubstreamPath & path) const
{
path.push_back(Substream::TupleElement);
path.back().tuple_element_name = name;

View File

@ -5,14 +5,14 @@
namespace DB
{
class SerializationTupleElement final : public SerializationWrapper
class SerializationNamed final : public SerializationWrapper
{
private:
String name;
bool escape_delimiter;
public:
SerializationTupleElement(const SerializationPtr & nested_, const String & name_, bool escape_delimiter_ = true)
SerializationNamed(const SerializationPtr & nested_, const String & name_, bool escape_delimiter_ = true)
: SerializationWrapper(nested_)
, name(name_), escape_delimiter(escape_delimiter_)
{
@ -21,11 +21,13 @@ public:
const String & getElementName() const { return name; }
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
SubstreamPath & path) const override;
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
@ -51,6 +53,22 @@ public:
SubstreamsCache * cache) const override;
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const String name;
const bool escape_delimiter;
SubcolumnCreator(const String & name_, bool escape_delimiter_)
: name(name_), escape_delimiter(escape_delimiter_) {}
DataTypePtr create(const DataTypePtr & prev) const override { return prev; }
ColumnPtr create(const ColumnPtr & prev) const override { return prev; }
SerializationPtr create(const SerializationPtr & prev) const override
{
return std::make_shared<SerializationNamed>(prev, name, escape_delimiter);
}
};
void addToPath(SubstreamPath & path) const;
};

View File

@ -1,5 +1,8 @@
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/Serializations/SerializationNamed.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnNullable.h>
#include <Core/Field.h>
@ -20,15 +23,50 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
void SerializationNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
DataTypePtr SerializationNullable::SubcolumnCreator::create(const DataTypePtr & prev) const
{
path.push_back(Substream::NullMap);
callback(path);
path.back() = Substream::NullableElements;
nested->enumerateStreams(callback, path);
path.pop_back();
return std::make_shared<DataTypeNullable>(prev);
}
SerializationPtr SerializationNullable::SubcolumnCreator::create(const SerializationPtr & prev) const
{
return std::make_shared<SerializationNullable>(prev);
}
ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev) const
{
return ColumnNullable::create(prev, null_map);
}
void SerializationNullable::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
const auto * type_nullable = type ? &assert_cast<const DataTypeNullable &>(*type) : nullptr;
const auto * column_nullable = column ? &assert_cast<const ColumnNullable &>(*column) : nullptr;
path.push_back(Substream::NullMap);
path.back().data =
{
type_nullable ? std::make_shared<DataTypeUInt8>() : nullptr,
column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr,
std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false),
nullptr,
};
callback(path);
path.back() = Substream::NullableElements;
path.back().data = {type, column, getPtr(), std::make_shared<SubcolumnCreator>(path.back().data.column)};
auto next_type = type_nullable ? type_nullable->getNestedType() : nullptr;
auto next_column = column_nullable ? column_nullable->getNestedColumnPtr() : nullptr;
nested->enumerateStreams(path, callback, next_type, next_column);
path.pop_back();
}
void SerializationNullable::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -13,7 +13,11 @@ private:
public:
SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {}
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
@ -80,6 +84,18 @@ public:
static ReturnType deserializeTextCSVImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextJSONImpl(IColumn & column, ReadBuffer & istr, const FormatSettings &, const SerializationPtr & nested);
private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const ColumnPtr null_map;
SubcolumnCreator(const ColumnPtr & null_map_) : null_map(null_map_) {}
DataTypePtr create(const DataTypePtr & prev) const override;
SerializationPtr create(const SerializationPtr & prev) const override;
ColumnPtr create(const ColumnPtr & prev) const override;
};
};
}

View File

@ -1,6 +1,7 @@
#include <base/map.h>
#include <base/range.h>
#include <DataTypes/Serializations/SerializationTuple.h>
#include <DataTypes/DataTypeTuple.h>
#include <Core/Field.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
@ -281,10 +282,22 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
});
}
void SerializationTuple::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationTuple::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
for (const auto & elem : elems)
elem->enumerateStreams(callback, path);
const auto * type_tuple = type ? &assert_cast<const DataTypeTuple &>(*type) : nullptr;
const auto * column_tuple = column ? &assert_cast<const ColumnTuple &>(*column) : nullptr;
for (size_t i = 0; i < elems.size(); ++i)
{
auto next_type = type_tuple ? type_tuple->getElement(i) : nullptr;
auto next_column = column_tuple ? column_tuple->getColumnPtr(i) : nullptr;
elems[i]->enumerateStreams(path, callback, next_type, next_column);
}
}
struct SerializeBinaryBulkStateTuple : public ISerialization::SerializeBinaryBulkState

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/Serializations/SimpleTextSerialization.h>
#include <DataTypes/Serializations/SerializationTupleElement.h>
#include <DataTypes/Serializations/SerializationNamed.h>
namespace DB
{
@ -9,7 +9,7 @@ namespace DB
class SerializationTuple final : public SimpleTextSerialization
{
public:
using ElementSerializationPtr = std::shared_ptr<const SerializationTupleElement>;
using ElementSerializationPtr = std::shared_ptr<const SerializationNamed>;
using ElementSerializations = std::vector<ElementSerializationPtr>;
SerializationTuple(const ElementSerializations & elems_, bool have_explicit_names_)
@ -31,7 +31,11 @@ public:
/** Each sub-column in a tuple is serialized in separate stream.
*/
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -4,9 +4,13 @@
namespace DB
{
void SerializationWrapper::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
void SerializationWrapper::enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const
{
nested_serialization->enumerateStreams(callback, path);
nested_serialization->enumerateStreams(path, callback, type, column);
}
void SerializationWrapper::serializeBinaryBulkStatePrefix(

View File

@ -16,7 +16,11 @@ protected:
public:
SerializationWrapper(const SerializationPtr & nested_serialization_) : nested_serialization(nested_serialization_) {}
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void enumerateStreams(
SubstreamPath & path,
const StreamCallback & callback,
DataTypePtr type,
ColumnPtr column) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/Types.h>
#include <base/DateLUT.h>
class DateLUTImpl;
/** Mixin-class that manages timezone info for timezone-aware DateTime implementations
*
* Must be used as a (second) base for class implementing IDateType/ISerialization-interface.
*/
class TimezoneMixin
{
public:
TimezoneMixin(const TimezoneMixin &) = default;
explicit TimezoneMixin(const String & time_zone_name = "")
: has_explicit_time_zone(!time_zone_name.empty())
, time_zone(DateLUT::instance(time_zone_name))
, utc_time_zone(DateLUT::instance("UTC"))
{
}
const DateLUTImpl & getTimeZone() const { return time_zone; }
bool hasExplicitTimeZone() const { return has_explicit_time_zone; }
protected:
/// true if time zone name was provided in data type parameters, false if it's using default time zone.
bool has_explicit_time_zone;
const DateLUTImpl & time_zone;
const DateLUTImpl & utc_time_zone;
};

View File

@ -0,0 +1,432 @@
#include <Formats/CapnProtoUtils.h>
#if USE_CAPNP
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/IDataType.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_TYPE_OF_FIELD;
extern const int CAPN_PROTO_BAD_CAST;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY));
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
auto description = std::string_view(e.getDescription().cStr());
if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());
if (description.find("Parse error") != String::npos)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while parsing CapnProro schema: {}, schema dir and file: {}, {}", description, schema_info.schemaDirectory(), schema_info.schemaPath());
}
auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode)
{
if (mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE)
return boost::algorithm::to_lower_copy(first) == boost::algorithm::to_lower_copy(second);
return first == second;
}
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};
static bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}
static bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}
/// Get full name of type for better exception messages.
static String getCapnProtoFullTypeName(const capnp::Type & type)
{
switch (type.which())
{
case capnp::schema::Type::Which::STRUCT:
{
auto struct_schema = type.asStruct();
auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;
String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += ", " + union_name;
type_name += ")";
return type_name;
}
case capnp::schema::Type::Which::LIST:
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
case capnp::schema::Type::Which::ENUM:
{
auto enum_schema = type.asEnum();
String enum_name = "Enum(";
auto enumerants = enum_schema.getEnumerants();
for (size_t i = 0; i != enumerants.size(); ++i)
{
enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal());
if (i + 1 != enumerants.size())
enum_name += ", ";
}
enum_name += ")";
return enum_name;
}
default:
auto it = capnp_simple_type_names.find(type.which());
if (it == capnp_simple_type_names.end())
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return it->second;
}
}
template <typename Type>
static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_type, FormatSettings::EnumComparingMode mode, UInt64 max_value, String & error_message)
{
if (!capnp_type.isEnum())
return false;
auto enum_schema = capnp_type.asEnum();
bool to_lower = mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE;
const auto * enum_type = assert_cast<const DataTypeEnum<Type> *>(column_type.get());
const auto & enum_values = dynamic_cast<const EnumValues<Type> &>(*enum_type);
auto enumerants = enum_schema.getEnumerants();
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
{
/// In CapnProto Enum fields are numbered sequentially starting from zero.
if (enumerants.size() > max_value)
{
error_message += "Enum from CapnProto schema contains values that is out of range for Clickhouse Enum";
return false;
}
auto values = enum_values.getSetOfAllValues();
std::unordered_set<Type> capn_enum_values;
for (auto enumerant : enumerants)
capn_enum_values.insert(Type(enumerant.getOrdinal()));
auto result = values == capn_enum_values;
if (!result)
error_message += "The set of values in Enum from CapnProto schema is different from the set of values in ClickHouse Enum";
return result;
}
auto names = enum_values.getSetOfAllNames(to_lower);
std::unordered_set<String> capn_enum_names;
for (auto enumerant : enumerants)
{
String name = enumerant.getProto().getName();
capn_enum_names.insert(to_lower ? boost::algorithm::to_lower_copy(name) : name);
}
auto result = names == capn_enum_names;
if (!result)
error_message += "The set of names in Enum from CapnProto schema is different from the set of names in ClickHouse Enum";
return result;
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message);
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
/// Check that struct is a named union of type VOID and one arbitrary type.
auto struct_schema = capnp_type.asStruct();
if (!checkIfStructIsNamedUnion(struct_schema))
return false;
auto union_fields = struct_schema.getUnionFields();
if (union_fields.size() != 2)
return false;
auto first = union_fields[0];
auto second = union_fields[1];
auto nested_type = assert_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
if (first.getType().isVoid())
return checkCapnProtoType(second.getType(), nested_type, mode, error_message);
if (second.getType().isVoid())
return checkCapnProtoType(first.getType(), nested_type, mode, error_message);
return false;
}
static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
auto struct_schema = capnp_type.asStruct();
if (checkIfStructIsNamedUnion(struct_schema))
return false;
if (checkIfStructContainsUnnamedUnion(struct_schema))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
if (nested_types.size() != struct_schema.getFields().size())
{
error_message += "Tuple and Struct types have different sizes";
return false;
}
if (!tuple_data_type->haveExplicitNames())
{
error_message += "Only named Tuple can be converted to CapnProto Struct";
return false;
}
for (const auto & name : tuple_data_type->getElementNames())
{
KJ_IF_MAYBE(field, struct_schema.findFieldByName(name))
{
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(name)], mode, error_message))
return false;
}
else
{
error_message += "CapnProto struct doesn't contain a field with name " + name;
return false;
}
}
return true;
}
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isList())
return false;
auto list_schema = capnp_type.asList();
auto nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message);
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return capnp_type.isBool() || capnp_type.isUInt8();
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
return capnp_type.isUInt16();
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
return capnp_type.isUInt32();
case TypeIndex::UInt64:
return capnp_type.isUInt64();
case TypeIndex::Int8:
return capnp_type.isInt8();
case TypeIndex::Int16:
return capnp_type.isInt16();
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Int32:
return capnp_type.isInt32();
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::Int64:
return capnp_type.isInt64();
case TypeIndex::Float32:
return capnp_type.isFloat32();
case TypeIndex::Float64:
return capnp_type.isFloat64();
case TypeIndex::Enum8:
return checkEnums<Int8>(capnp_type, data_type, mode, INT8_MAX, error_message);
case TypeIndex::Enum16:
return checkEnums<Int16>(capnp_type, data_type, mode, INT16_MAX, error_message);
case TypeIndex::Tuple:
return checkTupleType(capnp_type, data_type, mode, error_message);
case TypeIndex::Nullable:
{
auto result = checkNullableType(capnp_type, data_type, mode, error_message);
if (!result)
error_message += "Nullable can be represented only as a named union of type Void and nested type";
return result;
}
case TypeIndex::Array:
return checkArrayType(capnp_type, data_type, mode, error_message);
case TypeIndex::LowCardinality:
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message);
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
return capnp_type.isText() || capnp_type.isData();
default:
return false;
}
}
static std::pair<String, String> splitFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name))
{
capnp::DynamicValue::Reader field_reader;
try
{
field_reader = struct_reader.get(*field);
}
catch (const kj::Exception & e)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot extract field value from struct by provided schema, error: {} Perhaps the data was generated by another schema", String(e.getDescription().cStr()));
}
if (nested_name.empty())
return field_reader;
if (field_reader.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getReaderByColumnName(field_reader.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name))
{
if (nested_name.empty())
return {struct_builder, *field};
auto field_builder = struct_builder.get(*field);
if (field_builder.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getStructBuilderAndFieldByColumnName(field_builder.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
static capnp::StructSchema::Field getFieldByName(const capnp::StructSchema & schema, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, schema.findFieldByName(field_name))
{
if (nested_name.empty())
return *field;
if (!field->getType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getFieldByName(field->getType().asStruct(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name);
}
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode)
{
/// Firstly check that struct doesn't contain unnamed union, because we don't support it.
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Schema contains unnamed union that is not supported");
auto names_and_types = header.getNamesAndTypesList();
String additional_error_message;
for (auto & [name, type] : names_and_types)
{
auto field = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message))
{
auto e = Exception(
ErrorCodes::CAPN_PROTO_BAD_CAST,
"Cannot convert ClickHouse type {} to CapnProto type {}",
type->getName(),
getCapnProtoFullTypeName(field.getType()));
if (!additional_error_message.empty())
e.addMessage(additional_error_message);
throw std::move(e);
}
}
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#include "config_formats.h"
#if USE_CAPNP
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <Core/Block.h>
#include <capnp/schema-parser.h>
#include <capnp/dynamic.h>
namespace DB
{
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { return; }
};
class CapnProtoSchemaParser : public DestructorCatcher<capnp::SchemaParser>
{
public:
CapnProtoSchemaParser() {}
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name);
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
}
#endif

View File

@ -111,6 +111,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -42,7 +42,7 @@ FormatSettings getFormatSettings(ContextPtr context);
template <typename T>
FormatSettings getFormatSettings(ContextPtr context, const T & settings);
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
/** Allows to create an IInputFormat or IOutputFormat by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory final : private boost::noncopyable

View File

@ -99,4 +99,10 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
}
}
FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message)
: FormatSchemaInfo(
settings.schema.format_schema, format, require_message, settings.schema.is_server, settings.schema.format_schema_path)
{
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <Formats/FormatSettings.h>
namespace DB
{
@ -11,6 +12,7 @@ class FormatSchemaInfo
{
public:
FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path);
FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }

View File

@ -183,6 +183,20 @@ struct FormatSettings
{
bool import_nested = false;
} orc;
/// For capnProto format we should determine how to
/// compare ClickHouse Enum and Enum from schema.
enum class EnumComparingMode
{
BY_NAMES, // Names in enums should be the same, values can be different.
BY_NAMES_CASE_INSENSITIVE, // Case-insensitive name comparison.
BY_VALUES, // Values should be the same, names can be different.
};
struct
{
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
} capn_proto;
};
}

View File

@ -56,7 +56,6 @@ NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
}
}
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
void NativeReader::resetParser()
{
istr_concrete = nullptr;

View File

@ -67,6 +67,7 @@ void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
/// Input only formats.
@ -139,6 +140,7 @@ void registerFormats()
registerOutputFormatMySQLWire(factory);
registerOutputFormatMarkdown(factory);
registerOutputFormatPostgreSQLWire(factory);
registerOutputFormatCapnProto(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -96,6 +96,9 @@ struct ReplaceRegexpImpl
re2_st::StringPiece matches[max_captures];
size_t start_pos = 0;
bool is_first_match = true;
bool is_start_pos_added_one = false;
while (start_pos < static_cast<size_t>(input.length()))
{
/// If no more replacements possible for current string
@ -103,6 +106,9 @@ struct ReplaceRegexpImpl
if (searcher.Match(input, start_pos, input.length(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
if (is_start_pos_added_one)
start_pos -= 1;
const auto & match = matches[0];
size_t bytes_to_copy = (match.data() - input.data()) - start_pos;
@ -112,6 +118,13 @@ struct ReplaceRegexpImpl
res_offset += bytes_to_copy;
start_pos += bytes_to_copy + match.length();
/// To avoid infinite loop.
if (is_first_match && match.length() == 0 && !replace_one && input.length() > 1)
{
start_pos += 1;
is_start_pos_added_one = true;
}
/// Do substitution instructions
for (const auto & it : instructions)
{
@ -129,8 +142,9 @@ struct ReplaceRegexpImpl
}
}
if (replace_one || match.length() == 0) /// Stop after match of zero length, to avoid infinite loop.
if (replace_one || (!is_first_match && match.length() == 0))
can_finish_current_string = true;
is_first_match = false;
}
else
can_finish_current_string = true;

View File

@ -40,6 +40,7 @@ public:
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
{
prefetch_buffer.alignment = alignment;
}
~AsynchronousReadBufferFromFileDescriptor() override;

View File

@ -88,7 +88,11 @@ struct Memory : boost::noncopyable, Allocator
}
else
{
size_t new_capacity = align(new_size + pad_right, alignment);
size_t new_capacity = align(new_size, alignment) + pad_right;
size_t diff = new_capacity - m_capacity;
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, diff);
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;
@ -101,6 +105,9 @@ private:
if (!alignment)
return value;
if (!(value % alignment))
return value;
return (value + alignment - 1) / alignment * alignment;
}
@ -112,12 +119,10 @@ private:
return;
}
size_t padded_capacity = m_capacity + pad_right;
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, padded_capacity);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t new_capacity = align(padded_capacity, alignment);
size_t new_capacity = align(m_capacity, alignment) + pad_right;
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;

View File

@ -121,7 +121,7 @@ struct Progress
/** Callback to track the progress of the query.
* Used in IBlockInputStream and Context.
* Used in QueryPipeline and Context.
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/

View File

@ -5,6 +5,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
#include <future>
@ -151,6 +152,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
else
{
bytes_read += res;
__msan_unpoison(request.buf, res);
}
}

View File

@ -43,8 +43,6 @@ namespace ErrorCodes
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
}
class IBlockOutputStream;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)

View File

@ -1806,6 +1806,68 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
namespace
{
bool checkZooKeeperConfigIsLocal(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
String host = config.getString(config_name + "." + key + ".host");
if (isLocalAddress(DNSResolver::instance().resolveHost(host)))
return true;
}
}
return false;
}
}
bool Context::tryCheckClientConnectionToMyKeeperCluster() const
{
try
{
/// If our server is part of main Keeper cluster
if (checkZooKeeperConfigIsLocal(getConfigRef(), "zookeeper"))
{
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
getZooKeeper();
/// Connected, return true
return true;
}
else
{
Poco::Util::AbstractConfiguration::Keys keys;
getConfigRef().keys("auxiliary_zookeepers", keys);
/// If our server is part of some auxiliary_zookeeper
for (const auto & aux_zk_name : keys)
{
if (checkZooKeeperConfigIsLocal(getConfigRef(), "auxiliary_zookeepers." + aux_zk_name))
{
LOG_DEBUG(shared->log, "Our Keeper server is participant of the auxiliary zookeeper cluster ({}), will try to connect to it", aux_zk_name);
getAuxiliaryZooKeeper(aux_zk_name);
/// Connected, return true
return true;
}
}
}
/// Our server doesn't depend on our Keeper cluster
return true;
}
catch (...)
{
return false;
}
}
UInt32 Context::getZooKeeperSessionUptime() const
{
std::lock_guard lock(shared->zookeeper_mutex);
@ -1833,19 +1895,33 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperDispatcher() const
void Context::initializeKeeperDispatcher(bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
bool is_standalone_app = getApplicationType() == ApplicationType::KEEPER;
if (start_async)
{
assert(!is_standalone_app);
LOG_INFO(shared->log, "Connected to ZooKeeper (or Keeper) before internal Keeper start or we don't depend on our Keeper cluster"
", will wait for Keeper asynchronously");
}
else
{
LOG_INFO(shared->log, "Cannot connect to ZooKeeper (or Keeper) before internal Keeper start,"
"will wait for Keeper synchronously");
}
shared->keeper_storage_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_storage_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
shared->keeper_storage_dispatcher->initialize(config, is_standalone_app, start_async);
}
#endif
}

View File

@ -13,6 +13,7 @@
#include <Common/MultiVersion.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/RemoteHostFilter.h>
#include <Common/isLocalAddress.h>
#include <base/types.h>
#if !defined(ARCADIA_BUILD)
@ -635,13 +636,13 @@ public:
const Settings & getSettingsRef() const { return settings; }
void setProgressCallback(ProgressCallback callback);
/// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
/// Used in executeQuery() to pass it to the QueryPipeline.
ProgressCallback getProgressCallback() const;
void setFileProgressCallback(FileProgressCallback && callback) { file_progress_callback = callback; }
FileProgressCallback getFileProgressCallback() const { return file_progress_callback; }
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream,
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in QueryPipeline,
* to update and monitor information about the total number of resources spent for the query.
*/
void setProcessListElement(QueryStatus * elem);
@ -664,12 +665,18 @@ public:
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// Try to connect to Keeper using get(Auxiliary)ZooKeeper. Useful for
/// internal Keeper start (check connection to some other node). Return true
/// if connected successfully (without exception) or our zookeeper client
/// connection configured for some other cluster without our node.
bool tryCheckClientConnectionToMyKeeperCluster() const;
UInt32 getZooKeeperSessionUptime() const;
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif
void initializeKeeperDispatcher() const;
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.

View File

@ -128,10 +128,10 @@ BlockIO InterpreterDescribeQuery::execute()
{
for (const auto & column : columns)
{
column.type->forEachSubcolumn([&](const auto & name, const auto & type, const auto & path)
IDataType::forEachSubcolumn([&](const auto & path, const auto & name, const auto & data)
{
res_columns[0]->insert(Nested::concatenateName(column.name, name));
res_columns[1]->insert(type->getName());
res_columns[1]->insert(data.type->getName());
/// It's not trivial to calculate default expression for subcolumn.
/// So, leave it empty.
@ -150,7 +150,7 @@ BlockIO InterpreterDescribeQuery::execute()
res_columns[6]->insertDefault();
res_columns[7]->insert(1u);
});
}, column.type->getDefaultSerialization(), column.type, nullptr);
}
}

View File

@ -2303,14 +2303,12 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
{
const Settings & settings = context->getSettingsRef();
const auto & query = getSelectQuery();
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit,
query.hasFiltration());
limit);
query_plan.addStep(std::move(finish_sorting_step));
}

View File

@ -18,7 +18,7 @@
#include <Parsers/parseQuery.h>
#include <IO/WriteHelpers.h>
#include <Core/Defines.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
@ -524,7 +524,8 @@ std::vector<TableNeededColumns> normalizeColumnNamesExtractNeeded(
size_t count = countTablesWithColumn(tables, short_name);
if (count > 1 || aliases.count(short_name))
/// isValidIdentifierBegin retuired to be consistent with TableJoin::deduplicateAndQualifyColumnNames
if (count > 1 || aliases.count(short_name) || !isValidIdentifierBegin(short_name.at(0)))
{
const auto & table = tables[*table_pos];
IdentifierSemantic::setColumnLongName(*ident, table.table); /// table.column -> table_alias.column

View File

@ -11,6 +11,11 @@
namespace ProfileEvents
{
std::shared_ptr<DB::DataTypeEnum8> TypeEnum = std::make_shared<DB::DataTypeEnum8>(DB::DataTypeEnum8::Values{
{ "increment", static_cast<Int8>(INCREMENT)},
{ "gauge", static_cast<Int8>(GAUGE)},
});
/// Put implementation here to avoid extra linking dependencies for clickhouse_common_io
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <DataTypes/DataTypeEnum.h>
#include <Columns/IColumn.h>
@ -9,4 +10,13 @@ namespace ProfileEvents
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
/// This is for ProfileEvents packets.
enum Type : int8_t
{
INCREMENT = 1,
GAUGE = 2,
};
extern std::shared_ptr<DB::DataTypeEnum8> TypeEnum;
}

Some files were not shown because too many files have changed in this diff Show More