Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into issue-8828

This commit is contained in:
a.palagashvili 2020-11-09 19:53:20 +03:00
commit 6286775031
251 changed files with 2324 additions and 1203 deletions

View File

@ -123,6 +123,7 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--stacktrace` If specified, also print the stack trace if an exception occurs.
- `--config-file` The name of the configuration file.
- `--secure` If specified, will connect to server over secure connection.
- `--history_file` — Path to a file containing command history.
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
### Configuration Files {#configuration_files}

View File

@ -36,6 +36,7 @@ toc_title: Adopters
| <a href="https://www.criteo.com/" class="favicon">Criteo</a> | Retail | Main product | — | — | [Slides in English, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup18/3_storetail.pptx) |
| <a href="https://www.chinatelecomglobal.com/" class="favicon">Dataliance for China Telecom</a> | Telecom | Analytics | — | — | [Slides in Chinese, January 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup12/telecom.pdf) |
| <a href="https://db.com" class="favicon">Deutsche Bank</a> | Finance | BI Analytics | — | — | [Slides in English, October 2019](https://bigdatadays.ru/wp-content/uploads/2019/10/D2-H3-3_Yakunin-Goihburg.pdf) |
| <a href="https://deeplay.io/eng/" class="favicon">Deeplay</a> | Gaming Analytics | — | — | — | [Job advertisement, 2020](https://career.habr.com/vacancies/1000062568) |
| <a href="https://www.diva-e.com" class="favicon">Diva-e</a> | Digital consulting | Main Product | — | — | [Slides in English, September 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup29/ClickHouse-MeetUp-Unusual-Applications-sd-2019-09-17.pdf) |
| <a href="https://www.ecwid.com/" class="favicon">Ecwid</a> | E-commerce SaaS | Metrics, Logging | — | — | [Slides in Russian, April 2019](https://nastachku.ru/var/files/1/presentation/backend/2_Backend_6.pdf) |
| <a href="https://www.ebay.com/" class="favicon">eBay</a> | E-commerce | Logs, Metrics and Events | — | — | [Official website, Sep 2020](https://tech.ebayinc.com/engineering/ou-online-analytical-processing/) |

View File

@ -571,7 +571,7 @@ For more information, see the MergeTreeSettings.h header file.
Fine tuning for tables in the [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/mergetree.md).
This setting has higher priority.
This setting has a higher priority.
For more information, see the MergeTreeSettings.h header file.

View File

@ -0,0 +1,91 @@
---
toc_priority: 46
toc_title: Polygon Dictionaries With Grids
---
# Polygon dictionaries {#polygon-dictionaries}
Polygon dictionaries allow you to efficiently search for the polygon containing specified points.
For example: defining a city area by geographical coordinates.
Example configuration:
``` xml
<dictionary>
<structure>
<key>
<name>key</name>
<type>Array(Array(Array(Array(Float64))))</type>
</key>
<attribute>
<name>name</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
<layout>
<polygon />
</layout>
</dictionary>
```
Tne corresponding [DDL-query](../../../sql-reference/statements/create/dictionary.md#create-dictionary-query):
``` sql
CREATE DICTIONARY polygon_dict_name (
key Array(Array(Array(Array(Float64)))),
name String,
value UInt64
)
PRIMARY KEY key
LAYOUT(POLYGON())
...
```
When configuring the polygon dictionary, the key must have one of two types:
- A simple polygon. It is an array of points.
- MultiPolygon. It is an array of polygons. Each polygon is a two-dimensional array of points. The first element of this array is the outer boundary of the polygon, and subsequent elements specify areas to be excluded from it.
Points can be specified as an array or a tuple of their coordinates. In the current implementation, only two-dimensional points are supported.
The user can [upload their own data](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md) in all formats supported by ClickHouse.
There are 3 types of [in-memory storage](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) available:
- POLYGON_SIMPLE. This is a naive implementation, where a linear pass through all polygons is made for each query, and membership is checked for each one without using additional indexes.
- POLYGON_INDEX_EACH. A separate index is built for each polygon, which allows you to quickly check whether it belongs in most cases (optimized for geographical regions).
Also, a grid is superimposed on the area under consideration, which significantly narrows the number of polygons under consideration.
The grid is created by recursively dividing the cell into 16 equal parts and is configured with two parameters.
The division stops when the recursion depth reaches MAX_DEPTH or when the cell crosses no more than MIN_INTERSECTIONS polygons.
To respond to the query, there is a corresponding cell, and the index for the polygons stored in it is accessed alternately.
- POLYGON_INDEX_CELL. This placement also creates the grid described above. The same options are available. For each sheet cell, an index is built on all pieces of polygons that fall into it, which allows you to quickly respond to a request.
- POLYGON. Synonym to POLYGON_INDEX_CELL.
Dictionary queries are carried out using standard [functions](../../../sql-reference/functions/ext-dict-functions.md) for working with external dictionaries.
An important difference is that here the keys will be the points for which you want to find the polygon containing them.
Example of working with the dictionary defined above:
``` sql
CREATE TABLE points (
x Float64,
y Float64
)
...
SELECT tuple(x, y) AS key, dictGet(dict_name, 'name', key), dictGet(dict_name, 'value', key) FROM points ORDER BY x, y;
```
As a result of executing the last command for each point in the 'points' table, a minimum area polygon containing this point will be found, and the requested attributes will be output.

View File

@ -221,3 +221,85 @@ returns
│ 1970-03-12 │ 1970-01-08 │ original │
└────────────┴────────────┴──────────┘
```
## OFFSET FETCH Clause {#offset-fetch}
`OFFSET` and `FETCH` allow you to retrieve data by portions. They specify a row block which you want to get by a single query.
``` sql
OFFSET offset_row_count {ROW | ROWS}] [FETCH {FIRST | NEXT} fetch_row_count {ROW | ROWS} {ONLY | WITH TIES}]
```
The `offset_row_count` or `fetch_row_count` value can be a number or a literal constant. You can omit `fetch_row_count`; by default, it equals 1.
`OFFSET` specifies the number of rows to skip before starting to return rows from the query.
The `FETCH` specifies the maximum number of rows that can be in the result of a query.
The `ONLY` option is used to return rows that immediately follow the rows omitted by the `OFFSET`. In this case the `FETCH` is an alternative to the [LIMIT](../../../sql-reference/statements/select/limit.md) clause. For example, the following query
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 1 ROW FETCH FIRST 3 ROWS ONLY;
```
is identical to the query
``` sql
SELECT * FROM test_fetch ORDER BY a LIMIT 3 OFFSET 1;
```
The `WITH TIES` option is used to return any additional rows that tie for the last place in the result set according to the `ORDER BY` clause. For example, if `fetch_row_count` is set to 5 but two additional rows match the values of the `ORDER BY` columns in the fifth row, the result set will contain seven rows.
!!! note "Note"
According to the standard, the `OFFSET` clause must come before the `FETCH` clause if both are present.
### Examples {#examples}
Input table:
``` text
┌─a─┬─b─┐
│ 1 │ 1 │
│ 2 │ 1 │
│ 3 │ 4 │
│ 1 │ 3 │
│ 5 │ 4 │
│ 0 │ 6 │
│ 5 │ 7 │
└───┴───┘
```
Usage of the `ONLY` option:
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 3 ROW FETCH FIRST 3 ROWS ONLY;
```
Result:
``` text
┌─a─┬─b─┐
│ 2 │ 1 │
│ 3 │ 4 │
│ 5 │ 4 │
└───┴───┘
```
Usage of the `WITH TIES` option:
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 3 ROW FETCH FIRST 3 ROWS WITH TIES;
```
Result:
``` text
┌─a─┬─b─┐
│ 2 │ 1 │
│ 3 │ 4 │
│ 5 │ 4 │
│ 5 │ 7 │
└───┴───┘
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/statements/select/order-by/) <!--hide-->

View File

@ -64,6 +64,6 @@ CREATE TABLE merge.hits_buffer AS merge.hits ENGINE = Buffer(merge, hits, 16, 10
Таблицы типа Buffer используются в тех случаях, когда от большого количества серверов поступает слишком много INSERT-ов в единицу времени, и нет возможности заранее самостоятельно буферизовать данные перед вставкой, в результате чего, INSERT-ы не успевают выполняться.
Заметим, что даже для таблиц типа Buffer не имеет смысла вставлять данные по одной строке, так как таким образом будет достигнута скорость всего лишь в несколько тысяч строк в секунду, тогда как при вставке более крупными блоками, достижимо более миллиона строк в секунду (смотрите раздел «Производительность»).
Заметим, что даже для таблиц типа Buffer не имеет смысла вставлять данные по одной строке, так как таким образом будет достигнута скорость всего лишь в несколько тысяч строк в секунду, тогда как при вставке более крупными блоками, достижимо более миллиона строк в секунду (смотрите раздел [«Производительность»](../../../introduction/performance/).
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/table_engines/buffer/) <!--hide-->

View File

@ -555,6 +555,22 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
</merge_tree>
```
## replicated\_merge\_tree {#server_configuration_parameters-replicated_merge_tree}
Тонкая настройка таблиц в [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/mergetree.md).
Эта настройка имеет более высокий приоритет.
Подробнее смотрите в заголовочном файле MergeTreeSettings.h.
**Пример**
``` xml
<replicated_merge_tree>
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
</replicated_merge_tree>
```
## openSSL {#server_configuration_parameters-openssl}
Настройки клиента/сервера SSL.

View File

@ -1,4 +1,4 @@
# Cловари полигонов {#slovari-polygonov}
# Cловари полигонов {#polygon-dictionaries}
Словари полигонов позволяют эффективно искать полигон, в который попадают данные точки, среди множества полигонов.
Для примера: определение района города по географическим координатам.

View File

@ -214,3 +214,85 @@ ORDER BY
│ 1970-03-12 │ 1970-01-08 │ original │
└────────────┴────────────┴──────────┘
```
## Секция OFFSET FETCH {#offset-fetch}
`OFFSET` и `FETCH` позволяют извлекать данные по частям. Они указывают строки, которые вы хотите получить в результате запроса.
``` sql
OFFSET offset_row_count {ROW | ROWS}] [FETCH {FIRST | NEXT} fetch_row_count {ROW | ROWS} {ONLY | WITH TIES}]
```
`offset_row_count` или `fetch_row_count` может быть числом или литеральной константой. Если вы не используете `fetch_row_count`, то его значение равно 1.
`OFFSET` указывает количество строк, которые необходимо пропустить перед началом возврата строк из запроса.
`FETCH` указывает максимальное количество строк, которые могут быть получены в результате запроса.
Опция `ONLY` используется для возврата строк, которые следуют сразу же за строками, пропущенными секцией `OFFSET`. В этом случае `FETCH` — это альтернатива [LIMIT](../../../sql-reference/statements/select/limit.md). Например, следующий запрос
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 1 ROW FETCH FIRST 3 ROWS ONLY;
```
идентичен запросу
``` sql
SELECT * FROM test_fetch ORDER BY a LIMIT 3 OFFSET 1;
```
Опция `WITH TIES` используется для возврата дополнительных строк, которые привязываются к последней в результате запроса. Например, если `fetch_row_count` имеет значение 5 и существуют еще 2 строки с такими же значениями столбцов, указанных в `ORDER BY`, что и у пятой строки результата, то финальный набор будет содержать 7 строк.
!!! note "Примечание"
Секция `OFFSET` должна находиться перед секцией `FETCH`, если обе присутствуют.
### Примеры {#examples}
Входная таблица:
``` text
┌─a─┬─b─┐
│ 1 │ 1 │
│ 2 │ 1 │
│ 3 │ 4 │
│ 1 │ 3 │
│ 5 │ 4 │
│ 0 │ 6 │
│ 5 │ 7 │
└───┴───┘
```
Использование опции `ONLY`:
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 3 ROW FETCH FIRST 3 ROWS ONLY;
```
Результат:
``` text
┌─a─┬─b─┐
│ 2 │ 1 │
│ 3 │ 4 │
│ 5 │ 4 │
└───┴───┘
```
Использование опции `WITH TIES`:
``` sql
SELECT * FROM test_fetch ORDER BY a OFFSET 3 ROW FETCH FIRST 3 ROWS WITH TIES;
```
Результат:
``` text
┌─a─┬─b─┐
│ 2 │ 1 │
│ 3 │ 4 │
│ 5 │ 4 │
│ 5 │ 7 │
└───┴───┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/sql-reference/statements/select/order-by/) <!--hide-->

View File

@ -227,9 +227,6 @@ else ()
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-git-import DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-git-import)
endif ()
if(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge)
endif()
install (TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -75,6 +75,7 @@
#include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Formats/registerFormats.h>
#include <Common/Config/configReadClient.h>
#include <Storages/ColumnsDescription.h>
#include <common/argsToConfig.h>
@ -463,6 +464,7 @@ private:
{
UseSSL use_ssl;
registerFormats();
registerFunctions();
registerAggregateFunctions();
@ -2329,6 +2331,7 @@ public:
("query-fuzzer-runs", po::value<int>()->default_value(0), "query fuzzer runs")
("opentelemetry-traceparent", po::value<std::string>(), "OpenTelemetry traceparent header as described by W3C Trace Context recommendation")
("opentelemetry-tracestate", po::value<std::string>(), "OpenTelemetry tracestate header as described by W3C Trace Context recommendation")
("history_file", po::value<std::string>(), "path to history file")
;
Settings cmd_settings;
@ -2485,6 +2488,8 @@ public:
config().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
if (options.count("highlight"))
config().setBool("highlight", options["highlight"].as<bool>());
if (options.count("history_file"))
config().setString("history_file", options["history_file"].as<std::string>());
if ((query_fuzzer_runs = options["query-fuzzer-runs"].as<int>()))
{

View File

@ -1,6 +1,7 @@
#include "ClusterCopierApp.h"
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <Formats/registerFormats.h>
#include <unistd.h>
@ -122,6 +123,7 @@ void ClusterCopierApp::mainImpl()
registerStorages();
registerDictionaries();
registerDisks();
registerFormats();
static const std::string default_database = "_local";
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, *context));

View File

@ -33,6 +33,7 @@
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options.hpp>
#include <common/argsToConfig.h>
@ -224,6 +225,7 @@ try
registerStorages();
registerDictionaries();
registerDisks();
registerFormats();
/// Maybe useless
if (config().has("macros"))

View File

@ -23,6 +23,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Formats/registerFormats.h>
#include <Core/Block.h>
#include <common/StringRef.h>
#include <common/DateLUT.h>
@ -1050,6 +1051,8 @@ try
using namespace DB;
namespace po = boost::program_options;
registerFormats();
po::options_description description = createOptionsDescription("Options", getTerminalWidth());
description.add_options()
("help", "produce help message")

View File

@ -10,19 +10,8 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
PingHandler.cpp
SchemaAllowedHandler.cpp
validateODBCConnectionString.cpp
odbc-bridge.cpp
)
set (CLICKHOUSE_ODBC_BRIDGE_LINK
PRIVATE
clickhouse_parsers
clickhouse_aggregate_functions
daemon
dbms
Poco::Data
PUBLIC
Poco::Data::ODBC
)
clickhouse_program_add_library(odbc-bridge)
if (OS_LINUX)
# clickhouse-odbc-bridge is always a separate binary.
@ -30,10 +19,17 @@ if (OS_LINUX)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-export-dynamic")
endif ()
add_executable(clickhouse-odbc-bridge odbc-bridge.cpp)
set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
add_executable(clickhouse-odbc-bridge ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
clickhouse_program_link_split_binary(odbc-bridge)
target_link_libraries(clickhouse-odbc-bridge PRIVATE
daemon
dbms
clickhouse_parsers
Poco::Data
Poco::Data::ODBC
)
set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
if (USE_GDB_ADD_INDEX)
add_custom_command(TARGET clickhouse-odbc-bridge POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} ../clickhouse-odbc-bridge COMMENT "Adding .gdb-index to clickhouse-odbc-bridge" VERBATIM)

View File

@ -18,11 +18,13 @@
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config.h>
#include <Formats/registerFormats.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <ext/range.h>
#include <Common/SensitiveDataMasker.h>
namespace DB
{
namespace ErrorCodes
@ -160,6 +162,8 @@ int ODBCBridge::main(const std::vector<std::string> & /*args*/)
if (is_help)
return Application::EXIT_OK;
registerFormats();
LOG_INFO(log, "Starting up");
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);

View File

@ -1,3 +1,2 @@
add_executable (validate-odbc-connection-string validate-odbc-connection-string.cpp)
clickhouse_target_link_split_lib(validate-odbc-connection-string odbc-bridge)
add_executable (validate-odbc-connection-string validate-odbc-connection-string.cpp ../validateODBCConnectionString.cpp)
target_link_libraries (validate-odbc-connection-string PRIVATE clickhouse_common_io)

View File

@ -51,6 +51,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
@ -266,6 +267,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerStorages();
registerDictionaries();
registerDisks();
registerFormats();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -198,6 +198,7 @@ namespace
/// Serialize the list of ATTACH queries to a string.
std::stringstream ss;
ss.exceptions(std::ios::failbit);
for (const ASTPtr & query : queries)
ss << *query << ";\n";
String file_contents = std::move(ss).str();
@ -353,6 +354,7 @@ String DiskAccessStorage::getStorageParamsJSON() const
if (readonly)
json.set("readonly", readonly.load());
std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

View File

@ -151,6 +151,7 @@ String LDAPAccessStorage::getStorageParamsJSON() const
params_json.set("roles", default_role_names);
std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(params_json, oss);
return oss.str();

View File

@ -461,6 +461,7 @@ String UsersConfigAccessStorage::getStorageParamsJSON() const
if (!path.empty())
json.set("path", path);
std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

View File

@ -245,6 +245,7 @@ public:
{
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << this->data(place).rng;
DB::writeStringBinary(rng_stream.str(), buf);
}
@ -275,6 +276,7 @@ public:
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> this->data(place).rng;
}
@ -564,6 +566,7 @@ public:
{
DB::writeIntBinary<size_t>(data(place).total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << data(place).rng;
DB::writeStringBinary(rng_stream.str(), buf);
}
@ -598,6 +601,7 @@ public:
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> data(place).rng;
}

View File

@ -191,6 +191,7 @@ public:
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> rng;
for (size_t i = 0; i < samples.size(); ++i)
@ -205,6 +206,7 @@ public:
DB::writeIntBinary<size_t>(total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << rng;
DB::writeStringBinary(rng_stream.str(), buf);

View File

@ -223,6 +223,7 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
os.exceptions(std::ios::failbit);
for (const ReplicaState & state : replica_states)
{
const Connection * connection = state.connection;

View File

@ -71,7 +71,8 @@ void checkColumn(
std::unordered_map<UInt32, T> map;
size_t num_collisions = 0;
std::stringstream collitions_str;
std::stringstream collisions_str;
collisions_str.exceptions(std::ios::failbit);
for (size_t i = 0; i < eq_class.size(); ++i)
{
@ -86,14 +87,14 @@ void checkColumn(
if (num_collisions <= max_collisions_to_print)
{
collitions_str << "Collision:\n";
collitions_str << print_for_row(it->second) << '\n';
collitions_str << print_for_row(i) << std::endl;
collisions_str << "Collision:\n";
collisions_str << print_for_row(it->second) << '\n';
collisions_str << print_for_row(i) << std::endl;
}
if (num_collisions > allowed_collisions)
{
std::cerr << collitions_str.rdbuf();
std::cerr << collisions_str.rdbuf();
break;
}
}

View File

@ -538,6 +538,7 @@ XMLDocumentPtr ConfigProcessor::processConfig(
*has_zk_includes = !contributing_zk_paths.empty();
std::stringstream comment;
comment.exceptions(std::ios::failbit);
comment << " This file was generated automatically.\n";
comment << " Do not edit it: it is likely to be discarded and generated again before it's read next time.\n";
comment << " Files used to generate this file:";

View File

@ -246,6 +246,7 @@ static std::string getExtraExceptionInfo(const std::exception & e)
std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded_stacktrace /*= false*/, bool with_extra_info /*= true*/)
{
std::stringstream stream;
stream.exceptions(std::ios::failbit);
try
{
@ -365,6 +366,7 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
{
std::stringstream stream;
stream.exceptions(std::ios::failbit);
try
{

View File

@ -134,6 +134,7 @@ void MemoryTracker::alloc(Int64 size)
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
std::stringstream message;
message.exceptions(std::ios::failbit);
message << "Memory tracker";
if (const auto * description = description_ptr.load(std::memory_order_relaxed))
message << " " << description;
@ -166,6 +167,7 @@ void MemoryTracker::alloc(Int64 size)
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
std::stringstream message;
message.exceptions(std::ios::failbit);
message << "Memory limit";
if (const auto * description = description_ptr.load(std::memory_order_relaxed))
message << " " << description;

View File

@ -74,6 +74,7 @@ ShellCommand::~ShellCommand()
void ShellCommand::logCommand(const char * filename, char * const argv[])
{
std::stringstream args;
args.exceptions(std::ios::failbit);
for (int i = 0; argv != nullptr && argv[i] != nullptr; ++i)
{
if (i > 0)

View File

@ -24,6 +24,7 @@
std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext_t & context)
{
std::stringstream error;
error.exceptions(std::ios::failbit);
switch (sig)
{
case SIGSEGV:
@ -319,6 +320,7 @@ static void toStringEveryLineImpl(
std::unordered_map<std::string, DB::Dwarf> dwarfs;
std::stringstream out;
out.exceptions(std::ios::failbit);
for (size_t i = offset; i < size; ++i)
{
@ -358,6 +360,7 @@ static void toStringEveryLineImpl(
}
#else
std::stringstream out;
out.exceptions(std::ios::failbit);
for (size_t i = offset; i < size; ++i)
{
@ -373,6 +376,7 @@ static void toStringEveryLineImpl(
static std::string toStringImpl(const StackTrace::FramePointers & frame_pointers, size_t offset, size_t size)
{
std::stringstream out;
out.exceptions(std::ios::failbit);
toStringEveryLineImpl(frame_pointers, offset, size, [&](const std::string & str) { out << str << '\n'; });
return out.str();
}

View File

@ -154,6 +154,8 @@ std::pair<bool, std::string> StudentTTest::compareAndReport(size_t confidence_le
double mean_confidence_interval = table_value * t_statistic;
std::stringstream ss;
ss.exceptions(std::ios::failbit);
if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency.
{
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : ";

View File

@ -398,8 +398,7 @@ bool PerfEventsCounters::processThreadLocalChanges(const std::string & needed_ev
return true;
}
// Parse comma-separated list of event names. Empty means all available
// events.
// Parse comma-separated list of event names. Empty means all available events.
std::vector<size_t> PerfEventsCounters::eventIndicesFromString(const std::string & events_list)
{
std::vector<size_t> result;
@ -418,8 +417,7 @@ std::vector<size_t> PerfEventsCounters::eventIndicesFromString(const std::string
std::string event_name;
while (std::getline(iss, event_name, ','))
{
// Allow spaces at the beginning of the token, so that you can write
// 'a, b'.
// Allow spaces at the beginning of the token, so that you can write 'a, b'.
event_name.erase(0, event_name.find_first_not_of(' '));
auto entry = event_name_to_index.find(event_name);

View File

@ -80,6 +80,7 @@ void ThreadStatus::assertState(const std::initializer_list<int> & permitted_stat
}
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << "Unexpected thread state " << getCurrentState();
if (description)
ss << ": " << description;

View File

@ -49,6 +49,7 @@ struct UInt128
String toHexString() const
{
std::ostringstream os;
os.exceptions(std::ios::failbit);
os << std::setw(16) << std::setfill('0') << std::hex << high << low;
return String(os.str());
}

View File

@ -308,6 +308,7 @@ struct ODBCBridgeMixin
path.setFileName("clickhouse-odbc-bridge");
std::stringstream command;
command.exceptions(std::ios::failbit);
#if !CLICKHOUSE_SPLIT_BINARY
cmd_args.push_back("odbc-bridge");

View File

@ -219,6 +219,7 @@ std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Contai
++it->second.seq_num;
std::stringstream seq_num_str;
seq_num_str.exceptions(std::ios::failbit);
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
path_created += seq_num_str.str();

View File

@ -81,6 +81,7 @@ __attribute__((__weak__)) void checkStackSize()
if (stack_size * 2 > max_stack_size)
{
std::stringstream message;
message.exceptions(std::ios::failbit);
message << "Stack size too large"
<< ". Stack address: " << stack_address
<< ", frame address: " << frame_address

View File

@ -3,7 +3,6 @@
#include <re2/stringpiece.h>
#include <algorithm>
#include <sstream>
#include <cassert>
#include <iomanip>
@ -20,6 +19,7 @@ namespace DB
std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_globs)
{
std::ostringstream oss_for_escaping;
oss_for_escaping.exceptions(std::ios::failbit);
/// Escaping only characters that not used in glob syntax
for (const auto & letter : initial_str_with_globs)
{
@ -33,6 +33,7 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
re2::StringPiece input(escaped_with_globs);
re2::StringPiece matched;
std::ostringstream oss_for_replacing;
oss_for_replacing.exceptions(std::ios::failbit);
size_t current_index = 0;
while (RE2::FindAndConsume(&input, enum_or_range, &matched))
{
@ -45,8 +46,8 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
size_t range_end = 0;
char point;
std::istringstream iss_range(buffer);
iss_range.exceptions(std::ios::failbit);
iss_range >> range_begin >> point >> point >> range_end;
assert(!iss_range.fail());
bool leading_zeros = buffer[0] == '0';
size_t num_len = std::to_string(range_end).size();
if (leading_zeros)
@ -71,6 +72,7 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
oss_for_replacing << escaped_with_globs.substr(current_index);
std::string almost_res = oss_for_replacing.str();
std::ostringstream oss_final_processing;
oss_final_processing.exceptions(std::ios::failbit);
for (const auto & letter : almost_res)
{
if ((letter == '?') || (letter == '*'))

View File

@ -0,0 +1,15 @@
#pragma once
#include <Functions/registerFunctions.h>
#include <Formats/registerFormats.h>
inline void tryRegisterFunctions()
{
static struct Register { Register() { DB::registerFunctions(); } } registered;
}
inline void tryRegisterFormats()
{
static struct Register { Register() { DB::registerFormats(); } } registered;
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <Functions/FunctionFactory.h>
#include <Functions/registerFunctions.h>
struct RegisteredFunctionsState
{
RegisteredFunctionsState()
{
DB::registerFunctions();
}
RegisteredFunctionsState(RegisteredFunctionsState &&) = default;
};
inline void tryRegisterFunctions()
{
static RegisteredFunctionsState registered_functions_state;
}

View File

@ -165,6 +165,7 @@ TEST(Common, SensitiveDataMasker)
</rule>
</query_masking_rules>
</clickhouse>)END");
Poco::AutoPtr<Poco::Util::XMLConfiguration> xml_config = new Poco::Util::XMLConfiguration(xml_isteam_bad);
DB::SensitiveDataMasker masker_xml_based_exception_check(*xml_config, "query_masking_rules");

View File

@ -52,6 +52,7 @@ int main(int, char **)
if (x != i)
{
std::stringstream s;
s.exceptions(std::ios::failbit);
s << "Failed!, read: " << x << ", expected: " << i;
throw DB::Exception(s.str(), 0);
}

View File

@ -22,6 +22,7 @@ void IMySQLReadPacket::readPayload(ReadBuffer & in, uint8_t & sequence_id)
if (!payload.eof())
{
std::stringstream tmp;
tmp.exceptions(std::ios::failbit);
tmp << "Packet payload is not fully read. Stopped after " << payload.count() << " bytes, while " << payload.available() << " bytes are in buffer.";
throw Exception(tmp.str(), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}

View File

@ -16,6 +16,7 @@ void IMySQLWritePacket::writePayload(WriteBuffer & buffer, uint8_t & sequence_id
if (buf.remainingPayloadSize())
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << "Incomplete payload. Written " << getPayloadSize() - buf.remainingPayloadSize() << " bytes, expected " << getPayloadSize() << " bytes.";
throw Exception(ss.str(), 0);
}

View File

@ -130,4 +130,6 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
}
}
IMPLEMENT_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
}

View File

@ -514,4 +514,13 @@ struct Settings : public BaseSettings<SettingsTraits>
static void checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path);
};
/*
* User-specified file format settings for File and ULR engines.
*/
DECLARE_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
struct FormatFactorySettings : public BaseSettings<FormatFactorySettingsTraits>
{
};
}

View File

@ -61,6 +61,7 @@ struct SortColumnDescription
std::string dump() const
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << column_name << ":" << column_number << ":dir " << direction << "nulls " << nulls_direction;
return ss.str();
}

View File

@ -60,6 +60,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
if (!value)
{
std::stringstream exception_message;
exception_message.exceptions(std::ios::failbit);
exception_message << "Constraint " << backQuote(constraint_ptr->name)
<< " for table " << table_id.getNameForLogs()
@ -87,6 +88,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
Names related_columns = constraint_expr->getRequiredColumns();
std::stringstream exception_message;
exception_message.exceptions(std::ios::failbit);
exception_message << "Constraint " << backQuote(constraint_ptr->name)
<< " for table " << table_id.getNameForLogs()

View File

@ -360,6 +360,7 @@ Block IBlockInputStream::getExtremes()
String IBlockInputStream::getTreeID() const
{
std::stringstream s;
s.exceptions(std::ios::failbit);
s << getName();
if (!children.empty())

View File

@ -5,6 +5,7 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -314,6 +315,8 @@ void RemoteQueryExecutor::sendScalars()
void RemoteQueryExecutor::sendExternalTables()
{
SelectQueryInfo query_info;
size_t count = multiplexed_connections->size();
{
@ -328,11 +331,12 @@ void RemoteQueryExecutor::sendExternalTables()
{
StoragePtr cur = table.second;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, {}, context,
metadata_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
auto data = std::make_unique<ExternalTableData>();

View File

@ -33,6 +33,7 @@ static const std::vector<String> supported_functions{"any", "anyLast", "min",
String DataTypeCustomSimpleAggregateFunction::getName() const
{
std::stringstream stream;
stream.exceptions(std::ios::failbit);
stream << "SimpleAggregateFunction(" << function->getName();
if (!parameters.empty())

View File

@ -30,6 +30,7 @@ template <typename T>
std::string DataTypeDecimal<T>::doGetName() const
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << "Decimal(" << this->precision << ", " << this->scale << ")";
return ss.str();
}

View File

@ -95,6 +95,7 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
if (!create)
{
std::ostringstream query_stream;
query_stream.exceptions(std::ios::failbit);
formatAST(*query, query_stream, true);
throw Exception("Query '" + query_stream.str() + "' is not CREATE query", ErrorCodes::LOGICAL_ERROR);
}
@ -121,6 +122,7 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
create->table = TABLE_WITH_UUID_NAME_PLACEHOLDER;
std::ostringstream statement_stream;
statement_stream.exceptions(std::ios::failbit);
formatAST(*create, statement_stream, false);
statement_stream << '\n';
return statement_stream.str();

View File

@ -128,6 +128,7 @@ static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection
bool first = true;
std::stringstream error_message;
error_message.exceptions(std::ios::failbit);
error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires ";
for (const auto & [variable_name, variable_error_message] : variables_error_message)
{
@ -239,6 +240,7 @@ static inline BlockOutputStreamPtr getTableOutput(const String & database_name,
const StoragePtr & storage = DatabaseCatalog::instance().getTable(StorageID(database_name, table_name), query_context);
std::stringstream insert_columns_str;
insert_columns_str.exceptions(std::ios::failbit);
const StorageInMemoryMetadata & storage_metadata = storage->getInMemoryMetadata();
const ColumnsDescription & storage_columns = storage_metadata.getColumns();
const NamesAndTypesList & insert_columns_names = insert_materialized ? storage_columns.getAllPhysical() : storage_columns.getOrdinary();
@ -330,6 +332,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
const auto & position_message = [&]()
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
position.dump(ss);
return ss.str();
};
@ -372,6 +375,7 @@ void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, Materialize
const auto & position_message = [&]()
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
client.getPosition().dump(ss);
return ss.str();
};
@ -619,30 +623,17 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
else if (receive_event->type() == MYSQL_QUERY_EVENT)
{
QueryEvent & query_event = static_cast<QueryEvent &>(*receive_event);
flushBuffersData(buffers, metadata);
try
{
Context query_context = createQueryContext(global_context);
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
}
catch (Exception & exception)
{
tryLogCurrentException(log);
/// If some DDL query was not successfully parsed and executed
/// Then replication may fail on next binlog events anyway
if (exception.code() != ErrorCodes::SYNTAX_ERROR)
throw;
}
Position position_before_ddl;
position_before_ddl.update(metadata.binlog_position, metadata.binlog_file, metadata.executed_gtid_set);
metadata.transaction(position_before_ddl, [&]() { buffers.commit(global_context); });
metadata.transaction(client.getPosition(),[&](){ executeDDLAtomic(query_event); });
}
else if (receive_event->header.type != HEARTBEAT_EVENT)
{
const auto & dump_event_message = [&]()
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
receive_event->dump(ss);
return ss.str();
};
@ -651,6 +642,26 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
}
}
void MaterializeMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_event)
{
try
{
Context query_context = createQueryContext(global_context);
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
}
catch (Exception & exception)
{
tryLogCurrentException(log);
/// If some DDL query was not successfully parsed and executed
/// Then replication may fail on next binlog events anyway
if (exception.code() != ErrorCodes::SYNTAX_ERROR)
throw;
}
}
bool MaterializeMySQLSyncThread::isMySQLSyncThread()
{
return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME;

View File

@ -100,6 +100,7 @@ private:
std::atomic<bool> sync_quit{false};
std::unique_ptr<ThreadFromGlobalPool> background_thread_pool;
void executeDDLAtomic(const QueryEvent & query_event);
};
}

View File

@ -231,6 +231,7 @@ std::string DictionaryStructure::getKeyDescription() const
return "UInt64";
std::ostringstream out;
out.exceptions(std::ios::failbit);
out << '(';

View File

@ -19,6 +19,7 @@ static std::string configurationToString(const DictionaryConfigurationPtr & conf
{
const Poco::Util::XMLConfiguration * xml_config = dynamic_cast<const Poco::Util::XMLConfiguration *>(config.get());
std::ostringstream oss;
oss.exceptions(std::ios::failbit);
xml_config->save(oss);
return oss.str();
}

View File

@ -40,100 +40,93 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
FormatSettings getFormatSettings(const Context & context)
{
const auto & settings = context.getSettingsRef();
static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
return getFormatSettings(context, settings);
}
template <typename Settings>
FormatSettings getFormatSettings(const Context & context,
const Settings & settings)
{
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.date_time_output_format = settings.date_time_output_format;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.write_statistics = settings.output_format_write_statistics;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
if (format_settings.schema.is_server)
{
const Poco::URI & avro_schema_registry_url = settings.format_avro_schema_registry_url;
if (!avro_schema_registry_url.empty())
context.getRemoteHostFilter().checkURL(avro_schema_registry_url);
}
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
return format_settings;
}
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ?
FormatSettings::Pretty::Charset::ASCII :
FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.date_time_output_format = settings.date_time_output_format;
template
FormatSettings getFormatSettings<FormatFactorySettings>(const Context & context,
const FormatFactorySettings & settings);
return format_settings;
}
template
FormatSettings getFormatSettings<Settings>(const Context & context,
const Settings & settings);
BlockInputStreamPtr FormatFactory::getInput(
@ -142,21 +135,22 @@ BlockInputStreamPtr FormatFactory::getInput(
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback) const
const std::optional<FormatSettings> & _format_settings) const
{
if (name == "Native")
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
if (!getCreators(name).input_processor_creator)
{
const auto & input_getter = getCreators(name).input_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings, context);
return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, max_block_size, {}, format_settings);
}
const Settings & settings = context.getSettingsRef();
@ -182,17 +176,16 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
FormatSettings format_settings = getInputFormatSetting(settings, context);
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.callback = std::move(callback);
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
auto input_creator_params =
ParallelParsingBlockInputStream::InputCreatorParams{sample,
row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
@ -200,32 +193,37 @@ BlockInputStreamPtr FormatFactory::getInput(
return std::make_shared<ParallelParsingBlockInputStream>(params);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
auto format = getInputFormat(name, buf, sample, context, max_block_size,
format_settings);
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
}
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
WriteBuffer & buf, const Block & sample, const Context & context,
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
{
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
if (!getCreators(name).output_processor_creator)
{
const auto & output_getter = getCreators(name).output_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, std::move(callback), format_settings), sample);
output_getter(buf, sample, std::move(callback), format_settings),
sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), ignore_no_row_delimiter);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
auto format = getOutputFormat(name, buf, sample, context, std::move(callback),
format_settings);
return std::make_shared<MaterializingBlockOutputStream>(
std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -235,25 +233,27 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback) const
const std::optional<FormatSettings> & _format_settings) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings, context);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
RowInputFormatParams params;
params.max_block_size = max_block_size;
params.allow_errors_num = format_settings.input_allow_errors_num;
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.callback = std::move(callback);
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto format = input_getter(buf, sample, params, format_settings);
/// It's a kludge. Because I cannot remove context from values format.
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
values->setContext(context);
@ -263,19 +263,20 @@ InputFormatPtr FormatFactory::getInputFormat(
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
RowOutputFormatParams params;
params.ignore_no_row_delimiter = ignore_no_row_delimiter;
params.callback = std::move(callback);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
@ -333,150 +334,6 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineLineAsString(FormatFactory & factory);
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorORC(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
FormatFactory::FormatFactory()
{
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineRegexp(*this);
registerFileSegmentationEngineJSONAsString(*this);
registerFileSegmentationEngineLineAsString(*this);
registerInputFormatNative(*this);
registerOutputFormatNative(*this);
registerInputFormatProcessorNative(*this);
registerOutputFormatProcessorNative(*this);
registerInputFormatProcessorRowBinary(*this);
registerOutputFormatProcessorRowBinary(*this);
registerInputFormatProcessorTabSeparated(*this);
registerOutputFormatProcessorTabSeparated(*this);
registerInputFormatProcessorValues(*this);
registerOutputFormatProcessorValues(*this);
registerInputFormatProcessorCSV(*this);
registerOutputFormatProcessorCSV(*this);
registerInputFormatProcessorTSKV(*this);
registerOutputFormatProcessorTSKV(*this);
registerInputFormatProcessorJSONEachRow(*this);
registerOutputFormatProcessorJSONEachRow(*this);
registerInputFormatProcessorJSONCompactEachRow(*this);
registerOutputFormatProcessorJSONCompactEachRow(*this);
registerInputFormatProcessorProtobuf(*this);
registerOutputFormatProcessorProtobuf(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
registerInputFormatProcessorRawBLOB(*this);
registerOutputFormatProcessorRawBLOB(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorORC(*this);
registerOutputFormatProcessorORC(*this);
registerInputFormatProcessorParquet(*this);
registerOutputFormatProcessorParquet(*this);
registerInputFormatProcessorArrow(*this);
registerOutputFormatProcessorArrow(*this);
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
#endif
registerOutputFormatNull(*this);
registerOutputFormatProcessorPretty(*this);
registerOutputFormatProcessorPrettyCompact(*this);
registerOutputFormatProcessorPrettySpace(*this);
registerOutputFormatProcessorVertical(*this);
registerOutputFormatProcessorJSON(*this);
registerOutputFormatProcessorJSONCompact(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerOutputFormatProcessorXML(*this);
registerOutputFormatProcessorODBCDriver2(*this);
registerOutputFormatProcessorNull(*this);
registerOutputFormatProcessorMySQLWire(*this);
registerOutputFormatProcessorMarkdown(*this);
registerOutputFormatProcessorPostgreSQLWire(*this);
registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorJSONAsString(*this);
registerInputFormatProcessorLineAsString(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this);
#endif
}
FormatFactory & FormatFactory::instance()
{

View File

@ -3,6 +3,7 @@
#include <common/types.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Formats/FormatSettings.h>
#include <IO/BufferWithOwnMemory.h>
#include <functional>
@ -16,6 +17,8 @@ namespace DB
class Block;
class Context;
struct FormatSettings;
struct Settings;
struct FormatFactorySettings;
class ReadBuffer;
class WriteBuffer;
@ -32,6 +35,11 @@ struct RowOutputFormatParams;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
FormatSettings getFormatSettings(const Context & context);
template <typename T>
FormatSettings getFormatSettings(const Context & context,
const T & settings);
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things.
@ -96,7 +104,6 @@ private:
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
static FormatFactory & instance();
BlockInputStreamPtr getInput(
@ -105,10 +112,11 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
const Block & sample, const Context & context, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
InputFormatPtr getInputFormat(
const String & name,
@ -116,10 +124,12 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
@ -137,8 +147,6 @@ public:
private:
FormatsDictionary dict;
FormatFactory();
const Creators & getCreators(const String & name) const;
};

View File

@ -6,10 +6,16 @@
namespace DB
{
/** Various tweaks for input/output formats.
* Text serialization/deserialization of data types also depend on some of these settings.
* NOTE Parameters for unrelated formats and unrelated data types
* are collected in this struct - it prevents modularity, but they are difficult to separate.
/**
* Various tweaks for input/output formats. Text serialization/deserialization
* of data types also depend on some of these settings. It is different from
* FormatFactorySettings in that it has all necessary user-provided settings
* combined with information from context etc, that we can use directly during
* serialization. In contrast, FormatFactorySettings' job is to reflect the
* changes made to user-visible format settings, such as when tweaking the
* the format for File engine.
* NOTE Parameters for unrelated formats and unrelated data types are collected
* in this struct - it prevents modularity, but they are difficult to separate.
*/
struct FormatSettings
{
@ -17,76 +23,6 @@ struct FormatSettings
/// Option means that each chunk of data need to be formatted independently. Also each chunk will be flushed at the end of processing.
bool enable_streaming = false;
struct JSON
{
bool quote_64bit_integers = true;
bool quote_denormals = true;
bool escape_forward_slashes = true;
};
JSON json;
struct CSV
{
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
};
CSV csv;
struct Pretty
{
UInt64 max_rows = 10000;
UInt64 max_column_pad_width = 250;
UInt64 max_value_width = 10000;
bool color = true;
bool output_format_pretty_row_numbers = false;
enum class Charset
{
UTF8,
ASCII,
};
Charset charset = Charset::UTF8;
};
Pretty pretty;
struct Values
{
bool interpret_expressions = true;
bool deduce_templates_of_expressions = true;
bool accurate_types_of_literals = true;
};
Values values;
struct Template
{
String resultset_format;
String row_format;
String row_between_delimiter;
};
Template template_settings;
struct TSV
{
bool empty_as_default = false;
bool crlf_end_of_line = false;
String null_representation = "\\N";
bool input_format_enum_as_number = false;
};
TSV tsv;
bool skip_unknown_fields = false;
bool with_names_use_header = false;
bool write_statistics = true;
@ -113,24 +49,29 @@ struct FormatSettings
UInt64 input_allow_errors_num = 0;
Float32 input_allow_errors_ratio = 0;
struct Arrow
struct
{
UInt64 row_group_size = 1000000;
} arrow;
struct Parquet
struct
{
UInt64 row_group_size = 1000000;
} parquet;
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
} avro;
struct Schema
struct CSV
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
};
Schema schema;
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
} csv;
struct Custom
{
@ -141,29 +82,87 @@ struct FormatSettings
std::string row_between_delimiter;
std::string field_delimiter;
std::string escaping_rule;
};
} custom;
Custom custom;
struct Avro
struct
{
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
};
bool quote_64bit_integers = true;
bool quote_denormals = true;
bool escape_forward_slashes = true;
bool serialize_as_strings = false;
} json;
Avro avro;
struct
{
UInt64 row_group_size = 1000000;
} parquet;
struct Regexp
struct Pretty
{
UInt64 max_rows = 10000;
UInt64 max_column_pad_width = 250;
UInt64 max_value_width = 10000;
bool color = true;
bool output_format_pretty_row_numbers = false;
enum class Charset
{
UTF8,
ASCII,
};
Charset charset = Charset::UTF8;
} pretty;
struct
{
bool write_row_delimiters = true;
/**
* Some buffers (kafka / rabbit) split the rows internally using callback,
* and always send one row per message, so we can push there formats
* without framing / delimiters (like ProtobufSingle). In other cases,
* we have to enforce exporting at most one row in the format output,
* because Protobuf without delimiters is not generally useful.
*/
bool allow_many_rows_no_delimiters = false;
} protobuf;
struct
{
std::string regexp;
std::string escaping_rule;
bool skip_unmatched = false;
};
} regexp;
Regexp regexp;
struct
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
} schema;
struct
{
String resultset_format;
String row_format;
String row_between_delimiter;
} template_settings;
struct
{
bool empty_as_default = false;
bool crlf_end_of_line = false;
String null_representation = "\\N";
bool input_format_enum_as_number = false;
} tsv;
struct
{
bool interpret_expressions = true;
bool deduce_templates_of_expressions = true;
bool accurate_types_of_literals = true;
} values;
};
}

View File

@ -0,0 +1,160 @@
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include <Formats/FormatFactory.h>
namespace DB
{
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineLineAsString(FormatFactory & factory);
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorORC(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
void registerFormats()
{
auto & factory = FormatFactory::instance();
registerFileSegmentationEngineTabSeparated(factory);
registerFileSegmentationEngineCSV(factory);
registerFileSegmentationEngineJSONEachRow(factory);
registerFileSegmentationEngineRegexp(factory);
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineLineAsString(factory);
registerInputFormatNative(factory);
registerOutputFormatNative(factory);
registerInputFormatProcessorNative(factory);
registerOutputFormatProcessorNative(factory);
registerInputFormatProcessorRowBinary(factory);
registerOutputFormatProcessorRowBinary(factory);
registerInputFormatProcessorTabSeparated(factory);
registerOutputFormatProcessorTabSeparated(factory);
registerInputFormatProcessorValues(factory);
registerOutputFormatProcessorValues(factory);
registerInputFormatProcessorCSV(factory);
registerOutputFormatProcessorCSV(factory);
registerInputFormatProcessorTSKV(factory);
registerOutputFormatProcessorTSKV(factory);
registerInputFormatProcessorJSONEachRow(factory);
registerOutputFormatProcessorJSONEachRow(factory);
registerInputFormatProcessorJSONCompactEachRow(factory);
registerOutputFormatProcessorJSONCompactEachRow(factory);
registerInputFormatProcessorProtobuf(factory);
registerOutputFormatProcessorProtobuf(factory);
registerInputFormatProcessorTemplate(factory);
registerOutputFormatProcessorTemplate(factory);
registerInputFormatProcessorMsgPack(factory);
registerOutputFormatProcessorMsgPack(factory);
registerInputFormatProcessorRawBLOB(factory);
registerOutputFormatProcessorRawBLOB(factory);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorORC(factory);
registerOutputFormatProcessorORC(factory);
registerInputFormatProcessorParquet(factory);
registerOutputFormatProcessorParquet(factory);
registerInputFormatProcessorArrow(factory);
registerOutputFormatProcessorArrow(factory);
registerInputFormatProcessorAvro(factory);
registerOutputFormatProcessorAvro(factory);
#endif
registerOutputFormatNull(factory);
registerOutputFormatProcessorPretty(factory);
registerOutputFormatProcessorPrettyCompact(factory);
registerOutputFormatProcessorPrettySpace(factory);
registerOutputFormatProcessorVertical(factory);
registerOutputFormatProcessorJSON(factory);
registerOutputFormatProcessorJSONCompact(factory);
registerOutputFormatProcessorJSONEachRowWithProgress(factory);
registerOutputFormatProcessorXML(factory);
registerOutputFormatProcessorODBCDriver2(factory);
registerOutputFormatProcessorNull(factory);
registerOutputFormatProcessorMySQLWire(factory);
registerOutputFormatProcessorMarkdown(factory);
registerOutputFormatProcessorPostgreSQLWire(factory);
registerInputFormatProcessorRegexp(factory);
registerInputFormatProcessorJSONAsString(factory);
registerInputFormatProcessorLineAsString(factory);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(factory);
#endif
}
}

View File

@ -0,0 +1,9 @@
#pragma once
namespace DB
{
void registerFormats();
}

View File

@ -38,8 +38,8 @@ try
FormatSettings format_settings;
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){},false};
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0};
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){}};
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, in_params, false, false, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));

View File

@ -22,6 +22,7 @@ SRCS(
ProtobufReader.cpp
ProtobufSchemas.cpp
ProtobufWriter.cpp
registerFormats.cpp
verbosePrintString.cpp
)

View File

@ -12,7 +12,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#define STATS_ENABLE_STDVEC_WRAPPERS
#include <stats.hpp>
@ -139,31 +139,29 @@ Variants bayesian_ab_test(String distribution, PODArray<Float64> & xs, PODArray<
String convertToJson(const PODArray<String> & variant_names, const Variants & variants)
{
FormatSettings settings;
std::stringstream s;
WriteBufferFromOwnString buf;
writeCString("{\"data\":[", buf);
for (size_t i = 0; i < variants.size(); ++i)
{
WriteBufferFromOStream buf(s);
writeCString("{\"data\":[", buf);
for (size_t i = 0; i < variants.size(); ++i)
{
writeCString("{\"variant_name\":", buf);
writeJSONString(variant_names[i], buf, settings);
writeCString(",\"x\":", buf);
writeText(variants[i].x, buf);
writeCString(",\"y\":", buf);
writeText(variants[i].y, buf);
writeCString(",\"beats_control\":", buf);
writeText(variants[i].beats_control, buf);
writeCString(",\"to_be_best\":", buf);
writeText(variants[i].best, buf);
writeCString("}", buf);
if (i != variant_names.size() -1) writeCString(",", buf);
}
writeCString("]}", buf);
writeCString("{\"variant_name\":", buf);
writeJSONString(variant_names[i], buf, settings);
writeCString(",\"x\":", buf);
writeText(variants[i].x, buf);
writeCString(",\"y\":", buf);
writeText(variants[i].y, buf);
writeCString(",\"beats_control\":", buf);
writeText(variants[i].beats_control, buf);
writeCString(",\"to_be_best\":", buf);
writeText(variants[i].best, buf);
writeCString("}", buf);
if (i != variant_names.size() -1)
writeCString(",", buf);
}
writeCString("]}", buf);
return s.str();
return buf.str();
}
class FunctionBayesAB : public IFunction

View File

@ -10,39 +10,44 @@ Variants test_bayesab(std::string dist, PODArray<Float64> xs, PODArray<Float64>
{
Variants variants;
std::cout << std::fixed;
//std::cout << std::fixed;
if (dist == "beta")
{
std::cout << dist << "\nclicks: ";
for (auto x : xs) std::cout << x << " ";
/* std::cout << dist << "\nclicks: ";
for (auto x : xs)
std::cout << x << " ";
std::cout <<"\tconversions: ";
for (auto y : ys) std::cout << y << " ";
for (auto y : ys)
std::cout << y << " ";
std::cout << "\n";
std::cout << "\n";*/
variants = bayesian_ab_test<true>(dist, xs, ys);
}
else if (dist == "gamma")
{
std::cout << dist << "\nclicks: ";
for (auto x : xs) std::cout << x << " ";
/* std::cout << dist << "\nclicks: ";
for (auto x : xs)
std::cout << x << " ";
std::cout <<"\tcost: ";
for (auto y : ys) std::cout << y << " ";
for (auto y : ys)
std::cout << y << " ";
std::cout << "\n";*/
std::cout << "\n";
variants = bayesian_ab_test<true>(dist, xs, ys);
}
for (size_t i = 0; i < variants.size(); ++i)
/* for (size_t i = 0; i < variants.size(); ++i)
std::cout << i << " beats 0: " << variants[i].beats_control << std::endl;
for (size_t i = 0; i < variants.size(); ++i)
std::cout << i << " to be best: " << variants[i].best << std::endl;
std::cout << convertToJson({"0", "1", "2"}, variants) << std::endl;
*/
Float64 max_val = 0.0, min_val = 2.0;
for (size_t i = 0; i < variants.size(); ++i)
{

View File

@ -240,6 +240,7 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message;
error_message.exceptions(std::ios::failbit);
error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
<< response.getReason() << ", body: " << istr.rdbuf();

View File

@ -27,20 +27,14 @@ bool MySQLPacketPayloadReadBuffer::nextImpl()
in.readStrict(reinterpret_cast<char *>(&payload_length), 3);
if (payload_length > MAX_PACKET_LENGTH)
{
std::ostringstream tmp;
tmp << "Received packet with payload larger than max_packet_size: " << payload_length;
throw Exception(tmp.str(), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT,
"Received packet with payload larger than max_packet_size: {}", payload_length);
size_t packet_sequence_id = 0;
in.read(reinterpret_cast<char &>(packet_sequence_id));
if (packet_sequence_id != sequence_id)
{
std::ostringstream tmp;
tmp << "Received packet with wrong sequence-id: " << packet_sequence_id << ". Expected: " << static_cast<unsigned int>(sequence_id) << '.';
throw Exception(tmp.str(), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT,
"Received packet with wrong sequence-id: {}. Expected: {}.", packet_sequence_id, static_cast<unsigned int>(sequence_id));
sequence_id++;
if (payload_length == 0)

View File

@ -72,10 +72,7 @@ public:
}
else
{
std::stringstream error_message;
error_message << "Too many redirects while trying to access " << initial_uri.toString();
throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS);
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects while trying to access {}", initial_uri.toString());
}
}

View File

@ -248,6 +248,7 @@ void PocoHTTPClient::makeRequestInternal(
response->SetContentType(poco_response.getContentType());
std::stringstream headers_ss;
headers_ss.exceptions(std::ios::failbit);
for (const auto & [header_name, header_value] : poco_response)
{
response->AddHeader(header_name, header_value);

View File

@ -77,6 +77,7 @@ std::string dumpContents(const T& container,
{
std::stringstream sstr;
sstr.exceptions(std::ios::failbit);
dumpBuffer(std::begin(container), std::end(container), &sstr, col_sep, row_sep, cols_in_row);
return sstr.str();

View File

@ -23,6 +23,7 @@ static void test(size_t data_size)
{
std::cout << "block size " << read_buffer_block_size << std::endl;
std::stringstream io;
io.exceptions(std::ios::failbit);
DB::WriteBufferFromOStream out_impl(io);
DB::HashingWriteBuffer out(out_impl);
out.write(data, data_size);

View File

@ -21,6 +21,7 @@ try
using namespace DB;
std::stringstream s;
s.exceptions(std::ios::failbit);
{
std::string src = "1";

View File

@ -17,6 +17,7 @@ int main(int, char **)
DB::String d = "'xyz\\";
std::stringstream s;
s.exceptions(std::ios::failbit);
{
DB::WriteBufferFromOStream out(s);

View File

@ -46,6 +46,14 @@ inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::Socke
return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port);
}
void concatInsertPath(std::string & insert_path, const std::string & dir_name)
{
if (insert_path.empty())
insert_path = dir_name;
else
insert_path += "," + dir_name;
}
}
/// Implementation of Cluster::Address class
@ -358,9 +366,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
/// In case of internal_replication we will be appending names to dir_name_for_internal_replication
std::string dir_name_for_internal_replication;
std::string dir_name_for_internal_replication_with_local;
ShardInfoInsertPathForInternalReplication insert_paths;
for (const auto & replica_key : replica_keys)
{
@ -379,18 +385,20 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (internal_replication)
{
auto dir_name = replica_addresses.back().toFullString(settings.use_compact_format_in_distributed_parts_names);
if (!replica_addresses.back().is_local)
/// use_compact_format=0
{
if (dir_name_for_internal_replication.empty())
dir_name_for_internal_replication = dir_name;
else
dir_name_for_internal_replication += "," + dir_name;
auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
}
/// use_compact_format=1
{
auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name);
}
if (dir_name_for_internal_replication_with_local.empty())
dir_name_for_internal_replication_with_local = dir_name;
else
dir_name_for_internal_replication_with_local += "," + dir_name;
}
}
else
@ -425,8 +433,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({
std::move(dir_name_for_internal_replication),
std::move(dir_name_for_internal_replication_with_local),
std::move(insert_paths),
current_shard_num,
weight,
std::move(shard_local_addresses),
@ -485,8 +492,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({
{}, // dir_name_for_internal_replication
{}, // dir_name_for_internal_replication_with_local
{}, // insert_path_for_internal_replication
current_shard_num,
default_weight,
std::move(shard_local_addresses),
@ -609,22 +615,25 @@ Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector
initMisc();
}
const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_replica) const
const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const
{
if (!has_internal_replication)
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
if (prefer_localhost_replica)
const auto & paths = insert_path_for_internal_replication;
if (!use_compact_format)
{
if (dir_name_for_internal_replication.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication;
if (prefer_localhost_replica)
return paths.prefer_localhost_replica;
else
return paths.no_prefer_localhost_replica;
}
else
{
if (dir_name_for_internal_replication_with_local.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication_with_local;
if (prefer_localhost_replica)
return paths.prefer_localhost_replica_compact;
else
return paths.no_prefer_localhost_replica_compact;
}
}

View File

@ -133,6 +133,27 @@ public:
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
///
/// Contains different path for permutations of:
/// - prefer_localhost_replica
/// Notes with prefer_localhost_replica==0 will contains local nodes.
/// - use_compact_format_in_distributed_parts_names
/// See toFullString()
///
/// This is cached to avoid looping by replicas in insertPathForInternalReplication().
struct ShardInfoInsertPathForInternalReplication
{
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0
std::string prefer_localhost_replica;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
std::string no_prefer_localhost_replica;
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1
std::string prefer_localhost_replica_compact;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1
std::string no_prefer_localhost_replica_compact;
};
struct ShardInfo
{
public:
@ -141,13 +162,10 @@ public:
size_t getLocalNodeCount() const { return local_addresses.size(); }
bool hasInternalReplication() const { return has_internal_replication; }
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
const std::string & pathForInsert(bool prefer_localhost_replica) const;
const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const;
public:
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && prefer_localhost_replica
std::string dir_name_for_internal_replication;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && !prefer_localhost_replica
std::string dir_name_for_internal_replication_with_local;
ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication;
/// Number of the shard, the indexation begins with 1
UInt32 shard_num = 0;
UInt32 weight = 1;

View File

@ -107,6 +107,7 @@ String formattedAST(const ASTPtr & ast)
if (!ast)
return {};
std::stringstream ss;
ss.exceptions(std::ios::failbit);
formatAST(*ast, ss, false, true);
return ss.str();
}

View File

@ -4,9 +4,10 @@
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <Interpreters/ProcessList.h>
#include <Parsers/queryToString.h>
#include <Processors/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
@ -81,16 +82,17 @@ Context updateSettingsForCluster(const Cluster & cluster, const Context & contex
}
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info)
IStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const SelectQueryInfo & query_info)
{
assert(log);
Pipes res;
const Settings & settings = context.getSettingsRef();
const std::string query = queryToString(query_ast);
Context new_context = updateSettingsForCluster(*cluster, context, settings, log);
Context new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context.getProcessListElement())
@ -109,7 +111,7 @@ Pipe executeQuery(
else
throttler = user_level_throttler;
for (const auto & shard_info : cluster->getShardsInfo())
for (const auto & shard_info : query_info.cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, res);
return Pipe::unitePipes(std::move(res));

View File

@ -1,7 +1,6 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/Cluster.h>
namespace DB
{
@ -33,8 +32,7 @@ Context updateSettingsForCluster(const Cluster & cluster, const Context & contex
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE).
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info);
IStreamFactory & stream_factory, Poco::Logger * log, const ASTPtr & query_ast, const Context & context, const SelectQueryInfo & query_info);
}

View File

@ -1966,6 +1966,7 @@ void Context::checkCanBeDropped(const String & database, const String & table, c
String size_str = formatReadableSizeWithDecimalSuffix(size);
String max_size_to_drop_str = formatReadableSizeWithDecimalSuffix(max_size_to_drop);
std::stringstream ostr;
ostr.exceptions(std::ios::failbit);
ostr << "Table or Partition in " << backQuoteIfNeed(database) << "." << backQuoteIfNeed(table) << " was not dropped.\n"
<< "Reason:\n"

View File

@ -454,6 +454,7 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
std::string ExpressionAction::toString() const
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
switch (type)
{
case ADD_COLUMN:
@ -550,6 +551,7 @@ void ExpressionActions::checkLimits(Block & block) const
if (non_const_columns > settings.max_temporary_non_const_columns)
{
std::stringstream list_of_non_const_columns;
list_of_non_const_columns.exceptions(std::ios::failbit);
for (size_t i = 0, size = block.columns(); i < size; ++i)
if (block.safeGetByPosition(i).column && !isColumnConst(*block.safeGetByPosition(i).column))
list_of_non_const_columns << "\n" << block.safeGetByPosition(i).name;
@ -921,6 +923,7 @@ void ExpressionActions::finalize(const Names & output_columns)
std::string ExpressionActions::dumpActions() const
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << "input:\n";
for (const auto & input_column : input_columns)
@ -1342,6 +1345,7 @@ void ExpressionActionsChain::finalize()
std::string ExpressionActionsChain::dumpChain() const
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
for (size_t i = 0; i < steps.size(); ++i)
{

View File

@ -53,6 +53,7 @@ using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
*/
struct ExpressionAction
{
friend class KeyCondition;
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:

View File

@ -87,7 +87,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!partition_commands.empty())
{
table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, context.getSettingsRef());
auto partition_commands_pipe = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
auto partition_commands_pipe = table->alterPartition(metadata_snapshot, partition_commands, context);
if (!partition_commands_pipe.empty())
res.pipeline.init(std::move(partition_commands_pipe));
}

View File

@ -136,6 +136,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
{
/// Currently, there are no database engines, that support any arguments.
std::stringstream ostr;
ostr.exceptions(std::ios::failbit);
formatAST(*create.storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
@ -182,6 +183,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
create.if_not_exists = false;
std::ostringstream statement_stream;
statement_stream.exceptions(std::ios::failbit);
formatAST(create, statement_stream, false);
statement_stream << '\n';
String statement = statement_stream.str();

View File

@ -223,6 +223,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
MutableColumns res_columns = sample_block.cloneEmptyColumns();
std::stringstream ss;
ss.exceptions(std::ios::failbit);
if (ast.getKind() == ASTExplainQuery::ParsedAST)
{

View File

@ -495,8 +495,10 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl()
{
query_info.query = query_ptr;
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_ptr);
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_info);
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
@ -1433,7 +1435,6 @@ void InterpreterSelectQuery::executeFetchColumns(
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;

View File

@ -35,6 +35,7 @@ BlockInputStreamPtr InterpreterShowAccessQuery::executeImpl() const
/// Build the result column.
MutableColumnPtr column = ColumnString::create();
std::stringstream ss;
ss.exceptions(std::ios::failbit);
for (const auto & query : queries)
{
ss.str("");

View File

@ -239,6 +239,7 @@ BlockInputStreamPtr InterpreterShowCreateAccessEntityQuery::executeImpl()
/// Build the result column.
MutableColumnPtr column = ColumnString::create();
std::stringstream create_query_ss;
create_query_ss.exceptions(std::ios::failbit);
for (const auto & create_query : create_queries)
{
formatAST(*create_query, create_query_ss, false, true);
@ -248,6 +249,7 @@ BlockInputStreamPtr InterpreterShowCreateAccessEntityQuery::executeImpl()
/// Prepare description of the result column.
std::stringstream desc_ss;
desc_ss.exceptions(std::ios::failbit);
const auto & show_query = query_ptr->as<const ASTShowCreateAccessEntityQuery &>();
formatAST(show_query, desc_ss, false, true);
String desc = desc_ss.str();

View File

@ -79,6 +79,7 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
}
std::stringstream stream;
stream.exceptions(std::ios::failbit);
formatAST(*create_query, stream, false, false);
String res = stream.str();

View File

@ -119,6 +119,7 @@ BlockInputStreamPtr InterpreterShowGrantsQuery::executeImpl()
/// Build the result column.
MutableColumnPtr column = ColumnString::create();
std::stringstream grant_ss;
grant_ss.exceptions(std::ios::failbit);
for (const auto & grant_query : grant_queries)
{
grant_ss.str("");
@ -128,6 +129,7 @@ BlockInputStreamPtr InterpreterShowGrantsQuery::executeImpl()
/// Prepare description of the result column.
std::stringstream desc_ss;
desc_ss.exceptions(std::ios::failbit);
const auto & show_query = query_ptr->as<const ASTShowGrantsQuery &>();
formatAST(show_query, desc_ss, false, true);
String desc = desc_ss.str();

View File

@ -33,6 +33,7 @@ String InterpreterShowTablesQuery::getRewrittenQuery()
if (query.databases)
{
std::stringstream rewritten_query;
rewritten_query.exceptions(std::ios::failbit);
rewritten_query << "SELECT name FROM system.databases";
if (!query.like.empty())
@ -54,6 +55,7 @@ String InterpreterShowTablesQuery::getRewrittenQuery()
if (query.clusters)
{
std::stringstream rewritten_query;
rewritten_query.exceptions(std::ios::failbit);
rewritten_query << "SELECT DISTINCT cluster FROM system.clusters";
if (!query.like.empty())
@ -73,6 +75,7 @@ String InterpreterShowTablesQuery::getRewrittenQuery()
else if (query.cluster)
{
std::stringstream rewritten_query;
rewritten_query.exceptions(std::ios::failbit);
rewritten_query << "SELECT * FROM system.clusters";
rewritten_query << " WHERE cluster = " << std::quoted(query.cluster_str, '\'');
@ -87,6 +90,7 @@ String InterpreterShowTablesQuery::getRewrittenQuery()
DatabaseCatalog::instance().assertDatabaseExists(database);
std::stringstream rewritten_query;
rewritten_query.exceptions(std::ios::failbit);
rewritten_query << "SELECT name FROM system.";
if (query.dictionaries)

View File

@ -12,7 +12,9 @@
#include <Interpreters/Context.h>
#include <Interpreters/MySQL/InterpretersMySQLDDLQuery.h>
#include <Common/tests/gtest_global_context.h>
#include <Common/tests/gtest_global_register_functions.h>
#include <Common/tests/gtest_global_register.h>
#include <Poco/String.h>
using namespace DB;

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
static String wrongAliasMessage(const ASTPtr & ast, const ASTPtr & prev_ast, const String & alias)
{
std::stringstream message;
message.exceptions(std::ios::failbit);
message << "Different expressions with the same alias " << backQuoteIfNeed(alias) << ":" << std::endl;
formatAST(*ast, message, false, true);
message << std::endl << "and" << std::endl;

View File

@ -343,6 +343,7 @@ void Set::checkColumnsNumber(size_t num_key_columns) const
if (data_types.size() != num_key_columns)
{
std::stringstream message;
message.exceptions(std::ios::failbit);
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);

View File

@ -553,6 +553,7 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss.exceptions(std::ios::failbit);
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";

View File

@ -779,6 +779,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!internal && res.in)
{
std::stringstream log_str;
log_str.exceptions(std::ios::failbit);
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(&Poco::Logger::get("executeQuery"), log_str.str());

View File

@ -37,9 +37,6 @@ add_executable (in_join_subqueries_preprocessor in_join_subqueries_preprocessor.
target_link_libraries (in_join_subqueries_preprocessor PRIVATE clickhouse_aggregate_functions dbms clickhouse_parsers)
add_check(in_join_subqueries_preprocessor)
add_executable (users users.cpp)
target_link_libraries (users PRIVATE clickhouse_aggregate_functions dbms clickhouse_common_config)
if (OS_LINUX)
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop PRIVATE dbms)

View File

@ -1,282 +0,0 @@
#include <Common/Config/ConfigProcessor.h>
#include <Access/AccessControlManager.h>
#include <Access/AccessFlags.h>
#include <Access/User.h>
#include <filesystem>
#include <vector>
#include <string>
#include <tuple>
#include <iostream>
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <cstdlib>
#include <unistd.h>
namespace
{
namespace fs = std::filesystem;
struct TestEntry
{
std::string user_name;
std::string database_name;
bool is_allowed;
};
using TestEntries = std::vector<TestEntry>;
struct TestDescriptor
{
const char * config_content;
TestEntries entries;
};
using TestSet = std::vector<TestDescriptor>;
/// Tests description.
TestSet test_set =
{
{
"<?xml version=\"1.0\"?><yandex>"
" <profiles><default></default></profiles>"
" <users>"
" <default>"
" <password></password><profile>default</profile><quota>default</quota>"
" <allow_databases>"
" <database>default</database>"
" <database>test</database>"
" </allow_databases>"
" </default>"
" <web>"
" <password></password><profile>default</profile><quota>default</quota>"
" </web>"
" </users>"
" <quotas><default></default></quotas>"
"</yandex>",
{
{ "default", "default", true },
{ "default", "test", true },
{ "default", "stats", false },
{ "web", "default", true },
{ "web", "test", true },
{ "web", "stats", true },
{ "analytics", "default", false },
{ "analytics", "test", false },
{ "analytics", "stats", false }
}
},
{
"<?xml version=\"1.0\"?><yandex>"
" <profiles><default></default></profiles>"
" <users>"
" <default>"
" <password></password><profile>default</profile><quota>default</quota>"
" <allow_databases>"
" <database>default</database>"
" </allow_databases>"
" </default>"
" <web>"
" <password></password><profile>default</profile><quota>default</quota>"
" </web>"
" </users>"
" <quotas><default></default></quotas>"
"</yandex>",
{
{ "default", "default", true },
{ "default", "test", false },
{ "default", "stats", false },
{ "web", "default", true },
{ "web", "test", true },
{ "web", "stats", true },
{ "analytics", "default", false },
{ "analytics", "test", false },
{ "analytics", "stats", false }
}
},
{
"<?xml version=\"1.0\"?><yandex>"
" <profiles><default></default></profiles>"
" <users>"
" <default>"
" <password></password><profile>default</profile><quota>default</quota>"
" <allow_databases>"
" </allow_databases>"
" </default>"
" <web>"
" <password></password><profile>default</profile><quota>default</quota>"
" </web>"
" </users>"
" <quotas><default></default></quotas>"
"</yandex>",
{
{ "default", "default", true },
{ "default", "test", true },
{ "default", "stats", true },
{ "web", "default", true },
{ "web", "test", true },
{ "web", "stats", true },
{ "analytics", "default", false },
{ "analytics", "test", false },
{ "analytics", "stats", false }
}
},
{
"<?xml version=\"1.0\"?><yandex>"
" <profiles><default></default></profiles>"
" <users>"
" <default>"
" <password></password><profile>default</profile><quota>default</quota>"
" <allow_databases>"
" <database>default</database>"
" </allow_databases>"
" </default>"
" <web>"
" <password></password><profile>default</profile><quota>default</quota>"
" <allow_databases>"
" <database>test</database>"
" </allow_databases>"
" </web>"
" </users>"
" <quotas><default></default></quotas>"
"</yandex>",
{
{ "default", "default", true },
{ "default", "test", false },
{ "default", "stats", false },
{ "web", "default", false },
{ "web", "test", true },
{ "web", "stats", false },
{ "analytics", "default", false },
{ "analytics", "test", false },
{ "analytics", "stats", false }
}
}
};
std::string createTmpPath(const std::string & filename)
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = mkdtemp(pattern);
if (dir == nullptr)
throw std::runtime_error("Could not create directory");
return std::string(dir) + "/" + filename;
}
void createFile(const std::string & filename, const char * data)
{
std::ofstream ofs(filename.c_str());
if (!ofs.is_open())
throw std::runtime_error("Could not open file " + filename);
ofs << data;
}
void runOneTest(const TestDescriptor & test_descriptor)
{
const auto path_name = createTmpPath("users.xml");
createFile(path_name, test_descriptor.config_content);
DB::ConfigurationPtr config;
try
{
config = DB::ConfigProcessor(path_name).loadConfig().configuration;
}
catch (const Poco::Exception & ex)
{
std::ostringstream os;
os << "Error: " << ex.what() << ": " << ex.displayText();
throw std::runtime_error(os.str());
}
DB::AccessControlManager acl_manager;
try
{
acl_manager.setUsersConfig(*config);
}
catch (const Poco::Exception & ex)
{
std::ostringstream os;
os << "Error: " << ex.what() << ": " << ex.displayText();
throw std::runtime_error(os.str());
}
for (const auto & entry : test_descriptor.entries)
{
bool res;
try
{
res = acl_manager.read<DB::User>(entry.user_name)->access.isGranted(DB::AccessType::ALL, entry.database_name);
}
catch (const Poco::Exception &)
{
res = false;
}
if (res != entry.is_allowed)
{
auto to_string = [](bool access){ return (access ? "'granted'" : "'denied'"); };
std::ostringstream os;
os << "(user=" << entry.user_name << ", database=" << entry.database_name << "): ";
os << "Expected " << to_string(entry.is_allowed) << " but got " << to_string(res);
throw std::runtime_error(os.str());
}
}
fs::remove_all(fs::path(path_name).parent_path().string());
}
auto runTestSet()
{
size_t test_num = 1;
size_t failure_count = 0;
for (const auto & test_descriptor : test_set)
{
try
{
runOneTest(test_descriptor);
std::cout << "Test " << test_num << " passed\n";
}
catch (const std::runtime_error & ex)
{
std::cerr << "Test " << test_num << " failed with reason: " << ex.what() << "\n";
++failure_count;
}
catch (...)
{
std::cerr << "Test " << test_num << " failed with unknown reason\n";
++failure_count;
}
++test_num;
}
return std::make_tuple(test_set.size(), failure_count);
}
}
int main()
{
size_t test_count;
size_t failure_count;
std::tie(test_count, failure_count) = runTestSet();
std::cout << (test_count - failure_count) << " test(s) passed out of " << test_count << "\n";
return (failure_count == 0) ? 0 : EXIT_FAILURE;
}

View File

@ -178,7 +178,8 @@ void ASTAlterCommand::formatImpl(
}
else if (type == ASTAlterCommand::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (detach ? "DETACH" : "DROP") << " PARTITION "
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< (detach ? "DETACH" : "DROP") << (part ? " PART " : " PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}

View File

@ -63,6 +63,7 @@ namespace
{
std::vector<std::pair<ConditionType, String>> conditions_as_strings;
std::stringstream temp_sstream;
temp_sstream.exceptions(std::ios::failbit);
IAST::FormatSettings temp_settings(temp_sstream, settings);
for (const auto & [condition_type, condition] : conditions)
{

Some files were not shown because too many files have changed in this diff Show More