Merge branch 'master' of github.com:ClickHouse/ClickHouse into interactive-mode-for-clickhouse-local

This commit is contained in:
kssenii 2021-07-14 15:02:22 +03:00
commit edb41e4daa
313 changed files with 7727 additions and 2038 deletions

8
.gitmodules vendored
View File

@ -193,7 +193,7 @@
url = https://github.com/danlark1/miniselect
[submodule "contrib/rocksdb"]
path = contrib/rocksdb
url = https://github.com/ClickHouse-Extras/rocksdb.git
url = https://github.com/ClickHouse-Extras/rocksdb.git
[submodule "contrib/xz"]
path = contrib/xz
url = https://github.com/xz-mirror/xz
@ -228,3 +228,9 @@
[submodule "contrib/libpqxx"]
path = contrib/libpqxx
url = https://github.com/ClickHouse-Extras/libpqxx.git
[submodule "contrib/sqlite-amalgamation"]
path = contrib/sqlite-amalgamation
url = https://github.com/azadkuh/sqlite-amalgamation
[submodule "contrib/s2geometry"]
path = contrib/s2geometry
url = https://github.com/ClickHouse-Extras/s2geometry.git

View File

@ -536,10 +536,12 @@ include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/odbc.cmake)
include (cmake/find/nanodbc.cmake)
include (cmake/find/sqlite.cmake)
include (cmake/find/rocksdb.cmake)
include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake)
include (cmake/find/s2geometry.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "")

View File

@ -18,6 +18,8 @@
#define DATE_LUT_MAX (0xFFFFFFFFU - 86400)
#define DATE_LUT_MAX_DAY_NUM 0xFFFF
/// Max int value of Date32, DATE LUT cache size minus daynum_offset_epoch
#define DATE_LUT_MAX_EXTEND_DAY_NUM (DATE_LUT_SIZE - 16436)
/// A constant to add to time_t so every supported time point becomes non-negative and still has the same remainder of division by 3600.
/// If we treat "remainder of division" operation in the sense of modular arithmetic (not like in C++).
@ -270,6 +272,8 @@ public:
auto getOffsetAtStartOfEpoch() const { return offset_at_start_of_epoch; }
auto getTimeOffsetAtStartOfLUT() const { return offset_at_start_of_lut; }
auto getDayNumOffsetEpoch() const { return daynum_offset_epoch; }
/// All functions below are thread-safe; arguments are not checked.
inline ExtendedDayNum toDayNum(ExtendedDayNum d) const
@ -926,15 +930,17 @@ public:
{
if (unlikely(year < DATE_LUT_MIN_YEAR || year > DATE_LUT_MAX_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31))
return LUTIndex(0);
return LUTIndex{years_months_lut[(year - DATE_LUT_MIN_YEAR) * 12 + month - 1] + day_of_month - 1};
auto year_lut_index = (year - DATE_LUT_MIN_YEAR) * 12 + month - 1;
UInt32 index = years_months_lut[year_lut_index].toUnderType() + day_of_month - 1;
/// When date is out of range, default value is DATE_LUT_SIZE - 1 (2283-11-11)
return LUTIndex{std::min(index, static_cast<UInt32>(DATE_LUT_SIZE - 1))};
}
/// Create DayNum from year, month, day of month.
inline ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month) const
inline ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const
{
if (unlikely(year < DATE_LUT_MIN_YEAR || year > DATE_LUT_MAX_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31))
return ExtendedDayNum(0);
return ExtendedDayNum(default_error_day_num);
return toDayNum(makeLUTIndex(year, month, day_of_month));
}
@ -1091,9 +1097,9 @@ public:
return lut[new_index].date + time;
}
inline NO_SANITIZE_UNDEFINED Time addWeeks(Time t, Int64 delta) const
inline NO_SANITIZE_UNDEFINED Time addWeeks(Time t, Int32 delta) const
{
return addDays(t, delta * 7);
return addDays(t, static_cast<Int64>(delta) * 7);
}
inline UInt8 saturateDayOfMonth(Int16 year, UInt8 month, UInt8 day_of_month) const
@ -1158,14 +1164,14 @@ public:
return toDayNum(addMonthsIndex(d, delta));
}
inline Time NO_SANITIZE_UNDEFINED addQuarters(Time t, Int64 delta) const
inline Time NO_SANITIZE_UNDEFINED addQuarters(Time t, Int32 delta) const
{
return addMonths(t, delta * 3);
return addMonths(t, static_cast<Int64>(delta) * 3);
}
inline ExtendedDayNum addQuarters(ExtendedDayNum d, Int64 delta) const
inline ExtendedDayNum addQuarters(ExtendedDayNum d, Int32 delta) const
{
return addMonths(d, delta * 3);
return addMonths(d, static_cast<Int64>(delta) * 3);
}
template <typename DateOrTime>

View File

@ -70,6 +70,14 @@ public:
m_day = values.day_of_month;
}
explicit LocalDate(ExtendedDayNum day_num)
{
const auto & values = DateLUT::instance().getValues(day_num);
m_year = values.year;
m_month = values.month;
m_day = values.day_of_month;
}
LocalDate(unsigned short year_, unsigned char month_, unsigned char day_)
: m_year(year_), m_month(month_), m_day(day_)
{
@ -98,6 +106,12 @@ public:
return DayNum(lut.makeDayNum(m_year, m_month, m_day).toUnderType());
}
ExtendedDayNum getExtenedDayNum() const
{
const auto & lut = DateLUT::instance();
return ExtendedDayNum (lut.makeDayNum(m_year, m_month, m_day).toUnderType());
}
operator DayNum() const
{
return getDayNum();

View File

@ -0,0 +1,24 @@
option(ENABLE_S2_GEOMETRY "Enable S2 geometry library" ${ENABLE_LIBRARIES})
if (ENABLE_S2_GEOMETRY)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/s2geometry")
message (WARNING "submodule contrib/s2geometry is missing. to fix try run: \n git submodule update --init --recursive")
set (ENABLE_S2_GEOMETRY 0)
set (USE_S2_GEOMETRY 0)
else()
if (OPENSSL_FOUND)
set (S2_GEOMETRY_LIBRARY s2)
set (S2_GEOMETRY_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/s2geometry/src/s2)
set (USE_S2_GEOMETRY 1)
else()
message (WARNING "S2 uses OpenSSL, but the latter is absent.")
endif()
endif()
if (NOT USE_S2_GEOMETRY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't enable S2 geometry library")
endif()
endif()
message (STATUS "Using s2geometry=${USE_S2_GEOMETRY} : ${S2_GEOMETRY_INCLUDE_DIR}")

16
cmake/find/sqlite.cmake Normal file
View File

@ -0,0 +1,16 @@
option(ENABLE_SQLITE "Enable sqlite" ${ENABLE_LIBRARIES})
if (NOT ENABLE_SQLITE)
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/sqlite-amalgamation/sqlite3.c")
message (WARNING "submodule contrib/sqlite3-amalgamation is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal sqlite library")
set (USE_SQLITE 0)
return()
endif()
set (USE_SQLITE 1)
set (SQLITE_LIBRARY sqlite)
message (STATUS "Using sqlite=${USE_SQLITE}")

View File

@ -1,4 +1,4 @@
option(ENABLE_STATS "Enalbe StatsLib library" ${ENABLE_LIBRARIES})
option(ENABLE_STATS "Enable StatsLib library" ${ENABLE_LIBRARIES})
if (ENABLE_STATS)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/stats")

View File

@ -1,3 +1,4 @@
# Third-party libraries may have substandard code.
# Put all targets defined here and in added subfolders under "contrib/" folder in GUI-based IDEs by default.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they will
@ -10,10 +11,8 @@ else ()
endif ()
unset (_current_dir_name)
# Third-party libraries may have substandard code.
# Also remove a possible source of nondeterminism.
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w -D__DATE__= -D__TIME__= -D__TIMESTAMP__=")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w -D__DATE__= -D__TIME__= -D__TIMESTAMP__=")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
if (WITH_COVERAGE)
set (WITHOUT_COVERAGE_LIST ${WITHOUT_COVERAGE})
@ -329,3 +328,10 @@ endif()
add_subdirectory(fast_float)
if (USE_SQLITE)
add_subdirectory(sqlite-cmake)
endif()
if (USE_S2_GEOMETRY)
add_subdirectory(s2geometry-cmake)
endif()

1
contrib/s2geometry vendored Submodule

@ -0,0 +1 @@
Subproject commit 20ea540d81f4575a3fc0aea585aac611bcd03ede

View File

@ -0,0 +1,126 @@
set(S2_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/s2geometry/src")
set(S2_SRCS
"${S2_SOURCE_DIR}/s2/base/stringprintf.cc"
"${S2_SOURCE_DIR}/s2/base/strtoint.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2cell_id_vector.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2point_vector.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/encoded_string_vector.cc"
"${S2_SOURCE_DIR}/s2/id_set_lexicon.cc"
"${S2_SOURCE_DIR}/s2/mutable_s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/r2rect.cc"
"${S2_SOURCE_DIR}/s2/s1angle.cc"
"${S2_SOURCE_DIR}/s2/s1chord_angle.cc"
"${S2_SOURCE_DIR}/s2/s1interval.cc"
"${S2_SOURCE_DIR}/s2/s2boolean_operation.cc"
"${S2_SOURCE_DIR}/s2/s2builder.cc"
"${S2_SOURCE_DIR}/s2/s2builder_graph.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_closed_set_normalizer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_find_polygon_degeneracies.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_lax_polygon_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2point_vector_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polygon_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polyline_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polyline_vector_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_snap_functions.cc"
"${S2_SOURCE_DIR}/s2/s2cap.cc"
"${S2_SOURCE_DIR}/s2/s2cell.cc"
"${S2_SOURCE_DIR}/s2/s2cell_id.cc"
"${S2_SOURCE_DIR}/s2/s2cell_index.cc"
"${S2_SOURCE_DIR}/s2/s2cell_union.cc"
"${S2_SOURCE_DIR}/s2/s2centroids.cc"
"${S2_SOURCE_DIR}/s2/s2closest_cell_query.cc"
"${S2_SOURCE_DIR}/s2/s2closest_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2closest_point_query.cc"
"${S2_SOURCE_DIR}/s2/s2contains_vertex_query.cc"
"${S2_SOURCE_DIR}/s2/s2convex_hull_query.cc"
"${S2_SOURCE_DIR}/s2/s2coords.cc"
"${S2_SOURCE_DIR}/s2/s2crossing_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2debug.cc"
"${S2_SOURCE_DIR}/s2/s2earth.cc"
"${S2_SOURCE_DIR}/s2/s2edge_clipping.cc"
"${S2_SOURCE_DIR}/s2/s2edge_crosser.cc"
"${S2_SOURCE_DIR}/s2/s2edge_crossings.cc"
"${S2_SOURCE_DIR}/s2/s2edge_distances.cc"
"${S2_SOURCE_DIR}/s2/s2edge_tessellator.cc"
"${S2_SOURCE_DIR}/s2/s2error.cc"
"${S2_SOURCE_DIR}/s2/s2furthest_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2latlng.cc"
"${S2_SOURCE_DIR}/s2/s2latlng_rect.cc"
"${S2_SOURCE_DIR}/s2/s2latlng_rect_bounder.cc"
"${S2_SOURCE_DIR}/s2/s2lax_loop_shape.cc"
"${S2_SOURCE_DIR}/s2/s2lax_polygon_shape.cc"
"${S2_SOURCE_DIR}/s2/s2lax_polyline_shape.cc"
"${S2_SOURCE_DIR}/s2/s2loop.cc"
"${S2_SOURCE_DIR}/s2/s2loop_measures.cc"
"${S2_SOURCE_DIR}/s2/s2measures.cc"
"${S2_SOURCE_DIR}/s2/s2metrics.cc"
"${S2_SOURCE_DIR}/s2/s2max_distance_targets.cc"
"${S2_SOURCE_DIR}/s2/s2min_distance_targets.cc"
"${S2_SOURCE_DIR}/s2/s2padded_cell.cc"
"${S2_SOURCE_DIR}/s2/s2point_compression.cc"
"${S2_SOURCE_DIR}/s2/s2point_region.cc"
"${S2_SOURCE_DIR}/s2/s2pointutil.cc"
"${S2_SOURCE_DIR}/s2/s2polygon.cc"
"${S2_SOURCE_DIR}/s2/s2polyline.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_alignment.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_measures.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_simplifier.cc"
"${S2_SOURCE_DIR}/s2/s2predicates.cc"
"${S2_SOURCE_DIR}/s2/s2projections.cc"
"${S2_SOURCE_DIR}/s2/s2r2rect.cc"
"${S2_SOURCE_DIR}/s2/s2region.cc"
"${S2_SOURCE_DIR}/s2/s2region_term_indexer.cc"
"${S2_SOURCE_DIR}/s2/s2region_coverer.cc"
"${S2_SOURCE_DIR}/s2/s2region_intersection.cc"
"${S2_SOURCE_DIR}/s2/s2region_union.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index_buffered_region.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index_measures.cc"
"${S2_SOURCE_DIR}/s2/s2shape_measures.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_build_polygon_boundaries.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_coding.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_contains_brute_force.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_edge_iterator.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_get_reference_point.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_range_iterator.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_visit_crossing_edge_pairs.cc"
"${S2_SOURCE_DIR}/s2/s2text_format.cc"
"${S2_SOURCE_DIR}/s2/s2wedge_relations.cc"
"${S2_SOURCE_DIR}/s2/strings/ostringstream.cc"
"${S2_SOURCE_DIR}/s2/strings/serialize.cc"
# ClickHouse doesn't use strings from abseil.
# So, there is no duplicate symbols.
"${S2_SOURCE_DIR}/s2/third_party/absl/base/dynamic_annotations.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/base/internal/raw_logging.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/base/internal/throw_delegate.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/numeric/int128.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/ascii.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/match.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/numbers.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/str_cat.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/str_split.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/string_view.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/strip.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/internal/memutil.cc"
"${S2_SOURCE_DIR}/s2/util/bits/bit-interleave.cc"
"${S2_SOURCE_DIR}/s2/util/bits/bits.cc"
"${S2_SOURCE_DIR}/s2/util/coding/coder.cc"
"${S2_SOURCE_DIR}/s2/util/coding/varint.cc"
"${S2_SOURCE_DIR}/s2/util/math/exactfloat/exactfloat.cc"
"${S2_SOURCE_DIR}/s2/util/math/mathutil.cc"
"${S2_SOURCE_DIR}/s2/util/units/length-units.cc"
)
add_library(s2 ${S2_SRCS})
if (OPENSSL_FOUND)
target_link_libraries(s2 PRIVATE ${OPENSSL_LIBRARIES})
endif()
target_include_directories(s2 SYSTEM BEFORE PUBLIC "${S2_SOURCE_DIR}/")
if(M_LIBRARY)
target_link_libraries(s2 PRIVATE ${M_LIBRARY})
endif()

1
contrib/sqlite-amalgamation vendored Submodule

@ -0,0 +1 @@
Subproject commit 9818baa5d027ffb26d57f810dc4c597d4946781c

View File

@ -0,0 +1,6 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/sqlite-amalgamation")
set(SRCS ${LIBRARY_DIR}/sqlite3.c)
add_library(sqlite ${SRCS})
target_include_directories(sqlite SYSTEM PUBLIC "${LIBRARY_DIR}")

View File

@ -378,6 +378,16 @@ function run_tests
# needs pv
01923_network_receive_time_metric_insert
01889_sqlite_read_write
# needs s2
01849_geoToS2
01851_s2_to_geo
01852_s2_get_neighbours
01853_s2_cells_intersect
01854_s2_cap_contains
01854_s2_cap_union
)
time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \

View File

@ -29,7 +29,8 @@ RUN apt-get update -y \
unixodbc \
wget \
mysql-client=5.7* \
postgresql-client
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas

View File

@ -105,11 +105,11 @@ clickhouse-client -nmT < tests/queries/0_stateless/01521_dummy_test.sql | tee te
5) ensure everything is correct, if the test output is incorrect (due to some bug for example), adjust the reference file using text editor.
#### How to create good test
#### How to create a good test
- test should be
- A test should be
- minimal - create only tables related to tested functionality, remove unrelated columns and parts of query
- fast - should not take longer than few seconds (better subseconds)
- fast - should not take longer than a few seconds (better subseconds)
- correct - fails then feature is not working
- deterministic
- isolated / stateless
@ -126,6 +126,16 @@ clickhouse-client -nmT < tests/queries/0_stateless/01521_dummy_test.sql | tee te
- use other SQL files in the `0_stateless` folder as an example
- ensure the feature / feature combination you want to test is not yet covered with existing tests
#### Test naming rules
It's important to name tests correctly, so one could turn some tests subset off in clickhouse-test invocation.
| Tester flag| What should be in test name | When flag should be added |
|---|---|---|---|
| `--[no-]zookeeper`| "zookeeper" or "replica" | Test uses tables from ReplicatedMergeTree family |
| `--[no-]shard` | "shard" or "distributed" or "global"| Test using connections to 127.0.0.2 or similar |
| `--[no-]long` | "long" or "deadlock" or "race" | Test runs longer than 60 seconds |
#### Commit / push / create PR.
1) commit & push your changes

View File

@ -134,10 +134,10 @@ $ ./release
## Faster builds for development
Normally all tools of the ClickHouse bundle, such as `clickhouse-server`, `clickhouse-client` etc., are linked into a single static executable, `clickhouse`. This executable must be re-linked on every change, which might be slow. Two common ways to improve linking time are to use `lld` linker, and use the 'split' build configuration, which builds a separate binary for every tool, and further splits the code into several shared libraries. To enable these tweaks, pass the following flags to `cmake`:
Normally all tools of the ClickHouse bundle, such as `clickhouse-server`, `clickhouse-client` etc., are linked into a single static executable, `clickhouse`. This executable must be re-linked on every change, which might be slow. One common way to improve build time is to use the 'split' build configuration, which builds a separate binary for every tool, and further splits the code into several shared libraries. To enable this tweak, pass the following flags to `cmake`:
```
-DCMAKE_C_FLAGS="--ld-path=lld" -DCMAKE_CXX_FLAGS="--ld-path=lld" -DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
```
## You Dont Have to Build ClickHouse {#you-dont-have-to-build-clickhouse}

View File

@ -79,6 +79,7 @@ SELECT library_name, license_type, license_path FROM system.licenses ORDER BY li
| re2 | BSD 3-clause | /contrib/re2/LICENSE |
| replxx | BSD 3-clause | /contrib/replxx/LICENSE.md |
| rocksdb | BSD 3-clause | /contrib/rocksdb/LICENSE.leveldb |
| s2geometry | Apache | /contrib/s2geometry/LICENSE |
| sentry-native | MIT | /contrib/sentry-native/LICENSE |
| simdjson | Apache | /contrib/simdjson/LICENSE |
| snappy | Public Domain | /contrib/snappy/COPYING |

View File

@ -1,6 +1,6 @@
---
toc_priority: 12
toc_title: MateriaziePostgreSQL
toc_title: MaterializedPostgreSQL
---
# MaterializedPostgreSQL {#materialize-postgresql}

View File

@ -1213,7 +1213,15 @@ Default value: `3`.
## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers}
If the value is true, integers appear in quotes when using JSON\* Int64 and UInt64 formats (for compatibility with most JavaScript implementations); otherwise, integers are output without the quotes.
Controls quoting of 64-bit or bigger [integers](../../sql-reference/data-types/int-uint.md) (like `UInt64` or `Int128`) when they are output in a [JSON](../../interfaces/formats.md#json) format.
Such integers are enclosed in quotes by default. This behavior is compatible with most JavaScript implementations.
Possible values:
- 0 — Integers are output without quotes.
- 1 — Integers are enclosed in quotes.
Default value: 1.
## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals}

View File

@ -8,12 +8,11 @@ Columns:
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `name` ([String](../../sql-reference/data-types/string.md)) — Index name.
- `type` ([String](../../sql-reference/data-types/string.md)) — Index type.
- `expr` ([String](../../sql-reference/data-types/string.md)) — Expression used to calculate the index.
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of granules in the block.
- `expr` ([String](../../sql-reference/data-types/string.md)) — Expression for the index calculation.
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of granules in the block.
**Example**
```sql
SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical;
```

View File

@ -34,7 +34,7 @@ Input table:
Query:
``` sql
SELECT medianDeterministic(val, 1) FROM t
SELECT medianDeterministic(val, 1) FROM t;
```
Result:

View File

@ -12,9 +12,6 @@ toc_title: Map(key, value)
- `key` — The key part of the pair. [String](../../sql-reference/data-types/string.md) or [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — The value part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) or [Array](../../sql-reference/data-types/array.md).
!!! warning "Warning"
Currently `Map` data type is an experimental feature. To work with it you must set `allow_experimental_map_type = 1`.
To get the value from an `a Map('key', 'value')` column, use `a['key']` syntax. This lookup works now with a linear complexity.
**Examples**

View File

@ -195,6 +195,41 @@ Result:
└────────────────────┘
```
## h3ToGeo {#h3togeo}
Returns `(lon, lat)` that corresponds to the provided H3 index.
**Syntax**
``` sql
h3ToGeo(h3Index)
```
**Arguments**
- `h3Index` — H3 Index. Type: [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned values**
- `lon` — Longitude. Type: [Float64](../../../sql-reference/data-types/float.md).
- `lat` — Latitude. Type: [Float64](../../../sql-reference/data-types/float.md).
**Example**
Query:
``` sql
SELECT h3ToGeo(644325524701193974) coordinates;
```
Result:
``` text
┌─coordinates───────────────────────────┐
│ (37.79506616830252,55.71290243145668) │
└───────────────────────────────────────┘
```
## h3kRing {#h3kring}
Lists all the [H3](#h3index) hexagons in the raduis of `k` from the given hexagon in random order.

View File

@ -306,3 +306,49 @@ Result:
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
## toJSONString {#tojsonstring}
Serializes a value to its JSON representation. Various data types and nested structures are supported.
64-bit [integers](../../sql-reference/data-types/int-uint.md) or bigger (like `UInt64` or `Int128`) are enclosed in quotes by default. [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers) controls this behavior.
Special values `NaN` and `inf` are replaced with `null`. Enable [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals) setting to show them.
When serializing an [Enum](../../sql-reference/data-types/enum.md) value, the function outputs its name.
**Syntax**
``` sql
toJSONString(value)
```
**Arguments**
- `value` — Value to serialize. Value may be of any data type.
**Returned value**
- JSON representation of the value.
Type: [String](../../sql-reference/data-types/string.md).
**Example**
The first example shows serialization of a [Map](../../sql-reference/data-types/map.md).
The second example shows some special values wrapped into a [Tuple](../../sql-reference/data-types/tuple.md).
Query:
``` sql
SELECT toJSONString(map('key1', 1, 'key2', 2));
SELECT toJSONString(tuple(1.25, NULL, NaN, +inf, -inf, [])) SETTINGS output_format_json_quote_denormals = 1;
```
Result:
``` text
{"key1":1,"key2":2}
[1.25,null,"nan","inf","-inf",[]]
```
**See Also**
- [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers)
- [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals)

View File

@ -49,6 +49,7 @@ ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'passwor
| DATE, NEWDATE | [Date](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |
| DATETIME2, TIMESTAMP2 | [DateTime64](../../sql-reference/data-types/datetime64.md) |
| ENUM | [Enum](../../sql-reference/data-types/enum.md) |
| STRING | [String](../../sql-reference/data-types/string.md) |
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |

View File

@ -1204,8 +1204,15 @@ load_balancing = round_robin
Работает для форматов JSONEachRow и TSKV.
## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers}
Управляет кавычками при выводе 64-битных или более [целых чисел](../../sql-reference/data-types/int-uint.md) (например, `UInt64` или `Int128`) в формате [JSON](../../interfaces/formats.md#json).
По умолчанию такие числа заключаются в кавычки. Это поведение соответствует большинству реализаций JavaScript.
Если значение истинно, то при использовании JSON\* форматов UInt64 и Int64 числа выводятся в кавычках (из соображений совместимости с большинством реализаций JavaScript), иначе - без кавычек.
Возможные значения:
- 0 — числа выводятся без кавычек.
- 1 — числа выводятся в кавычках.
Значение по умолчанию: 1.
## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals}

View File

@ -0,0 +1,38 @@
# system.data_skipping_indices {#system-data-skipping-indices}
Содержит информацию о существующих индексах пропуска данных во всех таблицах.
Столбцы:
- `database` ([String](../../sql-reference/data-types/string.md)) — имя базы данных.
- `table` ([String](../../sql-reference/data-types/string.md)) — имя таблицы.
- `name` ([String](../../sql-reference/data-types/string.md)) — имя индекса.
- `type` ([String](../../sql-reference/data-types/string.md)) — тип индекса.
- `expr` ([String](../../sql-reference/data-types/string.md)) — выражение, используемое для вычисления индекса.
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — количество гранул в блоке данных.
**Пример**
```sql
SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical;
```
```text
Row 1:
──────
database: default
table: user_actions
name: clicks_idx
type: minmax
expr: clicks
granularity: 1
Row 2:
──────
database: default
table: users
name: contacts_null_idx
type: minmax
expr: assumeNotNull(contacts_null)
granularity: 1
```

View File

@ -4,7 +4,6 @@
Функции:
- `median` — синоним для [quantile](../../../sql-reference/aggregate-functions/reference/quantile.md#quantile).
- `medianDeterministic` — синоним для [quantileDeterministic](../../../sql-reference/aggregate-functions/reference/quantiledeterministic.md#quantiledeterministic).
- `medianExact` — синоним для [quantileExact](../../../sql-reference/aggregate-functions/reference/quantileexact.md#quantileexact).
@ -31,7 +30,7 @@
Запрос:
``` sql
SELECT medianDeterministic(val, 1) FROM t
SELECT medianDeterministic(val, 1) FROM t;
```
Результат:
@ -41,4 +40,3 @@ SELECT medianDeterministic(val, 1) FROM t
│ 1.5 │
└─────────────────────────────┘
```

View File

@ -12,9 +12,6 @@ toc_title: Map(key, value)
- `key` — ключ. [String](../../sql-reference/data-types/string.md) или [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — значение. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) или [Array](../../sql-reference/data-types/array.md).
!!! warning "Предупреждение"
Сейчас использование типа данных `Map` является экспериментальной возможностью. Чтобы использовать этот тип данных, включите настройку `allow_experimental_map_type = 1`.
Чтобы получить значение из колонки `a Map('key', 'value')`, используйте синтаксис `a['key']`. В настоящее время такая подстановка работает по алгоритму с линейной сложностью.
**Примеры**

View File

@ -306,3 +306,51 @@ SELECT JSONExtractKeysAndValuesRaw('{"a": [-100, 200.0], "b":{"c": {"d": "hello"
│ [('d','"hello"'),('f','"world"')] │
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
## toJSONString {#tojsonstring}
Сериализует значение в JSON представление. Поддерживаются различные типы данных и вложенные структуры.
По умолчанию 64-битные [целые числа](../../sql-reference/data-types/int-uint.md) и более (например, `UInt64` или `Int128`) заключаются в кавычки. Настройка [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers) управляет этим поведением.
Специальные значения `NaN` и `inf` заменяются на `null`. Чтобы они отображались, включите настройку [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals).
Когда сериализуется значение [Enum](../../sql-reference/data-types/enum.md), то функция выводит его имя.
**Синтаксис**
``` sql
toJSONString(value)
```
**Аргументы**
- `value` — значение, которое необходимо сериализовать. Может быть любого типа.
**Возвращаемое значение**
- JSON представление значения.
Тип: [String](../../sql-reference/data-types/string.md).
**Пример**
Первый пример показывает сериализацию [Map](../../sql-reference/data-types/map.md).
Во втором примере есть специальные значения, обернутые в [Tuple](../../sql-reference/data-types/tuple.md).
Запрос:
``` sql
SELECT toJSONString(map('key1', 1, 'key2', 2));
SELECT toJSONString(tuple(1.25, NULL, NaN, +inf, -inf, [])) SETTINGS output_format_json_quote_denormals = 1;
```
Результат:
``` text
{"key1":1,"key2":2}
[1.25,null,"nan","inf","-inf",[]]
```
**Смотрите также**
- [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers)
- [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals)

View File

@ -1,13 +1,8 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 42
toc_title: mysql
---
# mysql {#mysql}
允许 `SELECT` 要对存储在远程MySQL服务器上的数据执行的查询。
允许对存储在远程MySQL服务器上的数据执行`SELECT`和`INSERT`查询。
**语法**
``` sql
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
@ -15,31 +10,44 @@ mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
**参数**
- `host:port` — MySQL server address.
- `host:port` — MySQL服务器地址.
- `database`Remote database name.
- `database`远程数据库名称.
- `table`Remote table name.
- `table`远程表名称.
- `user` — MySQL user.
- `user` — MySQL用户.
- `password`User password.
- `password`用户密码.
- `replace_query`Flag that converts `INSERT INTO` 查询到 `REPLACE INTO`. 如果 `replace_query=1`,查询被替换。
- `replace_query`将INSERT INTO` 查询转换为 `REPLACE INTO`的标志。如果 `replace_query=1`,查询被替换。
- `on_duplicate_clause` — The `ON DUPLICATE KEY on_duplicate_clause` 表达式被添加`INSERT` 查询。
- `on_duplicate_clause` — 添加 `ON DUPLICATE KEY on_duplicate_clause` 表达式到 `INSERT` 查询。明确规定只能使用 `replace_query = 0` 如果你同时设置replace_query = 1`和`on_duplicate_clause`ClickHouse将产生异常。
Example: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, where `on_duplicate_clause` is `UPDATE c2 = c2 + 1`. See the MySQL documentation to find which `on_duplicate_clause` you can use with the `ON DUPLICATE KEY` clause.
示例:`INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`
To specify `on_duplicate_clause` you need to pass `0` to the `replace_query` parameter. If you simultaneously pass `replace_query = 1` and `on_duplicate_clause`, ClickHouse generates an exception.
`on_duplicate_clause`这里是`UPDATE c2 = c2 + 1`。请查阅MySQL文档来找到可以和`ON DUPLICATE KEY`一起使用的 `on_duplicate_clause`子句。
简单 `WHERE` 条款如 `=, !=, >, >=, <, <=` 当前在MySQL服务器上执行
简单`WHERE` 子句如 `=, !=, >, >=, <, <=` 将即时在MySQL服务器上执行。其余的条件和 `LIMIT` 只有在对MySQL的查询完成后才会在ClickHouse中执行采样约束
其余的条件和 `LIMIT` 只有在对MySQL的查询完成后才会在ClickHouse中执行采样约束。
支持使用`|`并列进行多副本查询,示例如下:
```sql
SELECT name FROM mysql(`mysql{1|2|3}:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
```
```sql
SELECT name FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
```
**返回值**
与原始MySQL表具有相同列的table对象。
与原始MySQL表具有相同列的表对象。
!!! note "注意"
在`INSERT`查询中为了区分`mysql(...)`与带有列名列表的表名的表函数,你必须使用关键字`FUNCTION`或`TABLE FUNCTION`。查看如下示例。
## 用法示例 {#usage-example}
@ -66,7 +74,7 @@ mysql> select * from test;
1 row in set (0,00 sec)
```
从ClickHouse中选择数据:
从ClickHouse中查询数据:
``` sql
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123')
@ -78,6 +86,21 @@ SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123')
└────────┴──────────────┴───────┴────────────────┘
```
替换和插入:
```sql
INSERT INTO FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 1) (int_id, float) VALUES (1, 3);
INSERT INTO TABLE FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 0, 'UPDATE int_id = int_id + 1') (int_id, float) VALUES (1, 4);
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123');
```
```text
┌─int_id─┬─float─┐
│ 1 │ 3 │
│ 2 │ 4 │
└────────┴───────┘
```
## 另请参阅 {#see-also}
- [MySQL 表引擎](../../engines/table-engines/integrations/mysql.md)

View File

@ -477,17 +477,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
if (ThreadFuzzer::instance().isEffective())
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
LOG_WARNING(log, "Server was built in debug mode. It will work slowly.");
#endif
#if defined(SANITIZER)
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
#endif
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases, ...
*/
@ -497,6 +486,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::SERVER);
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
global_context->addWarningMessage("Server was built in debug mode. It will work slowly.");
#endif
if (ThreadFuzzer::instance().isEffective())
global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable.");
#if defined(SANITIZER)
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
@ -552,8 +553,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (ptrace(PTRACE_TRACEME, 0, nullptr, nullptr) == -1)
{
/// Program is run under debugger. Modification of it's binary image is ok for breakpoints.
LOG_WARNING(log, "Server is run under debugger and its binary image is modified (most likely with breakpoints).",
calculated_binary_hash);
global_context->addWarningMessage(
fmt::format("Server is run under debugger and its binary image is modified (most likely with breakpoints).",
calculated_binary_hash)
);
}
else
{
@ -636,7 +639,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
else
{
LOG_WARNING(log, message);
global_context->addWarningMessage(message);
}
}

View File

@ -173,6 +173,7 @@ enum class AccessType
M(MONGO, "", GLOBAL, SOURCES) \
M(MYSQL, "", GLOBAL, SOURCES) \
M(POSTGRES, "", GLOBAL, SOURCES) \
M(SQLITE, "", GLOBAL, SOURCES) \
M(ODBC, "", GLOBAL, SOURCES) \
M(JDBC, "", GLOBAL, SOURCES) \
M(HDFS, "", GLOBAL, SOURCES) \

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionSequenceMatch.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <common/range.h>

View File

@ -101,6 +101,24 @@ struct AggregateFunctionSumData
{
const auto * end = ptr + count;
if constexpr (
(is_integer_v<T> && !is_big_int_v<T>)
|| (IsDecimalNumber<T> && !std::is_same_v<T, Decimal256> && !std::is_same_v<T, Decimal128>))
{
/// For integers we can vectorize the operation if we replace the null check using a multiplication (by 0 for null, 1 for not null)
/// https://quick-bench.com/q/MLTnfTvwC2qZFVeWHfOBR3U7a8I
T local_sum{};
while (ptr < end)
{
T multiplier = !*null_map;
Impl::add(local_sum, *ptr * multiplier);
++ptr;
++null_map;
}
Impl::add(sum, local_sum);
return;
}
if constexpr (std::is_floating_point_v<T>)
{
constexpr size_t unroll_count = 128 / sizeof(T);

View File

@ -4,6 +4,7 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
@ -49,6 +50,8 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data>>(argument_types);
else if (which.isStringOrFixedString())
@ -95,6 +98,8 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType>>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType>>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>(argument_types);
else if (which.isStringOrFixedString())

View File

@ -6,6 +6,7 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <functional>
@ -51,6 +52,8 @@ namespace
return res;
else if (which.isDate())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDate::FieldType>>(argument_types, params);
else if (which.isDate32())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDate32::FieldType>>(argument_types, params);
else if (which.isDateTime())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDateTime::FieldType>>(argument_types, params);
else if (which.isStringOrFixedString())

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionUniqUpTo.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
@ -61,6 +62,8 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDate::FieldType>>(threshold, argument_types, params);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDate32::FieldType>>(threshold, argument_types, params);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDateTime::FieldType>>(threshold, argument_types, params);
else if (which.isStringOrFixedString())

View File

@ -4,6 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <common/range.h>

View File

@ -76,6 +76,10 @@ add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/S3)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
if (USE_SQLITE)
add_headers_and_sources(dbms Databases/SQLite)
endif()
if(USE_RDKAFKA)
add_headers_and_sources(dbms Storages/Kafka)
endif()
@ -415,6 +419,11 @@ if (USE_AWS_S3)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${AWS_S3_INCLUDE_DIR})
endif()
if (USE_S2_GEOMETRY)
dbms_target_link_libraries (PUBLIC ${S2_GEOMETRY_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${S2_GEOMETRY_INCLUDE_DIR})
endif()
if (USE_BROTLI)
target_link_libraries (clickhouse_common_io PRIVATE ${BROTLI_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BROTLI_INCLUDE_DIR})
@ -425,6 +434,10 @@ if (USE_AMQPCPP)
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${AMQPCPP_INCLUDE_DIR})
endif()
if (USE_SQLITE)
dbms_target_link_libraries(PUBLIC sqlite)
endif()
if (USE_CASSANDRA)
dbms_target_link_libraries(PUBLIC ${CASSANDRA_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})

View File

@ -558,6 +558,9 @@
M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
M(590, CANNOT_SYSCONF) \
M(591, SQLITE_ENGINE_ERROR) \
M(592, DATA_ENCRYPTION_ERROR) \
M(593, ZERO_COPY_REPLICATION_ERROR) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -22,10 +22,6 @@
M(WriteBufferFromFileDescriptorWrite, "Number of writes (write/pwrite) to a file descriptor. Does not include sockets.") \
M(WriteBufferFromFileDescriptorWriteFailed, "Number of times the write (write/pwrite) to a file descriptor have failed.") \
M(WriteBufferFromFileDescriptorWriteBytes, "Number of bytes written to file descriptors. If the file is compressed, this will show compressed data size.") \
M(ReadBufferAIORead, "") \
M(ReadBufferAIOReadBytes, "") \
M(WriteBufferAIOWrite, "") \
M(WriteBufferAIOWriteBytes, "") \
M(ReadCompressedBytes, "Number of bytes (the number of bytes before decompression) read from compressed sources (files, network).") \
M(CompressedReadBufferBlocks, "Number of compressed blocks (the blocks of data that are compressed independent of each other) read from compressed sources (files, network).") \
M(CompressedReadBufferBytes, "Number of uncompressed bytes (the number of bytes after decompression) read from compressed sources (files, network).") \
@ -34,6 +30,10 @@
M(UncompressedCacheWeightLost, "") \
M(MMappedFileCacheHits, "") \
M(MMappedFileCacheMisses, "") \
M(AIOWrite, "Number of writes with Linux or FreeBSD AIO interface") \
M(AIOWriteBytes, "Number of bytes written with Linux or FreeBSD AIO interface") \
M(AIORead, "Number of reads with Linux or FreeBSD AIO interface") \
M(AIOReadBytes, "Number of bytes read with Linux or FreeBSD AIO interface") \
M(IOBufferAllocs, "") \
M(IOBufferAllocBytes, "") \
M(ArenaAllocChunks, "") \
@ -43,8 +43,8 @@
M(MarkCacheHits, "") \
M(MarkCacheMisses, "") \
M(CreatedReadBufferOrdinary, "") \
M(CreatedReadBufferAIO, "") \
M(CreatedReadBufferAIOFailed, "") \
M(CreatedReadBufferDirectIO, "") \
M(CreatedReadBufferDirectIOFailed, "") \
M(CreatedReadBufferMMap, "") \
M(CreatedReadBufferMMapFailed, "") \
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \

View File

@ -47,13 +47,13 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache,
size_t buf_size,
bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, mmap_cache, buf_size))
, p_file_in(createReadBufferFromFileBase(path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;

View File

@ -33,7 +33,7 @@ public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache,
const std::string & path, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -62,6 +62,8 @@ void ExternalResultDescription::init(const Block & sample_block_)
types.emplace_back(ValueType::vtString, is_nullable);
else if (which.isDate())
types.emplace_back(ValueType::vtDate, is_nullable);
else if (which.isDate32())
types.emplace_back(ValueType::vtDate32, is_nullable);
else if (which.isDateTime())
types.emplace_back(ValueType::vtDateTime, is_nullable);
else if (which.isUUID())

View File

@ -26,6 +26,7 @@ struct ExternalResultDescription
vtEnum16,
vtString,
vtDate,
vtDate32,
vtDateTime,
vtUUID,
vtDateTime64,

View File

@ -26,13 +26,14 @@ namespace ErrorCodes
MySQLClient::MySQLClient(const String & host_, UInt16 port_, const String & user_, const String & password_)
: host(host_), port(port_), user(user_), password(std::move(password_))
{
client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
mysql_context.client_capabilities = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
}
MySQLClient::MySQLClient(MySQLClient && other)
: host(std::move(other.host)), port(other.port), user(std::move(other.user)), password(std::move(other.password))
, client_capability_flags(other.client_capability_flags)
, mysql_context(other.mysql_context)
{
mysql_context.sequence_id = 0;
}
void MySQLClient::connect()
@ -56,7 +57,7 @@ void MySQLClient::connect()
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
packet_endpoint = std::make_shared<PacketEndpoint>(*in, *out, seq);
packet_endpoint = mysql_context.makeEndpoint(*in, *out);
handshake();
}
@ -68,7 +69,7 @@ void MySQLClient::disconnect()
socket->close();
socket = nullptr;
connected = false;
seq = 0;
mysql_context.sequence_id = 0;
}
/// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html
@ -87,10 +88,10 @@ void MySQLClient::handshake()
String auth_plugin_data = native41.getAuthPluginData();
HandshakeResponse handshake_response(
client_capability_flags, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password);
mysql_context.client_capabilities, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response, true);
ResponsePacket packet_response(client_capability_flags, true);
ResponsePacket packet_response(mysql_context.client_capabilities, true);
packet_endpoint->receivePacket(packet_response);
packet_endpoint->resetSequenceId();
@ -105,7 +106,7 @@ void MySQLClient::writeCommand(char command, String query)
WriteCommand write_command(command, query);
packet_endpoint->sendPacket<WriteCommand>(write_command, true);
ResponsePacket packet_response(client_capability_flags);
ResponsePacket packet_response(mysql_context.client_capabilities);
packet_endpoint->receivePacket(packet_response);
switch (packet_response.getType())
{
@ -124,7 +125,7 @@ void MySQLClient::registerSlaveOnMaster(UInt32 slave_id)
RegisterSlave register_slave(slave_id);
packet_endpoint->sendPacket<RegisterSlave>(register_slave, true);
ResponsePacket packet_response(client_capability_flags);
ResponsePacket packet_response(mysql_context.client_capabilities);
packet_endpoint->receivePacket(packet_response);
packet_endpoint->resetSequenceId();
if (packet_response.getType() == PACKET_ERR)

View File

@ -45,9 +45,7 @@ private:
String password;
bool connected = false;
UInt32 client_capability_flags = 0;
uint8_t seq = 0;
MySQLWireContext mysql_context;
const UInt8 charset_utf8 = 33;
const String mysql_native_password = "mysql_native_password";

View File

@ -68,4 +68,15 @@ String PacketEndpoint::packetToText(const String & payload)
}
MySQLProtocol::PacketEndpointPtr MySQLWireContext::makeEndpoint(WriteBuffer & out)
{
return MySQLProtocol::PacketEndpoint::create(out, sequence_id);
}
MySQLProtocol::PacketEndpointPtr MySQLWireContext::makeEndpoint(ReadBuffer & in, WriteBuffer & out)
{
return MySQLProtocol::PacketEndpoint::create(in, out, sequence_id);
}
}

View File

@ -5,6 +5,7 @@
#include "IMySQLReadPacket.h"
#include "IMySQLWritePacket.h"
#include "IO/MySQLPacketPayloadReadBuffer.h"
#include <common/shared_ptr_helper.h>
namespace DB
{
@ -15,19 +16,13 @@ namespace MySQLProtocol
/* Writes and reads packets, keeping sequence-id.
* Throws ProtocolError, if packet with incorrect sequence-id was received.
*/
class PacketEndpoint
class PacketEndpoint : public shared_ptr_helper<PacketEndpoint>
{
public:
uint8_t & sequence_id;
ReadBuffer * in;
WriteBuffer * out;
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
MySQLPacketPayloadReadBuffer getPayload();
void receivePacket(IMySQLReadPacket & packet);
@ -48,8 +43,29 @@ public:
/// Converts packet to text. Is used for debug output.
static String packetToText(const String & payload);
protected:
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
friend struct shared_ptr_helper<PacketEndpoint>;
};
using PacketEndpointPtr = std::shared_ptr<PacketEndpoint>;
}
struct MySQLWireContext
{
uint8_t sequence_id = 0;
uint32_t client_capabilities = 0;
size_t max_packet_size = 0;
MySQLProtocol::PacketEndpointPtr makeEndpoint(WriteBuffer & out);
MySQLProtocol::PacketEndpointPtr makeEndpoint(ReadBuffer & in, WriteBuffer & out);
};
}
}

View File

@ -1,4 +1,7 @@
#include "Connection.h"
#if USE_LIBPQXX
#include <common/logger_useful.h>
namespace postgres
@ -72,3 +75,5 @@ void Connection::connect()
updateConnection();
}
}
#endif

View File

@ -1,5 +1,11 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
@ -45,3 +51,5 @@ private:
Poco::Logger * log;
};
}
#endif

View File

@ -1,5 +1,11 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include <common/BorrowedObjectPool.h>
@ -35,3 +41,5 @@ private:
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;
}
#endif

View File

@ -1,4 +1,7 @@
#include "PoolWithFailover.h"
#if USE_LIBPQXX
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
@ -136,3 +139,5 @@ ConnectionHolderPtr PoolWithFailover::get()
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
}
}
#endif

View File

@ -1,5 +1,12 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include "ConnectionHolder.h"
#include <mutex>
#include <Poco/Util/AbstractConfiguration.h>
@ -63,3 +70,5 @@ private:
using PoolWithFailoverPtr = std::shared_ptr<PoolWithFailover>;
}
#endif

View File

@ -1,4 +1,7 @@
#include "Utils.h"
#if USE_LIBPQXX
#include <IO/Operators.h>
namespace postgres
@ -17,3 +20,5 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
}
}
#endif

View File

@ -1,5 +1,11 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include "Connection.h"
@ -15,3 +21,5 @@ namespace postgres
{
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
}
#endif

View File

@ -89,6 +89,9 @@ void insertPostgreSQLValue(
case ExternalResultDescription::ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ExternalResultDescription::ValueType::vtDate32:
assert_cast<ColumnInt32 &>(column).insertValue(Int32{LocalDate{std::string(value)}.getExtenedDayNum()});
break;
case ExternalResultDescription::ValueType::vtDateTime:
{
ReadBufferFromString in(value);

View File

@ -39,6 +39,7 @@ enum class TypeIndex
Float32,
Float64,
Date,
Date32,
DateTime,
DateTime64,
String,
@ -257,6 +258,7 @@ inline constexpr const char * getTypeName(TypeIndex idx)
case TypeIndex::Float32: return "Float32";
case TypeIndex::Float64: return "Float64";
case TypeIndex::Date: return "Date";
case TypeIndex::Date32: return "Date32";
case TypeIndex::DateTime: return "DateTime";
case TypeIndex::DateTime64: return "DateTime64";
case TypeIndex::String: return "String";

View File

@ -192,6 +192,7 @@ bool callOnIndexAndDataType(TypeIndex number, F && f, ExtraArgs && ... args)
case TypeIndex::Decimal256: return f(TypePair<DataTypeDecimal<Decimal256>, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::Date: return f(TypePair<DataTypeDate, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::Date32: return f(TypePair<DataTypeDate, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::DateTime: return f(TypePair<DataTypeDateTime, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::DateTime64: return f(TypePair<DataTypeDateTime64, T>(), std::forward<ExtraArgs>(args)...);

View File

@ -13,5 +13,6 @@
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ROCKSDB
#cmakedefine01 USE_LIBPQXX
#cmakedefine01 USE_SQLITE
#cmakedefine01 USE_NURAFT
#cmakedefine01 USE_KRB5

View File

@ -31,6 +31,10 @@ SRCS(
MySQL/PacketsProtocolText.cpp
MySQL/PacketsReplication.cpp
NamesAndTypes.cpp
PostgreSQL/Connection.cpp
PostgreSQL/PoolWithFailover.cpp
PostgreSQL/Utils.cpp
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp
QueryProcessingStage.cpp
Settings.cpp

View File

@ -0,0 +1,163 @@
#include "SQLiteBlockInputStream.h"
#if USE_SQLITE
#include <common/range.h>
#include <common/logger_useful.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
}
SQLiteBlockInputStream::SQLiteBlockInputStream(
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, sqlite_db(std::move(sqlite_db_))
{
description.init(sample_block);
}
void SQLiteBlockInputStream::readPrefix()
{
sqlite3_stmt * compiled_stmt = nullptr;
int status = sqlite3_prepare_v2(sqlite_db.get(), query_str.c_str(), query_str.size() + 1, &compiled_stmt, nullptr);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot prepate sqlite statement. Status: {}. Message: {}",
status, sqlite3_errstr(status));
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
}
Block SQLiteBlockInputStream::readImpl()
{
if (!compiled_statement)
return Block();
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
while (true)
{
int status = sqlite3_step(compiled_statement.get());
if (status == SQLITE_BUSY)
{
continue;
}
else if (status == SQLITE_DONE)
{
compiled_statement.reset();
break;
}
else if (status != SQLITE_ROW)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
}
int column_count = sqlite3_column_count(compiled_statement.get());
for (const auto idx : collections::range(0, column_count))
{
const auto & sample = description.sample_block.getByPosition(idx);
if (sqlite3_column_type(compiled_statement.get(), idx) == SQLITE_NULL)
{
insertDefaultSQLiteValue(*columns[idx], *sample.column);
continue;
}
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], description.types[idx].first, idx);
}
}
if (++num_rows == max_block_size)
break;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
void SQLiteBlockInputStream::readSuffix()
{
if (compiled_statement)
compiled_statement.reset();
}
void SQLiteBlockInputStream::insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtUInt64:
/// There is no uint64 in sqlite3, only int and int64
assert_cast<ColumnUInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
break;
default:
const char * data = reinterpret_cast<const char *>(sqlite3_column_text(compiled_statement.get(), idx));
int len = sqlite3_column_bytes(compiled_statement.get(), idx);
assert_cast<ColumnString &>(column).insertData(data, len);
break;
}
}
}
#endif

View File

@ -0,0 +1,62 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Core/ExternalResultDescription.h>
#include <DataStreams/IBlockInputStream.h>
#include <sqlite3.h> // Y_IGNORE
namespace DB
{
class SQLiteBlockInputStream : public IBlockInputStream
{
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
SQLiteBlockInputStream(SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
String getName() const override { return "SQLite"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
void insertDefaultSQLiteValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
using ValueType = ExternalResultDescription::ValueType;
struct StatementDeleter
{
void operator()(sqlite3_stmt * stmt) { sqlite3_finalize(stmt); }
};
void readPrefix() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx);
String query_str;
UInt64 max_block_size;
ExternalResultDescription description;
SQLitePtr sqlite_db;
std::unique_ptr<sqlite3_stmt, StatementDeleter> compiled_statement;
};
}
#endif

View File

@ -41,6 +41,7 @@ SRCS(
RemoteBlockOutputStream.cpp
RemoteQueryExecutor.cpp
RemoteQueryExecutorReadContext.cpp
SQLiteBlockInputStream.cpp
SizeLimits.cpp
SquashingBlockInputStream.cpp
SquashingBlockOutputStream.cpp

View File

@ -0,0 +1,23 @@
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationDate32.h>
namespace DB
{
bool DataTypeDate32::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this);
}
SerializationPtr DataTypeDate32::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDate32>();
}
void registerDataTypeDate32(DataTypeFactory & factory)
{
factory.registerSimpleDataType(
"Date32", [] { return DataTypePtr(std::make_shared<DataTypeDate32>()); }, DataTypeFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <DataTypes/DataTypeNumberBase.h>
namespace DB
{
class DataTypeDate32 final : public DataTypeNumberBase<Int32>
{
public:
static constexpr auto family_name = "Date32";
TypeIndex getTypeId() const override { return TypeIndex::Date32; }
const char * getFamilyName() const override { return family_name; }
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }
bool equals(const IDataType & rhs) const override;
protected:
SerializationPtr doGetDefaultSerialization() const override;
};
}

View File

@ -194,6 +194,7 @@ DataTypeFactory::DataTypeFactory()
registerDataTypeNumbers(*this);
registerDataTypeDecimal(*this);
registerDataTypeDate(*this);
registerDataTypeDate32(*this);
registerDataTypeDateTime(*this);
registerDataTypeString(*this);
registerDataTypeFixedString(*this);

View File

@ -69,6 +69,7 @@ private:
void registerDataTypeNumbers(DataTypeFactory & factory);
void registerDataTypeDecimal(DataTypeFactory & factory);
void registerDataTypeDate(DataTypeFactory & factory);
void registerDataTypeDate32(DataTypeFactory & factory);
void registerDataTypeDateTime(DataTypeFactory & factory);
void registerDataTypeString(DataTypeFactory & factory);
void registerDataTypeFixedString(DataTypeFactory & factory);

View File

@ -78,6 +78,8 @@ MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDat
return creator(static_cast<ColumnFixedString *>(nullptr));
else if (which.isDate())
return creator(static_cast<ColumnVector<UInt16> *>(nullptr));
else if (which.isDate32())
return creator(static_cast<ColumnVector<Int32> *>(nullptr));
else if (which.isDateTime())
return creator(static_cast<ColumnVector<UInt32> *>(nullptr));
else if (which.isUUID())

View File

@ -42,11 +42,23 @@ public:
return it;
}
/// throws exception if value is not valid
const StringRef & getNameForValue(const T & value) const
{
return findByValue(value)->second;
}
/// returns false if value is not valid
bool getNameForValue(const T & value, StringRef & result) const
{
const auto it = value_to_name_map.find(value);
if (it == std::end(value_to_name_map))
return false;
result = it->second;
return true;
}
T getValue(StringRef field_name, bool try_treat_as_id = false) const;
template <typename TValues>

View File

@ -322,8 +322,10 @@ struct WhichDataType
constexpr bool isEnum() const { return isEnum8() || isEnum16(); }
constexpr bool isDate() const { return idx == TypeIndex::Date; }
constexpr bool isDate32() const { return idx == TypeIndex::Date32; }
constexpr bool isDateTime() const { return idx == TypeIndex::DateTime; }
constexpr bool isDateTime64() const { return idx == TypeIndex::DateTime64; }
constexpr bool isDateOrDate32() const { return isDate() || isDate32(); }
constexpr bool isString() const { return idx == TypeIndex::String; }
constexpr bool isFixedString() const { return idx == TypeIndex::FixedString; }
@ -347,6 +349,10 @@ struct WhichDataType
template <typename T>
inline bool isDate(const T & data_type) { return WhichDataType(data_type).isDate(); }
template <typename T>
inline bool isDate32(const T & data_type) { return WhichDataType(data_type).isDate32(); }
template <typename T>
inline bool isDateOrDate32(const T & data_type) { return WhichDataType(data_type).isDateOrDate32(); }
template <typename T>
inline bool isDateTime(const T & data_type) { return WhichDataType(data_type).isDateTime(); }
template <typename T>
inline bool isDateTime64(const T & data_type) { return WhichDataType(data_type).isDateTime64(); }

View File

@ -29,7 +29,7 @@ namespace ErrorCodes
static inline bool typeIsSigned(const IDataType & type)
{
WhichDataType data_type(type);
return data_type.isNativeInt() || data_type.isFloat();
return data_type.isNativeInt() || data_type.isFloat() || data_type.isEnum();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const IDataType & type)
@ -57,6 +57,10 @@ static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const IDa
return builder.getFloatTy();
else if (data_type.isFloat64())
return builder.getDoubleTy();
else if (data_type.isEnum8())
return builder.getInt8Ty();
else if (data_type.isEnum16())
return builder.getInt16Ty();
return nullptr;
}
@ -109,7 +113,7 @@ static inline bool canBeNativeType(const IDataType & type)
return canBeNativeType(*data_type_nullable.getNestedType());
}
return data_type.isNativeInt() || data_type.isNativeUInt() || data_type.isFloat() || data_type.isDate();
return data_type.isNativeInt() || data_type.isNativeUInt() || data_type.isFloat() || data_type.isDate() || data_type.isEnum();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const DataTypePtr & type)
@ -266,7 +270,7 @@ static inline llvm::Constant * getColumnNativeValue(llvm::IRBuilderBase & builde
{
return llvm::ConstantInt::get(type, column.getUInt(index));
}
else if (column_data_type.isNativeInt())
else if (column_data_type.isNativeInt() || column_data_type.isEnum())
{
return llvm::ConstantInt::get(type, column.getInt(index));
}

View File

@ -0,0 +1,78 @@
#include <DataTypes/Serializations/SerializationDate32.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
namespace DB
{
void SerializationDate32::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const
{
writeDateText(ExtendedDayNum(assert_cast<const ColumnInt32 &>(column).getData()[row_num]), ostr);
}
void SerializationDate32::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextEscaped(column, istr, settings);
}
void SerializationDate32::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
readDateText(x, istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeText(column, row_num, ostr, settings);
}
void SerializationDate32::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('\'', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('\'', ostr);
}
void SerializationDate32::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
assertChar('\'', istr);
readDateText(x, istr);
assertChar('\'', istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x); /// It's important to do this at the end - for exception safety.
}
void SerializationDate32::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('"', ostr);
}
void SerializationDate32::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
assertChar('"', istr);
readDateText(x, istr);
assertChar('"', istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('"', ostr);
}
void SerializationDate32::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
LocalDate value;
readCSV(value, istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(value.getExtenedDayNum());
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <DataTypes/Serializations/SerializationNumber.h>
namespace DB
{
class SerializationDate32 final : public SerializationNumber<Int32>
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
};
}

View File

@ -16,6 +16,7 @@ SRCS(
DataTypeCustomIPv4AndIPv6.cpp
DataTypeCustomSimpleAggregateFunction.cpp
DataTypeDate.cpp
DataTypeDate32.cpp
DataTypeDateTime.cpp
DataTypeDateTime64.cpp
DataTypeDecimalBase.cpp
@ -45,6 +46,7 @@ SRCS(
Serializations/SerializationArray.cpp
Serializations/SerializationCustomSimpleText.cpp
Serializations/SerializationDate.cpp
Serializations/SerializationDate32.cpp
Serializations/SerializationDateTime.cpp
Serializations/SerializationDateTime64.cpp
Serializations/SerializationDecimal.cpp

View File

@ -1,17 +1,17 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
#include <filesystem>
@ -40,6 +40,10 @@
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#endif
#if USE_SQLITE
#include <Databases/SQLite/DatabaseSQLite.h>
#endif
namespace fs = std::filesystem;
namespace DB
@ -100,7 +104,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" ||
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL";
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
@ -299,6 +303,22 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
}
#endif
#if USE_SQLITE
else if (engine_name == "SQLite")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception("SQLite database requires 1 argument: database path", ErrorCodes::BAD_ARGUMENTS);
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(context, engine_define, database_path);
}
#endif
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);

View File

@ -0,0 +1,224 @@
#include "DatabaseSQLite.h"
#if USE_SQLITE
#include <common/logger_useful.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Interpreters/Context.h>
#include <Storages/StorageSQLite.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
extern const int UNKNOWN_TABLE;
}
DatabaseSQLite::DatabaseSQLite(
ContextPtr context_,
const ASTStorage * database_engine_define_,
const String & database_path_)
: IDatabase("SQLite")
, WithContext(context_->getGlobalContext())
, database_engine_define(database_engine_define_->clone())
, log(&Poco::Logger::get("DatabaseSQLite"))
{
sqlite3 * tmp_sqlite_db = nullptr;
int status = sqlite3_open(database_path_.c_str(), &tmp_sqlite_db);
if (status != SQLITE_OK)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot access sqlite database. Error status: {}. Message: {}",
status, sqlite3_errstr(status));
}
sqlite_db = std::shared_ptr<sqlite3>(tmp_sqlite_db, sqlite3_close);
}
bool DatabaseSQLite::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
return fetchTablesList().empty();
}
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &)
{
std::lock_guard<std::mutex> lock(mutex);
Tables tables;
auto table_names = fetchTablesList();
for (const auto & table_name : table_names)
tables[table_name] = fetchTable(table_name, local_context, true);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
}
std::unordered_set<std::string> DatabaseSQLite::fetchTablesList() const
{
std::unordered_set<String> tables;
std::string query = "SELECT name FROM sqlite_master "
"WHERE type = 'table' AND name NOT LIKE 'sqlite_%'";
auto callback_get_data = [](void * res, int col_num, char ** data_by_col, char ** /* col_names */) -> int
{
for (int i = 0; i < col_num; ++i)
static_cast<std::unordered_set<std::string> *>(res)->insert(data_by_col[i]);
return 0;
};
char * err_message = nullptr;
int status = sqlite3_exec(sqlite_db.get(), query.c_str(), callback_get_data, &tables, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot fetch sqlite database tables. Error status: {}. Message: {}",
status, err_msg);
}
return tables;
}
bool DatabaseSQLite::checkSQLiteTable(const String & table_name) const
{
const String query = fmt::format("SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';", table_name);
auto callback_get_data = [](void * res, int, char **, char **) -> int
{
*(static_cast<int *>(res)) += 1;
return 0;
};
int count = 0;
char * err_message = nullptr;
int status = sqlite3_exec(sqlite_db.get(), query.c_str(), callback_get_data, &count, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot check sqlite table. Error status: {}. Message: {}",
status, err_msg);
}
return (count != 0);
}
bool DatabaseSQLite::isTableExist(const String & table_name, ContextPtr) const
{
std::lock_guard<std::mutex> lock(mutex);
return checkSQLiteTable(table_name);
}
StoragePtr DatabaseSQLite::tryGetTable(const String & table_name, ContextPtr local_context) const
{
std::lock_guard<std::mutex> lock(mutex);
return fetchTable(table_name, local_context, false);
}
StoragePtr DatabaseSQLite::fetchTable(const String & table_name, ContextPtr local_context, bool table_checked) const
{
if (!table_checked && !checkSQLiteTable(table_name))
return StoragePtr{};
auto columns = fetchSQLiteTableStructure(sqlite_db.get(), table_name);
if (!columns)
return StoragePtr{};
auto storage = StorageSQLite::create(
StorageID(database_name, table_name),
sqlite_db,
table_name,
ColumnsDescription{*columns},
ConstraintsDescription{},
local_context);
return storage;
}
ASTPtr DatabaseSQLite::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = getDatabaseName();
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
if (!storage)
{
if (throw_on_error)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "SQLite table {}.{} does not exist",
database_name, table_name);
return nullptr;
}
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// init create query.
auto table_id = storage->getStorageID();
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = getColumnDeclaration(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 1, std::make_shared<ASTLiteral>(table_id.table_name));
return create_table_query;
}
ASTPtr DatabaseSQLite::getColumnDeclaration(const DataTypePtr & data_type) const
{
WhichDataType which(data_type);
if (which.isNullable())
return makeASTFunction("Nullable", getColumnDeclaration(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
return std::make_shared<ASTIdentifier>(data_type->getName());
}
}
#endif

View File

@ -0,0 +1,65 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Core/Names.h>
#include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h>
#include <sqlite3.h> // Y_IGNORE
namespace DB
{
class DatabaseSQLite final : public IDatabase, protected WithContext
{
public:
using SQLitePtr = std::shared_ptr<sqlite3>;
DatabaseSQLite(ContextPtr context_, const ASTStorage * database_engine_define_, const String & database_path_);
String getEngineName() const override { return "SQLite"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
bool empty() const override;
ASTPtr getCreateDatabaseQuery() const override;
void shutdown() override {}
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:
ASTPtr database_engine_define;
SQLitePtr sqlite_db;
Poco::Logger * log;
bool checkSQLiteTable(const String & table_name) const;
NameSet fetchTablesList() const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
};
}
#endif

View File

@ -0,0 +1,104 @@
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#if USE_SQLITE
#include <Common/quoteString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Poco/String.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
}
static DataTypePtr convertSQLiteDataType(String type)
{
DataTypePtr res;
type = Poco::toLower(type);
if (type == "tinyint")
res = std::make_shared<DataTypeInt8>();
else if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type.starts_with("int") || type == "mediumint")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
res = std::make_shared<DataTypeInt64>();
else if (type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (type.starts_with("double") || type == "real")
res = std::make_shared<DataTypeFloat64>();
else
res = std::make_shared<DataTypeString>(); // No decimal when fetching data through API
return res;
}
std::shared_ptr<NamesAndTypesList> fetchSQLiteTableStructure(sqlite3 * connection, const String & sqlite_table_name)
{
auto columns = NamesAndTypesList();
auto query = fmt::format("pragma table_info({});", quoteString(sqlite_table_name));
auto callback_get_data = [](void * res, int col_num, char ** data_by_col, char ** col_names) -> int
{
NameAndTypePair name_and_type;
bool is_nullable = false;
for (int i = 0; i < col_num; ++i)
{
if (strcmp(col_names[i], "name") == 0)
{
name_and_type.name = data_by_col[i];
}
else if (strcmp(col_names[i], "type") == 0)
{
name_and_type.type = convertSQLiteDataType(data_by_col[i]);
}
else if (strcmp(col_names[i], "notnull") == 0)
{
is_nullable = (data_by_col[i][0] == '0');
}
}
if (is_nullable)
name_and_type.type = std::make_shared<DataTypeNullable>(name_and_type.type);
static_cast<NamesAndTypesList *>(res)->push_back(name_and_type);
return 0;
};
char * err_message = nullptr;
int status = sqlite3_exec(connection, query.c_str(), callback_get_data, &columns, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Failed to fetch SQLite data. Status: {}. Message: {}",
status, err_msg);
}
if (columns.empty())
return nullptr;
return std::make_shared<NamesAndTypesList>(columns);
}
}
#endif

View File

@ -0,0 +1,19 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Storages/StorageSQLite.h>
#include <sqlite3.h> // Y_IGNORE
namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchSQLiteTableStructure(sqlite3 * connection,
const String & sqlite_table_name);
}
#endif

View File

@ -27,6 +27,8 @@ SRCS(
MySQL/MaterializeMetadata.cpp
MySQL/MaterializeMySQLSettings.cpp
MySQL/MaterializeMySQLSyncThread.cpp
SQLite/DatabaseSQLite.cpp
SQLite/fetchSQLiteTableStructure.cpp
)

View File

@ -26,8 +26,10 @@
namespace ProfileEvents
{
extern const Event FileOpen;
extern const Event WriteBufferAIOWrite;
extern const Event WriteBufferAIOWriteBytes;
extern const Event AIOWrite;
extern const Event AIOWriteBytes;
extern const Event AIORead;
extern const Event AIOReadBytes;
}
namespace DB
@ -531,8 +533,8 @@ public:
auto bytes_written = eventResult(event);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
ProfileEvents::increment(ProfileEvents::AIOWrite);
ProfileEvents::increment(ProfileEvents::AIOWriteBytes, bytes_written);
if (bytes_written != static_cast<decltype(bytes_written)>(block_size * buffer_size_in_blocks))
throw Exception(ErrorCodes::AIO_WRITE_ERROR,
@ -600,6 +602,9 @@ public:
buffer_size_in_bytes,
read_bytes);
ProfileEvents::increment(ProfileEvents::AIORead);
ProfileEvents::increment(ProfileEvents::AIOReadBytes, read_bytes);
SSDCacheBlock block(block_size);
for (size_t i = 0; i < blocks_length; ++i)
@ -687,6 +692,9 @@ public:
throw Exception(ErrorCodes::AIO_READ_ERROR,
"GC: AIO failed to read file ({}). Expected bytes ({}). Actual bytes ({})", file_path, block_size, read_bytes);
ProfileEvents::increment(ProfileEvents::AIORead);
ProfileEvents::increment(ProfileEvents::AIOReadBytes, read_bytes);
char * request_buffer = getRequestBuffer(request);
// Unpoison the memory returned from an uninstrumented system function.

View File

@ -90,17 +90,17 @@ DiskCacheWrapper::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
{
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
LOG_DEBUG(log, "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto metadata = acquireDownloadMetadata(path);
@ -134,7 +134,7 @@ DiskCacheWrapper::readFile(
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite);
copyData(*src_buffer, *dst_buffer);
}
@ -158,9 +158,9 @@ DiskCacheWrapper::readFile(
}
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -38,7 +38,7 @@ public:
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;

View File

@ -115,9 +115,9 @@ void DiskDecorator::listFiles(const String & path, std::vector<String> & file_na
std::unique_ptr<ReadBufferFromFileBase>
DiskDecorator::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
{
return delegate->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return delegate->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
}
std::unique_ptr<WriteBufferFromFileBase>
@ -206,9 +206,9 @@ void DiskDecorator::startup()
delegate->startup();
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context)
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)
{
delegate->applyNewSettings(config, context);
delegate->applyNewSettings(config, context, config_prefix, map);
}
}

View File

@ -39,7 +39,7 @@ public:
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
@ -65,11 +65,12 @@ public:
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType::Type getType() const override { return delegate->getType(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
protected:
Executor & getExecutor() override;

201
src/Disks/DiskEncrypted.cpp Normal file
View File

@ -0,0 +1,201 @@
#include <Disks/DiskEncrypted.h>
#if USE_SSL
#include <Disks/DiskFactory.h>
#include <IO/FileEncryptionCommon.h>
#include <IO/ReadBufferFromEncryptedFile.h>
#include <IO/WriteBufferFromEncryptedFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DISK_INDEX;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
using DiskEncryptedPtr = std::shared_ptr<DiskEncrypted>;
using namespace FileEncryption;
class DiskEncryptedReservation : public IReservation
{
public:
DiskEncryptedReservation(DiskEncryptedPtr disk_, std::unique_ptr<IReservation> reservation_)
: disk(std::move(disk_)), reservation(std::move(reservation_))
{
}
UInt64 getSize() const override { return reservation->getSize(); }
DiskPtr getDisk(size_t i) const override
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
return disk;
}
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override { reservation->update(new_size); }
private:
DiskEncryptedPtr disk;
std::unique_ptr<IReservation> reservation;
};
ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
{
auto reservation = delegate->reserve(bytes);
if (!reservation)
return {};
return std::make_unique<DiskEncryptedReservation>(std::static_pointer_cast<DiskEncrypted>(shared_from_this()), std::move(reservation));
}
DiskEncrypted::DiskEncrypted(const String & name_, DiskPtr disk_, const String & key_, const String & path_)
: DiskDecorator(disk_)
, name(name_), key(key_), disk_path(path_)
, disk_absolute_path(delegate->getPath() + disk_path)
{
initialize();
}
void DiskEncrypted::initialize()
{
// use wrapped_disk as an EncryptedDisk store
if (disk_path.empty())
return;
if (disk_path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
delegate->createDirectories(disk_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
String iv;
size_t offset = 0;
if (exists(path) && getFileSize(path))
{
iv = readIV(kIVSize, *buffer);
offset = kIVSize;
}
else
iv = randomString(kIVSize);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), iv, key, offset);
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
String iv;
size_t start_offset = 0;
auto wrapped_path = wrappedPath(path);
if (mode == WriteMode::Append && exists(path) && getFileSize(path))
{
auto read_buffer = delegate->readFile(wrapped_path, kIVSize);
iv = readIV(kIVSize, *read_buffer);
start_offset = getFileSize(path);
}
else
iv = randomString(kIVSize);
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), iv, key, start_offset);
}
size_t DiskEncrypted::getFileSize(const String & path) const
{
auto wrapped_path = wrappedPath(path);
size_t size = delegate->getFileSize(wrapped_path);
return size > kIVSize ? (size - kIVSize) : 0;
}
void DiskEncrypted::truncateFile(const String & path, size_t size)
{
auto wrapped_path = wrappedPath(path);
delegate->truncateFile(wrapped_path, size ? (size + kIVSize) : 0);
}
SyncGuardPtr DiskEncrypted::getDirectorySyncGuard(const String & path) const
{
auto wrapped_path = wrappedPath(path);
return delegate->getDirectorySyncGuard(wrapped_path);
}
void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
ContextPtr /*context*/,
const String & config_prefix,
const DisksMap & map)
{
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception("The wrapped disk name can not be empty. An encrypted disk is a wrapper over another disk. "
"Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
key = config.getString(config_prefix + ".key", "");
if (key.empty())
throw Exception("Encrypted disk key can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
auto wrapped_disk = map.find(wrapped_disk_name);
if (wrapped_disk == map.end())
throw Exception("The wrapped disk must have been announced earlier. No disk with name " + wrapped_disk_name + ". Disk " + name,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
delegate = wrapped_disk->second;
disk_path = config.getString(config_prefix + ".path", "");
initialize();
}
void registerDiskEncrypted(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr /*context*/,
const DisksMap & map) -> DiskPtr {
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception("The wrapped disk name can not be empty. An encrypted disk is a wrapper over another disk. "
"Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
String key = config.getString(config_prefix + ".key", "");
if (key.empty())
throw Exception("Encrypted disk key can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (key.size() != cipherKeyLength(defaultCipher()))
throw Exception("Expected key with size " + std::to_string(cipherKeyLength(defaultCipher())) + ", got key with size " + std::to_string(key.size()),
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
auto wrapped_disk = map.find(wrapped_disk_name);
if (wrapped_disk == map.end())
throw Exception("The wrapped disk must have been announced earlier. No disk with name " + wrapped_disk_name + ". Disk " + name,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
String relative_path = config.getString(config_prefix + ".path", "");
return std::make_shared<DiskEncrypted>(name, wrapped_disk->second, key, relative_path);
};
factory.registerDiskType("encrypted", creator);
}
}
#endif

229
src/Disks/DiskEncrypted.h Normal file
View File

@ -0,0 +1,229 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <Disks/IDisk.h>
#include <Disks/DiskDecorator.h>
namespace DB
{
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class DiskEncrypted : public DiskDecorator
{
public:
DiskEncrypted(const String & name_, DiskPtr disk_, const String & key_, const String & path_);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_absolute_path; }
ReservationPtr reserve(UInt64 bytes) override;
bool exists(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->exists(wrapped_path);
}
bool isFile(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->isFile(wrapped_path);
}
bool isDirectory(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->isDirectory(wrapped_path);
}
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createDirectory(wrapped_path);
}
void createDirectories(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createDirectories(wrapped_path);
}
void clearDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->clearDirectory(wrapped_path);
}
void moveDirectory(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->moveDirectory(wrapped_from_path, wrapped_to_path);
}
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->iterateDirectory(wrapped_path);
}
void createFile(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createFile(wrapped_path);
}
void moveFile(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->moveFile(wrapped_from_path, wrapped_to_path);
}
void replaceFile(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->replaceFile(wrapped_from_path, wrapped_to_path);
}
void listFiles(const String & path, std::vector<String> & file_names) override
{
auto wrapped_path = wrappedPath(path);
delegate->listFiles(wrapped_path, file_names);
}
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override
{
IDisk::copy(from_path, to_disk, to_path);
}
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode) override;
void removeFile(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeFile(wrapped_path);
}
void removeFileIfExists(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeFileIfExists(wrapped_path);
}
void removeDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeDirectory(wrapped_path);
}
void removeRecursive(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeRecursive(wrapped_path);
}
void removeSharedFile(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedFile(wrapped_path, flag);
}
void removeSharedRecursive(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedRecursive(wrapped_path, flag);
}
void removeSharedFileIfExists(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedFileIfExists(wrapped_path, flag);
}
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override
{
auto wrapped_path = wrappedPath(path);
delegate->setLastModified(wrapped_path, timestamp);
}
Poco::Timestamp getLastModified(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->getLastModified(wrapped_path);
}
void setReadOnly(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->setReadOnly(wrapped_path);
}
void createHardLink(const String & src_path, const String & dst_path) override
{
auto wrapped_src_path = wrappedPath(src_path);
auto wrapped_dst_path = wrappedPath(dst_path);
delegate->createHardLink(wrapped_src_path, wrapped_dst_path);
}
void truncateFile(const String & path, size_t size) override;
String getUniqueId(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->getUniqueId(wrapped_path);
}
void onFreeze(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->onFreeze(wrapped_path);
}
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType::Type getType() const override { return DiskType::Type::Encrypted; }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:
void initialize();
String wrappedPath(const String & path) const
{
// if path starts_with disk_path -> got already wrapped path
if (!disk_path.empty() && path.starts_with(disk_path))
return path;
return disk_path + path;
}
String name;
String key;
String disk_path;
String disk_absolute_path;
};
}
#endif

View File

@ -24,7 +24,8 @@ DiskPtr DiskFactory::create(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) const
ContextPtr context,
const DisksMap & map) const
{
const auto disk_type = config.getString(config_prefix + ".type", "local");
@ -33,7 +34,7 @@ DiskPtr DiskFactory::create(
throw Exception{"DiskFactory: the disk '" + name + "' has unknown disk type: " + disk_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
const auto & disk_creator = found->second;
return disk_creator(name, config, config_prefix, context);
return disk_creator(name, config, config_prefix, context, map);
}
}

View File

@ -8,12 +8,14 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <functional>
#include <map>
#include <unordered_map>
namespace DB
{
using DisksMap = std::map<String, DiskPtr>;
/**
* Disk factory. Responsible for creating new disk objects.
*/
@ -24,7 +26,8 @@ public:
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context)>;
ContextPtr context,
const DisksMap & map)>;
static DiskFactory & instance();
@ -34,7 +37,8 @@ public:
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) const;
ContextPtr context,
const DisksMap & map) const;
private:
using DiskTypeRegistry = std::unordered_map<String, Creator>;

View File

@ -211,9 +211,9 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
std::unique_ptr<ReadBufferFromFileBase>
DiskLocal::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
{
return createReadBufferFromFileBase(fs::path(disk_path) / path, estimated_size, aio_threshold, mmap_threshold, mmap_cache, buf_size);
return createReadBufferFromFileBase(fs::path(disk_path) / path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size);
}
std::unique_ptr<WriteBufferFromFileBase>
@ -367,7 +367,8 @@ void registerDiskLocal(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) -> DiskPtr {
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
if (name == "default")
{

View File

@ -74,7 +74,7 @@ public:
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
@ -100,6 +100,8 @@ public:
DiskType::Type getType() const override { return DiskType::Type::Local; }
bool supportZeroCopyReplication() const override { return false; }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:

View File

@ -450,7 +450,8 @@ void registerDiskMemory(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/,
ContextPtr /*context*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
ContextPtr /*context*/,
const DisksMap & /*map*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
factory.registerDiskType("memory", creator);
}

View File

@ -66,7 +66,7 @@ public:
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
@ -92,6 +92,8 @@ public:
DiskType::Type getType() const override { return DiskType::Type::RAM; }
bool supportZeroCopyReplication() const override { return false; }
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);

View File

@ -187,11 +187,11 @@ void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const
{
ReadLock lock (mutex);
auto impl = DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
auto impl = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
}

View File

@ -47,7 +47,7 @@ public:
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -37,7 +37,7 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
auto disk_config_prefix = config_prefix + "." + disk_name;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), 0));
@ -62,16 +62,16 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
auto disk_config_prefix = config_prefix + "." + disk_name;
if (result->getDisksMap().count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context, result->getDisksMap()));
}
else
{
auto disk = old_disks_minus_new_disks[disk_name];
disk->applyNewSettings(config, context);
disk->applyNewSettings(config, context, disk_config_prefix, result->getDisksMap());
old_disks_minus_new_disks.erase(disk_name);
}

View File

@ -12,7 +12,6 @@ namespace DB
class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
using DisksMap = std::map<String, DiskPtr>;
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.

View File

@ -12,7 +12,8 @@ struct DiskType
Local,
RAM,
S3,
HDFS
HDFS,
Encrypted
};
static String toString(Type disk_type)
{
@ -26,6 +27,8 @@ struct DiskType
return "s3";
case Type::HDFS:
return "hdfs";
case Type::Encrypted:
return "encrypted";
}
__builtin_unreachable();
}

Some files were not shown because too many files have changed in this diff Show More