diff --git a/cmake/find/blob_storage.cmake b/cmake/find/blob_storage.cmake
index 74a907da7db..ec1b97f4695 100644
--- a/cmake/find/blob_storage.cmake
+++ b/cmake/find/blob_storage.cmake
@@ -1,14 +1,16 @@
option (ENABLE_AZURE_BLOB_STORAGE "Enable Azure blob storage" ${ENABLE_LIBRARIES})
-option(USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY
- "Set to FALSE to use system Azure SDK instead of bundled (OFF currently not implemented)"
- ON)
-
if (ENABLE_AZURE_BLOB_STORAGE)
set(USE_AZURE_BLOB_STORAGE 1)
set(AZURE_BLOB_STORAGE_LIBRARY azure_sdk)
+else()
+ return()
endif()
+option(USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY
+ "Set to FALSE to use system Azure SDK instead of bundled (OFF currently not implemented)"
+ ON)
+
if ((NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/azure/sdk"
OR NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/azure/cmake-modules")
AND USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
diff --git a/cmake/find/ccache.cmake b/cmake/find/ccache.cmake
index 95ec3d8a034..9acc0423f67 100644
--- a/cmake/find/ccache.cmake
+++ b/cmake/find/ccache.cmake
@@ -31,6 +31,7 @@ if (CCACHE_FOUND AND NOT COMPILER_MATCHES_CCACHE)
if (CCACHE_VERSION VERSION_GREATER "3.2.0" OR NOT CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
message(STATUS "Using ${CCACHE_FOUND} ${CCACHE_VERSION}")
+ set(LAUNCHER ${CCACHE_FOUND})
# debian (debhelpers) set SOURCE_DATE_EPOCH environment variable, that is
# filled from the debian/changelog or current time.
@@ -39,13 +40,8 @@ if (CCACHE_FOUND AND NOT COMPILER_MATCHES_CCACHE)
# of the manifest, which do not allow to use previous cache,
# - 4.2+ ccache ignores SOURCE_DATE_EPOCH for every file w/o __DATE__/__TIME__
#
- # So for:
- # - 4.2+ does not require any sloppiness
- # - 4.0+ will ignore SOURCE_DATE_EPOCH environment variable.
- if (CCACHE_VERSION VERSION_GREATER_EQUAL "4.2")
- message(STATUS "ccache is 4.2+ no quirks for SOURCE_DATE_EPOCH required")
- set(LAUNCHER ${CCACHE_FOUND})
- elseif (CCACHE_VERSION VERSION_GREATER_EQUAL "4.0")
+ # Exclude SOURCE_DATE_EPOCH env for ccache versions between [4.0, 4.2).
+ if (CCACHE_VERSION VERSION_GREATER_EQUAL "4.0" AND CCACHE_VERSION VERSION_LESS "4.2")
message(STATUS "Ignore SOURCE_DATE_EPOCH for ccache")
set(LAUNCHER env -u SOURCE_DATE_EPOCH ${CCACHE_FOUND})
endif()
diff --git a/docs/en/interfaces/http.md b/docs/en/interfaces/http.md
index 38e729fde0b..a49143bf599 100644
--- a/docs/en/interfaces/http.md
+++ b/docs/en/interfaces/http.md
@@ -186,7 +186,7 @@ $ echo "SELECT 1" | gzip -c | \
```
``` bash
-# Receiving compressed data from the server
+# Receiving compressed data archive from the server
$ curl -vsS "http://localhost:8123/?enable_http_compression=1" \
-H 'Accept-Encoding: gzip' --output result.gz -d 'SELECT number FROM system.numbers LIMIT 3'
$ zcat result.gz
@@ -195,6 +195,15 @@ $ zcat result.gz
2
```
+```bash
+# Receiving compressed data from the server and using the gunzip to receive decompressed data
+$ curl -sS "http://localhost:8123/?enable_http_compression=1" \
+ -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 3' | gunzip -
+0
+1
+2
+```
+
## Default Database {#default-database}
You can use the ‘database’ URL parameter or the ‘X-ClickHouse-Database’ header to specify the default database.
diff --git a/docs/en/introduction/adopters.md b/docs/en/introduction/adopters.md
index 87c5a6f7aec..c2660653907 100644
--- a/docs/en/introduction/adopters.md
+++ b/docs/en/introduction/adopters.md
@@ -60,8 +60,10 @@ toc_title: Adopters
| Exness | Trading | Metrics, Logging | — | — | [Talk in Russian, May 2019](https://youtu.be/_rpU-TvSfZ8?t=3215) |
| EventBunker.io | Serverless Data Processing | — | — | — | [Tweet, April 2021](https://twitter.com/Halil_D_/status/1379839133472985091) |
| FastNetMon | DDoS Protection | Main Product | | — | [Official website](https://fastnetmon.com/docs-fnm-advanced/fastnetmon-advanced-traffic-persistency/) |
+| Firebolt | Analytics | Main product | - | - | [YouTube Tech Talk](https://www.youtube.com/watch?v=9rW9uEJ15tU) |
| Flipkart | e-Commerce | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=239) |
| FunCorp | Games | | — | 14 bn records/day as of Jan 2021 | [Article](https://www.altinity.com/blog/migrating-from-redshift-to-clickhouse) |
+| Futurra Group | Analytics | — | — | — | [Article in Russian, December 2021](https://dou.ua/forums/topic/35587/) |
| Geniee | Ad network | Main product | — | — | [Blog post in Japanese, July 2017](https://tech.geniee.co.jp/entry/2017/07/20/160100) |
| Genotek | Bioinformatics | Main product | — | — | [Video, August 2020](https://youtu.be/v3KyZbz9lEE) |
| Gigapipe | Managed ClickHouse | Main product | — | — | [Official website](https://gigapipe.com/) |
@@ -70,6 +72,7 @@ toc_title: Adopters
| Grouparoo | Data Warehouse Integrations | Main product | — | — | [Official Website, November 2021](https://www.grouparoo.com/integrations) |
| HUYA | Video Streaming | Analytics | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/7.%20ClickHouse万亿数据分析实践%20李本旺(sundy-li)%20虎牙.pdf) |
| Hydrolix | Cloud data platform | Main product | — | — | [Documentation](https://docs.hydrolix.io/guide/query) |
+| Hystax | Cloud Operations | Observability Analytics | - | - | [Blog](https://hystax.com/clickhouse-for-real-time-cost-saving-analytics-how-to-stop-hammering-screws-and-use-an-electric-screwdriver/) |
| ICA | FinTech | Risk Management | — | — | [Blog Post in English, Sep 2020](https://altinity.com/blog/clickhouse-vs-redshift-performance-for-fintech-risk-management?utm_campaign=ClickHouse%20vs%20RedShift&utm_content=143520807&utm_medium=social&utm_source=twitter&hss_channel=tw-3894792263) |
| Idealista | Real Estate | Analytics | — | — | [Blog Post in English, April 2019](https://clickhouse.com/blog/en/clickhouse-meetup-in-madrid-on-april-2-2019) |
| Infobaleen | AI markting tool | Analytics | — | — | [Official site](https://infobaleen.com) |
@@ -81,14 +84,18 @@ toc_title: Adopters
| Ippon Technologies | Technology Consulting | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=205) |
| Ivi | Online Cinema | Analytics, Monitoring | — | — | [Article in Russian, Jan 2018](https://habr.com/en/company/ivi/blog/347408/) |
| Jinshuju 金数据 | BI Analytics | Main product | — | — | [Slides in Chinese, October 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup24/3.%20金数据数据架构调整方案Public.pdf) |
-| Jitsu | Cloud Software | Data Pipeline | — | — | [Documentation](https://jitsu.com/docs/destinations-configuration/clickhouse-destination), [Hacker News](https://news.ycombinator.com/item?id=29106082) |
+| Jitsu | Cloud Software | Data Pipeline | — | — | [Documentation](https://jitsu.com/docs/destinations-configuration/clickhouse-destination), [Hacker News post](https://news.ycombinator.com/item?id=29106082) |
+| JuiceFS | Storage | Shopping Cart | - | - | [Blog](https://juicefs.com/blog/en/posts/shopee-clickhouse-with-juicefs/) |
| kakaocorp | Internet company | — | — | — | [if(kakao)2020](https://tv.kakao.com/channel/3693125/cliplink/414129353), [if(kakao)2021](https://if.kakao.com/session/24) |
| Kodiak Data | Clouds | Main product | — | — | [Slides in Engish, April 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup13/kodiak_data.pdf) |
| Kontur | Software Development | Metrics | — | — | [Talk in Russian, November 2018](https://www.youtube.com/watch?v=U4u4Bd0FtrY) |
| Kuaishou | Video | — | — | — | [ClickHouse Meetup, October 2018](https://clickhouse.com/blog/en/2018/clickhouse-community-meetup-in-beijing-on-october-28-2018/) |
| KGK Global | Vehicle monitoring | — | — | — | [Press release, June 2021](https://zoom.cnews.ru/news/item/530921) |
+| LANCOM Systems | Network Solutions | Traffic analysis | - | - | [ClickHouse Operator for Kubernetes](https://www.lancom-systems.com/), [Hacker News post] (https://news.ycombinator.com/item?id=29413660) |
| Lawrence Berkeley National Laboratory | Research | Traffic analysis | 5 servers | 55 TiB | [Slides in English, April 2019](https://www.smitasin.com/presentations/2019-04-17_DOE-NSM.pdf) |
+| Lever | Talent Management | Recruiting | - | - | [Hacker News post](https://news.ycombinator.com/item?id=29558544) |
| LifeStreet | Ad network | Main product | 75 servers (3 replicas) | 5.27 PiB | [Blog post in Russian, February 2017](https://habr.com/en/post/322620/) |
+| Lookforsale | E-Commerce | — | — | — | [Job Posting, December 2021](https://telegram.me/javascript_jobs/587318) |
| Mail.ru Cloud Solutions | Cloud services | Main product | — | — | [Article in Russian](https://mcs.mail.ru/help/db-create/clickhouse#) |
| MAXILECT | Ad Tech, Blockchain, ML, AI | — | — | — | [Job advertisement, 2021](https://www.linkedin.com/feed/update/urn:li:activity:6780842017229430784/) |
| Marilyn | Advertising | Statistics | — | — | [Talk in Russian, June 2017](https://www.youtube.com/watch?v=iXlIgx2khwc) |
@@ -106,6 +113,7 @@ toc_title: Adopters
| Ok.ru | Social Network | — | 72 servers | 810 TB compressed, 50bn rows/day, 1.5 TB/day | [SmartData conference, October 2021](https://assets.ctfassets.net/oxjq45e8ilak/4JPHkbJenLgZhBGGyyonFP/57472ec6987003ec4078d0941740703b/____________________ClickHouse_______________________.pdf) |
| Omnicomm | Transportation Monitoring | — | — | — | [Facebook post, October 2021](https://www.facebook.com/OmnicommTeam/posts/2824479777774500) |
| OneAPM | Monitoring and Data Analysis | Main product | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/8.%20clickhouse在OneAPM的应用%20杜龙.pdf) |
+| Opensee | Financial Analytics | Main product | - | - | [Blog](https://opensee.io/news/from-moscow-to-wall-street-the-remarkable-journey-of-clickhouse/) |
| Open Targets | Genome Research | Genome Search | — | — | [Tweet, October 2021](https://twitter.com/OpenTargets/status/1452570865342758913?s=20), [Blog](https://blog.opentargets.org/graphql/) |
| OZON | E-commerce | — | — | — | [Official website](https://job.ozon.ru/vacancy/razrabotchik-clickhouse-ekspluatatsiya-40991870/) |
| Panelbear | Analytics | Monitoring and Analytics | — | — | [Tech Stack, November 2020](https://panelbear.com/blog/tech-stack/) |
@@ -118,6 +126,7 @@ toc_title: Adopters
| PRANA | Industrial predictive analytics | Main product | — | — | [News (russian), Feb 2021](https://habr.com/en/news/t/541392/) |
| QINGCLOUD | Cloud services | Main product | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/4.%20Cloud%20%2B%20TSDB%20for%20ClickHouse%20张健%20QingCloud.pdf) |
| Qrator | DDoS protection | Main product | — | — | [Blog Post, March 2019](https://blog.qrator.net/en/clickhouse-ddos-mitigation_37/) |
+| R-Vision | Information Security | — | — | — | [Article in Russian, December 2021](https://www.anti-malware.ru/reviews/R-Vision-SENSE-15) |
| Raiffeisenbank | Banking | Analytics | — | — | [Lecture in Russian, December 2020](https://cs.hse.ru/announcements/421965599.html) |
| Rambler | Internet services | Analytics | — | — | [Talk in Russian, April 2018](https://medium.com/@ramblertop/разработка-api-clickhouse-для-рамблер-топ-100-f4c7e56f3141) |
| Replica | Urban Planning | Analytics | — | — | [Job advertisement](https://boards.greenhouse.io/replica/jobs/5547732002?gh_jid=5547732002) |
@@ -153,6 +162,7 @@ toc_title: Adopters
| Tinybird | Real-time Data Products | Data processing | — | — | [Official website](https://www.tinybird.co/) |
| Traffic Stars | AD network | — | 300 servers in Europe/US | 1.8 PiB, 700 000 insert rps (as of 2021) | [Slides in Russian, May 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup15/lightning/ninja.pdf) |
| Uber | Taxi | Logging | — | — | [Slides, February 2020](https://presentations.clickhouse.com/meetup40/uber.pdf) |
+| UseTech | Software Development | — | — | — | [Job Posting, December 2021](https://vk.com/wall136266658_2418) |
| UTMSTAT | Analytics | Main product | — | — | [Blog post, June 2020](https://vc.ru/tribuna/133956-striming-dannyh-iz-servisa-skvoznoy-analitiki-v-clickhouse) |
| Vercel | Traffic and Performance Analytics | — | — | — | Direct reference, October 2021 |
| VKontakte | Social Network | Statistics, Logging | — | — | [Slides in Russian, August 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup17/3_vk.pdf) |
@@ -168,7 +178,8 @@ toc_title: Adopters
| Yandex Cloud | Public Cloud | Main product | — | — | [Talk in Russian, December 2019](https://www.youtube.com/watch?v=pgnak9e_E0o) |
| Yandex DataLens | Business Intelligence | Main product | — | — | [Slides in Russian, December 2019](https://presentations.clickhouse.com/meetup38/datalens.pdf) |
| Yandex Market | e-Commerce | Metrics, Logging | — | — | [Talk in Russian, January 2019](https://youtu.be/_l1qP0DyBcA?t=478) |
-| Yandex Metrica | Web analytics | Main product | 630 servers in one cluster, 360 servers in another cluster, 1862 servers in one department | 133 PiB / 8.31 PiB / 120 trillion records | [Slides, February 2020](https://presentations.clickhouse.com/meetup40/introduction/#13) |
+| Yandex Metrica | Web analytics | Macin product | 630 servers in one cluster, 360 servers in another cluster, 1862 servers in one department | 133 PiB / 8.31 PiB / 120 trillion records | [Slides, February 2020](https://presentations.clickhouse.com/meetup40/introduction/#13) |
+| | Analytics | Main product | - | - | [Integration](https://www.yellowfinbi.com/campaign/yellowfin-9-whats-new#el-30219e0e) |
| Yotascale | Cloud | Data pipeline | — | 2 bn records/day | [LinkedIn (Accomplishments)](https://www.linkedin.com/in/adilsaleem/) |
| Your Analytics | Product Analytics | Main Product | — | - | [Tweet, November 2021](https://twitter.com/mikenikles/status/1459737241165565953) |
| Zagrava Trading | — | — | — | — | [Job offer, May 2021](https://twitter.com/datastackjobs/status/1394707267082063874) |
@@ -178,9 +189,5 @@ toc_title: Adopters
| Цифровой Рабочий | Industrial IoT, Analytics | — | — | — | [Blog post in Russian, March 2021](https://habr.com/en/company/croc/blog/548018/) |
| ООО «МПЗ Богородский» | Agriculture | — | — | — | [Article in Russian, November 2020](https://cloud.yandex.ru/cases/okraina) |
| ДомКлик | Real Estate | — | — | — | [Article in Russian, October 2021](https://habr.com/ru/company/domclick/blog/585936/) |
-| Futurra Group | Analytics | — | — | — | [Article in Russian, December 2021](https://dou.ua/forums/topic/35587/) |
-| UseTech | Software Development | — | — | — | [Job Posting, December 2021](https://vk.com/wall136266658_2418) |
-| Lookforsale | E-Commerce | — | — | — | [Job Posting, December 2021](https://telegram.me/javascript_jobs/587318) |
-| R-Vision | Information Security | — | — | — | [Article in Russian, December 2021](https://www.anti-malware.ru/reviews/R-Vision-SENSE-15) |
[Original article](https://clickhouse.com/docs/en/introduction/adopters/)
diff --git a/docs/en/sql-reference/statements/alter/projection.md b/docs/en/sql-reference/statements/alter/projection.md
index 96cd8f5d607..c7ebc83c496 100644
--- a/docs/en/sql-reference/statements/alter/projection.md
+++ b/docs/en/sql-reference/statements/alter/projection.md
@@ -9,11 +9,12 @@ The following operations with [projections](../../../engines/table-engines/merge
- `ALTER TABLE [db].name ADD PROJECTION name ( SELECT [GROUP BY] [ORDER BY] )` - Adds projection description to tables metadata.
-- `ALTER TABLE [db].name DROP PROJECTION name` - Removes projection description from tables metadata and deletes projection files from disk.
+- `ALTER TABLE [db].name DROP PROJECTION name` - Removes projection description from tables metadata and deletes projection files from disk. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations).
- `ALTER TABLE [db.]table MATERIALIZE PROJECTION name IN PARTITION partition_name` - The query rebuilds the projection `name` in the partition `partition_name`. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations).
-- `ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name` - Deletes projection files from disk without removing description.
+- `ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name` - Deletes projection files from disk without removing description. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations).
+
The commands `ADD`, `DROP` and `CLEAR` are lightweight in a sense that they only change metadata or remove files.
diff --git a/docs/ru/sql-reference/functions/nlp-functions.md b/docs/ru/sql-reference/functions/nlp-functions.md
index 250403ab127..992a7d6ccf3 100644
--- a/docs/ru/sql-reference/functions/nlp-functions.md
+++ b/docs/ru/sql-reference/functions/nlp-functions.md
@@ -3,10 +3,10 @@ toc_priority: 67
toc_title: NLP
---
-# [экспериментально] Функции для работы с ествественным языком {#nlp-functions}
+# [экспериментально] Функции для работы с естественным языком {#nlp-functions}
!!! warning "Предупреждение"
- Сейчас использование функций для работы с ествественным языком является экспериментальной возможностью. Чтобы использовать данные функции, включите настройку `allow_experimental_nlp_functions = 1`.
+ Сейчас использование функций для работы с естественным языком является экспериментальной возможностью. Чтобы использовать данные функции, включите настройку `allow_experimental_nlp_functions = 1`.
## stem {#stem}
@@ -84,7 +84,7 @@ SELECT lemmatize('en', 'wolves');
Находит синонимы к заданному слову. Представлены два типа расширений словарей: `plain` и `wordnet`.
-Для работы расширения типа `plain` необходимо указать путь до простого текстового файла, где каждая строка соотвествует одному набору синонимов. Слова в данной строке должны быть разделены с помощью пробела или знака табуляции.
+Для работы расширения типа `plain` необходимо указать путь до простого текстового файла, где каждая строка соответствует одному набору синонимов. Слова в данной строке должны быть разделены с помощью пробела или знака табуляции.
Для работы расширения типа `plain` необходимо указать путь до WordNet тезауруса. Тезаурус должен содержать WordNet sense index.
diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp
index 1c276a83768..35ffb97b8e2 100644
--- a/programs/benchmark/Benchmark.cpp
+++ b/programs/benchmark/Benchmark.cpp
@@ -342,6 +342,9 @@ private:
}
}
+ /// Now we don't block the Ctrl+C signal and second signal will terminate the program without waiting.
+ interrupt_listener.unblock();
+
pool.wait();
total_watch.stop();
@@ -586,7 +589,6 @@ public:
#ifndef __clang__
#pragma GCC optimize("-fno-var-tracking-assignments")
#endif
-#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseBenchmark(int argc, char ** argv)
{
diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp
index 53e295b7fbb..aa4747636c9 100644
--- a/programs/local/LocalServer.cpp
+++ b/programs/local/LocalServer.cpp
@@ -313,11 +313,11 @@ void LocalServer::cleanup()
std::string LocalServer::getInitialCreateTableQuery()
{
- if (!config().has("table-structure"))
+ if (!config().has("table-structure") && !config().has("table-file"))
return {};
auto table_name = backQuoteIfNeed(config().getString("table-name", "table"));
- auto table_structure = config().getString("table-structure");
+ auto table_structure = config().getString("table-structure", "auto");
auto data_format = backQuoteIfNeed(config().getString("table-data-format", "TSV"));
String table_file;
@@ -332,7 +332,12 @@ std::string LocalServer::getInitialCreateTableQuery()
table_file = quoteString(config().getString("table-file"));
}
- return fmt::format("CREATE TABLE {} ({}) ENGINE = File({}, {});",
+ if (table_structure == "auto")
+ table_structure = "";
+ else
+ table_structure = "(" + table_structure + ")";
+
+ return fmt::format("CREATE TABLE {} {} ENGINE = File({}, {});",
table_name, table_structure, data_format, table_file);
}
@@ -422,7 +427,7 @@ try
#else
is_interactive = stdin_is_a_tty
&& (config().hasOption("interactive")
- || (!config().has("query") && !config().has("table-structure") && queries_files.empty()));
+ || (!config().has("query") && !config().has("table-structure") && queries_files.empty() && !config().has("table-file")));
#endif
if (!is_interactive)
{
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7124961821e..0fe66314114 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -526,6 +526,14 @@ if (USE_BZIP2)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BZIP2_INCLUDE_DIR})
endif()
+if(USE_SIMDJSON)
+ dbms_target_link_libraries(PRIVATE simdjson)
+endif()
+
+if(USE_RAPIDJSON)
+ dbms_target_include_directories(SYSTEM PRIVATE ${RAPIDJSON_INCLUDE_DIR})
+endif()
+
dbms_target_link_libraries(PUBLIC consistent-hashing)
include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake")
diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp
index f2e5e018e1b..16f85fcae61 100644
--- a/src/Common/ErrorCodes.cpp
+++ b/src/Common/ErrorCodes.cpp
@@ -604,6 +604,7 @@
M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \
M(634, MONGODB_ERROR) \
M(635, CANNOT_POLL) \
+ M(636, CANNOT_EXTRACT_TABLE_STRUCTURE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
diff --git a/src/Common/SymbolIndex.cpp b/src/Common/SymbolIndex.cpp
index 568f633975b..32c1a15337c 100644
--- a/src/Common/SymbolIndex.cpp
+++ b/src/Common/SymbolIndex.cpp
@@ -86,7 +86,7 @@ namespace
/// https://stackoverflow.com/questions/32088140/multiple-string-tables-in-elf-object
-void updateResources(std::string_view name, const void * address, SymbolIndex::Resources & resources)
+void updateResources(ElfW(Addr) base_address, std::string_view object_name, std::string_view name, const void * address, SymbolIndex::Resources & resources)
{
const char * char_address = static_cast(address);
@@ -97,18 +97,23 @@ void updateResources(std::string_view name, const void * address, SymbolIndex::R
name = name.substr((name[0] == '_') + strlen("binary_"));
name = name.substr(0, name.size() - strlen("_start"));
- resources.emplace(name, std::string_view{char_address, 0}); // NOLINT
+ resources.emplace(name, SymbolIndex::ResourcesBlob{
+ base_address,
+ object_name,
+ std::string_view{char_address, 0}, // NOLINT
+ });
}
else if (name.ends_with("_end"))
{
name = name.substr((name[0] == '_') + strlen("binary_"));
name = name.substr(0, name.size() - strlen("_end"));
- if (auto it = resources.find(name); it != resources.end() && it->second.empty())
+ auto it = resources.find(name);
+ if (it != resources.end() && it->second.base_address == base_address && it->second.data.empty())
{
- const char * start = it->second.data();
+ const char * start = it->second.data.data();
assert(char_address >= start);
- it->second = std::string_view{start, static_cast(char_address - start)};
+ it->second.data = std::string_view{start, static_cast(char_address - start)};
}
}
}
@@ -153,10 +158,12 @@ void collectSymbolsFromProgramHeaders(
size_t sym_cnt = 0;
for (const auto * it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
+ ElfW(Addr) base_address = correct_address(info->dlpi_addr, it->d_un.d_ptr);
+
// TODO: this branch leads to invalid address of the hash table. Need further investigation.
// if (it->d_tag == DT_HASH)
// {
- // const ElfW(Word) * hash = reinterpret_cast(correct_address(info->dlpi_addr, it->d_un.d_ptr));
+ // const ElfW(Word) * hash = reinterpret_cast(base_address);
// sym_cnt = hash[1];
// break;
// }
@@ -167,7 +174,7 @@ void collectSymbolsFromProgramHeaders(
const uint32_t * buckets = nullptr;
const uint32_t * hashval = nullptr;
- const ElfW(Word) * hash = reinterpret_cast(correct_address(info->dlpi_addr, it->d_un.d_ptr));
+ const ElfW(Word) * hash = reinterpret_cast(base_address);
buckets = hash + 4 + (hash[2] * sizeof(size_t) / 4);
@@ -196,9 +203,11 @@ void collectSymbolsFromProgramHeaders(
const char * strtab = nullptr;
for (const auto * it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
+ ElfW(Addr) base_address = correct_address(info->dlpi_addr, it->d_un.d_ptr);
+
if (it->d_tag == DT_STRTAB)
{
- strtab = reinterpret_cast(correct_address(info->dlpi_addr, it->d_un.d_ptr));
+ strtab = reinterpret_cast(base_address);
break;
}
}
@@ -208,10 +217,12 @@ void collectSymbolsFromProgramHeaders(
for (const auto * it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
+ ElfW(Addr) base_address = correct_address(info->dlpi_addr, it->d_un.d_ptr);
+
if (it->d_tag == DT_SYMTAB)
{
/* Get the pointer to the first entry of the symbol table */
- const ElfW(Sym) * elf_sym = reinterpret_cast(correct_address(info->dlpi_addr, it->d_un.d_ptr));
+ const ElfW(Sym) * elf_sym = reinterpret_cast(base_address);
/* Iterate over the symbol table */
for (ElfW(Word) sym_index = 0; sym_index < ElfW(Word)(sym_cnt); ++sym_index)
@@ -236,7 +247,7 @@ void collectSymbolsFromProgramHeaders(
symbols.push_back(symbol);
/// But resources can be represented by a pair of empty symbols (indicating their boundaries).
- updateResources(symbol.name, symbol.address_begin, resources);
+ updateResources(base_address, info->dlpi_name, symbol.name, symbol.address_begin, resources);
}
break;
@@ -299,7 +310,7 @@ void collectSymbolsFromELFSymbolTable(
if (symbol_table_entry->st_size)
symbols.push_back(symbol);
- updateResources(symbol.name, symbol.address_begin, resources);
+ updateResources(info->dlpi_addr, info->dlpi_name, symbol.name, symbol.address_begin, resources);
}
}
diff --git a/src/Common/SymbolIndex.h b/src/Common/SymbolIndex.h
index 7c542980099..1331cf81cf7 100644
--- a/src/Common/SymbolIndex.h
+++ b/src/Common/SymbolIndex.h
@@ -51,7 +51,7 @@ public:
std::string_view getResource(String name) const
{
if (auto it = data.resources.find(name); it != data.resources.end())
- return it->second;
+ return it->second.data;
return {};
}
@@ -59,7 +59,17 @@ public:
String getBuildID() const { return data.build_id; }
String getBuildIDHex() const;
- using Resources = std::unordered_map;
+ struct ResourcesBlob
+ {
+ /// Symbol can be presented in multiple shared objects,
+ /// base_address will be used to compare only symbols from the same SO.
+ ElfW(Addr) base_address;
+ /// Just a human name of the SO.
+ std::string_view object_name;
+ /// Data blob.
+ std::string_view data;
+ };
+ using Resources = std::unordered_map;
struct Data
{
diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp
index f05a10b8815..c8753c8edaf 100644
--- a/src/Common/ZooKeeper/ZooKeeper.cpp
+++ b/src/Common/ZooKeeper/ZooKeeper.cpp
@@ -26,6 +26,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
+ extern const int BAD_ARGUMENTS;
}
}
@@ -1133,4 +1134,54 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
return request;
}
+std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log)
+{
+ if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
+ zookeeper_path.resize(zookeeper_path.size() - 1);
+ /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
+ if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
+ {
+ /// Do not allow this for new tables, print warning for tables created in old versions
+ if (check_starts_with_slash)
+ throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must starts with '/', got '{}'", zookeeper_path);
+ if (log)
+ LOG_WARNING(log, "ZooKeeper path ('{}') does not start with '/'. It will not be supported in future releases");
+ zookeeper_path = "/" + zookeeper_path;
+ }
+
+ return zookeeper_path;
+}
+
+String extractZooKeeperName(const String & path)
+{
+ static constexpr auto default_zookeeper_name = "default";
+ if (path.empty())
+ throw DB::Exception("ZooKeeper path should not be empty", DB::ErrorCodes::BAD_ARGUMENTS);
+ if (path[0] == '/')
+ return default_zookeeper_name;
+ auto pos = path.find(":/");
+ if (pos != String::npos && pos < path.find('/'))
+ {
+ auto zookeeper_name = path.substr(0, pos);
+ if (zookeeper_name.empty())
+ throw DB::Exception("Zookeeper path should start with '/' or ':/'", DB::ErrorCodes::BAD_ARGUMENTS);
+ return zookeeper_name;
+ }
+ return default_zookeeper_name;
+}
+
+String extractZooKeeperPath(const String & path, bool check_starts_with_slash, Poco::Logger * log)
+{
+ if (path.empty())
+ throw DB::Exception("ZooKeeper path should not be empty", DB::ErrorCodes::BAD_ARGUMENTS);
+ if (path[0] == '/')
+ return normalizeZooKeeperPath(path, check_starts_with_slash, log);
+ auto pos = path.find(":/");
+ if (pos != String::npos && pos < path.find('/'))
+ {
+ return normalizeZooKeeperPath(path.substr(pos + 1, String::npos), check_starts_with_slash, log);
+ }
+ return normalizeZooKeeperPath(path, check_starts_with_slash, log);
+}
+
}
diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h
index 8e015b1f331..371f93f6df3 100644
--- a/src/Common/ZooKeeper/ZooKeeper.h
+++ b/src/Common/ZooKeeper/ZooKeeper.h
@@ -379,4 +379,11 @@ private:
};
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
+
+String normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log = nullptr);
+
+String extractZooKeeperName(const String & path);
+
+String extractZooKeeperPath(const String & path, bool check_starts_with_slash, Poco::Logger * log = nullptr);
+
}
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index 952009047d4..6e53fa4342c 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -596,6 +596,8 @@ class IColumn;
M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
+ M(UInt64, input_format_msgpack_number_of_columns, 0, "The number of columns in inserted MsgPack data. Used for automatic schema inference from data.", 0) \
+ M(UInt64, input_format_max_rows_to_read_for_schema_inference, 100, "The maximum rows of data to read for automatic schema inference", 0) \
\
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \
@@ -661,6 +663,7 @@ class IColumn;
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0)\
+
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h
index e74df5c327a..85644b6f6ca 100644
--- a/src/DataTypes/IDataType.h
+++ b/src/DataTypes/IDataType.h
@@ -377,6 +377,8 @@ struct WhichDataType
constexpr bool isNullable() const { return idx == TypeIndex::Nullable; }
constexpr bool isFunction() const { return idx == TypeIndex::Function; }
constexpr bool isAggregateFunction() const { return idx == TypeIndex::AggregateFunction; }
+
+ constexpr bool isLowCarnality() const { return idx == TypeIndex::LowCardinality; }
};
/// IDataType helpers (alternative for IDataType virtual methods with single point of truth)
diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp
index e9944b592ed..165bad950f5 100644
--- a/src/Databases/DatabaseOnDisk.cpp
+++ b/src/Databases/DatabaseOnDisk.cpp
@@ -76,10 +76,16 @@ std::pair createTableFromAST(
/// - the database has not been loaded yet;
/// - the code is simpler, since the query is already brought to a suitable form.
if (!ast_create_query.columns_list || !ast_create_query.columns_list->columns)
- throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
-
- columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
- constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
+ {
+ if (!StorageFactory::instance().checkIfStorageSupportsSchemaInterface(ast_create_query.storage->engine->name))
+ throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
+ /// Leave columns empty.
+ }
+ else
+ {
+ columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
+ constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
+ }
}
return
diff --git a/src/Formats/CapnProtoUtils.cpp b/src/Formats/CapnProtoUtils.cpp
index ecfa5df8351..bed46a97c1b 100644
--- a/src/Formats/CapnProtoUtils.cpp
+++ b/src/Formats/CapnProtoUtils.cpp
@@ -7,6 +7,8 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
@@ -26,6 +28,7 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA;
+ extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
@@ -427,6 +430,113 @@ void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Blo
}
}
+template
+static DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants)
+{
+ std::vector> values;
+ for (auto enumerant : enumerants)
+ values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal()));
+ return std::make_shared>(std::move(values));
+}
+
+static DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema)
+{
+ auto enumerants = enum_schema.getEnumerants();
+ if (enumerants.size() < 128)
+ return getEnumDataTypeFromEnumerants(enumerants);
+ if (enumerants.size() < 32768)
+ return getEnumDataTypeFromEnumerants(enumerants);
+
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "ClickHouse supports only 8 and 16-but Enums");
+}
+
+static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
+{
+ switch (capnp_type.which())
+ {
+ case capnp::schema::Type::INT8:
+ return std::make_shared();
+ case capnp::schema::Type::INT16:
+ return std::make_shared();
+ case capnp::schema::Type::INT32:
+ return std::make_shared();
+ case capnp::schema::Type::INT64:
+ return std::make_shared();
+ case capnp::schema::Type::BOOL: [[fallthrough]];
+ case capnp::schema::Type::UINT8:
+ return std::make_shared();
+ case capnp::schema::Type::UINT16:
+ return std::make_shared();
+ case capnp::schema::Type::UINT32:
+ return std::make_shared();
+ case capnp::schema::Type::UINT64:
+ return std::make_shared();
+ case capnp::schema::Type::FLOAT32:
+ return std::make_shared();
+ case capnp::schema::Type::FLOAT64:
+ return std::make_shared();
+ case capnp::schema::Type::DATA: [[fallthrough]];
+ case capnp::schema::Type::TEXT:
+ return std::make_shared();
+ case capnp::schema::Type::ENUM:
+ return getEnumDataTypeFromEnumSchema(capnp_type.asEnum());
+ case capnp::schema::Type::LIST:
+ {
+ auto list_schema = capnp_type.asList();
+ auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType());
+ return std::make_shared(nested_type);
+ }
+ case capnp::schema::Type::STRUCT:
+ {
+ auto struct_schema = capnp_type.asStruct();
+
+ /// Check if it can be Nullable.
+ if (checkIfStructIsNamedUnion(struct_schema))
+ {
+ auto fields = struct_schema.getUnionFields();
+ if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unions are not supported");
+ auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
+ if (value_type.isStruct() || value_type.isList())
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Tuples and Lists cannot be inside Nullable");
+
+ auto nested_type = getDataTypeFromCapnProtoType(value_type);
+ return std::make_shared(nested_type);
+ }
+
+ if (checkIfStructContainsUnnamedUnion(struct_schema))
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unnamed union is not supported");
+
+ /// Treat Struct as Tuple.
+ DataTypes nested_types;
+ Names nested_names;
+ for (auto field : struct_schema.getNonUnionFields())
+ {
+ nested_names.push_back(field.getProto().getName());
+ nested_types.push_back(getDataTypeFromCapnProtoType(field.getType()));
+ }
+ return std::make_shared(std::move(nested_types), std::move(nested_names));
+ }
+ default:
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
+ }
+}
+
+NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema)
+{
+ if (checkIfStructContainsUnnamedUnion(schema))
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unnamed union is not supported");
+
+ NamesAndTypesList names_and_types;
+ for (auto field : schema.getNonUnionFields())
+ {
+ auto name = field.getProto().getName();
+ auto type = getDataTypeFromCapnProtoType(field.getType());
+ names_and_types.emplace_back(name, type);
+ }
+ return names_and_types;
+}
+
}
#endif
diff --git a/src/Formats/CapnProtoUtils.h b/src/Formats/CapnProtoUtils.h
index 93ca0a5e616..51c152de17f 100644
--- a/src/Formats/CapnProtoUtils.h
+++ b/src/Formats/CapnProtoUtils.h
@@ -38,6 +38,7 @@ capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Re
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
+NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema);
}
#endif
diff --git a/src/Formats/EscapingRuleUtils.cpp b/src/Formats/EscapingRuleUtils.cpp
index d956d9e6bfb..0a7747fc864 100644
--- a/src/Formats/EscapingRuleUtils.cpp
+++ b/src/Formats/EscapingRuleUtils.cpp
@@ -1,7 +1,16 @@
#include
+#include
+#include
#include
+#include
+#include
+#include
#include
#include
+#include
+#include
+#include
+#include
namespace DB
{
@@ -9,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
+ extern const int LOGICAL_ERROR;
}
FormatSettings::EscapingRule stringToEscapingRule(const String & escaping_rule)
@@ -193,30 +203,145 @@ void writeStringByEscapingRule(const String & value, WriteBuffer & out, FormatSe
}
}
-String readStringByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
+template
+String readByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
{
String result;
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Quoted:
- readQuotedString(result, buf);
+ if constexpr (read_string)
+ readQuotedString(result, buf);
+ else
+ readQuotedFieldIntoString(result, buf);
break;
case FormatSettings::EscapingRule::JSON:
- readJSONString(result, buf);
+ if constexpr (read_string)
+ readJSONString(result, buf);
+ else
+ readJSONFieldIntoString(result, buf);
break;
case FormatSettings::EscapingRule::Raw:
readString(result, buf);
break;
case FormatSettings::EscapingRule::CSV:
- readCSVString(result, buf, format_settings.csv);
+ if constexpr (read_string)
+ readCSVString(result, buf, format_settings.csv);
+ else
+ readCSVField(result, buf, format_settings.csv);
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedString(result, buf);
break;
default:
- throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot read string with {} escaping rule", escapingRuleToString(escaping_rule));
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot read value with {} escaping rule", escapingRuleToString(escaping_rule));
}
return result;
}
+String readFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
+{
+ return readByEscapingRule(buf, escaping_rule, format_settings);
+}
+
+String readStringByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
+{
+ return readByEscapingRule(buf, escaping_rule, format_settings);
+}
+
+static bool evaluateConstantExpressionFromString(const StringRef & field, DataTypePtr & type, ContextPtr context)
+{
+ if (!context)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "You must provide context to evaluate constant expression");
+
+ ParserExpression parser;
+ Expected expected;
+ Tokens tokens(field.data, field.data + field.size);
+ IParser::Pos token_iterator(tokens, context->getSettingsRef().max_parser_depth);
+ ASTPtr ast;
+
+ /// FIXME: Our parser cannot parse maps in the form of '{key : value}' that is used in text formats.
+ bool parsed = parser.parse(token_iterator, ast, expected);
+ if (!parsed)
+ return false;
+
+ try
+ {
+ std::pair result = evaluateConstantExpression(ast, context);
+ type = generalizeDataType(result.second);
+ return true;
+ }
+ catch (...)
+ {
+ return false;
+ }
+}
+
+DataTypePtr determineDataTypeByEscapingRule(const String & field, const FormatSettings & format_settings, FormatSettings::EscapingRule escaping_rule, ContextPtr context)
+{
+ switch (escaping_rule)
+ {
+ case FormatSettings::EscapingRule::Quoted:
+ {
+ DataTypePtr type;
+ bool parsed = evaluateConstantExpressionFromString(field, type, context);
+ return parsed ? type : nullptr;
+ }
+ case FormatSettings::EscapingRule::JSON:
+ return getDataTypeFromJSONField(field);
+ case FormatSettings::EscapingRule::CSV:
+ {
+ if (field.empty() || field == format_settings.csv.null_representation)
+ return nullptr;
+
+ if (field == format_settings.bool_false_representation || field == format_settings.bool_true_representation)
+ return std::make_shared();
+
+ DataTypePtr type;
+ bool parsed;
+ if (field[0] == '\'' || field[0] == '"')
+ {
+ /// Try to evaluate expression inside quotes.
+ parsed = evaluateConstantExpressionFromString(StringRef(field.data() + 1, field.size() - 2), type, context);
+ /// If it's a number in quotes we determine it as a string.
+ if (parsed && type && isNumber(removeNullable(type)))
+ return makeNullable(std::make_shared());
+ }
+ else
+ parsed = evaluateConstantExpressionFromString(field, type, context);
+
+ /// If we couldn't parse an expression, determine it as a string.
+ return parsed ? type : makeNullable(std::make_shared());
+ }
+ case FormatSettings::EscapingRule::Raw: [[fallthrough]];
+ case FormatSettings::EscapingRule::Escaped:
+ /// TODO: Try to use some heuristics here to determine the type of data.
+ return field.empty() ? nullptr : makeNullable(std::make_shared());
+ default:
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot determine the type for value with {} escaping rule", escapingRuleToString(escaping_rule));
+ }
+}
+
+DataTypes determineDataTypesByEscapingRule(const std::vector & fields, const FormatSettings & format_settings, FormatSettings::EscapingRule escaping_rule, ContextPtr context)
+{
+ DataTypes data_types;
+ data_types.reserve(fields.size());
+ for (const auto & field : fields)
+ data_types.push_back(determineDataTypeByEscapingRule(field, format_settings, escaping_rule, context));
+ return data_types;
+}
+
+DataTypePtr getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule escaping_rule)
+{
+ switch (escaping_rule)
+ {
+ case FormatSettings::EscapingRule::CSV: [[fallthrough]];
+ case FormatSettings::EscapingRule::Escaped: [[fallthrough]];
+ case FormatSettings::EscapingRule::Raw:
+ return makeNullable(std::make_shared());
+ default:
+ return nullptr;
+ }
+}
+
}
diff --git a/src/Formats/EscapingRuleUtils.h b/src/Formats/EscapingRuleUtils.h
index 02f027db74d..10147b29ad6 100644
--- a/src/Formats/EscapingRuleUtils.h
+++ b/src/Formats/EscapingRuleUtils.h
@@ -4,6 +4,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -33,5 +34,24 @@ void serializeFieldByEscapingRule(
void writeStringByEscapingRule(const String & value, WriteBuffer & out, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
String readStringByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
+String readFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
+
+/// Try to determine the type of the field written by a specific escaping rule.
+/// If cannot, return nullptr.
+/// - For Quoted escaping rule we can interpret a single field as a constant
+/// expression and get it's type by evaluation this expression.
+/// - For JSON escaping rule we can use JSON parser to parse a single field
+/// and then convert JSON type of this field to ClickHouse type.
+/// - For CSV escaping rule we can do the next:
+/// - If the field is an unquoted string, then we could try to evaluate it
+/// as a constant expression, and if it fails, treat it as a String.
+/// - If the field is a string in quotes, then we can try to evaluate
+/// expression inside quotes as a constant expression, and if it fails or
+/// the result is a number (we don't parse numbers in quotes) we treat it as a String.
+/// - For TSV and TSVRaw we treat each field as a String (TODO: try to use some tweaks and heuristics here)
+DataTypePtr determineDataTypeByEscapingRule(const String & field, const FormatSettings & format_settings, FormatSettings::EscapingRule escaping_rule, ContextPtr context = nullptr);
+DataTypes determineDataTypesByEscapingRule(const std::vector & fields, const FormatSettings & format_settings, FormatSettings::EscapingRule escaping_rule, ContextPtr context = nullptr);
+
+DataTypePtr getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule escaping_rule);
}
diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp
index 467fdfddafe..2068de0d01c 100644
--- a/src/Formats/FormatFactory.cpp
+++ b/src/Formats/FormatFactory.cpp
@@ -14,9 +14,6 @@
#include
#include
-#include
-#include
-
namespace DB
{
@@ -120,6 +117,8 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.seekable_read = settings.input_format_allow_seeks;
+ format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
+ format_settings.max_rows_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)
@@ -201,7 +200,6 @@ InputFormatPtr FormatFactory::getInput(
return format;
}
-
InputFormatPtr FormatFactory::getInputFormat(
const String & name,
ReadBuffer & buf,
@@ -342,6 +340,32 @@ String FormatFactory::getContentType(
return format->getContentType();
}
+SchemaReaderPtr FormatFactory::getSchemaReader(
+ const String & name,
+ ReadBuffer & buf,
+ ContextPtr context,
+ const std::optional & _format_settings) const
+{
+ const auto & schema_reader_creator = dict.at(name).schema_reader_creator;
+ if (!schema_reader_creator)
+ throw Exception("FormatFactory: Format " + name + " doesn't support schema inference.", ErrorCodes::LOGICAL_ERROR);
+
+ auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
+ return schema_reader_creator(buf, format_settings, context);
+}
+
+ExternalSchemaReaderPtr FormatFactory::getExternalSchemaReader(
+ const String & name,
+ ContextPtr context,
+ const std::optional & _format_settings) const
+{
+ const auto & external_schema_reader_creator = dict.at(name).external_schema_reader_creator;
+ if (!external_schema_reader_creator)
+ throw Exception("FormatFactory: Format " + name + " doesn't support schema inference.", ErrorCodes::LOGICAL_ERROR);
+
+ auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
+ return external_schema_reader_creator(format_settings);
+}
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
@@ -375,6 +399,21 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}
+void FormatFactory::registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator)
+{
+ auto & target = dict[name].schema_reader_creator;
+ if (target)
+ throw Exception("FormatFactory: Schema reader " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
+ target = std::move(schema_reader_creator);
+}
+
+void FormatFactory::registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator)
+{
+ auto & target = dict[name].external_schema_reader_creator;
+ if (target)
+ throw Exception("FormatFactory: Schema reader " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
+ target = std::move(external_schema_reader_creator);
+}
void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & name)
{
@@ -412,6 +451,23 @@ bool FormatFactory::isOutputFormat(const String & name) const
return it != dict.end() && it->second.output_creator;
}
+bool FormatFactory::checkIfFormatHasSchemaReader(const String & name)
+{
+ const auto & target = getCreators(name);
+ return bool(target.schema_reader_creator);
+}
+
+bool FormatFactory::checkIfFormatHasExternalSchemaReader(const String & name)
+{
+ const auto & target = getCreators(name);
+ return bool(target.external_schema_reader_creator);
+}
+
+bool FormatFactory::checkIfFormatHasAnySchemaReader(const String & name)
+{
+ return checkIfFormatHasSchemaReader(name) || checkIfFormatHasExternalSchemaReader(name);
+}
+
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;
diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h
index ea285c47996..a62b32da0cc 100644
--- a/src/Formats/FormatFactory.h
+++ b/src/Formats/FormatFactory.h
@@ -4,7 +4,9 @@
#include
#include
#include
+#include
#include
+#include
#include
@@ -31,6 +33,11 @@ class IOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
+class ISchemaReader;
+class IExternalSchemaReader;
+using SchemaReaderPtr = std::shared_ptr;
+using ExternalSchemaReaderPtr = std::shared_ptr;
+
using InputFormatPtr = std::shared_ptr;
using OutputFormatPtr = std::shared_ptr;
@@ -85,11 +92,16 @@ private:
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function;
+ using SchemaReaderCreator = std::function;
+ using ExternalSchemaReaderCreator = std::function;
+
struct Creators
{
InputCreator input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
+ SchemaReaderCreator schema_reader_creator;
+ ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
@@ -138,6 +150,17 @@ public:
ContextPtr context,
const std::optional & format_settings = std::nullopt) const;
+ SchemaReaderPtr getSchemaReader(
+ const String & name,
+ ReadBuffer & buf,
+ ContextPtr context,
+ const std::optional & format_settings = std::nullopt) const;
+
+ ExternalSchemaReaderPtr getExternalSchemaReader(
+ const String & name,
+ ContextPtr context,
+ const std::optional & format_settings = std::nullopt) const;
+
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
@@ -146,11 +169,19 @@ public:
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
+ /// Register schema readers for format its name.
+ void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator);
+ void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator);
+
void markOutputFormatSupportsParallelFormatting(const String & name);
void markFormatAsColumnOriented(const String & name);
bool checkIfFormatIsColumnOriented(const String & name);
+ bool checkIfFormatHasSchemaReader(const String & name);
+ bool checkIfFormatHasExternalSchemaReader(const String & name);
+ bool checkIfFormatHasAnySchemaReader(const String & name);
+
const FormatsDictionary & getAllFormats() const
{
return dict;
@@ -163,6 +194,7 @@ private:
FormatsDictionary dict;
const Creators & getCreators(const String & name) const;
+
};
}
diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h
index d9af07fdc9c..6298e959c3e 100644
--- a/src/Formats/FormatSettings.h
+++ b/src/Formats/FormatSettings.h
@@ -33,6 +33,7 @@ struct FormatSettings
bool defaults_for_omitted_fields = true;
bool seekable_read = true;
+ UInt64 max_rows_to_read_for_schema_inference = 100;
enum class DateTimeInputFormat
{
@@ -217,6 +218,11 @@ struct FormatSettings
{
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
} capn_proto;
+
+ struct
+ {
+ UInt64 number_of_columns = 0;
+ } msgpack;
};
}
diff --git a/src/Formats/JSONEachRowUtils.cpp b/src/Formats/JSONEachRowUtils.cpp
index b55e9f59cc7..c63b8453634 100644
--- a/src/Formats/JSONEachRowUtils.cpp
+++ b/src/Formats/JSONEachRowUtils.cpp
@@ -1,7 +1,17 @@
#include
#include
+#include
#include
#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
#include
@@ -26,7 +36,7 @@ static std::pair fileSegmentationEngineJSONEachRowImpl(ReadBuffer
while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast(pos - in.position()) < min_chunk_size || number_of_rows < min_rows))
{
const auto current_object_size = memory.size() + static_cast(pos - in.position());
- if (current_object_size > 10 * min_chunk_size)
+ if (min_chunk_size != 0 && current_object_size > 10 * min_chunk_size)
throw ParsingException("Size of JSON object is extremely large. Expected not greater than " +
std::to_string(min_chunk_size) + " bytes, but current is " + std::to_string(current_object_size) +
" bytes per row. Increase the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, most likely JSON is malformed", ErrorCodes::INCORRECT_DATA);
@@ -92,6 +102,122 @@ static std::pair fileSegmentationEngineJSONEachRowImpl(ReadBuffer
return {loadAtPosition(in, memory, pos), number_of_rows};
}
+template
+static String readJSONEachRowLineIntoStringImpl(ReadBuffer & in)
+{
+ Memory memory;
+ fileSegmentationEngineJSONEachRowImpl(in, memory, 0, 1);
+ return String(memory.data(), memory.size());
+}
+
+template
+DataTypePtr getDataTypeFromJSONFieldImpl(const Element & field)
+{
+ if (field.isNull())
+ return nullptr;
+
+ if (field.isBool())
+ return makeNullable(std::make_shared());
+
+ if (field.isInt64() || field.isUInt64() || field.isDouble())
+ return makeNullable(std::make_shared());
+
+ if (field.isString())
+ return makeNullable(std::make_shared());
+
+ if (field.isArray())
+ {
+ auto array = field.getArray();
+
+ /// Return nullptr in case of empty array because we cannot determine nested type.
+ if (array.size() == 0)
+ return nullptr;
+
+ DataTypes nested_data_types;
+ /// If this array contains fields with different types we will treat it as Tuple.
+ bool is_tuple = false;
+ for (const auto element : array)
+ {
+ auto type = getDataTypeFromJSONFieldImpl(element);
+ if (!type)
+ return nullptr;
+
+ if (!nested_data_types.empty() && type->getName() != nested_data_types.back()->getName())
+ is_tuple = true;
+
+ nested_data_types.push_back(std::move(type));
+ }
+
+ if (is_tuple)
+ return std::make_shared(nested_data_types);
+
+ return std::make_shared(nested_data_types.back());
+ }
+
+ if (field.isObject())
+ {
+ auto object = field.getObject();
+ DataTypePtr value_type;
+ for (const auto key_value_pair : object)
+ {
+ auto type = getDataTypeFromJSONFieldImpl(key_value_pair.second);
+ if (!type)
+ return nullptr;
+
+ if (value_type && value_type->getName() != type->getName())
+ return nullptr;
+
+ value_type = type;
+ }
+ return std::make_shared(std::make_shared(), value_type);
+ }
+
+ throw Exception{ErrorCodes::INCORRECT_DATA, "Unexpected JSON type"};
+}
+
+auto getJSONParserAndElement()
+{
+#if USE_SIMDJSON
+ return std::pair();
+#elif USE_RAPIDJSON
+ return std::pair();
+#else
+ return std::pair();
+#endif
+}
+
+DataTypePtr getDataTypeFromJSONField(const String & field)
+{
+ auto [parser, element] = getJSONParserAndElement();
+ bool parsed = parser.parse(field, element);
+ if (!parsed)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse JSON object");
+
+ return getDataTypeFromJSONFieldImpl(element);
+}
+
+template
+static DataTypes determineColumnDataTypesFromJSONEachRowDataImpl(ReadBuffer & in, bool /*json_strings*/, Extractor & extractor)
+{
+ String line = readJSONEachRowLineIntoStringImpl(in);
+ auto [parser, element] = getJSONParserAndElement();
+ bool parsed = parser.parse(line, element);
+ if (!parsed)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse JSON object");
+
+ auto fields = extractor.extract(element);
+
+ DataTypes data_types;
+ data_types.reserve(fields.size());
+ for (const auto & field : fields)
+ data_types.push_back(getDataTypeFromJSONFieldImpl(field));
+
+ /// TODO: For JSONStringsEachRow/JSONCompactStringsEach all types will be strings.
+ /// Should we try to parse data inside strings somehow in this case?
+
+ return data_types;
+}
+
std::pair fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
return fileSegmentationEngineJSONEachRowImpl<'{', '}'>(in, memory, min_chunk_size, 1);
@@ -102,6 +228,60 @@ std::pair fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in
return fileSegmentationEngineJSONEachRowImpl<'[', ']'>(in, memory, min_chunk_size, min_rows);
}
+struct JSONEachRowFieldsExtractor
+{
+ template
+ std::vector extract(const Element & element)
+ {
+ /// {..., "" : , ...}
+ auto object = element.getObject();
+ std::vector fields;
+ fields.reserve(object.size());
+ column_names.reserve(object.size());
+ for (const auto & key_value_pair : object)
+ {
+ column_names.emplace_back(key_value_pair.first);
+ fields.push_back(key_value_pair.second);
+ }
+
+ return fields;
+ }
+
+ std::vector column_names;
+};
+
+std::unordered_map readRowAndGetNamesAndDataTypesForJSONEachRow(ReadBuffer & in, bool json_strings)
+{
+ JSONEachRowFieldsExtractor extractor;
+ auto data_types = determineColumnDataTypesFromJSONEachRowDataImpl(in, json_strings, extractor);
+ std::unordered_map result;
+ for (size_t i = 0; i != extractor.column_names.size(); ++i)
+ result[extractor.column_names[i]] = data_types[i];
+ return result;
+}
+
+struct JSONCompactEachRowFieldsExtractor
+{
+ template
+ std::vector extract(const Element & element)
+ {
+ /// [..., , ...]
+ auto array = element.getArray();
+ std::vector fields;
+ fields.reserve(array.size());
+ for (size_t i = 0; i != array.size(); ++i)
+ fields.push_back(array[i]);
+ return fields;
+ }
+};
+
+DataTypes readRowAndGetDataTypesForJSONCompactEachRow(ReadBuffer & in, bool json_strings)
+{
+ JSONCompactEachRowFieldsExtractor extractor;
+ return determineColumnDataTypesFromJSONEachRowDataImpl(in, json_strings, extractor);
+}
+
+
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf)
{
/// For JSONEachRow we can safely skip whitespace characters
diff --git a/src/Formats/JSONEachRowUtils.h b/src/Formats/JSONEachRowUtils.h
index 4a049aa1abd..6f71baa8b40 100644
--- a/src/Formats/JSONEachRowUtils.h
+++ b/src/Formats/JSONEachRowUtils.h
@@ -11,6 +11,21 @@ namespace DB
std::pair fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
std::pair fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows);
+
+/// Parse JSON from string and convert it's type to ClickHouse type. Make the result type always Nullable.
+/// JSON array with different nested types is treated as Tuple.
+/// If cannot convert (for example when field contains null), return nullptr.
+DataTypePtr getDataTypeFromJSONField(const String & field);
+
+/// Read row in JSONEachRow format and try to determine type for each field.
+/// Return map {column_name : type}.
+/// If cannot determine the type of some field, return nullptr for it.
+std::unordered_map readRowAndGetNamesAndDataTypesForJSONEachRow(ReadBuffer & in, bool json_strings);
+
+/// Read row in JSONCompactEachRow format and try to determine type for each field.
+/// If cannot determine the type of some field, return nullptr for it.
+DataTypes readRowAndGetDataTypesForJSONCompactEachRow(ReadBuffer & in, bool json_strings);
+
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf);
bool readFieldImpl(ReadBuffer & in, IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, const String & column_name, const FormatSettings & format_settings, bool yield_strings);
diff --git a/src/Formats/ParsedTemplateFormatString.cpp b/src/Formats/ParsedTemplateFormatString.cpp
index 4966420f05b..8d1b987d01a 100644
--- a/src/Formats/ParsedTemplateFormatString.cpp
+++ b/src/Formats/ParsedTemplateFormatString.cpp
@@ -14,14 +14,14 @@ namespace ErrorCodes
extern const int INVALID_TEMPLATE_FORMAT;
}
-ParsedTemplateFormatString::ParsedTemplateFormatString(const FormatSchemaInfo & schema, const ColumnIdxGetter & idx_by_name)
+ParsedTemplateFormatString::ParsedTemplateFormatString(const FormatSchemaInfo & schema, const ColumnIdxGetter & idx_by_name, bool allow_indexes)
{
ReadBufferFromFile schema_file(schema.absoluteSchemaPath(), 4096);
String format_string;
readStringUntilEOF(format_string, schema_file);
try
{
- parse(format_string, idx_by_name);
+ parse(format_string, idx_by_name, allow_indexes);
}
catch (DB::Exception & e)
{
@@ -33,7 +33,7 @@ ParsedTemplateFormatString::ParsedTemplateFormatString(const FormatSchemaInfo &
}
-void ParsedTemplateFormatString::parse(const String & format_string, const ColumnIdxGetter & idx_by_name)
+void ParsedTemplateFormatString::parse(const String & format_string, const ColumnIdxGetter & idx_by_name, bool allow_indexes)
{
enum ParserState
{
@@ -100,6 +100,8 @@ void ParsedTemplateFormatString::parse(const String & format_string, const Colum
column_idx = strtoull(column_names.back().c_str(), &col_idx_end, 10);
if (col_idx_end != column_names.back().c_str() + column_names.back().size() || errno)
column_idx = idx_by_name(column_names.back());
+ else if (!allow_indexes)
+ throw Exception(ErrorCodes::INVALID_TEMPLATE_FORMAT, "Indexes instead of names are not allowed");
}
format_idx_to_column_idx.emplace_back(column_idx);
break;
diff --git a/src/Formats/ParsedTemplateFormatString.h b/src/Formats/ParsedTemplateFormatString.h
index ba0ebdf5aa8..c5617d0f0ef 100644
--- a/src/Formats/ParsedTemplateFormatString.h
+++ b/src/Formats/ParsedTemplateFormatString.h
@@ -31,9 +31,9 @@ struct ParsedTemplateFormatString
typedef std::function(const String &)> ColumnIdxGetter;
ParsedTemplateFormatString() = default;
- ParsedTemplateFormatString(const FormatSchemaInfo & schema, const ColumnIdxGetter & idx_by_name);
+ ParsedTemplateFormatString(const FormatSchemaInfo & schema, const ColumnIdxGetter & idx_by_name, bool allow_indexes = true);
- void parse(const String & format_string, const ColumnIdxGetter & idx_by_name);
+ void parse(const String & format_string, const ColumnIdxGetter & idx_by_name, bool allow_indexes = true);
static const char * readMayBeQuotedColumnNameInto(const char * pos, size_t size, String & s);
size_t columnsCount() const;
diff --git a/src/Formats/ProtobufSerializer.cpp b/src/Formats/ProtobufSerializer.cpp
index 5232b76b7fe..b59db12a16c 100644
--- a/src/Formats/ProtobufSerializer.cpp
+++ b/src/Formats/ProtobufSerializer.cpp
@@ -24,6 +24,7 @@
# include
# include
# include
+# include
# include
# include
# include
@@ -56,6 +57,7 @@ namespace ErrorCodes
extern const int PROTOBUF_FIELD_NOT_REPEATED;
extern const int PROTOBUF_BAD_CAST;
extern const int LOGICAL_ERROR;
+ extern const int BAD_ARGUMENTS;
}
namespace
@@ -3017,10 +3019,8 @@ namespace
{
std::vector column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
-
for (size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);
-
auto field_serializer = std::make_unique(
std::move(column_names_used), field_descriptor, std::move(nested_message_serializer), get_root_desc_function);
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
@@ -3230,8 +3230,105 @@ namespace
std::function get_root_desc_function;
std::shared_ptr root_serializer_ptr;
};
-}
+ template
+ DataTypePtr getEnumDataType(const google::protobuf::EnumDescriptor * enum_descriptor)
+ {
+ std::vector> values;
+ for (int i = 0; i != enum_descriptor->value_count(); ++i)
+ {
+ const auto * enum_value_descriptor = enum_descriptor->value(i);
+ values.emplace_back(enum_value_descriptor->name(), enum_value_descriptor->number());
+ }
+ return std::make_shared>(std::move(values));
+ }
+
+ NameAndTypePair getNameAndDataTypeFromField(const google::protobuf::FieldDescriptor * field_descriptor, bool allow_repeat = true)
+ {
+ if (allow_repeat && field_descriptor->is_map())
+ {
+ auto name_and_type = getNameAndDataTypeFromField(field_descriptor, false);
+ const auto * tuple_type = assert_cast(name_and_type.type.get());
+ return {name_and_type.name, std::make_shared(tuple_type->getElements())};
+ }
+
+ if (allow_repeat && field_descriptor->is_repeated())
+ {
+ auto name_and_type = getNameAndDataTypeFromField(field_descriptor, false);
+ return {name_and_type.name, std::make_shared(name_and_type.type)};
+ }
+
+ switch (field_descriptor->type())
+ {
+ case FieldTypeId::TYPE_SFIXED32: [[fallthrough]];
+ case FieldTypeId::TYPE_SINT32: [[fallthrough]];
+ case FieldTypeId::TYPE_INT32:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_SFIXED64: [[fallthrough]];
+ case FieldTypeId::TYPE_SINT64: [[fallthrough]];
+ case FieldTypeId::TYPE_INT64:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_BOOL:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_FLOAT:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_DOUBLE:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_UINT32: [[fallthrough]];
+ case FieldTypeId::TYPE_FIXED32:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_UINT64: [[fallthrough]];
+ case FieldTypeId::TYPE_FIXED64:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_BYTES: [[fallthrough]];
+ case FieldTypeId::TYPE_STRING:
+ return {field_descriptor->name(), std::make_shared()};
+ case FieldTypeId::TYPE_ENUM:
+ {
+ const auto * enum_descriptor = field_descriptor->enum_type();
+ if (enum_descriptor->value_count() == 0)
+ throw Exception("Empty enum field", ErrorCodes::BAD_ARGUMENTS);
+ int max_abs = std::abs(enum_descriptor->value(0)->number());
+ for (int i = 1; i != enum_descriptor->value_count(); ++i)
+ {
+ if (std::abs(enum_descriptor->value(i)->number()) > max_abs)
+ max_abs = std::abs(enum_descriptor->value(i)->number());
+ }
+ if (max_abs < 128)
+ return {field_descriptor->name(), getEnumDataType(enum_descriptor)};
+ else if (max_abs < 32768)
+ return {field_descriptor->name(), getEnumDataType(enum_descriptor)};
+ else
+ throw Exception("ClickHouse supports only 8-bit and 16-bit enums", ErrorCodes::BAD_ARGUMENTS);
+ }
+ case FieldTypeId::TYPE_GROUP: [[fallthrough]];
+ case FieldTypeId::TYPE_MESSAGE:
+ {
+ const auto * message_descriptor = field_descriptor->message_type();
+ if (message_descriptor->field_count() == 1)
+ {
+ const auto * nested_field_descriptor = message_descriptor->field(0);
+ auto nested_name_and_type = getNameAndDataTypeFromField(nested_field_descriptor);
+ return {field_descriptor->name() + "_" + nested_name_and_type.name, nested_name_and_type.type};
+ }
+ else
+ {
+ DataTypes nested_types;
+ Strings nested_names;
+ for (int i = 0; i != message_descriptor->field_count(); ++i)
+ {
+ auto nested_name_and_type = getNameAndDataTypeFromField(message_descriptor->field(i));
+ nested_types.push_back(nested_name_and_type.type);
+ nested_names.push_back(nested_name_and_type.name);
+ }
+ return {field_descriptor->name(), std::make_shared(std::move(nested_types), std::move(nested_names))};
+ }
+ }
+ }
+
+ __builtin_unreachable();
+ }
+}
std::unique_ptr ProtobufSerializer::create(
const Strings & column_names,
@@ -3254,5 +3351,14 @@ std::unique_ptr ProtobufSerializer::create(
std::vector missing_column_indices;
return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter);
}
+
+NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor)
+{
+ NamesAndTypesList schema;
+ for (int i = 0; i != message_descriptor->field_count(); ++i)
+ schema.push_back(getNameAndDataTypeFromField(message_descriptor->field(i)));
+ return schema;
+}
+
}
#endif
diff --git a/src/Formats/ProtobufSerializer.h b/src/Formats/ProtobufSerializer.h
index 3eaca6a18d6..d9bed913517 100644
--- a/src/Formats/ProtobufSerializer.h
+++ b/src/Formats/ProtobufSerializer.h
@@ -4,6 +4,7 @@
#if USE_PROTOBUF
# include
+#include
namespace google::protobuf { class Descriptor; }
@@ -48,5 +49,7 @@ public:
ProtobufWriter & writer);
};
+NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor);
+
}
#endif
diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp
new file mode 100644
index 00000000000..37067eae64f
--- /dev/null
+++ b/src/Formats/ReadSchemaUtils.cpp
@@ -0,0 +1,112 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
+ extern const int BAD_ARGUMENTS;
+}
+
+ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context)
+{
+ NamesAndTypesList names_and_types;
+ if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name))
+ {
+ auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context, format_settings);
+ try
+ {
+ names_and_types = external_schema_reader->readSchema();
+ }
+ catch (const DB::Exception & e)
+ {
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
+ }
+ }
+ else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
+ {
+ auto read_buf = read_buffer_creator();
+ if (read_buf->eof())
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file, file is empty", format_name);
+
+ auto schema_reader = FormatFactory::instance().getSchemaReader(format_name, *read_buf, context, format_settings);
+ try
+ {
+ names_and_types = schema_reader->readSchema();
+ }
+ catch (const DB::Exception & e)
+ {
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
+ }
+ }
+ else
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} file format doesn't support schema inference", format_name);
+
+ return ColumnsDescription(names_and_types);
+}
+
+DataTypePtr generalizeDataType(DataTypePtr type)
+{
+ WhichDataType which(type);
+
+ if (which.isNothing())
+ return nullptr;
+
+ if (which.isNullable())
+ {
+ const auto * nullable_type = assert_cast(type.get());
+ return generalizeDataType(nullable_type->getNestedType());
+ }
+
+ if (isNumber(type))
+ return makeNullable(std::make_shared());
+
+ if (which.isArray())
+ {
+ const auto * array_type = assert_cast(type.get());
+ auto nested_type = generalizeDataType(array_type->getNestedType());
+ return nested_type ? std::make_shared(nested_type) : nullptr;
+ }
+
+ if (which.isTuple())
+ {
+ const auto * tuple_type = assert_cast(type.get());
+ DataTypes nested_types;
+ for (const auto & element : tuple_type->getElements())
+ {
+ auto nested_type = generalizeDataType(element);
+ if (!nested_type)
+ return nullptr;
+ nested_types.push_back(nested_type);
+ }
+ return std::make_shared(std::move(nested_types));
+ }
+
+ if (which.isMap())
+ {
+ const auto * map_type = assert_cast(type.get());
+ auto key_type = removeNullable(generalizeDataType(map_type->getKeyType()));
+ auto value_type = generalizeDataType(map_type->getValueType());
+ return key_type && value_type ? std::make_shared(key_type, value_type) : nullptr;
+ }
+
+ if (which.isLowCarnality())
+ {
+ const auto * lc_type = assert_cast(type.get());
+ auto nested_type = generalizeDataType(lc_type->getDictionaryType());
+ return nested_type ? std::make_shared(nested_type) : nullptr;
+ }
+
+ return makeNullable(type);
+}
+
+}
diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h
new file mode 100644
index 00000000000..fb43acc3cd6
--- /dev/null
+++ b/src/Formats/ReadSchemaUtils.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include
+#include
+
+namespace DB
+{
+
+/// Try to determine the schema of the data in specifying format.
+/// For formats that have an external schema reader, it will
+/// use it and won't create a read buffer.
+/// For formats that have a schema reader from the data,
+/// read buffer will be created by the provided creator and
+/// the schema will be extracted from the data.
+/// If format doesn't have any schema reader or a schema reader
+/// couldn't determine the schema, an exception will be thrown.
+using ReadBufferCreator = std::function()>;
+ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context);
+
+/// Convert type to the most general type:
+/// - IntN, UIntN, FloatN, Decimal -> Float64
+/// - Type -> Nullable(type)
+/// - Array(Type) -> Array(Nullable(Type))
+/// - Tuple(Type1, ..., TypeN) -> Tuple(Nullable(Type1), ..., Nullable(TypeN))
+/// - Map(KeyType, ValueType) -> Map(KeyType, Nullable(ValueType))
+/// - LowCardinality(Type) -> LowCardinality(Nullable(Type))
+/// If type is Nothing or one of the nested types is Nothing, return nullptr.
+DataTypePtr generalizeDataType(DataTypePtr type);
+
+}
diff --git a/src/Formats/config_formats.h.in b/src/Formats/config_formats.h.in
index f6497b4830b..427abc7d1ce 100644
--- a/src/Formats/config_formats.h.in
+++ b/src/Formats/config_formats.h.in
@@ -10,4 +10,3 @@
#cmakedefine01 USE_ARROW
#cmakedefine01 USE_PROTOBUF
#cmakedefine01 USE_MSGPACK
-
diff --git a/src/Formats/registerFormats.cpp b/src/Formats/registerFormats.cpp
index 7425c6898de..1349c9e3323 100644
--- a/src/Formats/registerFormats.cpp
+++ b/src/Formats/registerFormats.cpp
@@ -81,6 +81,28 @@ void registerInputFormatCapnProto(FormatFactory & factory);
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory);
void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory);
+void registerArrowSchemaReader(FormatFactory & factory);
+void registerParquetSchemaReader(FormatFactory & factory);
+void registerORCSchemaReader(FormatFactory & factory);
+void registerTSVSchemaReader(FormatFactory & factory);
+void registerCSVSchemaReader(FormatFactory & factory);
+void registerJSONCompactEachRowSchemaReader(FormatFactory & factory);
+void registerJSONEachRowSchemaReader(FormatFactory & factory);
+void registerNativeSchemaReader(FormatFactory & factory);
+void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory);
+void registerAvroSchemaReader(FormatFactory & factory);
+void registerProtobufSchemaReader(FormatFactory & factory);
+void registerLineAsStringSchemaReader(FormatFactory & factory);
+void registerJSONAsStringSchemaReader(FormatFactory & factory);
+void registerRawBLOBSchemaReader(FormatFactory & factory);
+void registerMsgPackSchemaReader(FormatFactory & factory);
+void registerCapnProtoSchemaReader(FormatFactory & factory);
+void registerCustomSeparatedSchemaReader(FormatFactory & factory);
+void registerRegexpSchemaReader(FormatFactory & factory);
+void registerTSKVSchemaReader(FormatFactory & factory);
+void registerValuesSchemaReader(FormatFactory & factory);
+void registerTemplateSchemaReader(FormatFactory & factory);
+
void registerFormats()
{
auto & factory = FormatFactory::instance();
@@ -152,6 +174,28 @@ void registerFormats()
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
+
+ registerArrowSchemaReader(factory);
+ registerParquetSchemaReader(factory);
+ registerORCSchemaReader(factory);
+ registerTSVSchemaReader(factory);
+ registerCSVSchemaReader(factory);
+ registerJSONCompactEachRowSchemaReader(factory);
+ registerJSONEachRowSchemaReader(factory);
+ registerNativeSchemaReader(factory);
+ registerRowBinaryWithNamesAndTypesSchemaReader(factory);
+ registerAvroSchemaReader(factory);
+ registerProtobufSchemaReader(factory);
+ registerLineAsStringSchemaReader(factory);
+ registerJSONAsStringSchemaReader(factory);
+ registerRawBLOBSchemaReader(factory);
+ registerMsgPackSchemaReader(factory);
+ registerCapnProtoSchemaReader(factory);
+ registerCustomSeparatedSchemaReader(factory);
+ registerRegexpSchemaReader(factory);
+ registerTSKVSchemaReader(factory);
+ registerValuesSchemaReader(factory);
+ registerTemplateSchemaReader(factory);
}
}
diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h
index 8018fa8e726..62e62b5f5dc 100644
--- a/src/Functions/FunctionsConversion.h
+++ b/src/Functions/FunctionsConversion.h
@@ -1835,6 +1835,8 @@ public:
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
+ bool canBeExecutedOnDefaultArguments() const override { return false; }
+
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
diff --git a/src/IO/BrotliReadBuffer.cpp b/src/IO/BrotliReadBuffer.cpp
index b66bbf45054..77069746153 100644
--- a/src/IO/BrotliReadBuffer.cpp
+++ b/src/IO/BrotliReadBuffer.cpp
@@ -39,7 +39,7 @@ BrotliReadBuffer::BrotliReadBuffer(std::unique_ptr in_, size_t buf_s
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
- , eof(false)
+ , eof_flag(false)
{
}
@@ -47,7 +47,7 @@ BrotliReadBuffer::~BrotliReadBuffer() = default;
bool BrotliReadBuffer::nextImpl()
{
- if (eof)
+ if (eof_flag)
return false;
if (!in_available)
@@ -74,7 +74,7 @@ bool BrotliReadBuffer::nextImpl()
{
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
else
diff --git a/src/IO/BrotliReadBuffer.h b/src/IO/BrotliReadBuffer.h
index 0fa999d1de5..44a7dc7ddbd 100644
--- a/src/IO/BrotliReadBuffer.h
+++ b/src/IO/BrotliReadBuffer.h
@@ -32,7 +32,7 @@ private:
size_t out_capacity;
uint8_t * out_data;
- bool eof;
+ bool eof_flag;
};
}
diff --git a/src/IO/Bzip2ReadBuffer.cpp b/src/IO/Bzip2ReadBuffer.cpp
index df9a8d5b369..c2060612757 100644
--- a/src/IO/Bzip2ReadBuffer.cpp
+++ b/src/IO/Bzip2ReadBuffer.cpp
@@ -42,7 +42,7 @@ Bzip2ReadBuffer::Bzip2ReadBuffer(std::unique_ptr in_, size_t buf_siz
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, in(std::move(in_))
, bz(std::make_unique())
- , eof(false)
+ , eof_flag(false)
{
}
@@ -50,7 +50,7 @@ Bzip2ReadBuffer::~Bzip2ReadBuffer() = default;
bool Bzip2ReadBuffer::nextImpl()
{
- if (eof)
+ if (eof_flag)
return false;
if (!bz->stream.avail_in)
@@ -72,7 +72,7 @@ bool Bzip2ReadBuffer::nextImpl()
{
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
else
@@ -91,7 +91,7 @@ bool Bzip2ReadBuffer::nextImpl()
if (in->eof())
{
- eof = true;
+ eof_flag = true;
throw Exception(ErrorCodes::UNEXPECTED_END_OF_FILE, "Unexpected end of bzip2 archive");
}
diff --git a/src/IO/Bzip2ReadBuffer.h b/src/IO/Bzip2ReadBuffer.h
index dc113800683..de1e61ee388 100644
--- a/src/IO/Bzip2ReadBuffer.h
+++ b/src/IO/Bzip2ReadBuffer.h
@@ -26,7 +26,7 @@ private:
class Bzip2StateWrapper;
std::unique_ptr bz;
- bool eof;
+ bool eof_flag;
};
}
diff --git a/src/IO/LZMAInflatingReadBuffer.cpp b/src/IO/LZMAInflatingReadBuffer.cpp
index f2df6bdca6a..80da7421fc3 100644
--- a/src/IO/LZMAInflatingReadBuffer.cpp
+++ b/src/IO/LZMAInflatingReadBuffer.cpp
@@ -7,7 +7,7 @@ namespace ErrorCodes
extern const int LZMA_STREAM_DECODER_FAILED;
}
LZMAInflatingReadBuffer::LZMAInflatingReadBuffer(std::unique_ptr in_, size_t buf_size, char * existing_memory, size_t alignment)
- : BufferWithOwnMemory(buf_size, existing_memory, alignment), in(std::move(in_)), eof(false)
+ : BufferWithOwnMemory(buf_size, existing_memory, alignment), in(std::move(in_)), eof_flag(false)
{
lstr = LZMA_STREAM_INIT;
lstr.allocator = nullptr;
@@ -36,7 +36,7 @@ LZMAInflatingReadBuffer::~LZMAInflatingReadBuffer()
bool LZMAInflatingReadBuffer::nextImpl()
{
- if (eof)
+ if (eof_flag)
return false;
lzma_action action = LZMA_RUN;
@@ -64,7 +64,7 @@ bool LZMAInflatingReadBuffer::nextImpl()
{
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
else
diff --git a/src/IO/LZMAInflatingReadBuffer.h b/src/IO/LZMAInflatingReadBuffer.h
index 18922f64516..2d676eeeeb3 100644
--- a/src/IO/LZMAInflatingReadBuffer.h
+++ b/src/IO/LZMAInflatingReadBuffer.h
@@ -25,7 +25,7 @@ private:
std::unique_ptr in;
lzma_stream lstr;
- bool eof;
+ bool eof_flag;
};
}
diff --git a/src/IO/Lz4InflatingReadBuffer.cpp b/src/IO/Lz4InflatingReadBuffer.cpp
index 22bce94cad2..61e912d440c 100644
--- a/src/IO/Lz4InflatingReadBuffer.cpp
+++ b/src/IO/Lz4InflatingReadBuffer.cpp
@@ -32,7 +32,7 @@ Lz4InflatingReadBuffer::~Lz4InflatingReadBuffer()
bool Lz4InflatingReadBuffer::nextImpl()
{
- if (eof)
+ if (eof_flag)
return false;
if (!in_available)
@@ -66,7 +66,7 @@ bool Lz4InflatingReadBuffer::nextImpl()
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
diff --git a/src/IO/Lz4InflatingReadBuffer.h b/src/IO/Lz4InflatingReadBuffer.h
index 0462d85adf7..d4d81f8765c 100644
--- a/src/IO/Lz4InflatingReadBuffer.h
+++ b/src/IO/Lz4InflatingReadBuffer.h
@@ -35,7 +35,7 @@ private:
size_t in_available;
size_t out_available;
- bool eof = false;
+ bool eof_flag = false;
};
}
diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp
index b0a6838b81e..48811a41edd 100644
--- a/src/IO/ReadHelpers.cpp
+++ b/src/IO/ReadHelpers.cpp
@@ -702,6 +702,25 @@ void readCSVString(String & s, ReadBuffer & buf, const FormatSettings::CSV & set
readCSVStringInto(s, buf, settings);
}
+void readCSVField(String & s, ReadBuffer & buf, const FormatSettings::CSV & settings)
+{
+ s.clear();
+ bool add_quote = false;
+ char quote = '\'';
+
+ if (!buf.eof() && (*buf.position() == '\'' || *buf.position() == '"'))
+ {
+ quote = *buf.position();
+ s.push_back(quote);
+ add_quote = true;
+ }
+
+ readCSVStringInto(s, buf, settings);
+
+ if (add_quote)
+ s.push_back(quote);
+}
+
template void readCSVStringInto>(PaddedPODArray & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
@@ -1212,6 +1231,19 @@ void skipToNextRowOrEof(PeekableReadBuffer & buf, const String & row_after_delim
}
}
+// Use PeekableReadBuffer to copy field to string after parsing.
+template
+static void readParsedValueIntoString(String & s, ReadBuffer & buf, ParseFunc parse_func)
+{
+ PeekableReadBuffer peekable_buf(buf);
+ peekable_buf.setCheckpoint();
+ parse_func(peekable_buf);
+ peekable_buf.makeContinuousMemoryFromCheckpointToPos();
+ auto * end = peekable_buf.position();
+ peekable_buf.rollbackToCheckpoint();
+ s.append(peekable_buf.position(), end);
+ peekable_buf.position() = end;
+}
template
static void readQuotedFieldInBrackets(String & s, ReadBuffer & buf)
@@ -1266,7 +1298,11 @@ void readQuotedFieldIntoString(String & s, ReadBuffer & buf)
/// - Number: integer, float, decimal.
if (*buf.position() == '\'')
- readQuotedString(s, buf);
+ {
+ s.push_back('\'');
+ readQuotedStringInto(s, buf);
+ s.push_back('\'');
+ }
else if (*buf.position() == '[')
readQuotedFieldInBrackets<'[', ']'>(s, buf);
else if (*buf.position() == '(')
@@ -1290,18 +1326,19 @@ void readQuotedFieldIntoString(String & s, ReadBuffer & buf)
else
{
/// It's an integer, float or decimal. They all can be parsed as float.
- /// Use PeekableReadBuffer to copy field to string after parsing.
- PeekableReadBuffer peekable_buf(buf);
- peekable_buf.setCheckpoint();
- Float64 tmp;
- readFloatText(tmp, peekable_buf);
- peekable_buf.makeContinuousMemoryFromCheckpointToPos();
- auto * end = peekable_buf.position();
- peekable_buf.rollbackToCheckpoint();
- s.append(peekable_buf.position(), end);
- peekable_buf.position() = end;
+ auto parse_func = [](ReadBuffer & in)
+ {
+ Float64 tmp;
+ readFloatText(tmp, in);
+ };
+ readParsedValueIntoString(s, buf, parse_func);
}
}
+void readJSONFieldIntoString(String & s, ReadBuffer & buf)
+{
+ auto parse_func = [](ReadBuffer & in) { skipJSONField(in, "json_field"); };
+ readParsedValueIntoString(s, buf, parse_func);
+}
}
diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h
index b2ad4035cdc..6d1023947a5 100644
--- a/src/IO/ReadHelpers.h
+++ b/src/IO/ReadHelpers.h
@@ -563,6 +563,8 @@ void readStringUntilWhitespace(String & s, ReadBuffer & buf);
*/
void readCSVString(String & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
+/// Differ from readCSVString in that it doesn't remove quotes around field if any.
+void readCSVField(String & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
/// Read and append result to array of characters.
template
@@ -1381,4 +1383,7 @@ struct PcgDeserializer
void readQuotedFieldIntoString(String & s, ReadBuffer & buf);
+void readJSONFieldIntoString(String & s, ReadBuffer & buf);
+
}
+
diff --git a/src/IO/ZlibInflatingReadBuffer.cpp b/src/IO/ZlibInflatingReadBuffer.cpp
index 472399dea3d..28426e920ef 100644
--- a/src/IO/ZlibInflatingReadBuffer.cpp
+++ b/src/IO/ZlibInflatingReadBuffer.cpp
@@ -16,7 +16,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
size_t alignment)
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, in(std::move(in_))
- , eof(false)
+ , eof_flag(false)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
@@ -54,7 +54,7 @@ bool ZlibInflatingReadBuffer::nextImpl()
do
{
/// if we already found eof, we shouldn't do anything
- if (eof)
+ if (eof_flag)
return false;
/// if there is no available bytes in zstr, move ptr to next available data
@@ -83,7 +83,7 @@ bool ZlibInflatingReadBuffer::nextImpl()
/// * false if there is no data in working buffer
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
/// If it is not end of file, we need to reset zstr and return true, because we still have some data to read
diff --git a/src/IO/ZlibInflatingReadBuffer.h b/src/IO/ZlibInflatingReadBuffer.h
index b8c141e9b9b..905ab0cd3fc 100644
--- a/src/IO/ZlibInflatingReadBuffer.h
+++ b/src/IO/ZlibInflatingReadBuffer.h
@@ -33,7 +33,7 @@ private:
std::unique_ptr in;
z_stream zstr;
- bool eof;
+ bool eof_flag;
};
}
diff --git a/src/IO/ZstdInflatingReadBuffer.cpp b/src/IO/ZstdInflatingReadBuffer.cpp
index ce89f09f955..6f244dc5a75 100644
--- a/src/IO/ZstdInflatingReadBuffer.cpp
+++ b/src/IO/ZstdInflatingReadBuffer.cpp
@@ -31,7 +31,7 @@ bool ZstdInflatingReadBuffer::nextImpl()
do
{
// If it is known that end of file was reached, return false
- if (eof)
+ if (eof_flag)
return false;
/// If end was reached, get next part
@@ -64,7 +64,7 @@ bool ZstdInflatingReadBuffer::nextImpl()
/// If end of file is reached, fill eof variable and return true if there is some data in buffer, otherwise return false
if (in->eof())
{
- eof = true;
+ eof_flag = true;
return !working_buffer.empty();
}
/// It is possible, that input buffer is not at eof yet, but nothing was decompressed in current iteration.
diff --git a/src/IO/ZstdInflatingReadBuffer.h b/src/IO/ZstdInflatingReadBuffer.h
index e6e2dad0ad5..ec80b860e0e 100644
--- a/src/IO/ZstdInflatingReadBuffer.h
+++ b/src/IO/ZstdInflatingReadBuffer.h
@@ -31,7 +31,7 @@ private:
ZSTD_DCtx * dctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
- bool eof = false;
+ bool eof_flag = false;
};
}
diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp
index 8f003e75a07..7ddb0c8c26e 100644
--- a/src/Interpreters/InterpreterCreateQuery.cpp
+++ b/src/Interpreters/InterpreterCreateQuery.cpp
@@ -637,13 +637,14 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
/// Table function without columns list.
auto table_function = TableFunctionFactory::instance().get(create.as_table_function, getContext());
properties.columns = table_function->getActualTableStructure(getContext());
- assert(!properties.columns.empty());
}
else if (create.is_dictionary)
{
return {};
}
- else
+ /// We can have queries like "CREATE TABLE ENGINE=" if
+ /// supports schema inference (will determine table structure in it's constructor).
+ else if (!StorageFactory::instance().checkIfStorageSupportsSchemaInterface(create.storage->engine->name))
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
@@ -1083,7 +1084,10 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
{
const auto & factory = TableFunctionFactory::instance();
auto table_func = factory.get(create.as_table_function, getContext());
- res = table_func->execute(create.as_table_function, getContext(), create.getTable(), properties.columns);
+ /// In case of CREATE AS table_function() query we should use global context
+ /// in storage creation because there will be no query context on server startup
+ /// and because storage lifetime is bigger than query context lifetime.
+ res = table_func->execute(create.as_table_function, getContext(), create.getTable(), properties.columns, /*use_global_context=*/true);
res->renameInMemory({create.getDatabase(), create.getTable(), create.uuid});
}
else
diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp
index 3e77bee19a9..e61a0f55142 100644
--- a/src/Parsers/ASTCreateQuery.cpp
+++ b/src/Parsers/ASTCreateQuery.cpp
@@ -359,7 +359,7 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (as_table_function)
{
- if (columns_list)
+ if (columns_list && !columns_list->empty())
{
frame.expression_list_always_start_on_new_line = true;
settings.ostr << (settings.one_line ? " (" : "\n(");
@@ -375,7 +375,7 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
frame.expression_list_always_start_on_new_line = true;
- if (columns_list && !as_table_function)
+ if (columns_list && !columns_list->empty() && !as_table_function)
{
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;
diff --git a/src/Parsers/ASTCreateQuery.h b/src/Parsers/ASTCreateQuery.h
index 93fced7dba5..2e35731acad 100644
--- a/src/Parsers/ASTCreateQuery.h
+++ b/src/Parsers/ASTCreateQuery.h
@@ -50,6 +50,12 @@ public:
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
+
+ bool empty()
+ {
+ return (!columns || columns->children.empty()) && (!indices || indices->children.empty()) && (!constraints || constraints->children.empty())
+ && (!projections || projections->children.empty());
+ }
};
diff --git a/src/Parsers/ParserCreateQuery.cpp b/src/Parsers/ParserCreateQuery.cpp
index dbbea986404..7f47e1efb49 100644
--- a/src/Parsers/ParserCreateQuery.cpp
+++ b/src/Parsers/ParserCreateQuery.cpp
@@ -557,34 +557,43 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
}
}
}
+ /** Create queries without list of columns:
+ * - CREATE|ATTACH TABLE ... AS ...
+ * - CREATE|ATTACH TABLE ... ENGINE = engine
+ */
else
{
storage_p.parse(pos, storage, expected);
- if (!s_as.ignore(pos, expected))
- return false;
-
- if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
+ /// CREATE|ATTACH TABLE ... AS ...
+ if (s_as.ignore(pos, expected))
{
- /// ENGINE can not be specified for table functions.
- if (storage || !table_function_p.parse(pos, as_table_function, expected))
+ if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
- /// AS [db.]table
- if (!name_p.parse(pos, as_table, expected))
- return false;
-
- if (s_dot.ignore(pos, expected))
+ /// ENGINE can not be specified for table functions.
+ if (storage || !table_function_p.parse(pos, as_table_function, expected))
{
- as_database = as_table;
+ /// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
return false;
- }
- /// Optional - ENGINE can be specified.
- if (!storage)
- storage_p.parse(pos, storage, expected);
+ if (s_dot.ignore(pos, expected))
+ {
+ as_database = as_table;
+ if (!name_p.parse(pos, as_table, expected))
+ return false;
+ }
+
+ /// Optional - ENGINE can be specified.
+ if (!storage)
+ storage_p.parse(pos, storage, expected);
+ }
}
}
+ else if (!storage)
+ {
+ return false;
+ }
}
auto comment = parseComment(pos, expected);
diff --git a/src/Parsers/ParserCreateQuery.h b/src/Parsers/ParserCreateQuery.h
index bc1ebd65639..33aafb40d83 100644
--- a/src/Parsers/ParserCreateQuery.h
+++ b/src/Parsers/ParserCreateQuery.h
@@ -361,6 +361,8 @@ protected:
* Or:
* CREATE|ATTACH TABLE [IF NOT EXISTS] [db.]name [UUID 'uuid'] [ON CLUSTER cluster] AS ENGINE = engine SELECT ...
*
+ * Or (for engines that supports schema inference):
+ * CREATE|ATTACH TABLE [IF NOT EXISTS] [db.]name [UUID 'uuid'] [ON CLUSTER cluster] ENGINE = engine
*/
class ParserCreateTableQuery : public IParserBase
{
diff --git a/src/Processors/Formats/ISchemaReader.cpp b/src/Processors/Formats/ISchemaReader.cpp
new file mode 100644
index 00000000000..096e39a2893
--- /dev/null
+++ b/src/Processors/Formats/ISchemaReader.cpp
@@ -0,0 +1,160 @@
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
+}
+
+IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_)
+ : ISchemaReader(in_), max_rows_to_read(max_rows_to_read_), default_type(default_type_)
+{
+}
+
+NamesAndTypesList IRowSchemaReader::readSchema()
+{
+ DataTypes data_types = readRowAndGetDataTypes();
+ for (size_t row = 1; row < max_rows_to_read; ++row)
+ {
+ DataTypes new_data_types = readRowAndGetDataTypes();
+ if (new_data_types.empty())
+ /// We reached eof.
+ break;
+
+ if (new_data_types.size() != data_types.size())
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Rows have different amount of values");
+
+ for (size_t i = 0; i != data_types.size(); ++i)
+ {
+ /// We couldn't determine the type of this column in a new row, just skip it.
+ if (!new_data_types[i])
+ continue;
+
+ /// If we couldn't determine the type of column yet, just set the new type.
+ if (!data_types[i])
+ data_types[i] = new_data_types[i];
+ /// If the new type and the previous type for this column are different,
+ /// we will use default type if we have it or throw an exception.
+ else if (data_types[i]->getName() != new_data_types[i]->getName())
+ {
+ if (default_type)
+ data_types[i] = default_type;
+ else
+ throw Exception(
+ ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
+ "Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}", new_data_types[i]->getName(), i + 1, row, data_types[i]->getName());
+ }
+ }
+ }
+
+ /// Check that we read at list one column.
+ if (data_types.empty())
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot read rows from the data");
+
+ /// If column names weren't set, use default names 'c1', 'c2', ...
+ if (column_names.empty())
+ {
+ column_names.reserve(data_types.size());
+ for (size_t i = 0; i != data_types.size(); ++i)
+ column_names.push_back("c" + std::to_string(i + 1));
+ }
+ /// If column names were set, check that the number of names match the number of types.
+ else if (column_names.size() != data_types.size())
+ throw Exception(
+ ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
+ "The number of column names {} differs with the number of types {}", column_names.size(), data_types.size());
+
+ NamesAndTypesList result;
+ for (size_t i = 0; i != data_types.size(); ++i)
+ {
+ /// Check that we could determine the type of this column.
+ if (!data_types[i])
+ {
+ if (!default_type)
+ throw Exception(
+ ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
+ "Cannot determine table structure by first {} rows of data, because some columns contain only Nulls. To increase the maximum "
+ "number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference",
+ max_rows_to_read);
+
+ data_types[i] = default_type;
+ }
+ result.emplace_back(column_names[i], data_types[i]);
+ }
+
+ return result;
+}
+
+IRowWithNamesSchemaReader::IRowWithNamesSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_)
+ : ISchemaReader(in_), max_rows_to_read(max_rows_to_read_), default_type(default_type_)
+{
+}
+
+NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
+{
+ auto names_and_types = readRowAndGetNamesAndDataTypes();
+ for (size_t row = 1; row < max_rows_to_read; ++row)
+ {
+ auto new_names_and_types = readRowAndGetNamesAndDataTypes();
+ if (new_names_and_types.empty())
+ /// We reached eof.
+ break;
+
+ for (const auto & [name, new_type] : new_names_and_types)
+ {
+ auto it = names_and_types.find(name);
+ /// If we didn't see this column before, just add it.
+ if (it == names_and_types.end())
+ {
+ names_and_types[name] = new_type;
+ continue;
+ }
+
+ auto & type = it->second;
+ /// If we couldn't determine the type of column yet, just set the new type.
+ if (!type)
+ type = new_type;
+ /// If the new type and the previous type for this column are different,
+ /// we will use default type if we have it or throw an exception.
+ else if (new_type && type->getName() != new_type->getName())
+ {
+ if (default_type)
+ type = default_type;
+ else
+ throw Exception(
+ ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
+ "Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}", type->getName(), name, row, new_type->getName());
+ }
+ }
+ }
+
+ /// Check that we read at list one column.
+ if (names_and_types.empty())
+ throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot read rows from the data");
+
+ NamesAndTypesList result;
+ for (auto & [name, type] : names_and_types)
+ {
+ /// Check that we could determine the type of this column.
+ if (!type)
+ {
+ if (!default_type)
+ throw Exception(
+ ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
+ "Cannot determine table structure by first {} rows of data, because some columns contain only Nulls. To increase the maximum "
+ "number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference",
+ max_rows_to_read);
+
+ type = default_type;
+ }
+ result.emplace_back(name, type);
+ }
+
+ return result;
+}
+
+}
diff --git a/src/Processors/Formats/ISchemaReader.h b/src/Processors/Formats/ISchemaReader.h
new file mode 100644
index 00000000000..67a8eb88d61
--- /dev/null
+++ b/src/Processors/Formats/ISchemaReader.h
@@ -0,0 +1,87 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+/// Base class for schema inference for the data in some specific format.
+/// It reads some data from read buffer and try to determine the schema
+/// from read data.
+class ISchemaReader
+{
+public:
+ ISchemaReader(ReadBuffer & in_) : in(in_) {}
+
+ virtual NamesAndTypesList readSchema() = 0;
+
+ virtual ~ISchemaReader() = default;
+
+protected:
+ ReadBuffer & in;
+};
+
+/// Base class for schema inference for formats that read data row by row.
+/// It reads data row by row (up to max_rows_to_read), determines types of columns
+/// for each row and compare them with types from the previous rows. If some column
+/// contains values with different types in different rows, the default type will be
+/// used for this column or the exception will be thrown (if default type is not set).
+class IRowSchemaReader : public ISchemaReader
+{
+public:
+ IRowSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_ = nullptr);
+ NamesAndTypesList readSchema() override;
+
+protected:
+ /// Read one row and determine types of columns in it.
+ /// Return types in the same order in which the values were in the row.
+ /// If it's impossible to determine the type for some column, return nullptr for it.
+ /// Return empty list if can't read more data.
+ virtual DataTypes readRowAndGetDataTypes() = 0;
+
+ void setColumnNames(const std::vector & names) { column_names = names; }
+
+private:
+ size_t max_rows_to_read;
+ DataTypePtr default_type;
+ std::vector column_names;
+};
+
+/// Base class for schema inference for formats that read data row by row and each
+/// row contains column names and values (ex: JSONEachRow, TSKV).
+/// Differ from IRowSchemaReader in that after reading a row we get
+/// a map {column_name : type} and some columns may be missed in a single row
+/// (in this case we will use types from the previous rows for missed columns).
+class IRowWithNamesSchemaReader : public ISchemaReader
+{
+public:
+ IRowWithNamesSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_ = nullptr);
+ NamesAndTypesList readSchema() override;
+
+protected:
+ /// Read one row and determine types of columns in it.
+ /// Return map {column_name : type}.
+ /// If it's impossible to determine the type for some column, return nullptr for it.
+ /// Return empty map is can't read more data.
+ virtual std::unordered_map readRowAndGetNamesAndDataTypes() = 0;
+
+private:
+ size_t max_rows_to_read;
+ DataTypePtr default_type;
+};
+
+/// Base class for schema inference for formats that don't need any data to
+/// determine the schema: formats with constant schema (ex: JSONAsString, LineAsString)
+/// and formats that use external format schema (ex: Protobuf, CapnProto).
+class IExternalSchemaReader
+{
+public:
+ virtual NamesAndTypesList readSchema() = 0;
+
+ virtual ~IExternalSchemaReader() = default;
+};
+
+}
diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
index 1f6b530d72f..4af2c651c39 100644
--- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
+++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
@@ -85,31 +85,38 @@ void ArrowBlockInputFormat::resetParser()
record_batch_current = 0;
}
+static std::shared_ptr createStreamReader(ReadBuffer & in)
+{
+ auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique(in));
+ if (!stream_reader_status.ok())
+ throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
+ "Error while opening a table: {}", stream_reader_status.status().ToString());
+ return *stream_reader_status;
+}
+
+static std::shared_ptr createFileReader(ReadBuffer & in, const FormatSettings & format_settings, std::atomic & is_stopped)
+{
+ auto arrow_file = asArrowFile(in, format_settings, is_stopped);
+ if (is_stopped)
+ return nullptr;
+
+ auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(std::move(arrow_file));
+ if (!file_reader_status.ok())
+ throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
+ "Error while opening a table: {}", file_reader_status.status().ToString());
+ return *file_reader_status;
+}
+
+
void ArrowBlockInputFormat::prepareReader()
{
- std::shared_ptr schema;
-
if (stream)
- {
- auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique(*in));
- if (!stream_reader_status.ok())
- throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
- "Error while opening a table: {}", stream_reader_status.status().ToString());
- stream_reader = *stream_reader_status;
- schema = stream_reader->schema();
- }
+ stream_reader = createStreamReader(*in);
else
{
- auto arrow_file = asArrowFile(*in, format_settings, is_stopped);
- if (is_stopped)
+ file_reader = createFileReader(*in, format_settings, is_stopped);
+ if (!file_reader)
return;
-
- auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(std::move(arrow_file));
- if (!file_reader_status.ok())
- throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
- "Error while opening a table: {}", file_reader_status.status().ToString());
- file_reader = *file_reader_status;
- schema = file_reader->schema();
}
arrow_column_to_ch_column = std::make_unique(getPort().getHeader(), "Arrow", format_settings.arrow.import_nested);
@@ -122,6 +129,27 @@ void ArrowBlockInputFormat::prepareReader()
record_batch_current = 0;
}
+ArrowSchemaReader::ArrowSchemaReader(ReadBuffer & in_, bool stream_, const FormatSettings & format_settings_)
+ : ISchemaReader(in_), stream(stream_), format_settings(format_settings_)
+{
+}
+
+NamesAndTypesList ArrowSchemaReader::readSchema()
+{
+ std::shared_ptr schema;
+
+ if (stream)
+ schema = createStreamReader(in)->schema();
+ else
+ {
+ std::atomic is_stopped = 0;
+ schema = createFileReader(in, format_settings, is_stopped)->schema();
+ }
+
+ auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, stream ? "ArrowStream" : "Arrow");
+ return header.getNamesAndTypesList();
+}
+
void registerInputFormatArrow(FormatFactory & factory)
{
factory.registerInputFormat(
@@ -145,6 +173,20 @@ void registerInputFormatArrow(FormatFactory & factory)
});
}
+void registerArrowSchemaReader(FormatFactory & factory)
+{
+ factory.registerSchemaReader(
+ "Arrow",
+ [](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
+ {
+ return std::make_shared(buf, false, settings);
+ });
+ factory.registerSchemaReader(
+ "ArrowStream",
+ [](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
+ {
+ return std::make_shared(buf, true, settings);
+ });}
}
#else
@@ -154,6 +196,8 @@ class FormatFactory;
void registerInputFormatArrow(FormatFactory &)
{
}
+
+void registerArrowSchemaReader(FormatFactory &) {}
}
#endif
diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h
index bb8a000477c..62cbf949fc2 100644
--- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h
+++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h
@@ -4,6 +4,7 @@
#if USE_ARROW
#include
+#include
#include
namespace arrow { class RecordBatchReader; }
@@ -51,6 +52,18 @@ private:
std::atomic is_stopped{0};
};
+class ArrowSchemaReader : public ISchemaReader
+{
+public:
+ ArrowSchemaReader(ReadBuffer & in_, bool stream_, const FormatSettings & format_settings_);
+
+ NamesAndTypesList readSchema() override;
+
+private:
+ bool stream;
+ const FormatSettings format_settings;
+};
+
}
#endif
diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
index 272907022a1..aa181ea0b8b 100644
--- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
+++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
@@ -239,10 +239,8 @@ static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr
-static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr & arrow_column, const String & column_name)
+static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr & arrow_column, const String & column_name, DataTypePtr internal_type)
{
- const auto * arrow_decimal_type = static_cast(arrow_column->type().get());
- auto internal_type = std::make_shared>(arrow_decimal_type->precision(), arrow_decimal_type->scale());
auto internal_column = internal_type->createColumn();
auto & column = assert_cast &>(*internal_column);
auto & column_data = column.getData();
@@ -259,6 +257,21 @@ static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr
+static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr