diff --git a/base/common/ReplxxLineReader.cpp b/base/common/ReplxxLineReader.cpp index fcd1610e589..7893e56d751 100644 --- a/base/common/ReplxxLineReader.cpp +++ b/base/common/ReplxxLineReader.cpp @@ -91,6 +91,10 @@ ReplxxLineReader::ReplxxLineReader( /// it also binded to M-p/M-n). rx.bind_key(Replxx::KEY::meta('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_NEXT, code); }); rx.bind_key(Replxx::KEY::meta('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_PREVIOUS, code); }); + /// By default M-BACKSPACE is KILL_TO_WHITESPACE_ON_LEFT, while in readline it is backward-kill-word + rx.bind_key(Replxx::KEY::meta(Replxx::KEY::BACKSPACE), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_BEGINING_OF_WORD, code); }); + /// By default C-w is KILL_TO_BEGINING_OF_WORD, while in readline it is unix-word-rubout + rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); }); rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; }); } diff --git a/contrib/librdkafka b/contrib/librdkafka index cf11d0aa36d..43491d33ca2 160000 --- a/contrib/librdkafka +++ b/contrib/librdkafka @@ -1 +1 @@ -Subproject commit cf11d0aa36d4738f2c9bf4377807661660f1be76 +Subproject commit 43491d33ca2826531d1e3cae70d4bf1e5249e3c9 diff --git a/docker/packager/deb/Dockerfile b/docker/packager/deb/Dockerfile index 8c5d1dbb3fd..2f1d28efe61 100644 --- a/docker/packager/deb/Dockerfile +++ b/docker/packager/deb/Dockerfile @@ -61,7 +61,7 @@ RUN apt-get update \ RUN echo 'deb http://archive.ubuntu.com/ubuntu/ focal-proposed restricted main multiverse universe' > /etc/apt/sources.list.d/proposed-repositories.list RUN apt-get update \ - && apt-get install gcc-10 g++-10 --yes + && apt-get install gcc-10 g++-10 --yes --no-install-recommends RUN rm /etc/apt/sources.list.d/proposed-repositories.list && apt-get update diff --git a/docs/en/development/build-osx.md b/docs/en/development/build-osx.md index 24ecbdc1c2c..f34107ca3d3 100644 --- a/docs/en/development/build-osx.md +++ b/docs/en/development/build-osx.md @@ -124,4 +124,11 @@ Reboot. To check if it’s working, you can use `ulimit -n` command. +## Run ClickHouse server: + +``` +cd ClickHouse +./build/programs/clickhouse-server --config-file ./programs/server/config.xml +``` + [Original article](https://clickhouse.tech/docs/en/development/build_osx/) diff --git a/docs/en/getting-started/example-datasets/ontime.md b/docs/en/getting-started/example-datasets/ontime.md index 83673cdceb6..f18acc6fd50 100644 --- a/docs/en/getting-started/example-datasets/ontime.md +++ b/docs/en/getting-started/example-datasets/ontime.md @@ -21,120 +21,121 @@ echo https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performa Creating a table: ``` sql -CREATE TABLE `ontime` ( - `Year` UInt16, - `Quarter` UInt8, - `Month` UInt8, - `DayofMonth` UInt8, - `DayOfWeek` UInt8, - `FlightDate` Date, - `UniqueCarrier` FixedString(7), - `AirlineID` Int32, - `Carrier` FixedString(2), - `TailNum` String, - `FlightNum` String, - `OriginAirportID` Int32, - `OriginAirportSeqID` Int32, - `OriginCityMarketID` Int32, - `Origin` FixedString(5), - `OriginCityName` String, - `OriginState` FixedString(2), - `OriginStateFips` String, - `OriginStateName` String, - `OriginWac` Int32, - `DestAirportID` Int32, - `DestAirportSeqID` Int32, - `DestCityMarketID` Int32, - `Dest` FixedString(5), - `DestCityName` String, - `DestState` FixedString(2), - `DestStateFips` String, - `DestStateName` String, - `DestWac` Int32, - `CRSDepTime` Int32, - `DepTime` Int32, - `DepDelay` Int32, - `DepDelayMinutes` Int32, - `DepDel15` Int32, - `DepartureDelayGroups` String, - `DepTimeBlk` String, - `TaxiOut` Int32, - `WheelsOff` Int32, - `WheelsOn` Int32, - `TaxiIn` Int32, - `CRSArrTime` Int32, - `ArrTime` Int32, - `ArrDelay` Int32, - `ArrDelayMinutes` Int32, - `ArrDel15` Int32, - `ArrivalDelayGroups` Int32, - `ArrTimeBlk` String, - `Cancelled` UInt8, - `CancellationCode` FixedString(1), - `Diverted` UInt8, - `CRSElapsedTime` Int32, - `ActualElapsedTime` Int32, - `AirTime` Int32, - `Flights` Int32, - `Distance` Int32, - `DistanceGroup` UInt8, - `CarrierDelay` Int32, - `WeatherDelay` Int32, - `NASDelay` Int32, - `SecurityDelay` Int32, - `LateAircraftDelay` Int32, - `FirstDepTime` String, - `TotalAddGTime` String, - `LongestAddGTime` String, - `DivAirportLandings` String, - `DivReachedDest` String, - `DivActualElapsedTime` String, - `DivArrDelay` String, - `DivDistance` String, - `Div1Airport` String, - `Div1AirportID` Int32, - `Div1AirportSeqID` Int32, - `Div1WheelsOn` String, - `Div1TotalGTime` String, - `Div1LongestGTime` String, - `Div1WheelsOff` String, - `Div1TailNum` String, - `Div2Airport` String, - `Div2AirportID` Int32, - `Div2AirportSeqID` Int32, - `Div2WheelsOn` String, - `Div2TotalGTime` String, - `Div2LongestGTime` String, - `Div2WheelsOff` String, - `Div2TailNum` String, - `Div3Airport` String, - `Div3AirportID` Int32, - `Div3AirportSeqID` Int32, - `Div3WheelsOn` String, - `Div3TotalGTime` String, - `Div3LongestGTime` String, - `Div3WheelsOff` String, - `Div3TailNum` String, - `Div4Airport` String, - `Div4AirportID` Int32, - `Div4AirportSeqID` Int32, - `Div4WheelsOn` String, - `Div4TotalGTime` String, - `Div4LongestGTime` String, - `Div4WheelsOff` String, - `Div4TailNum` String, - `Div5Airport` String, - `Div5AirportID` Int32, - `Div5AirportSeqID` Int32, - `Div5WheelsOn` String, - `Div5TotalGTime` String, - `Div5LongestGTime` String, - `Div5WheelsOff` String, - `Div5TailNum` String +CREATE TABLE `ontime` +( + `Year` UInt16, + `Quarter` UInt8, + `Month` UInt8, + `DayofMonth` UInt8, + `DayOfWeek` UInt8, + `FlightDate` Date, + `Reporting_Airline` String, + `DOT_ID_Reporting_Airline` Int32, + `IATA_CODE_Reporting_Airline` String, + `Tail_Number` Int32, + `Flight_Number_Reporting_Airline` String, + `OriginAirportID` Int32, + `OriginAirportSeqID` Int32, + `OriginCityMarketID` Int32, + `Origin` FixedString(5), + `OriginCityName` String, + `OriginState` FixedString(2), + `OriginStateFips` String, + `OriginStateName` String, + `OriginWac` Int32, + `DestAirportID` Int32, + `DestAirportSeqID` Int32, + `DestCityMarketID` Int32, + `Dest` FixedString(5), + `DestCityName` String, + `DestState` FixedString(2), + `DestStateFips` String, + `DestStateName` String, + `DestWac` Int32, + `CRSDepTime` Int32, + `DepTime` Int32, + `DepDelay` Int32, + `DepDelayMinutes` Int32, + `DepDel15` Int32, + `DepartureDelayGroups` String, + `DepTimeBlk` String, + `TaxiOut` Int32, + `WheelsOff` Int32, + `WheelsOn` Int32, + `TaxiIn` Int32, + `CRSArrTime` Int32, + `ArrTime` Int32, + `ArrDelay` Int32, + `ArrDelayMinutes` Int32, + `ArrDel15` Int32, + `ArrivalDelayGroups` Int32, + `ArrTimeBlk` String, + `Cancelled` UInt8, + `CancellationCode` FixedString(1), + `Diverted` UInt8, + `CRSElapsedTime` Int32, + `ActualElapsedTime` Int32, + `AirTime` Nullable(Int32), + `Flights` Int32, + `Distance` Int32, + `DistanceGroup` UInt8, + `CarrierDelay` Int32, + `WeatherDelay` Int32, + `NASDelay` Int32, + `SecurityDelay` Int32, + `LateAircraftDelay` Int32, + `FirstDepTime` String, + `TotalAddGTime` String, + `LongestAddGTime` String, + `DivAirportLandings` String, + `DivReachedDest` String, + `DivActualElapsedTime` String, + `DivArrDelay` String, + `DivDistance` String, + `Div1Airport` String, + `Div1AirportID` Int32, + `Div1AirportSeqID` Int32, + `Div1WheelsOn` String, + `Div1TotalGTime` String, + `Div1LongestGTime` String, + `Div1WheelsOff` String, + `Div1TailNum` String, + `Div2Airport` String, + `Div2AirportID` Int32, + `Div2AirportSeqID` Int32, + `Div2WheelsOn` String, + `Div2TotalGTime` String, + `Div2LongestGTime` String, + `Div2WheelsOff` String, + `Div2TailNum` String, + `Div3Airport` String, + `Div3AirportID` Int32, + `Div3AirportSeqID` Int32, + `Div3WheelsOn` String, + `Div3TotalGTime` String, + `Div3LongestGTime` String, + `Div3WheelsOff` String, + `Div3TailNum` String, + `Div4Airport` String, + `Div4AirportID` Int32, + `Div4AirportSeqID` Int32, + `Div4WheelsOn` String, + `Div4TotalGTime` String, + `Div4LongestGTime` String, + `Div4WheelsOff` String, + `Div4TailNum` String, + `Div5Airport` String, + `Div5AirportID` Int32, + `Div5AirportSeqID` Int32, + `Div5WheelsOn` String, + `Div5TotalGTime` String, + `Div5LongestGTime` String, + `Div5WheelsOff` String, + `Div5TailNum` String ) ENGINE = MergeTree -PARTITION BY Year -ORDER BY (Carrier, FlightDate) -SETTINGS index_granularity = 8192; + PARTITION BY Year + ORDER BY (IATA_CODE_Reporting_Airline, FlightDate) + SETTINGS index_granularity = 8192; ``` Loading data with multiple threads: @@ -206,7 +207,7 @@ LIMIT 10; Q4. The number of delays by carrier for 2007 ``` sql -SELECT Carrier, count(*) +SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*) FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier @@ -220,29 +221,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year=2007 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` Better version of the same query: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year=2007 GROUP BY Carrier @@ -256,29 +257,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year>=2000 AND Year<=2008 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` Better version of the same query: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier @@ -297,7 +298,7 @@ FROM from ontime WHERE DepDelay>10 GROUP BY Year -) +) q JOIN ( select @@ -305,7 +306,7 @@ JOIN count(*) as c2 from ontime GROUP BY Year -) USING (Year) +) qq USING (Year) ORDER BY Year; ``` @@ -340,7 +341,7 @@ Q10. ``` sql SELECT - min(Year), max(Year), Carrier, count(*) AS cnt, + min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt, sum(ArrDelayMinutes>30) AS flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) AS rate FROM ontime diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index f86e9668f00..19671b523e3 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -430,7 +430,7 @@ Keys for syslog: Default value: `LOG_USER` if `address` is specified, `LOG_DAEMON` otherwise. - format – Message format. Possible values: `bsd` and `syslog.` -## send_crash_reports {#server_configuration_parameters-logger} +## send_crash_reports {#server_configuration_parameters-send_crash_reports} Settings for opt-in sending crash reports to the ClickHouse core developers team via [Sentry](https://sentry.io). Enabling it, especially in pre-production environments, is highly appreciated. diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 60f39c19e7e..b0c879af931 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -143,6 +143,16 @@ Possible values: Default value: 0. +## http_max_uri_size {#http-max-uri-size} + +Sets the maximum URI length of an HTTP request. + +Possible values: + +- Positive integer. + +Default value: 1048576. + ## send_progress_in_http_headers {#settings-send_progress_in_http_headers} Enables or disables `X-ClickHouse-Progress` HTTP response headers in `clickhouse-server` responses. diff --git a/docs/en/operations/system-tables/distribution_queue.md b/docs/en/operations/system-tables/distribution_queue.md index fdc6a134da2..3b09c20874c 100644 --- a/docs/en/operations/system-tables/distribution_queue.md +++ b/docs/en/operations/system-tables/distribution_queue.md @@ -18,6 +18,10 @@ Columns: - `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in local files, in bytes. +- `broken_data_files` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of files that has been marked as broken (due to an error). + +- `broken_data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in broken files, in bytes. + - `last_exception` ([String](../../sql-reference/data-types/string.md)) — Text message about the last error that occurred (if any). **Example** diff --git a/docs/en/sql-reference/functions/functions-for-nulls.md b/docs/en/sql-reference/functions/functions-for-nulls.md index 5cc95fe298a..c06711b3cd2 100644 --- a/docs/en/sql-reference/functions/functions-for-nulls.md +++ b/docs/en/sql-reference/functions/functions-for-nulls.md @@ -224,7 +224,7 @@ assumeNotNull(x) **Returned values** - The original value from the non-`Nullable` type, if it is not `NULL`. -- The default value for the non-`Nullable` type if the original value was `NULL`. +- Implementation specific result if the original value was `NULL`. **Example** diff --git a/docs/ja/getting-started/example-datasets/ontime.md b/docs/ja/getting-started/example-datasets/ontime.md index bd049e8caad..d12d8a36069 100644 --- a/docs/ja/getting-started/example-datasets/ontime.md +++ b/docs/ja/getting-started/example-datasets/ontime.md @@ -29,126 +29,127 @@ done テーブルの作成: ``` sql -CREATE TABLE `ontime` ( - `Year` UInt16, - `Quarter` UInt8, - `Month` UInt8, - `DayofMonth` UInt8, - `DayOfWeek` UInt8, - `FlightDate` Date, - `UniqueCarrier` FixedString(7), - `AirlineID` Int32, - `Carrier` FixedString(2), - `TailNum` String, - `FlightNum` String, - `OriginAirportID` Int32, - `OriginAirportSeqID` Int32, - `OriginCityMarketID` Int32, - `Origin` FixedString(5), - `OriginCityName` String, - `OriginState` FixedString(2), - `OriginStateFips` String, - `OriginStateName` String, - `OriginWac` Int32, - `DestAirportID` Int32, - `DestAirportSeqID` Int32, - `DestCityMarketID` Int32, - `Dest` FixedString(5), - `DestCityName` String, - `DestState` FixedString(2), - `DestStateFips` String, - `DestStateName` String, - `DestWac` Int32, - `CRSDepTime` Int32, - `DepTime` Int32, - `DepDelay` Int32, - `DepDelayMinutes` Int32, - `DepDel15` Int32, - `DepartureDelayGroups` String, - `DepTimeBlk` String, - `TaxiOut` Int32, - `WheelsOff` Int32, - `WheelsOn` Int32, - `TaxiIn` Int32, - `CRSArrTime` Int32, - `ArrTime` Int32, - `ArrDelay` Int32, - `ArrDelayMinutes` Int32, - `ArrDel15` Int32, - `ArrivalDelayGroups` Int32, - `ArrTimeBlk` String, - `Cancelled` UInt8, - `CancellationCode` FixedString(1), - `Diverted` UInt8, - `CRSElapsedTime` Int32, - `ActualElapsedTime` Int32, - `AirTime` Int32, - `Flights` Int32, - `Distance` Int32, - `DistanceGroup` UInt8, - `CarrierDelay` Int32, - `WeatherDelay` Int32, - `NASDelay` Int32, - `SecurityDelay` Int32, - `LateAircraftDelay` Int32, - `FirstDepTime` String, - `TotalAddGTime` String, - `LongestAddGTime` String, - `DivAirportLandings` String, - `DivReachedDest` String, - `DivActualElapsedTime` String, - `DivArrDelay` String, - `DivDistance` String, - `Div1Airport` String, - `Div1AirportID` Int32, - `Div1AirportSeqID` Int32, - `Div1WheelsOn` String, - `Div1TotalGTime` String, - `Div1LongestGTime` String, - `Div1WheelsOff` String, - `Div1TailNum` String, - `Div2Airport` String, - `Div2AirportID` Int32, - `Div2AirportSeqID` Int32, - `Div2WheelsOn` String, - `Div2TotalGTime` String, - `Div2LongestGTime` String, - `Div2WheelsOff` String, - `Div2TailNum` String, - `Div3Airport` String, - `Div3AirportID` Int32, - `Div3AirportSeqID` Int32, - `Div3WheelsOn` String, - `Div3TotalGTime` String, - `Div3LongestGTime` String, - `Div3WheelsOff` String, - `Div3TailNum` String, - `Div4Airport` String, - `Div4AirportID` Int32, - `Div4AirportSeqID` Int32, - `Div4WheelsOn` String, - `Div4TotalGTime` String, - `Div4LongestGTime` String, - `Div4WheelsOff` String, - `Div4TailNum` String, - `Div5Airport` String, - `Div5AirportID` Int32, - `Div5AirportSeqID` Int32, - `Div5WheelsOn` String, - `Div5TotalGTime` String, - `Div5LongestGTime` String, - `Div5WheelsOff` String, - `Div5TailNum` String +CREATE TABLE `ontime` +( + `Year` UInt16, + `Quarter` UInt8, + `Month` UInt8, + `DayofMonth` UInt8, + `DayOfWeek` UInt8, + `FlightDate` Date, + `Reporting_Airline` String, + `DOT_ID_Reporting_Airline` Int32, + `IATA_CODE_Reporting_Airline` String, + `Tail_Number` Int32, + `Flight_Number_Reporting_Airline` String, + `OriginAirportID` Int32, + `OriginAirportSeqID` Int32, + `OriginCityMarketID` Int32, + `Origin` FixedString(5), + `OriginCityName` String, + `OriginState` FixedString(2), + `OriginStateFips` String, + `OriginStateName` String, + `OriginWac` Int32, + `DestAirportID` Int32, + `DestAirportSeqID` Int32, + `DestCityMarketID` Int32, + `Dest` FixedString(5), + `DestCityName` String, + `DestState` FixedString(2), + `DestStateFips` String, + `DestStateName` String, + `DestWac` Int32, + `CRSDepTime` Int32, + `DepTime` Int32, + `DepDelay` Int32, + `DepDelayMinutes` Int32, + `DepDel15` Int32, + `DepartureDelayGroups` String, + `DepTimeBlk` String, + `TaxiOut` Int32, + `WheelsOff` Int32, + `WheelsOn` Int32, + `TaxiIn` Int32, + `CRSArrTime` Int32, + `ArrTime` Int32, + `ArrDelay` Int32, + `ArrDelayMinutes` Int32, + `ArrDel15` Int32, + `ArrivalDelayGroups` Int32, + `ArrTimeBlk` String, + `Cancelled` UInt8, + `CancellationCode` FixedString(1), + `Diverted` UInt8, + `CRSElapsedTime` Int32, + `ActualElapsedTime` Int32, + `AirTime` Nullable(Int32), + `Flights` Int32, + `Distance` Int32, + `DistanceGroup` UInt8, + `CarrierDelay` Int32, + `WeatherDelay` Int32, + `NASDelay` Int32, + `SecurityDelay` Int32, + `LateAircraftDelay` Int32, + `FirstDepTime` String, + `TotalAddGTime` String, + `LongestAddGTime` String, + `DivAirportLandings` String, + `DivReachedDest` String, + `DivActualElapsedTime` String, + `DivArrDelay` String, + `DivDistance` String, + `Div1Airport` String, + `Div1AirportID` Int32, + `Div1AirportSeqID` Int32, + `Div1WheelsOn` String, + `Div1TotalGTime` String, + `Div1LongestGTime` String, + `Div1WheelsOff` String, + `Div1TailNum` String, + `Div2Airport` String, + `Div2AirportID` Int32, + `Div2AirportSeqID` Int32, + `Div2WheelsOn` String, + `Div2TotalGTime` String, + `Div2LongestGTime` String, + `Div2WheelsOff` String, + `Div2TailNum` String, + `Div3Airport` String, + `Div3AirportID` Int32, + `Div3AirportSeqID` Int32, + `Div3WheelsOn` String, + `Div3TotalGTime` String, + `Div3LongestGTime` String, + `Div3WheelsOff` String, + `Div3TailNum` String, + `Div4Airport` String, + `Div4AirportID` Int32, + `Div4AirportSeqID` Int32, + `Div4WheelsOn` String, + `Div4TotalGTime` String, + `Div4LongestGTime` String, + `Div4WheelsOff` String, + `Div4TailNum` String, + `Div5Airport` String, + `Div5AirportID` Int32, + `Div5AirportSeqID` Int32, + `Div5WheelsOn` String, + `Div5TotalGTime` String, + `Div5LongestGTime` String, + `Div5WheelsOff` String, + `Div5TailNum` String ) ENGINE = MergeTree -PARTITION BY Year -ORDER BY (Carrier, FlightDate) -SETTINGS index_granularity = 8192; + PARTITION BY Year + ORDER BY (IATA_CODE_Reporting_Airline, FlightDate) + SETTINGS index_granularity = 8192; ``` データのロード: ``` bash -$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done +ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'" ``` ## パーティション済みデータのダウンロード {#download-of-prepared-partitions} @@ -212,10 +213,10 @@ LIMIT 10; Q4. 2007年のキャリア別の遅延の数 ``` sql -SELECT Carrier, count(*) +SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*) FROM ontime WHERE DepDelay>10 AND Year=2007 -GROUP BY Carrier +GROUP BY IATA_CODE_Reporting_Airline ORDER BY count(*) DESC; ``` @@ -226,32 +227,32 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year=2007 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` 同じクエリのより良いバージョン: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year=2007 -GROUP BY Carrier +GROUP BY IATA_CODE_Reporting_Airline ORDER BY c3 DESC ``` @@ -262,29 +263,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year>=2000 AND Year<=2008 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` 同じクエリのより良いバージョン: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier @@ -303,7 +304,7 @@ FROM from ontime WHERE DepDelay>10 GROUP BY Year -) +) q JOIN ( select @@ -311,7 +312,7 @@ JOIN count(*) as c2 from ontime GROUP BY Year -) USING (Year) +) qq USING (Year) ORDER BY Year; ``` @@ -346,7 +347,7 @@ Q10. ``` sql SELECT - min(Year), max(Year), Carrier, count(*) AS cnt, + min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt, sum(ArrDelayMinutes>30) AS flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) AS rate FROM ontime diff --git a/docs/ru/getting-started/example-datasets/ontime.md b/docs/ru/getting-started/example-datasets/ontime.md index be5b1cd1b70..d46b7e75e7f 100644 --- a/docs/ru/getting-started/example-datasets/ontime.md +++ b/docs/ru/getting-started/example-datasets/ontime.md @@ -27,126 +27,127 @@ done Создание таблицы: ``` sql -CREATE TABLE `ontime` ( - `Year` UInt16, - `Quarter` UInt8, - `Month` UInt8, - `DayofMonth` UInt8, - `DayOfWeek` UInt8, - `FlightDate` Date, - `UniqueCarrier` FixedString(7), - `AirlineID` Int32, - `Carrier` FixedString(2), - `TailNum` String, - `FlightNum` String, - `OriginAirportID` Int32, - `OriginAirportSeqID` Int32, - `OriginCityMarketID` Int32, - `Origin` FixedString(5), - `OriginCityName` String, - `OriginState` FixedString(2), - `OriginStateFips` String, - `OriginStateName` String, - `OriginWac` Int32, - `DestAirportID` Int32, - `DestAirportSeqID` Int32, - `DestCityMarketID` Int32, - `Dest` FixedString(5), - `DestCityName` String, - `DestState` FixedString(2), - `DestStateFips` String, - `DestStateName` String, - `DestWac` Int32, - `CRSDepTime` Int32, - `DepTime` Int32, - `DepDelay` Int32, - `DepDelayMinutes` Int32, - `DepDel15` Int32, - `DepartureDelayGroups` String, - `DepTimeBlk` String, - `TaxiOut` Int32, - `WheelsOff` Int32, - `WheelsOn` Int32, - `TaxiIn` Int32, - `CRSArrTime` Int32, - `ArrTime` Int32, - `ArrDelay` Int32, - `ArrDelayMinutes` Int32, - `ArrDel15` Int32, - `ArrivalDelayGroups` Int32, - `ArrTimeBlk` String, - `Cancelled` UInt8, - `CancellationCode` FixedString(1), - `Diverted` UInt8, - `CRSElapsedTime` Int32, - `ActualElapsedTime` Int32, - `AirTime` Int32, - `Flights` Int32, - `Distance` Int32, - `DistanceGroup` UInt8, - `CarrierDelay` Int32, - `WeatherDelay` Int32, - `NASDelay` Int32, - `SecurityDelay` Int32, - `LateAircraftDelay` Int32, - `FirstDepTime` String, - `TotalAddGTime` String, - `LongestAddGTime` String, - `DivAirportLandings` String, - `DivReachedDest` String, - `DivActualElapsedTime` String, - `DivArrDelay` String, - `DivDistance` String, - `Div1Airport` String, - `Div1AirportID` Int32, - `Div1AirportSeqID` Int32, - `Div1WheelsOn` String, - `Div1TotalGTime` String, - `Div1LongestGTime` String, - `Div1WheelsOff` String, - `Div1TailNum` String, - `Div2Airport` String, - `Div2AirportID` Int32, - `Div2AirportSeqID` Int32, - `Div2WheelsOn` String, - `Div2TotalGTime` String, - `Div2LongestGTime` String, - `Div2WheelsOff` String, - `Div2TailNum` String, - `Div3Airport` String, - `Div3AirportID` Int32, - `Div3AirportSeqID` Int32, - `Div3WheelsOn` String, - `Div3TotalGTime` String, - `Div3LongestGTime` String, - `Div3WheelsOff` String, - `Div3TailNum` String, - `Div4Airport` String, - `Div4AirportID` Int32, - `Div4AirportSeqID` Int32, - `Div4WheelsOn` String, - `Div4TotalGTime` String, - `Div4LongestGTime` String, - `Div4WheelsOff` String, - `Div4TailNum` String, - `Div5Airport` String, - `Div5AirportID` Int32, - `Div5AirportSeqID` Int32, - `Div5WheelsOn` String, - `Div5TotalGTime` String, - `Div5LongestGTime` String, - `Div5WheelsOff` String, - `Div5TailNum` String +CREATE TABLE `ontime` +( + `Year` UInt16, + `Quarter` UInt8, + `Month` UInt8, + `DayofMonth` UInt8, + `DayOfWeek` UInt8, + `FlightDate` Date, + `Reporting_Airline` String, + `DOT_ID_Reporting_Airline` Int32, + `IATA_CODE_Reporting_Airline` String, + `Tail_Number` Int32, + `Flight_Number_Reporting_Airline` String, + `OriginAirportID` Int32, + `OriginAirportSeqID` Int32, + `OriginCityMarketID` Int32, + `Origin` FixedString(5), + `OriginCityName` String, + `OriginState` FixedString(2), + `OriginStateFips` String, + `OriginStateName` String, + `OriginWac` Int32, + `DestAirportID` Int32, + `DestAirportSeqID` Int32, + `DestCityMarketID` Int32, + `Dest` FixedString(5), + `DestCityName` String, + `DestState` FixedString(2), + `DestStateFips` String, + `DestStateName` String, + `DestWac` Int32, + `CRSDepTime` Int32, + `DepTime` Int32, + `DepDelay` Int32, + `DepDelayMinutes` Int32, + `DepDel15` Int32, + `DepartureDelayGroups` String, + `DepTimeBlk` String, + `TaxiOut` Int32, + `WheelsOff` Int32, + `WheelsOn` Int32, + `TaxiIn` Int32, + `CRSArrTime` Int32, + `ArrTime` Int32, + `ArrDelay` Int32, + `ArrDelayMinutes` Int32, + `ArrDel15` Int32, + `ArrivalDelayGroups` Int32, + `ArrTimeBlk` String, + `Cancelled` UInt8, + `CancellationCode` FixedString(1), + `Diverted` UInt8, + `CRSElapsedTime` Int32, + `ActualElapsedTime` Int32, + `AirTime` Nullable(Int32), + `Flights` Int32, + `Distance` Int32, + `DistanceGroup` UInt8, + `CarrierDelay` Int32, + `WeatherDelay` Int32, + `NASDelay` Int32, + `SecurityDelay` Int32, + `LateAircraftDelay` Int32, + `FirstDepTime` String, + `TotalAddGTime` String, + `LongestAddGTime` String, + `DivAirportLandings` String, + `DivReachedDest` String, + `DivActualElapsedTime` String, + `DivArrDelay` String, + `DivDistance` String, + `Div1Airport` String, + `Div1AirportID` Int32, + `Div1AirportSeqID` Int32, + `Div1WheelsOn` String, + `Div1TotalGTime` String, + `Div1LongestGTime` String, + `Div1WheelsOff` String, + `Div1TailNum` String, + `Div2Airport` String, + `Div2AirportID` Int32, + `Div2AirportSeqID` Int32, + `Div2WheelsOn` String, + `Div2TotalGTime` String, + `Div2LongestGTime` String, + `Div2WheelsOff` String, + `Div2TailNum` String, + `Div3Airport` String, + `Div3AirportID` Int32, + `Div3AirportSeqID` Int32, + `Div3WheelsOn` String, + `Div3TotalGTime` String, + `Div3LongestGTime` String, + `Div3WheelsOff` String, + `Div3TailNum` String, + `Div4Airport` String, + `Div4AirportID` Int32, + `Div4AirportSeqID` Int32, + `Div4WheelsOn` String, + `Div4TotalGTime` String, + `Div4LongestGTime` String, + `Div4WheelsOff` String, + `Div4TailNum` String, + `Div5Airport` String, + `Div5AirportID` Int32, + `Div5AirportSeqID` Int32, + `Div5WheelsOn` String, + `Div5TotalGTime` String, + `Div5LongestGTime` String, + `Div5WheelsOff` String, + `Div5TailNum` String ) ENGINE = MergeTree -PARTITION BY Year -ORDER BY (Carrier, FlightDate) -SETTINGS index_granularity = 8192; + PARTITION BY Year + ORDER BY (IATA_CODE_Reporting_Airline, FlightDate) + SETTINGS index_granularity = 8192; ``` Загрузка данных: ``` bash -$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done +ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'" ``` ## Скачивание готовых партиций {#skachivanie-gotovykh-partitsii} @@ -211,7 +212,7 @@ LIMIT 10; Q4. Количество задержек по перевозчикам за 2007 год ``` sql -SELECT Carrier, count(*) +SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*) FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier @@ -225,29 +226,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year=2007 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` Более оптимальная версия того же запроса: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year=2007 GROUP BY Carrier @@ -261,29 +262,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year>=2000 AND Year<=2008 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` Более оптимальная версия того же запроса: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier @@ -302,7 +303,7 @@ FROM from ontime WHERE DepDelay>10 GROUP BY Year -) +) q JOIN ( select @@ -310,7 +311,7 @@ JOIN count(*) as c2 from ontime GROUP BY Year -) USING (Year) +) qq USING (Year) ORDER BY Year; ``` @@ -346,7 +347,7 @@ Q10. ``` sql SELECT - min(Year), max(Year), Carrier, count(*) AS cnt, + min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt, sum(ArrDelayMinutes>30) AS flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) AS rate FROM ontime diff --git a/docs/ru/operations/server-configuration-parameters/settings.md b/docs/ru/operations/server-configuration-parameters/settings.md index be9e2deab74..abaf2a8f2da 100644 --- a/docs/ru/operations/server-configuration-parameters/settings.md +++ b/docs/ru/operations/server-configuration-parameters/settings.md @@ -415,7 +415,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part Значения по умолчанию: при указанном `address` - `LOG_USER`, иначе - `LOG_DAEMON` - format - формат сообщений. Возможные значения - `bsd` и `syslog` -## send_crash_reports {#server_configuration_parameters-logger} +## send_crash_reports {#server_configuration_parameters-send_crash_reports} Настройки для отправки сообщений о сбоях в команду разработчиков ядра ClickHouse через [Sentry](https://sentry.io). Включение этих настроек, особенно в pre-production среде, может дать очень ценную информацию и поможет развитию ClickHouse. diff --git a/docs/ru/operations/settings/settings.md b/docs/ru/operations/settings/settings.md index fa657b67691..467d27dad32 100644 --- a/docs/ru/operations/settings/settings.md +++ b/docs/ru/operations/settings/settings.md @@ -119,6 +119,16 @@ ClickHouse применяет настройку в тех случаях, ко Значение по умолчанию: 0. +## http_max_uri_size {#http-max-uri-size} + +Устанавливает максимальную длину URI в HTTP-запросе. + +Возможные значения: + +- Положительное целое. + +Значение по умолчанию: 1048576. + ## send_progress_in_http_headers {#settings-send_progress_in_http_headers} Включает или отключает HTTP-заголовки `X-ClickHouse-Progress` в ответах `clickhouse-server`. diff --git a/docs/ru/sql-reference/functions/functions-for-nulls.md b/docs/ru/sql-reference/functions/functions-for-nulls.md index 365dba75da7..7285f803264 100644 --- a/docs/ru/sql-reference/functions/functions-for-nulls.md +++ b/docs/ru/sql-reference/functions/functions-for-nulls.md @@ -224,7 +224,7 @@ assumeNotNull(x) **Возвращаемые значения** - Исходное значение с не `Nullable` типом, если оно — не `NULL`. -- Значение по умолчанию для не `Nullable` типа, если исходное значение — `NULL`. +- Неспецифицированный результат, зависящий от реализации, если исходное значение — `NULL`. **Пример** diff --git a/docs/zh/getting-started/example-datasets/ontime.md b/docs/zh/getting-started/example-datasets/ontime.md index 3921f71fc7e..6d888b2196c 100644 --- a/docs/zh/getting-started/example-datasets/ontime.md +++ b/docs/zh/getting-started/example-datasets/ontime.md @@ -29,126 +29,127 @@ done 创建表结构: ``` sql -CREATE TABLE `ontime` ( - `Year` UInt16, - `Quarter` UInt8, - `Month` UInt8, - `DayofMonth` UInt8, - `DayOfWeek` UInt8, - `FlightDate` Date, - `UniqueCarrier` FixedString(7), - `AirlineID` Int32, - `Carrier` FixedString(2), - `TailNum` String, - `FlightNum` String, - `OriginAirportID` Int32, - `OriginAirportSeqID` Int32, - `OriginCityMarketID` Int32, - `Origin` FixedString(5), - `OriginCityName` String, - `OriginState` FixedString(2), - `OriginStateFips` String, - `OriginStateName` String, - `OriginWac` Int32, - `DestAirportID` Int32, - `DestAirportSeqID` Int32, - `DestCityMarketID` Int32, - `Dest` FixedString(5), - `DestCityName` String, - `DestState` FixedString(2), - `DestStateFips` String, - `DestStateName` String, - `DestWac` Int32, - `CRSDepTime` Int32, - `DepTime` Int32, - `DepDelay` Int32, - `DepDelayMinutes` Int32, - `DepDel15` Int32, - `DepartureDelayGroups` String, - `DepTimeBlk` String, - `TaxiOut` Int32, - `WheelsOff` Int32, - `WheelsOn` Int32, - `TaxiIn` Int32, - `CRSArrTime` Int32, - `ArrTime` Int32, - `ArrDelay` Int32, - `ArrDelayMinutes` Int32, - `ArrDel15` Int32, - `ArrivalDelayGroups` Int32, - `ArrTimeBlk` String, - `Cancelled` UInt8, - `CancellationCode` FixedString(1), - `Diverted` UInt8, - `CRSElapsedTime` Int32, - `ActualElapsedTime` Int32, - `AirTime` Int32, - `Flights` Int32, - `Distance` Int32, - `DistanceGroup` UInt8, - `CarrierDelay` Int32, - `WeatherDelay` Int32, - `NASDelay` Int32, - `SecurityDelay` Int32, - `LateAircraftDelay` Int32, - `FirstDepTime` String, - `TotalAddGTime` String, - `LongestAddGTime` String, - `DivAirportLandings` String, - `DivReachedDest` String, - `DivActualElapsedTime` String, - `DivArrDelay` String, - `DivDistance` String, - `Div1Airport` String, - `Div1AirportID` Int32, - `Div1AirportSeqID` Int32, - `Div1WheelsOn` String, - `Div1TotalGTime` String, - `Div1LongestGTime` String, - `Div1WheelsOff` String, - `Div1TailNum` String, - `Div2Airport` String, - `Div2AirportID` Int32, - `Div2AirportSeqID` Int32, - `Div2WheelsOn` String, - `Div2TotalGTime` String, - `Div2LongestGTime` String, - `Div2WheelsOff` String, - `Div2TailNum` String, - `Div3Airport` String, - `Div3AirportID` Int32, - `Div3AirportSeqID` Int32, - `Div3WheelsOn` String, - `Div3TotalGTime` String, - `Div3LongestGTime` String, - `Div3WheelsOff` String, - `Div3TailNum` String, - `Div4Airport` String, - `Div4AirportID` Int32, - `Div4AirportSeqID` Int32, - `Div4WheelsOn` String, - `Div4TotalGTime` String, - `Div4LongestGTime` String, - `Div4WheelsOff` String, - `Div4TailNum` String, - `Div5Airport` String, - `Div5AirportID` Int32, - `Div5AirportSeqID` Int32, - `Div5WheelsOn` String, - `Div5TotalGTime` String, - `Div5LongestGTime` String, - `Div5WheelsOff` String, - `Div5TailNum` String +CREATE TABLE `ontime` +( + `Year` UInt16, + `Quarter` UInt8, + `Month` UInt8, + `DayofMonth` UInt8, + `DayOfWeek` UInt8, + `FlightDate` Date, + `Reporting_Airline` String, + `DOT_ID_Reporting_Airline` Int32, + `IATA_CODE_Reporting_Airline` String, + `Tail_Number` Int32, + `Flight_Number_Reporting_Airline` String, + `OriginAirportID` Int32, + `OriginAirportSeqID` Int32, + `OriginCityMarketID` Int32, + `Origin` FixedString(5), + `OriginCityName` String, + `OriginState` FixedString(2), + `OriginStateFips` String, + `OriginStateName` String, + `OriginWac` Int32, + `DestAirportID` Int32, + `DestAirportSeqID` Int32, + `DestCityMarketID` Int32, + `Dest` FixedString(5), + `DestCityName` String, + `DestState` FixedString(2), + `DestStateFips` String, + `DestStateName` String, + `DestWac` Int32, + `CRSDepTime` Int32, + `DepTime` Int32, + `DepDelay` Int32, + `DepDelayMinutes` Int32, + `DepDel15` Int32, + `DepartureDelayGroups` String, + `DepTimeBlk` String, + `TaxiOut` Int32, + `WheelsOff` Int32, + `WheelsOn` Int32, + `TaxiIn` Int32, + `CRSArrTime` Int32, + `ArrTime` Int32, + `ArrDelay` Int32, + `ArrDelayMinutes` Int32, + `ArrDel15` Int32, + `ArrivalDelayGroups` Int32, + `ArrTimeBlk` String, + `Cancelled` UInt8, + `CancellationCode` FixedString(1), + `Diverted` UInt8, + `CRSElapsedTime` Int32, + `ActualElapsedTime` Int32, + `AirTime` Nullable(Int32), + `Flights` Int32, + `Distance` Int32, + `DistanceGroup` UInt8, + `CarrierDelay` Int32, + `WeatherDelay` Int32, + `NASDelay` Int32, + `SecurityDelay` Int32, + `LateAircraftDelay` Int32, + `FirstDepTime` String, + `TotalAddGTime` String, + `LongestAddGTime` String, + `DivAirportLandings` String, + `DivReachedDest` String, + `DivActualElapsedTime` String, + `DivArrDelay` String, + `DivDistance` String, + `Div1Airport` String, + `Div1AirportID` Int32, + `Div1AirportSeqID` Int32, + `Div1WheelsOn` String, + `Div1TotalGTime` String, + `Div1LongestGTime` String, + `Div1WheelsOff` String, + `Div1TailNum` String, + `Div2Airport` String, + `Div2AirportID` Int32, + `Div2AirportSeqID` Int32, + `Div2WheelsOn` String, + `Div2TotalGTime` String, + `Div2LongestGTime` String, + `Div2WheelsOff` String, + `Div2TailNum` String, + `Div3Airport` String, + `Div3AirportID` Int32, + `Div3AirportSeqID` Int32, + `Div3WheelsOn` String, + `Div3TotalGTime` String, + `Div3LongestGTime` String, + `Div3WheelsOff` String, + `Div3TailNum` String, + `Div4Airport` String, + `Div4AirportID` Int32, + `Div4AirportSeqID` Int32, + `Div4WheelsOn` String, + `Div4TotalGTime` String, + `Div4LongestGTime` String, + `Div4WheelsOff` String, + `Div4TailNum` String, + `Div5Airport` String, + `Div5AirportID` Int32, + `Div5AirportSeqID` Int32, + `Div5WheelsOn` String, + `Div5TotalGTime` String, + `Div5LongestGTime` String, + `Div5WheelsOff` String, + `Div5TailNum` String ) ENGINE = MergeTree -PARTITION BY Year -ORDER BY (Carrier, FlightDate) -SETTINGS index_granularity = 8192; + PARTITION BY Year + ORDER BY (IATA_CODE_Reporting_Airline, FlightDate) + SETTINGS index_granularity = 8192; ``` 加载数据: ``` bash -$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done +ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'" ``` ## 下载预处理好的分区数据 {#xia-zai-yu-chu-li-hao-de-fen-qu-shu-ju} @@ -212,7 +213,7 @@ LIMIT 10; Q4. 查询2007年各航空公司延误超过10分钟以上的次数 ``` sql -SELECT Carrier, count(*) +SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*) FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier @@ -226,29 +227,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year=2007 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year=2007 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` 更好的查询版本: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year=2007 GROUP BY Carrier @@ -262,29 +263,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3 FROM ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year>=2000 AND Year<=2008 GROUP BY Carrier -) +) q JOIN ( SELECT - Carrier, + IATA_CODE_Reporting_Airline AS Carrier, count(*) AS c2 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier -) USING Carrier +) qq USING Carrier ORDER BY c3 DESC; ``` 更好的查询版本: ``` sql -SELECT Carrier, avg(DepDelay>10)*100 AS c3 +SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3 FROM ontime WHERE Year>=2000 AND Year<=2008 GROUP BY Carrier @@ -303,7 +304,7 @@ FROM from ontime WHERE DepDelay>10 GROUP BY Year -) +) q JOIN ( select @@ -311,7 +312,7 @@ JOIN count(*) as c2 from ontime GROUP BY Year -) USING (Year) +) qq USING (Year) ORDER BY Year; ``` @@ -346,7 +347,7 @@ Q10. ``` sql SELECT - min(Year), max(Year), Carrier, count(*) AS cnt, + min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt, sum(ArrDelayMinutes>30) AS flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) AS rate FROM ontime diff --git a/programs/bash-completion/completions/clickhouse b/programs/bash-completion/completions/clickhouse index c4b77cf3f7a..fc55398dcf1 100644 --- a/programs/bash-completion/completions/clickhouse +++ b/programs/bash-completion/completions/clickhouse @@ -23,19 +23,9 @@ function _complete_for_clickhouse_entrypoint_bin() fi util="${words[1]}" - case "$prev" in - -C|--config-file|--config) - return - ;; - # Argh... This looks like a bash bug... - # Redirections are passed to the completion function - # although it is managed by the shell directly... - '<'|'>'|'>>'|[12]'>'|[12]'>>') - return - ;; - esac - - COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd" "$util")" -- "$cur") ) + if _complete_for_clickhouse_generic_bin_impl "$prev"; then + COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd" "$util")" -- "$cur") ) + fi return 0 } diff --git a/programs/bash-completion/completions/clickhouse-bootstrap b/programs/bash-completion/completions/clickhouse-bootstrap index dc8dcd5ad8d..15b2140161d 100644 --- a/programs/bash-completion/completions/clickhouse-bootstrap +++ b/programs/bash-completion/completions/clickhouse-bootstrap @@ -15,6 +15,13 @@ shopt -s extglob export _CLICKHOUSE_COMPLETION_LOADED=1 +CLICKHOUSE_QueryProcessingStage=( + complete + fetch_columns + with_mergeable_state + with_mergeable_state_after_aggregation +) + function _clickhouse_bin_exist() { [ -x "$1" ] || command -v "$1" >& /dev/null; } @@ -30,6 +37,33 @@ function _clickhouse_get_options() "$@" --help 2>&1 | awk -F '[ ,=<>]' '{ for (i=1; i <= NF; ++i) { if (substr($i, 0, 1) == "-" && length($i) > 1) print $i; } }' | sort -u } +function _complete_for_clickhouse_generic_bin_impl() +{ + local prev=$1 && shift + + case "$prev" in + -C|--config-file|--config) + return 1 + ;; + --stage) + COMPREPLY=( $(compgen -W "${CLICKHOUSE_QueryProcessingStage[*]}" -- "$cur") ) + return 1 + ;; + --host) + COMPREPLY=( $(compgen -A hostname -- "$cur") ) + return 1 + ;; + # Argh... This looks like a bash bug... + # Redirections are passed to the completion function + # although it is managed by the shell directly... + '<'|'>'|'>>'|[12]'>'|[12]'>>') + return 1 + ;; + esac + + return 0 +} + function _complete_for_clickhouse_generic_bin() { local cur prev @@ -39,19 +73,9 @@ function _complete_for_clickhouse_generic_bin() COMPREPLY=() _get_comp_words_by_ref cur prev - case "$prev" in - -C|--config-file|--config) - return - ;; - # Argh... This looks like a bash bug... - # Redirections are passed to the completion function - # although it is managed by the shell directly... - '<'|'>'|'>>'|[12]'>'|[12]'>>') - return - ;; - esac - - COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd")" -- "$cur") ) + if _complete_for_clickhouse_generic_bin_impl "$prev"; then + COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd")" -- "$cur") ) + fi return 0 } diff --git a/programs/client/QueryFuzzer.cpp b/programs/client/QueryFuzzer.cpp index 6243e2c82ec..721e5acb991 100644 --- a/programs/client/QueryFuzzer.cpp +++ b/programs/client/QueryFuzzer.cpp @@ -27,6 +27,7 @@ #include #include + namespace DB { diff --git a/programs/client/QueryFuzzer.h b/programs/client/QueryFuzzer.h index 38714205967..9ef66db1873 100644 --- a/programs/client/QueryFuzzer.h +++ b/programs/client/QueryFuzzer.h @@ -50,7 +50,7 @@ struct QueryFuzzer // Some debug fields for detecting problematic ASTs with loops. // These are reset for each fuzzMain call. std::unordered_set debug_visited_nodes; - ASTPtr * debug_top_ast; + ASTPtr * debug_top_ast = nullptr; // This is the only function you have to call -- it will modify the passed diff --git a/src/Bridge/LibraryBridgeHelper.cpp b/src/Bridge/LibraryBridgeHelper.cpp index 0931abfdfc3..c589d0ce09e 100644 --- a/src/Bridge/LibraryBridgeHelper.cpp +++ b/src/Bridge/LibraryBridgeHelper.cpp @@ -24,11 +24,11 @@ LibraryBridgeHelper::LibraryBridgeHelper( ContextPtr context_, const Block & sample_block_, const Field & dictionary_id_) - : IBridgeHelper(context_) + : IBridgeHelper(context_->getGlobalContext()) , log(&Poco::Logger::get("LibraryBridgeHelper")) , sample_block(sample_block_) , config(context_->getConfigRef()) - , http_timeout(context_->getSettingsRef().http_receive_timeout.value) + , http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value) , dictionary_id(dictionary_id_) { bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT); diff --git a/src/Bridge/XDBCBridgeHelper.h b/src/Bridge/XDBCBridgeHelper.h index 7dc7f91c87a..299df6ff888 100644 --- a/src/Bridge/XDBCBridgeHelper.h +++ b/src/Bridge/XDBCBridgeHelper.h @@ -62,20 +62,19 @@ public: static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed"; XDBCBridgeHelper( - ContextPtr global_context_, + ContextPtr context_, Poco::Timespan http_timeout_, const std::string & connection_string_) - : IXDBCBridgeHelper(global_context_) + : IXDBCBridgeHelper(context_->getGlobalContext()) , log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper")) , connection_string(connection_string_) , http_timeout(http_timeout_) - , config(global_context_->getConfigRef()) + , config(context_->getGlobalContext()->getConfigRef()) { bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST); bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT); } - protected: auto getConnectionString() const { return connection_string; } diff --git a/src/Common/ColumnsHashing.h b/src/Common/ColumnsHashing.h index b7173b25ce5..ada783d982c 100644 --- a/src/Common/ColumnsHashing.h +++ b/src/Common/ColumnsHashing.h @@ -1,6 +1,5 @@ #pragma once - #include #include #include @@ -15,6 +14,8 @@ #include #include +#include + namespace DB { @@ -594,8 +595,11 @@ struct HashMethodKeysFixed return prepared_keys[row]; #if defined(__SSSE3__) && !defined(MEMORY_SANITIZER) - if constexpr (!has_low_cardinality && !has_nullable_keys && sizeof(Key) <= 16) + if constexpr (sizeof(Key) <= 16) + { + assert(!has_low_cardinality && !has_nullable_keys); return packFixedShuffle(columns_data.get(), keys_size, key_sizes.data(), row, masks.get()); + } #endif return packFixed(row, keys_size, Base::getActualColumns(), key_sizes); } diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index abbb3c71d72..1e482361f85 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -55,6 +55,7 @@ M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ M(LocalThreadActive, "Number of threads in local thread pools running a task.") \ M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \ + M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \ M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \ M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \ M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \ diff --git a/src/Common/HashTable/Hash.h b/src/Common/HashTable/Hash.h index 0abe96497bd..d601d945ccf 100644 --- a/src/Common/HashTable/Hash.h +++ b/src/Common/HashTable/Hash.h @@ -91,49 +91,38 @@ inline UInt32 updateWeakHash32(const DB::UInt8 * pos, size_t size, DB::UInt32 up { if (size < 8) { - DB::UInt64 value = 0; - auto * value_ptr = reinterpret_cast(&value); + UInt64 value = 0; - typedef __attribute__((__aligned__(1))) uint16_t uint16_unaligned_t; - typedef __attribute__((__aligned__(1))) uint32_t uint32_unaligned_t; - - /// Adopted code from FastMemcpy.h (memcpy_tiny) switch (size) { case 0: break; case 1: - value_ptr[0] = pos[0]; + __builtin_memcpy(&value, pos, 1); break; case 2: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); + __builtin_memcpy(&value, pos, 2); break; case 3: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); - value_ptr[2] = pos[2]; + __builtin_memcpy(&value, pos, 3); break; case 4: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); + __builtin_memcpy(&value, pos, 4); break; case 5: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); - value_ptr[4] = pos[4]; + __builtin_memcpy(&value, pos, 5); break; case 6: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); - *reinterpret_cast(value_ptr + 4) = - *reinterpret_cast(pos + 4); + __builtin_memcpy(&value, pos, 6); break; case 7: - *reinterpret_cast(value_ptr) = *reinterpret_cast(pos); - *reinterpret_cast(value_ptr + 3) = - *reinterpret_cast(pos + 3); + __builtin_memcpy(&value, pos, 7); break; default: __builtin_unreachable(); } - value_ptr[7] = size; + reinterpret_cast(&value)[7] = size; return intHashCRC32(value, updated_value); } diff --git a/src/Common/MemoryStatisticsOS.h b/src/Common/MemoryStatisticsOS.h index 1661b62711f..0893e333007 100644 --- a/src/Common/MemoryStatisticsOS.h +++ b/src/Common/MemoryStatisticsOS.h @@ -6,7 +6,7 @@ namespace DB { -/** Opens a file /proc/self/mstat. Keeps it open and reads memory statistics via 'pread'. +/** Opens a file /proc/self/statm. Keeps it open and reads memory statistics via 'pread'. * This is Linux specific. * See: man procfs * diff --git a/src/Common/SpaceSaving.h b/src/Common/SpaceSaving.h index 185b4aa90ae..7ad48a6cf87 100644 --- a/src/Common/SpaceSaving.h +++ b/src/Common/SpaceSaving.h @@ -85,7 +85,7 @@ public: struct Counter { - Counter() {} + Counter() = default; //-V730 Counter(const TKey & k, UInt64 c = 0, UInt64 e = 0, size_t h = 0) : key(k), slot(0), hash(h), count(c), error(e) {} @@ -148,7 +148,7 @@ public: // Increase weight of a key that already exists auto hash = counter_map.hash(key); - if (auto counter = findCounter(key, hash); counter) + if (auto * counter = findCounter(key, hash); counter) { counter->count += increment; counter->error += error; @@ -159,12 +159,12 @@ public: // Key doesn't exist, but can fit in the top K if (unlikely(size() < capacity())) { - auto c = new Counter(arena.emplace(key), increment, error, hash); + auto * c = new Counter(arena.emplace(key), increment, error, hash); push(c); return; } - auto min = counter_list.back(); + auto * min = counter_list.back(); // The key doesn't exist and cannot fit in the current top K, but // the new key has a bigger weight and is virtually more present // compared to the element who is less present on the set. This part @@ -218,7 +218,7 @@ public: */ if (m2 > 0) { - for (auto counter : counter_list) + for (auto * counter : counter_list) { counter->count += m2; counter->error += m2; @@ -226,10 +226,10 @@ public: } // The list is sorted in descending order, we have to scan in reverse - for (auto counter : boost::adaptors::reverse(rhs.counter_list)) + for (auto * counter : boost::adaptors::reverse(rhs.counter_list)) { size_t hash = counter_map.hash(counter->key); - if (auto current = findCounter(counter->key, hash)) + if (auto * current = findCounter(counter->key, hash)) { // Subtract m2 previously added, guaranteed not negative current->count += (counter->count - m2); @@ -262,7 +262,7 @@ public: std::vector topK(size_t k) const { std::vector res; - for (auto counter : counter_list) + for (auto * counter : counter_list) { res.push_back(*counter); if (res.size() == k) @@ -274,7 +274,7 @@ public: void write(WriteBuffer & wb) const { writeVarUInt(size(), wb); - for (auto counter : counter_list) + for (auto * counter : counter_list) counter->write(wb); writeVarUInt(alpha_map.size(), wb); @@ -290,7 +290,7 @@ public: for (size_t i = 0; i < count; ++i) { - auto counter = new Counter(); + auto * counter = new Counter(); counter->read(rb); counter->hash = counter_map.hash(counter->key); push(counter); @@ -325,7 +325,7 @@ protected: { while (counter->slot > 0) { - auto next = counter_list[counter->slot - 1]; + auto * next = counter_list[counter->slot - 1]; if (*counter > *next) { std::swap(next->slot, counter->slot); @@ -339,7 +339,7 @@ protected: private: void destroyElements() { - for (auto counter : counter_list) + for (auto * counter : counter_list) { arena.free(counter->key); delete counter; @@ -376,7 +376,7 @@ private: { removed_keys = 0; counter_map.clear(); - for (auto counter : counter_list) + for (auto * counter : counter_list) counter_map[counter->key] = counter; } diff --git a/src/Core/Field.h b/src/Core/Field.h index 5c4c2e165ad..2c3ca1310e4 100644 --- a/src/Core/Field.h +++ b/src/Core/Field.h @@ -310,7 +310,7 @@ public: template using enable_if_not_field_or_stringlike_t = std::enable_if_t, Field> && !std::is_same_v>, String>, Z>; - Field() + Field() //-V730 : which(Types::Null) { } @@ -851,7 +851,7 @@ decltype(auto) castToNearestFieldType(T && x) } template -Field::Field(T && rhs, enable_if_not_field_or_stringlike_t) +Field::Field(T && rhs, enable_if_not_field_or_stringlike_t) //-V730 { auto && val = castToNearestFieldType(std::forward(rhs)); createConcrete(std::forward(val)); diff --git a/src/Disks/ya.make b/src/Disks/ya.make index 53dc9fd75c4..82c87aaa488 100644 --- a/src/Disks/ya.make +++ b/src/Disks/ya.make @@ -14,6 +14,7 @@ SRCS( DiskFactory.cpp DiskLocal.cpp DiskMemory.cpp + DiskRestartProxy.cpp DiskSelector.cpp IDisk.cpp IVolume.cpp diff --git a/src/Functions/IFunction.cpp b/src/Functions/IFunction.cpp index 43b344bebc4..5bbe6eb523e 100644 --- a/src/Functions/IFunction.cpp +++ b/src/Functions/IFunction.cpp @@ -137,7 +137,7 @@ ColumnPtr wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & a if (const auto * nullable = checkAndGetColumn(*elem.column)) { const ColumnPtr & null_map_column = nullable->getNullMapColumnPtr(); - if (!result_null_map_column) + if (!result_null_map_column) //-V1051 { result_null_map_column = null_map_column; } diff --git a/src/IO/ya.make b/src/IO/ya.make index 01b3995ef7a..68e690b8611 100644 --- a/src/IO/ya.make +++ b/src/IO/ya.make @@ -47,6 +47,7 @@ SRCS( ReadBufferAIO.cpp ReadBufferFromFile.cpp ReadBufferFromFileBase.cpp + ReadBufferFromFileDecorator.cpp ReadBufferFromFileDescriptor.cpp ReadBufferFromIStream.cpp ReadBufferFromMemory.cpp @@ -57,6 +58,7 @@ SRCS( UseSSL.cpp WriteBufferFromFile.cpp WriteBufferFromFileBase.cpp + WriteBufferFromFileDecorator.cpp WriteBufferFromFileDescriptor.cpp WriteBufferFromFileDescriptorDiscardOnFailure.cpp WriteBufferFromHTTP.cpp diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 1b5ed75b0d0..31eefe03b3c 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1043,12 +1043,12 @@ private: */ struct AggregateFunctionInstruction { - const IAggregateFunction * that; - size_t state_offset; - const IColumn ** arguments; - const IAggregateFunction * batch_that; - const IColumn ** batch_arguments; - const UInt64 * offsets = nullptr; + const IAggregateFunction * that{}; + size_t state_offset{}; + const IColumn ** arguments{}; + const IAggregateFunction * batch_that{}; + const IColumn ** batch_arguments{}; + const UInt64 * offsets{}; }; using AggregateFunctionInstructions = std::vector; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index ee576c8deda..af4369527bc 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include #include @@ -119,11 +121,14 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column) return true; } +ExpressionAnalyzerData::~ExpressionAnalyzerData() = default; + ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_) : use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries) , size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode) {} +ExpressionAnalyzer::~ExpressionAnalyzer() = default; ExpressionAnalyzer::ExpressionAnalyzer( const ASTPtr & query_, @@ -746,11 +751,11 @@ static JoinPtr tryGetStorageJoin(std::shared_ptr analyzed_join) return {}; } -static ExpressionActionsPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join) +static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join) { ASTPtr expression_list = analyzed_join.rightKeysList(); auto syntax_result = TreeRewriter(context).analyze(expression_list, analyzed_join.columnsFromJoinedTable()); - return ExpressionAnalyzer(expression_list, syntax_result, context).getActions(true, false); + return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false); } static bool allowDictJoin(StoragePtr joined_storage, ContextPtr context, String & dict_name, String & key_name) @@ -802,67 +807,77 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin( const ASTTablesInSelectQueryElement & join_element, const ColumnsWithTypeAndName & left_sample_columns) { /// Two JOINs are not supported with the same subquery, but different USINGs. - auto join_hash = join_element.getTreeHash(); - String join_subquery_id = toString(join_hash.first) + "_" + toString(join_hash.second); - SubqueryForSet & subquery_for_join = subqueries_for_sets[join_subquery_id]; + if (joined_plan) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table join was already created for query"); /// Use StorageJoin if any. - if (!subquery_for_join.join) - subquery_for_join.join = tryGetStorageJoin(syntax->analyzed_join); + JoinPtr join = tryGetStorageJoin(syntax->analyzed_join); - if (!subquery_for_join.join) + if (!join) { /// Actions which need to be calculated on joined block. - ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin()); + auto joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin()); Names original_right_columns; - if (!subquery_for_join.source) + + NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns( + Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames()); + for (auto & pr : required_columns_with_aliases) + original_right_columns.push_back(pr.first); + + /** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs + * - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1, + * in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`. + * - this function shows the expression JOIN _data1. + */ + auto interpreter = interpretSubquery(join_element.table_expression, getContext(), original_right_columns, query_options); + { - NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns( - joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns()); - for (auto & pr : required_columns_with_aliases) - original_right_columns.push_back(pr.first); + joined_plan = std::make_unique(); + interpreter->buildQueryPlan(*joined_plan); - /** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs - * - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1, - * in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`. - * - this function shows the expression JOIN _data1. - */ - auto interpreter = interpretSubquery(join_element.table_expression, getContext(), original_right_columns, query_options); + auto sample_block = interpreter->getSampleBlock(); - subquery_for_join.makeSource(interpreter, std::move(required_columns_with_aliases)); + auto rename_dag = std::make_unique(sample_block.getColumnsWithTypeAndName()); + for (const auto & name_with_alias : required_columns_with_aliases) + { + if (sample_block.has(name_with_alias.first)) + { + auto pos = sample_block.getPositionByName(name_with_alias.first); + const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second); + rename_dag->getIndex()[pos] = &alias; + } + } + + auto rename_step = std::make_unique(joined_plan->getCurrentDataStream(), std::move(rename_dag)); + rename_step->setStepDescription("Rename joined columns"); + joined_plan->addStep(std::move(rename_step)); } - /// TODO You do not need to set this up when JOIN is only needed on remote servers. - subquery_for_join.addJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside + auto joined_actions_step = std::make_unique(joined_plan->getCurrentDataStream(), std::move(joined_block_actions)); + joined_actions_step->setStepDescription("Joined actions"); + joined_plan->addStep(std::move(joined_actions_step)); - const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName(); + const ColumnsWithTypeAndName & right_sample_columns = joined_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(); bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns); if (need_convert) - subquery_for_join.addJoinActions(std::make_shared( - syntax->analyzed_join->rightConvertingActions(), - ExpressionActionsSettings::fromContext(getContext()))); + { + auto converting_step = std::make_unique(joined_plan->getCurrentDataStream(), syntax->analyzed_join->rightConvertingActions()); + converting_step->setStepDescription("Convert joined columns"); + joined_plan->addStep(std::move(converting_step)); + } - subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block, getContext()); + join = makeJoin(syntax->analyzed_join, joined_plan->getCurrentDataStream().header, getContext()); /// Do not make subquery for join over dictionary. if (syntax->analyzed_join->dictionary_reader) - { - JoinPtr join = subquery_for_join.join; - subqueries_for_sets.erase(join_subquery_id); - return join; - } + joined_plan.reset(); } else - { - const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName(); - bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns); - if (need_convert) - subquery_for_join.addJoinActions(std::make_shared(syntax->analyzed_join->rightConvertingActions())); - } + syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, {}); - return subquery_for_join.join; + return join; } ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere( @@ -1345,6 +1360,11 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAn return std::make_shared(actions, ExpressionActionsSettings::fromContext(getContext())); } +std::unique_ptr SelectQueryExpressionAnalyzer::getJoinedPlan() +{ + return std::move(joined_plan); +} + ActionsDAGPtr SelectQueryExpressionAnalyzer::simpleSelectActions() { ExpressionActionsChain new_chain(getContext()); diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index fe53b44690e..68002539d52 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -47,9 +47,13 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false); /// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately. struct ExpressionAnalyzerData { + ~ExpressionAnalyzerData(); + SubqueriesForSets subqueries_for_sets; PreparedSets prepared_sets; + std::unique_ptr joined_plan; + /// Columns after ARRAY JOIN. If there is no ARRAY JOIN, it's source_columns. NamesAndTypesList columns_after_array_join; /// Columns after Columns after ARRAY JOIN and JOIN. If there is no JOIN, it's columns_after_array_join. @@ -99,6 +103,8 @@ public: : ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, 0, false, {}) {} + ~ExpressionAnalyzer(); + void appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types); /// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression. @@ -293,6 +299,7 @@ public: const AggregateDescriptions & aggregates() const { return aggregate_descriptions; } const PreparedSets & getPreparedSets() const { return prepared_sets; } + std::unique_ptr getJoinedPlan(); /// Tables that will need to be sent to remote servers for distributed query processing. const TemporaryTablesMapping & getExternalTables() const { return external_tables; } diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index fcd89aed84d..d6163ff773a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1504,6 +1504,7 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result void HashJoin::reuseJoinedData(const HashJoin & join) { data = join.data; + from_storage_join = true; joinDispatch(kind, strictness, data->maps, [this](auto kind_, auto strictness_, auto & map) { used_flags.reinit(map.getBufferSizeInCells(data->type) + 1); diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 7f70c397db9..c0a699d0a12 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -159,6 +159,8 @@ public: void joinTotals(Block & block) const override; + bool isFilled() const override { return from_storage_join || data->type == Type::DICT; } + /** For RIGHT and FULL JOINs. * A stream that will contain default values from left table, joined with rows from right table, that was not joined before. * Use only after all calls to joinBlock was done. @@ -344,6 +346,9 @@ private: ASTTableJoin::Kind kind; ASTTableJoin::Strictness strictness; + /// This join was created from StorageJoin and it is already filled. + bool from_storage_join = false; + /// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates. const Names & key_names_right; diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index b326f06fa7e..0f486fbe523 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -41,6 +41,10 @@ public: virtual size_t getTotalByteCount() const = 0; virtual bool alwaysReturnsEmptySet() const { return false; } + /// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock. + /// Different query plan is used for such joins. + virtual bool isFilled() const { return false; } + virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; } }; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 116d2f58080..16c9731a427 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -1117,14 +1118,37 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu if (expressions.hasJoin()) { - QueryPlanStepPtr join_step = std::make_unique( - query_plan.getCurrentDataStream(), - expressions.join, - expressions.join_has_delayed_stream, - settings.max_block_size); + if (expressions.join->isFilled()) + { + QueryPlanStepPtr filled_join_step = std::make_unique( + query_plan.getCurrentDataStream(), + expressions.join, + settings.max_block_size); - join_step->setStepDescription("JOIN"); - query_plan.addStep(std::move(join_step)); + filled_join_step->setStepDescription("JOIN"); + query_plan.addStep(std::move(filled_join_step)); + } + else + { + auto joined_plan = query_analyzer->getJoinedPlan(); + + if (!joined_plan) + throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query"); + + QueryPlanStepPtr join_step = std::make_unique( + query_plan.getCurrentDataStream(), + joined_plan->getCurrentDataStream(), + expressions.join, + settings.max_block_size); + + join_step->setStepDescription("JOIN"); + std::vector plans; + plans.emplace_back(std::make_unique(std::move(query_plan))); + plans.emplace_back(std::move(joined_plan)); + + query_plan = QueryPlan(); + query_plan.unitePlans(std::move(join_step), {std::move(plans)}); + } } if (expressions.hasWhere()) diff --git a/src/Interpreters/SubqueryForSet.cpp b/src/Interpreters/SubqueryForSet.cpp index a3a51210bf1..08fc07c71e1 100644 --- a/src/Interpreters/SubqueryForSet.cpp +++ b/src/Interpreters/SubqueryForSet.cpp @@ -1,9 +1,6 @@ #include -#include -#include -#include -#include - +#include +#include namespace DB { @@ -13,65 +10,4 @@ SubqueryForSet::~SubqueryForSet() = default; SubqueryForSet::SubqueryForSet(SubqueryForSet &&) = default; SubqueryForSet & SubqueryForSet::operator= (SubqueryForSet &&) = default; -void SubqueryForSet::makeSource(std::shared_ptr & interpreter, - NamesWithAliases && joined_block_aliases_) -{ - joined_block_aliases = std::move(joined_block_aliases_); - source = std::make_unique(); - interpreter->buildQueryPlan(*source); - - sample_block = interpreter->getSampleBlock(); - renameColumns(sample_block); -} - -void SubqueryForSet::renameColumns(Block & block) -{ - for (const auto & name_with_alias : joined_block_aliases) - { - if (block.has(name_with_alias.first)) - { - auto pos = block.getPositionByName(name_with_alias.first); - auto column = block.getByPosition(pos); - block.erase(pos); - column.name = name_with_alias.second; - block.insert(std::move(column)); - } - } -} - -void SubqueryForSet::addJoinActions(ExpressionActionsPtr actions) -{ - actions->execute(sample_block); - if (joined_block_actions == nullptr) - { - joined_block_actions = actions; - } - else - { - auto new_dag = ActionsDAG::merge( - std::move(*joined_block_actions->getActionsDAG().clone()), - std::move(*actions->getActionsDAG().clone())); - joined_block_actions = std::make_shared(new_dag, actions->getSettings()); - } -} - -bool SubqueryForSet::insertJoinedBlock(Block & block) -{ - renameColumns(block); - - if (joined_block_actions) - joined_block_actions->execute(block); - - return join->addJoinedBlock(block); -} - -void SubqueryForSet::setTotals(Block totals) -{ - if (join) - { - renameColumns(totals); - join->setTotals(totals); - } -} - } diff --git a/src/Interpreters/SubqueryForSet.h b/src/Interpreters/SubqueryForSet.h index a42bf296d6c..974f5bd3e58 100644 --- a/src/Interpreters/SubqueryForSet.h +++ b/src/Interpreters/SubqueryForSet.h @@ -2,19 +2,16 @@ #include #include -#include -#include -#include namespace DB { -class InterpreterSelectWithUnionQuery; -class ExpressionActions; -using ExpressionActionsPtr = std::shared_ptr; class QueryPlan; +class Set; +using SetPtr = std::shared_ptr; + /// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section. struct SubqueryForSet { @@ -28,28 +25,10 @@ struct SubqueryForSet /// If set, build it from result. SetPtr set; - JoinPtr join; - /// Apply this actions to joined block. - ExpressionActionsPtr joined_block_actions; - Block sample_block; /// source->getHeader() + column renames /// If set, put the result into the table. /// This is a temporary table for transferring to remote servers for distributed query processing. StoragePtr table; - - void makeSource(std::shared_ptr & interpreter, - NamesWithAliases && joined_block_aliases_); - - void addJoinActions(ExpressionActionsPtr actions); - - bool insertJoinedBlock(Block & block); - void setTotals(Block totals); - -private: - NamesWithAliases joined_block_aliases; /// Rename column from joined block from this list. - - /// Rename source right table column names into qualified column names if they conflicts with left table ones. - void renameColumns(Block & block); }; /// ID of subquery -> what to do with it. diff --git a/src/Interpreters/evaluateConstantExpression.cpp b/src/Interpreters/evaluateConstantExpression.cpp index 89924025c08..e6b5893edd7 100644 --- a/src/Interpreters/evaluateConstantExpression.cpp +++ b/src/Interpreters/evaluateConstantExpression.cpp @@ -19,7 +19,6 @@ #include #include - namespace DB { diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 1b803ec0886..14b60d0b14c 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -96,6 +97,12 @@ void QueryPipeline::addTransform(ProcessorPtr transform) pipe.addTransform(std::move(transform)); } +void QueryPipeline::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes) +{ + checkInitializedAndNotCompleted(); + pipe.addTransform(std::move(transform), totals, extremes); +} + void QueryPipeline::transform(const Transformer & transformer) { checkInitializedAndNotCompleted(); @@ -255,6 +262,96 @@ QueryPipeline QueryPipeline::unitePipelines( return pipeline; } +std::unique_ptr QueryPipeline::joinPipelines( + std::unique_ptr left, + std::unique_ptr right, + JoinPtr join, + size_t max_block_size, + Processors * collected_processors) +{ + left->checkInitializedAndNotCompleted(); + right->checkInitializedAndNotCompleted(); + + /// Extremes before join are useless. They will be calculated after if needed. + left->pipe.dropExtremes(); + right->pipe.dropExtremes(); + + left->pipe.collected_processors = collected_processors; + right->pipe.collected_processors = collected_processors; + + /// In case joined subquery has totals, and we don't, add default chunk to totals. + bool default_totals = false; + if (!left->hasTotals() && right->hasTotals()) + { + left->addDefaultTotals(); + default_totals = true; + } + + /// (left) ──────┐ + /// ╞> Joining ─> (joined) + /// (left) ─┐┌───┘ + /// └┼───┐ + /// (right) ┐ (totals) ──┼─┐ ╞> Joining ─> (joined) + /// ╞> Resize ┐ ╓─┘┌┼─┘ + /// (right) ┘ │ ╟──┘└─┐ + /// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals) + /// (totals) ─────────┘ ╙─────┘ + + size_t num_streams = left->getNumStreams(); + right->resize(1); + + auto adding_joined = std::make_shared(right->getHeader(), join); + InputPort * totals_port = nullptr; + if (right->hasTotals()) + totals_port = adding_joined->addTotalsPort(); + + right->addTransform(std::move(adding_joined), totals_port, nullptr); + + size_t num_streams_including_totals = num_streams + (left->hasTotals() ? 1 : 0); + right->resize(num_streams_including_totals); + + /// This counter is needed for every Joining except totals, to decide which Joining will generate non joined rows. + auto finish_counter = std::make_shared(num_streams); + + auto lit = left->pipe.output_ports.begin(); + auto rit = right->pipe.output_ports.begin(); + + for (size_t i = 0; i < num_streams; ++i) + { + auto joining = std::make_shared(left->getHeader(), join, max_block_size, false, default_totals, finish_counter); + connect(**lit, joining->getInputs().front()); + connect(**rit, joining->getInputs().back()); + *lit = &joining->getOutputs().front(); + + ++lit; + ++rit; + + if (collected_processors) + collected_processors->emplace_back(joining); + + left->pipe.processors.emplace_back(std::move(joining)); + } + + if (left->hasTotals()) + { + auto joining = std::make_shared(left->getHeader(), join, max_block_size, true, default_totals); + connect(*left->pipe.totals_port, joining->getInputs().front()); + connect(**rit, joining->getInputs().back()); + left->pipe.totals_port = &joining->getOutputs().front(); + + ++rit; + + if (collected_processors) + collected_processors->emplace_back(joining); + + left->pipe.processors.emplace_back(std::move(joining)); + } + + left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end()); + left->pipe.holder = std::move(right->pipe.holder); + left->pipe.header = left->pipe.output_ports.front()->getHeader(); + return left; +} void QueryPipeline::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context) { diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index ac0777d22c6..0c8caa93539 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -27,6 +27,9 @@ struct SizeLimits; struct ExpressionActionsSettings; +class IJoin; +using JoinPtr = std::shared_ptr; + class QueryPipeline { public: @@ -52,6 +55,7 @@ public: void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter); /// Add transform with getNumStreams() input ports. void addTransform(ProcessorPtr transform); + void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); using Transformer = std::function; /// Transform pipeline in general way. @@ -90,6 +94,15 @@ public: size_t max_threads_limit = 0, Processors * collected_processors = nullptr); + /// Join two pipelines together using JoinPtr. + /// If collector is used, it will collect only newly-added processors, but not processors from pipelines. + static std::unique_ptr joinPipelines( + std::unique_ptr left, + std::unique_ptr right, + JoinPtr join, + size_t max_block_size, + Processors * collected_processors = nullptr); + /// Add other pipeline and execute it before current one. /// Pipeline must have empty header, it should not generate any chunk. /// This is used for CreatingSets. diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index 9bcbc9d8b09..811e5885219 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -55,8 +55,8 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const settings.out << prefix; if (subquery_for_set.set) settings.out << "Set: "; - else if (subquery_for_set.join) - settings.out << "Join: "; + // else if (subquery_for_set.join) + // settings.out << "Join: "; settings.out << description << '\n'; } @@ -65,8 +65,8 @@ void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const { if (subquery_for_set.set) map.add("Set", description); - else if (subquery_for_set.join) - map.add("Join", description); + // else if (subquery_for_set.join) + // map.add("Join", description); } @@ -134,8 +134,6 @@ void addCreatingSetsStep( continue; auto plan = std::move(set.source); - std::string type = (set.join != nullptr) ? "JOIN" - : "subquery"; auto creating_set = std::make_unique( plan->getCurrentDataStream(), @@ -143,7 +141,7 @@ void addCreatingSetsStep( std::move(set), limits, context); - creating_set->setStepDescription("Create set for " + type); + creating_set->setStepDescription("Create set for subquery"); plan->addStep(std::move(creating_set)); input_streams.emplace_back(plan->getCurrentDataStream()); diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index e6a0475a7c2..ddf6ed00c3f 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -28,22 +28,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions) }; } -static ITransformingStep::Traits getJoinTraits() -{ - return ITransformingStep::Traits - { - { - .preserves_distinct_columns = false, - .returns_single_stream = false, - .preserves_number_of_streams = true, - .preserves_sorting = false, - }, - { - .preserves_number_of_rows = false, - } - }; -} - ExpressionStep::ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_) : ITransformingStep( input_stream_, @@ -118,47 +102,4 @@ void ExpressionStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Expression", expression->toTree()); } -JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_) - : ITransformingStep( - input_stream_, - Transform::transformHeader(input_stream_.header, join_), - getJoinTraits()) - , join(std::move(join_)) - , has_non_joined_rows(has_non_joined_rows_) - , max_block_size(max_block_size_) -{ -} - -void JoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) -{ - /// In case joined subquery has totals, and we don't, add default chunk to totals. - bool add_default_totals = false; - if (!pipeline.hasTotals()) - { - pipeline.addDefaultTotals(); - add_default_totals = true; - } - - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) - { - bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared(header, join, on_totals, add_default_totals); - }); - - if (has_non_joined_rows) - { - const Block & join_result_sample = pipeline.getHeader(); - auto stream = std::make_shared(*join, join_result_sample, max_block_size); - auto source = std::make_shared(std::move(stream)); - - source->setQueryPlanStep(this); - pipeline.addDelayedStream(source); - - /// Now, after adding delayed stream, it has implicit dependency on other port. - /// Here we add resize processor to remove this dependency. - /// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck` - pipeline.resize(pipeline.getNumStreams(), true); - } -} - } diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index b7c4c0974f3..753d446f1f3 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -7,9 +7,6 @@ namespace DB class ActionsDAG; using ActionsDAGPtr = std::shared_ptr; -class IJoin; -using JoinPtr = std::shared_ptr; - class ExpressionTransform; class JoiningTransform; @@ -36,23 +33,4 @@ private: ActionsDAGPtr actions_dag; }; -/// TODO: add separate step for join. -class JoinStep : public ITransformingStep -{ -public: - using Transform = JoiningTransform; - - explicit JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_); - String getName() const override { return "Join"; } - - void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override; - - const JoinPtr & getJoin() const { return join; } - -private: - JoinPtr join; - bool has_non_joined_rows; - size_t max_block_size; -}; - } diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp new file mode 100644 index 00000000000..b06d6628dcb --- /dev/null +++ b/src/Processors/QueryPlan/JoinStep.cpp @@ -0,0 +1,89 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +JoinStep::JoinStep( + const DataStream & left_stream_, + const DataStream & right_stream_, + JoinPtr join_, + size_t max_block_size_) + : join(std::move(join_)) + , max_block_size(max_block_size_) +{ + input_streams = {left_stream_, right_stream_}; + output_stream = DataStream + { + .header = JoiningTransform::transformHeader(left_stream_.header, join), + }; +} + +QueryPipelinePtr JoinStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) +{ + if (pipelines.size() != 2) + throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps"); + + return QueryPipeline::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors); +} + +void JoinStep::describePipeline(FormatSettings & settings) const +{ + IQueryPlanStep::describePipeline(processors, settings); +} + +static ITransformingStep::Traits getStorageJoinTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = false, + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_) + : ITransformingStep( + input_stream_, + JoiningTransform::transformHeader(input_stream_.header, join_), + getStorageJoinTraits()) + , join(std::move(join_)) + , max_block_size(max_block_size_) +{ + if (!join->isFilled()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled"); +} + +void FilledJoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) +{ + bool default_totals = false; + if (!pipeline.hasTotals() && join->hasTotals()) + { + pipeline.addDefaultTotals(); + default_totals = true; + } + + auto finish_counter = std::make_shared(pipeline.getNumStreams()); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) + { + bool on_totals = stream_type == QueryPipeline::StreamType::Totals; + auto counter = on_totals ? nullptr : finish_counter; + return std::make_shared(header, join, max_block_size, on_totals, default_totals, counter); + }); +} + +} diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h new file mode 100644 index 00000000000..6430f7cbd59 --- /dev/null +++ b/src/Processors/QueryPlan/JoinStep.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include + +namespace DB +{ + +class IJoin; +using JoinPtr = std::shared_ptr; + +/// Join two data streams. +class JoinStep : public IQueryPlanStep +{ +public: + JoinStep( + const DataStream & left_stream_, + const DataStream & right_stream_, + JoinPtr join_, + size_t max_block_size_); + + String getName() const override { return "Join"; } + + QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override; + + void describePipeline(FormatSettings & settings) const override; + + const JoinPtr & getJoin() const { return join; } + +private: + JoinPtr join; + size_t max_block_size; + Processors processors; +}; + +/// Special step for the case when Join is already filled. +/// For StorageJoin and Dictionary. +class FilledJoinStep : public ITransformingStep +{ +public: + FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_); + + String getName() const override { return "FilledJoin"; } + void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override; + +private: + JoinPtr join; + size_t max_block_size; +}; + +} diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp index 20813e9f548..e1fac21d5c1 100644 --- a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -72,8 +73,8 @@ static size_t tryAddNewFilterStep( /// Add new Filter step before Aggregating. /// Expression/Filter -> Aggregating -> Something auto & node = nodes.emplace_back(); - node.children.swap(child_node->children); - child_node->children.emplace_back(&node); + node.children.emplace_back(&node); + std::swap(node.children[0], child_node->children[0]); /// Expression/Filter -> Aggregating -> Filter -> Something /// New filter column is the first one. diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index a5b5958eff1..86051019235 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -45,8 +45,6 @@ void CreatingSetsTransform::startSubquery() { if (subquery.set) LOG_TRACE(log, "Creating set."); - if (subquery.join) - LOG_TRACE(log, "Creating join."); if (subquery.table) LOG_TRACE(log, "Filling temporary table."); @@ -54,10 +52,9 @@ void CreatingSetsTransform::startSubquery() table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()); done_with_set = !subquery.set; - done_with_join = !subquery.join; done_with_table = !subquery.table; - if (done_with_set && done_with_join && done_with_table) + if (done_with_set /*&& done_with_join*/ && done_with_table) throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR); if (table_out) @@ -72,8 +69,6 @@ void CreatingSetsTransform::finishSubquery() if (subquery.set) LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds); - if (subquery.join) - LOG_DEBUG(log, "Created Join with {} entries from {} rows in {} sec.", subquery.join->getTotalRowCount(), read_rows, seconds); if (subquery.table) LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds); } @@ -81,12 +76,6 @@ void CreatingSetsTransform::finishSubquery() { LOG_DEBUG(log, "Subquery has empty result."); } - - if (totals) - subquery.setTotals(getInputPort().getHeader().cloneWithColumns(totals.detachColumns())); - else - /// Set empty totals anyway, it is needed for MergeJoin. - subquery.setTotals({}); } void CreatingSetsTransform::init() @@ -111,12 +100,6 @@ void CreatingSetsTransform::consume(Chunk chunk) done_with_set = true; } - if (!done_with_join) - { - if (!subquery.insertJoinedBlock(block)) - done_with_join = true; - } - if (!done_with_table) { block = materializeBlock(block); @@ -130,7 +113,7 @@ void CreatingSetsTransform::consume(Chunk chunk) done_with_table = true; } - if (done_with_set && done_with_join && done_with_table) + if (done_with_set && done_with_table) finishConsume(); } diff --git a/src/Processors/Transforms/CreatingSetsTransform.h b/src/Processors/Transforms/CreatingSetsTransform.h index a5787b0a5f5..a847582a988 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.h +++ b/src/Processors/Transforms/CreatingSetsTransform.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -43,7 +44,7 @@ private: Stopwatch watch; bool done_with_set = true; - bool done_with_join = true; + //bool done_with_join = true; bool done_with_table = true; SizeLimits network_transfer_limits; diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index dea887fd7d7..31b2da46ab3 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -1,10 +1,17 @@ #include #include #include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Block JoiningTransform::transformHeader(Block header, const JoinPtr & join) { ExtraBlockPtr tmp; @@ -12,13 +19,128 @@ Block JoiningTransform::transformHeader(Block header, const JoinPtr & join) return header; } -JoiningTransform::JoiningTransform(Block input_header, JoinPtr join_, - bool on_totals_, bool default_totals_) - : ISimpleTransform(input_header, transformHeader(input_header, join_), on_totals_) +JoiningTransform::JoiningTransform( + Block input_header, + JoinPtr join_, + size_t max_block_size_, + bool on_totals_, + bool default_totals_, + FinishCounterPtr finish_counter_) + : IProcessor({input_header}, {transformHeader(input_header, join_)}) , join(std::move(join_)) , on_totals(on_totals_) , default_totals(default_totals_) -{} + , finish_counter(std::move(finish_counter_)) + , max_block_size(max_block_size_) +{ + if (!join->isFilled()) + inputs.emplace_back(Block(), this); +} + +IProcessor::Status JoiningTransform::prepare() +{ + auto & output = outputs.front(); + + /// Check can output. + if (output.isFinished() || stop_reading) + { + output.finish(); + for (auto & input : inputs) + input.close(); + return Status::Finished; + } + + if (!output.canPush()) + { + for (auto & input : inputs) + input.setNotNeeded(); + return Status::PortFull; + } + + /// Output if has data. + if (has_output) + { + output.push(std::move(output_chunk)); + has_output = false; + + return Status::PortFull; + } + + if (inputs.size() > 1) + { + auto & last_in = inputs.back(); + if (!last_in.isFinished()) + { + last_in.setNeeded(); + if (last_in.hasData()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No data is expected from second JoiningTransform port"); + + return Status::NeedData; + } + } + + if (has_input) + return Status::Ready; + + auto & input = inputs.front(); + if (input.isFinished()) + { + if (process_non_joined) + return Status::Ready; + + output.finish(); + return Status::Finished; + } + + input.setNeeded(); + + if (!input.hasData()) + return Status::NeedData; + + input_chunk = input.pull(true); + has_input = true; + return Status::Ready; +} + +void JoiningTransform::work() +{ + if (has_input) + { + transform(input_chunk); + output_chunk.swap(input_chunk); + has_input = not_processed != nullptr; + has_output = !output_chunk.empty(); + } + else + { + if (!non_joined_stream) + { + if (!finish_counter || !finish_counter->isLast()) + { + process_non_joined = false; + return; + } + + non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size); + if (!non_joined_stream) + { + process_non_joined = false; + return; + } + } + + auto block = non_joined_stream->read(); + if (!block) + { + process_non_joined = false; + return; + } + + auto rows = block.rows(); + output_chunk.setColumns(block.getColumns(), rows); + has_output = true; + } +} void JoiningTransform::transform(Chunk & chunk) { @@ -28,7 +150,7 @@ void JoiningTransform::transform(Chunk & chunk) if (join->alwaysReturnsEmptySet() && !on_totals) { - stopReading(); + stop_reading = true; chunk.clear(); return; } @@ -42,7 +164,7 @@ void JoiningTransform::transform(Chunk & chunk) auto cols = chunk.detachColumns(); for (auto & col : cols) col = col->cloneResized(1); - block = getInputPort().getHeader().cloneWithColumns(std::move(cols)); + block = inputs.front().getHeader().cloneWithColumns(std::move(cols)); /// Drop totals if both out stream and joined stream doesn't have ones. /// See comment in ExpressionTransform.h @@ -61,29 +183,122 @@ void JoiningTransform::transform(Chunk & chunk) Block JoiningTransform::readExecute(Chunk & chunk) { Block res; + // std::cerr << "=== Chunk rows " << chunk.getNumRows() << " cols " << chunk.getNumColumns() << std::endl; if (!not_processed) { + // std::cerr << "!not_processed " << std::endl; if (chunk.hasColumns()) - res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); if (res) join->joinBlock(res, not_processed); } else if (not_processed->empty()) /// There's not processed data inside expression. { + // std::cerr << "not_processed->empty() " << std::endl; if (chunk.hasColumns()) - res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); not_processed.reset(); join->joinBlock(res, not_processed); } else { + // std::cerr << "not not_processed->empty() " << std::endl; res = std::move(not_processed->block); join->joinBlock(res, not_processed); } + + // std::cerr << "Res block rows " << res.rows() << " cols " << res.columns() << std::endl; return res; } +FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_) + : IProcessor({input_header}, {Block()}) + , join(std::move(join_)) +{} + +InputPort * FillingRightJoinSideTransform::addTotalsPort() +{ + if (inputs.size() > 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Totals port was already added to FillingRightJoinSideTransform"); + + return &inputs.emplace_back(inputs.front().getHeader(), this); +} + +IProcessor::Status FillingRightJoinSideTransform::prepare() +{ + auto & output = outputs.front(); + + /// Check can output. + if (output.isFinished()) + { + for (auto & input : inputs) + input.close(); + return Status::Finished; + } + + if (!output.canPush()) + { + for (auto & input : inputs) + input.setNotNeeded(); + return Status::PortFull; + } + + auto & input = inputs.front(); + + if (stop_reading) + { + input.close(); + } + else if (!input.isFinished()) + { + input.setNeeded(); + + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(true); + return Status::Ready; + } + + if (inputs.size() > 1) + { + auto & totals_input = inputs.back(); + if (!totals_input.isFinished()) + { + totals_input.setNeeded(); + + if (!totals_input.hasData()) + return Status::NeedData; + + chunk = totals_input.pull(true); + for_totals = true; + return Status::Ready; + } + } + else if (!set_totals) + { + chunk.setColumns(inputs.front().getHeader().cloneEmpty().getColumns(), 0); + for_totals = true; + return Status::Ready; + } + + output.finish(); + return Status::Finished; +} + +void FillingRightJoinSideTransform::work() +{ + auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); + + if (for_totals) + join->setTotals(block); + else + stop_reading = !join->addJoinedBlock(block); + + set_totals = for_totals; +} + } diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index 15a203635e2..98038946f3b 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -1,5 +1,5 @@ #pragma once -#include +#include namespace DB @@ -8,21 +8,63 @@ namespace DB class IJoin; using JoinPtr = std::shared_ptr; -class JoiningTransform : public ISimpleTransform +class IBlockInputStream; +using BlockInputStreamPtr = std::shared_ptr; + +/// Join rows to chunk form left table. +/// This transform usually has two input ports and one output. +/// First input is for data from left table. +/// Second input has empty header and is connected with FillingRightJoinSide. +/// We can process left table only when Join is filled. Second input is used to signal that FillingRightJoinSide is finished. +class JoiningTransform : public IProcessor { public: - JoiningTransform(Block input_header, JoinPtr join_, - bool on_totals_ = false, bool default_totals_ = false); + + /// Count streams and check which is last. + /// The last one should process non-joined rows. + class FinishCounter + { + public: + explicit FinishCounter(size_t total_) : total(total_) {} + + bool isLast() + { + return finished.fetch_add(1) + 1 >= total; + } + + private: + const size_t total; + std::atomic finished{0}; + }; + + using FinishCounterPtr = std::shared_ptr; + + JoiningTransform( + Block input_header, + JoinPtr join_, + size_t max_block_size_, + bool on_totals_ = false, + bool default_totals_ = false, + FinishCounterPtr finish_counter_ = nullptr); String getName() const override { return "JoiningTransform"; } static Block transformHeader(Block header, const JoinPtr & join); + Status prepare() override; + void work() override; + protected: - void transform(Chunk & chunk) override; - bool needInputData() const override { return !not_processed; } + void transform(Chunk & chunk); private: + Chunk input_chunk; + Chunk output_chunk; + bool has_input = false; + bool has_output = false; + bool stop_reading = false; + bool process_non_joined = true; + JoinPtr join; bool on_totals; /// This flag means that we have manually added totals to our pipeline. @@ -33,7 +75,33 @@ private: ExtraBlockPtr not_processed; + FinishCounterPtr finish_counter; + BlockInputStreamPtr non_joined_stream; + size_t max_block_size; + Block readExecute(Chunk & chunk); }; +/// Fills Join with block from right table. +/// Has single input and single output port. +/// Output port has empty header. It is closed when al data is inserted in join. +class FillingRightJoinSideTransform : public IProcessor +{ +public: + FillingRightJoinSideTransform(Block input_header, JoinPtr join_); + String getName() const override { return "FillingRightJoinSide"; } + + InputPort * addTotalsPort(); + + Status prepare() override; + void work() override; + +private: + JoinPtr join; + Chunk chunk; + bool stop_reading = false; + bool for_totals = false; + bool set_totals = false; +}; + } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 18f285e60a2..5ab9c79511f 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -107,6 +107,7 @@ SRCS( QueryPlan/IQueryPlanStep.cpp QueryPlan/ISourceStep.cpp QueryPlan/ITransformingStep.cpp + QueryPlan/JoinStep.cpp QueryPlan/LimitByStep.cpp QueryPlan/LimitStep.cpp QueryPlan/MergeSortingStep.cpp diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 97d47333759..ce19d3ec89f 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -34,6 +34,7 @@ namespace CurrentMetrics { extern const Metric DistributedSend; extern const Metric DistributedFilesToInsert; + extern const Metric BrokenDistributedFilesToInsert; } namespace fs = std::filesystem; @@ -304,6 +305,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , log(&Poco::Logger::get(getLoggerName())) , monitor_blocker(monitor_blocker_) , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) + , metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0) { task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); }); task_handle->activateAndSchedule(); @@ -368,20 +370,20 @@ void StorageDistributedDirectoryMonitor::run() { do_sleep = !processFiles(files); - std::lock_guard metrics_lock(metrics_mutex); - last_exception = std::exception_ptr{}; + std::lock_guard status_lock(status_mutex); + status.last_exception = std::exception_ptr{}; } catch (...) { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); do_sleep = true; - ++error_count; + ++status.error_count; sleep_time = std::min( - std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, + std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))}, max_sleep_time); tryLogCurrentException(getLoggerName().data()); - last_exception = std::current_exception(); + status.last_exception = std::current_exception(); } } else @@ -392,9 +394,9 @@ void StorageDistributedDirectoryMonitor::run() const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); - error_count /= 2; + status.error_count /= 2; last_decrease_time = now; } @@ -500,16 +502,16 @@ std::map StorageDistributedDirectoryMonitor::getFiles() } { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); - if (files_count != files.size()) - LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count); - if (bytes_count != new_bytes_count) - LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count); + if (status.files_count != files.size()) + LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count); + if (status.bytes_count != new_bytes_count) + LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count); metric_pending_files.changeTo(files.size()); - files_count = files.size(); - bytes_count = new_bytes_count; + status.files_count = files.size(); + status.bytes_count = new_bytes_count; } return files; @@ -560,8 +562,9 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa writeRemoteConvert(distributed_header, remote, compression_expected, in, log); remote.writeSuffix(); } - catch (const Exception & e) + catch (Exception & e) { + e.addMessage(fmt::format("While sending {}", file_path)); maybeMarkAsBroken(file_path, e); throw; } @@ -708,7 +711,7 @@ struct StorageDistributedDirectoryMonitor::Batch if (remote) remote->writeSuffix(); } - catch (const Exception & e) + catch (Exception & e) { if (isFileBrokenErrorCode(e.code(), e.isRemoteException())) { @@ -716,7 +719,14 @@ struct StorageDistributedDirectoryMonitor::Batch batch_broken = true; } else + { + std::vector files(file_index_to_path.size()); + for (const auto & [index, name] : file_index_to_path) + files.push_back(name); + e.addMessage(fmt::format("While sending batch {}", fmt::join(files, "\n"))); + throw; + } } if (!batch_broken) @@ -817,10 +827,10 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t return false; { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); metric_pending_files.add(); - bytes_count += file_size; - ++files_count; + status.bytes_count += file_size; + ++status.files_count; } return task_handle->scheduleAfter(ms, false); @@ -828,16 +838,9 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus() { - std::lock_guard metrics_lock(metrics_mutex); - - return Status{ - path, - last_exception, - error_count, - files_count, - bytes_count, - monitor_blocker.isCancelled(), - }; + std::lock_guard status_lock(status_mutex); + Status current_status{status, path, monitor_blocker.isCancelled()}; + return current_status; } void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) @@ -964,11 +967,17 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/"); { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); size_t file_size = fs::file_size(file_path); - --files_count; - bytes_count -= file_size; + + --status.files_count; + status.bytes_count -= file_size; + + ++status.broken_files_count; + status.broken_bytes_count += file_size; + + metric_broken_files.add(); } fs::rename(file_path, broken_file_path); @@ -980,10 +989,10 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat size_t file_size = fs::file_size(file_path); { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); metric_pending_files.sub(); - --files_count; - bytes_count -= file_size; + --status.files_count; + status.bytes_count -= file_size; } fs::remove(file_path); @@ -1012,7 +1021,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela std::lock_guard lock{mutex}; { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); relative_path = new_relative_path; path = disk->getPath() + relative_path + '/'; } diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 1ccac4522d7..ab9b8592294 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -50,15 +50,23 @@ public: /// For scheduling via DistributedBlockOutputStream bool addAndSchedule(size_t file_size, size_t ms); + struct InternalStatus + { + std::exception_ptr last_exception; + + size_t error_count = 0; + + size_t files_count = 0; + size_t bytes_count = 0; + + size_t broken_files_count = 0; + size_t broken_bytes_count = 0; + }; /// system.distribution_queue interface - struct Status + struct Status : InternalStatus { std::string path; - std::exception_ptr last_exception; - size_t error_count; - size_t files_count; - size_t bytes_count; - bool is_blocked; + bool is_blocked = false; }; Status getStatus(); @@ -92,11 +100,8 @@ private: struct BatchHeader; struct Batch; - std::mutex metrics_mutex; - size_t error_count = 0; - size_t files_count = 0; - size_t bytes_count = 0; - std::exception_ptr last_exception; + std::mutex status_mutex; + InternalStatus status; const std::chrono::milliseconds default_sleep_time; std::chrono::milliseconds sleep_time; @@ -110,6 +115,7 @@ private: BackgroundSchedulePoolTaskHolder task_handle; CurrentMetrics::Increment metric_pending_files; + CurrentMetrics::Increment metric_broken_files; friend class DirectoryMonitorBlockInputStream; }; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 69a00216101..864c31ec05d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3128,7 +3128,6 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c entry.merge_type = merge_type; entry.deduplicate = deduplicate; entry.deduplicate_by_columns = deduplicate_by_columns; - entry.merge_type = merge_type; entry.create_time = time(nullptr); for (const auto & part : parts) @@ -5215,11 +5214,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto log_entries = zookeeper->getChildren(zookeeper_path + "/log"); - if (log_entries.empty()) - { - res.log_max_index = 0; - } - else + if (!log_entries.empty()) { const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end()); res.log_max_index = parse(last_log_entry.substr(strlen("log-"))); @@ -5231,7 +5226,6 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); res.total_replicas = all_replicas.size(); - res.active_replicas = 0; for (const String & replica : all_replicas) if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) ++res.active_replicas; @@ -5365,7 +5359,7 @@ void StorageReplicatedMergeTree::fetchPartition( ContextPtr query_context) { Macros::MacroExpansionInfo info; - info.expand_special_macros_only = false; + info.expand_special_macros_only = false; //-V1048 info.table_id = getStorageID(); info.table_id.uuid = UUIDHelpers::Nil; auto expand_from = query_context->getMacros()->expand(from_, info); @@ -6317,7 +6311,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta entry_delete.type = LogEntry::DROP_RANGE; entry_delete.source_replica = replica_name; entry_delete.new_part_name = drop_range_fake_part_name; - entry_delete.detach = false; + entry_delete.detach = false; //-V1048 entry_delete.create_time = time(nullptr); } diff --git a/src/Storages/System/StorageSystemDistributionQueue.cpp b/src/Storages/System/StorageSystemDistributionQueue.cpp index 9c0f8818011..d8879c3655e 100644 --- a/src/Storages/System/StorageSystemDistributionQueue.cpp +++ b/src/Storages/System/StorageSystemDistributionQueue.cpp @@ -98,6 +98,8 @@ NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes() { "error_count", std::make_shared() }, { "data_files", std::make_shared() }, { "data_compressed_bytes", std::make_shared() }, + { "broken_data_files", std::make_shared() }, + { "broken_data_compressed_bytes", std::make_shared() }, { "last_exception", std::make_shared() }, }; } @@ -181,6 +183,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont res_columns[col_num++]->insert(status.error_count); res_columns[col_num++]->insert(status.files_count); res_columns[col_num++]->insert(status.bytes_count); + res_columns[col_num++]->insert(status.broken_files_count); + res_columns[col_num++]->insert(status.broken_bytes_count); if (status.last_exception) res_columns[col_num++]->insert(getExceptionMessage(status.last_exception, false)); diff --git a/tests/integration/test_polymorphic_parts/test.py b/tests/integration/test_polymorphic_parts/test.py index 50a8192fbc5..dc16bab0ca4 100644 --- a/tests/integration/test_polymorphic_parts/test.py +++ b/tests/integration/test_polymorphic_parts/test.py @@ -376,74 +376,6 @@ def test_in_memory(start_cluster): "Wide\t1\n") -def test_in_memory_wal(start_cluster): - # Merges are disabled in config - - for i in range(5): - insert_random_data('wal_table', node11, 50) - node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20) - - def check(node, rows, parts): - node.query("SELECT count() FROM wal_table") == "{}\n".format(rows) - node.query( - "SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == "{}\n".format( - parts) - - check(node11, 250, 5) - check(node12, 250, 5) - - # WAL works at inserts - node11.restart_clickhouse(kill=True) - check(node11, 250, 5) - - # WAL works at fetches - node12.restart_clickhouse(kill=True) - check(node12, 250, 5) - - insert_random_data('wal_table', node11, 50) - node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20) - - # Disable replication - with PartitionManager() as pm: - pm.partition_instances(node11, node12) - check(node11, 300, 6) - - wal_file = "/var/lib/clickhouse/data/default/wal_table/wal.bin" - # Corrupt wal file - # Truncate it to it's size minus 10 bytes - node11.exec_in_container(['bash', '-c', 'truncate --size="$(($(stat -c "%s" {}) - 10))" {}'.format(wal_file, wal_file)], - privileged=True, user='root') - node11.restart_clickhouse(kill=True) - - # Broken part is lost, but other restored successfully - check(node11, 250, 5) - # WAL with blocks from 0 to 4 - broken_wal_file = "/var/lib/clickhouse/data/default/wal_table/wal_0_4.bin" - # Check file exists - node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)]) - - # Fetch lost part from replica - node11.query("SYSTEM SYNC REPLICA wal_table", timeout=20) - check(node11, 300, 6) - - # Check that new data is written to new wal, but old is still exists for restoring - # Check file not empty - node11.exec_in_container(['bash', '-c', 'test -s {}'.format(wal_file)]) - # Check file exists - node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)]) - - # Data is lost without WAL - node11.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0") - with PartitionManager() as pm: - pm.partition_instances(node11, node12) - - insert_random_data('wal_table', node11, 50) - check(node11, 350, 7) - - node11.restart_clickhouse(kill=True) - check(node11, 300, 6) - - def test_in_memory_wal_rotate(start_cluster): # Write every part to single wal node11.query("ALTER TABLE restore_table MODIFY SETTING write_ahead_log_max_bytes = 10") diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 04a78c5f2c4..8bb36cfa8fd 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2467,8 +2467,6 @@ def test_kafka_issue14202(kafka_cluster): kafka_format = 'JSONEachRow'; ''') - time.sleep(3) - instance.query( 'INSERT INTO test.kafka_q SELECT t, some_string FROM ( SELECT dt AS t, some_string FROM test.empty_table )') # check instance is alive diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.reference b/tests/queries/0_stateless/01293_system_distribution_queue.reference index a2c1e5f2a7b..4a51abdb745 100644 --- a/tests/queries/0_stateless/01293_system_distribution_queue.reference +++ b/tests/queries/0_stateless/01293_system_distribution_queue.reference @@ -1,6 +1,6 @@ INSERT -1 0 1 1 +1 0 1 1 0 0 FLUSH -1 0 0 0 +1 0 0 0 0 0 UNBLOCK -0 0 0 0 +0 0 0 0 0 0 diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.sql b/tests/queries/0_stateless/01293_system_distribution_queue.sql index dc63dece960..b16433029bf 100644 --- a/tests/queries/0_stateless/01293_system_distribution_queue.sql +++ b/tests/queries/0_stateless/01293_system_distribution_queue.sql @@ -10,15 +10,15 @@ select * from system.distribution_queue; select 'INSERT'; system stop distributed sends dist_01293; insert into dist_01293 select * from numbers(10); -select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes>100, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); system flush distributed dist_01293; select 'FLUSH'; -select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); select 'UNBLOCK'; system start distributed sends dist_01293; -select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); drop table null_01293; drop table dist_01293;