Merge branch 'master' into dont-optimize-trivial-insert-select

This commit is contained in:
Alexey Milovidov 2024-06-29 21:52:21 +02:00
commit 01c4301165
23 changed files with 602 additions and 94 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

View File

@ -0,0 +1,394 @@
---
slug: /en/getting-started/example-datasets/stackoverflow
sidebar_label: Stack Overflow
sidebar_position: 1
description: Analyzing Stack Overflow data with ClickHouse
---
# Analyzing Stack Overflow data with ClickHouse
This dataset contains every `Post`, `User`, `Vote`, `Comment`, `Badge, `PostHistory`, and `PostLink` that has occurred on Stack Overflow.
Users can either download pre-prepared Parquet versions of the data, containing every post up to April 2024, or download the latest data in XML format and load this. Stack Overflow provide updates to this data periodically - historically every 3 months.
The following diagram shows the schema for the available tables assuming Parquet format.
![Stack Overflow schema](./images/stackoverflow.png)
A description of the schema of this data can be found [here](https://meta.stackexchange.com/questions/2677/database-schema-documentation-for-the-public-data-dump-and-sede).
## Pre-prepared data
We provide a copy of this data in Parquet format, up to date as of April 2024. While small for ClickHouse with respect to the number of rows (60 million posts), this dataset contains significant volumes of text and large String columns.
```sql
CREATE DATABASE stackoverflow
```
The following timings are for a 96 GiB, 24 vCPU ClickHouse Cloud cluster located in `eu-west-2`. The dataset is located in `eu-west-3`.
### Posts
```sql
CREATE TABLE stackoverflow.posts
(
`Id` Int32 CODEC(Delta(4), ZSTD(1)),
`PostTypeId` Enum8('Question' = 1, 'Answer' = 2, 'Wiki' = 3, 'TagWikiExcerpt' = 4, 'TagWiki' = 5, 'ModeratorNomination' = 6, 'WikiPlaceholder' = 7, 'PrivilegeWiki' = 8),
`AcceptedAnswerId` UInt32,
`CreationDate` DateTime64(3, 'UTC'),
`Score` Int32,
`ViewCount` UInt32 CODEC(Delta(4), ZSTD(1)),
`Body` String,
`OwnerUserId` Int32,
`OwnerDisplayName` String,
`LastEditorUserId` Int32,
`LastEditorDisplayName` String,
`LastEditDate` DateTime64(3, 'UTC') CODEC(Delta(8), ZSTD(1)),
`LastActivityDate` DateTime64(3, 'UTC'),
`Title` String,
`Tags` String,
`AnswerCount` UInt16 CODEC(Delta(2), ZSTD(1)),
`CommentCount` UInt8,
`FavoriteCount` UInt8,
`ContentLicense` LowCardinality(String),
`ParentId` String,
`CommunityOwnedDate` DateTime64(3, 'UTC'),
`ClosedDate` DateTime64(3, 'UTC')
)
ENGINE = MergeTree
PARTITION BY toYear(CreationDate)
ORDER BY (PostTypeId, toDate(CreationDate), CreationDate)
INSERT INTO stackoverflow.posts SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/*.parquet')
0 rows in set. Elapsed: 265.466 sec. Processed 59.82 million rows, 38.07 GB (225.34 thousand rows/s., 143.42 MB/s.)
```
Posts are also available by year e.g. [https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet](https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet)
### Votes
```sql
CREATE TABLE stackoverflow.votes
(
`Id` UInt32,
`PostId` Int32,
`VoteTypeId` UInt8,
`CreationDate` DateTime64(3, 'UTC'),
`UserId` Int32,
`BountyAmount` UInt8
)
ENGINE = MergeTree
ORDER BY (VoteTypeId, CreationDate, PostId, UserId)
INSERT INTO stackoverflow.votes SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/votes/*.parquet')
0 rows in set. Elapsed: 21.605 sec. Processed 238.98 million rows, 2.13 GB (11.06 million rows/s., 98.46 MB/s.)
```
Votes are also available by year e.g. [https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet](https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/votes/2020.parquet)
### Comments
```sql
CREATE TABLE stackoverflow.comments
(
`Id` UInt32,
`PostId` UInt32,
`Score` UInt16,
`Text` String,
`CreationDate` DateTime64(3, 'UTC'),
`UserId` Int32,
`UserDisplayName` LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY CreationDate
INSERT INTO stackoverflow.comments SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/comments/*.parquet')
0 rows in set. Elapsed: 56.593 sec. Processed 90.38 million rows, 11.14 GB (1.60 million rows/s., 196.78 MB/s.)
```
Comments are also available by year e.g. [https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet](https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/comments/2020.parquet)
### Users
```sql
CREATE TABLE stackoverflow.users
(
`Id` Int32,
`Reputation` LowCardinality(String),
`CreationDate` DateTime64(3, 'UTC') CODEC(Delta(8), ZSTD(1)),
`DisplayName` String,
`LastAccessDate` DateTime64(3, 'UTC'),
`AboutMe` String,
`Views` UInt32,
`UpVotes` UInt32,
`DownVotes` UInt32,
`WebsiteUrl` String,
`Location` LowCardinality(String),
`AccountId` Int32
)
ENGINE = MergeTree
ORDER BY (Id, CreationDate)
INSERT INTO stackoverflow.users SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/users.parquet')
0 rows in set. Elapsed: 10.988 sec. Processed 22.48 million rows, 1.36 GB (2.05 million rows/s., 124.10 MB/s.)
```
### Badges
```sql
CREATE TABLE stackoverflow.badges
(
`Id` UInt32,
`UserId` Int32,
`Name` LowCardinality(String),
`Date` DateTime64(3, 'UTC'),
`Class` Enum8('Gold' = 1, 'Silver' = 2, 'Bronze' = 3),
`TagBased` Bool
)
ENGINE = MergeTree
ORDER BY UserId
INSERT INTO stackoverflow.badges SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/badges.parquet')
0 rows in set. Elapsed: 6.635 sec. Processed 51.29 million rows, 797.05 MB (7.73 million rows/s., 120.13 MB/s.)
```
### `PostLinks`
```sql
CREATE TABLE stackoverflow.postlinks
(
`Id` UInt64,
`CreationDate` DateTime64(3, 'UTC'),
`PostId` Int32,
`RelatedPostId` Int32,
`LinkTypeId` Enum8('Linked' = 1, 'Duplicate' = 3)
)
ENGINE = MergeTree
ORDER BY (PostId, RelatedPostId)
INSERT INTO stackoverflow.postlinks SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/postlinks.parquet')
0 rows in set. Elapsed: 1.534 sec. Processed 6.55 million rows, 129.70 MB (4.27 million rows/s., 84.57 MB/s.)
```
### `PostHistory`
```sql
CREATE TABLE stackoverflow.posthistory
(
`Id` UInt64,
`PostHistoryTypeId` UInt8,
`PostId` Int32,
`RevisionGUID` String,
`CreationDate` DateTime64(3, 'UTC'),
`UserId` Int32,
`Text` String,
`ContentLicense` LowCardinality(String),
`Comment` String,
`UserDisplayName` String
)
ENGINE = MergeTree
ORDER BY (CreationDate, PostId)
INSERT INTO stackoverflow.posthistory SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posthistory/*.parquet')
0 rows in set. Elapsed: 422.795 sec. Processed 160.79 million rows, 67.08 GB (380.30 thousand rows/s., 158.67 MB/s.)
```
## Original dataset
The original dataset is available in compressed (7zip) XML format at [https://archive.org/download/stackexchange](https://archive.org/download/stackexchange) - files with prefix `stackoverflow.com*`.
### Download
```bash
wget https://archive.org/download/stackexchange/stackoverflow.com-Badges.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-Comments.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-PostHistory.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-PostLinks.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-Posts.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-Users.7z
wget https://archive.org/download/stackexchange/stackoverflow.com-Votes.7z
```
These files are up to 35GB and can take around 30 mins to download depending on internet connection - the download server throttles at around 20MB/sec.
### Convert to JSON
At the time of writing, ClickHouse does not have native support for XML as an input format. To load the data into ClickHouse we first convert to NDJSON.
To convert XML to JSON we recommend the [`xq`](https://github.com/kislyuk/yq) linux tool, a simple `jq` wrapper for XML documents.
Install xq and jq:
```bash
sudo apt install jq
pip install yq
```
The following steps apply to any of the above files. We use the `stackoverflow.com-Posts.7z` file as an example. Modify as required.
Extract the file using [p7zip](https://p7zip.sourceforge.net/). This will produce a single xml file - in this case `Posts.xml`.
> Files are compressed approximately 4.5x. At 22GB compressed, the posts file requires around 97G uncompressed.
```bash
p7zip -d stackoverflow.com-Posts.7z
```
The following splits the xml file into files, each containing 10000 rows.
```bash
mkdir posts
cd posts
# the following splits the input xml file into sub files of 10000 rows
tail +3 ../Posts.xml | head -n -1 | split -l 10000 --filter='{ printf "<rows>\n"; cat - ; printf "</rows>\n"; } > $FILE' -
```
After running the above users will have a set of files, each with 10000 lines. This ensures the memory overhead of the next command is not excessive (xml to JSON conversion is done in memory).
```bash
find . -maxdepth 1 -type f -exec xq -c '.rows.row[]' {} \; | sed -e 's:"@:":g' > posts_v2.json
```
The above command will produce a single `posts.json` file.
Load into ClickHouse with the following command. Note the schema is specified for the `posts.json` file. This will need to be adjusted per data type to align with the target table.
```bash
clickhouse local --query "SELECT * FROM file('posts.json', JSONEachRow, 'Id Int32, PostTypeId UInt8, AcceptedAnswerId UInt32, CreationDate DateTime64(3, \'UTC\'), Score Int32, ViewCount UInt32, Body String, OwnerUserId Int32, OwnerDisplayName String, LastEditorUserId Int32, LastEditorDisplayName String, LastEditDate DateTime64(3, \'UTC\'), LastActivityDate DateTime64(3, \'UTC\'), Title String, Tags String, AnswerCount UInt16, CommentCount UInt8, FavoriteCount UInt8, ContentLicense String, ParentId String, CommunityOwnedDate DateTime64(3, \'UTC\'), ClosedDate DateTime64(3, \'UTC\')') FORMAT Native" | clickhouse client --host <host> --secure --password <password> --query "INSERT INTO stackoverflow.posts_v2 FORMAT Native"
```
## Example queries
A few simple questions to you get started.
### Most popular tags on Stack Overflow
```sql
SELECT
arrayJoin(arrayFilter(t -> (t != ''), splitByChar('|', Tags))) AS Tags,
count() AS c
FROM stackoverflow.posts
GROUP BY Tags
ORDER BY c DESC
LIMIT 10
┌─Tags───────┬───────c─┐
│ javascript │ 2527130 │
│ python │ 2189638 │
│ java │ 1916156 │
│ c# │ 1614236 │
│ php │ 1463901 │
│ android │ 1416442 │
│ html │ 1186567 │
│ jquery │ 1034621 │
│ c++ │ 806202 │
│ css │ 803755 │
└────────────┴─────────┘
10 rows in set. Elapsed: 1.013 sec. Processed 59.82 million rows, 1.21 GB (59.07 million rows/s., 1.19 GB/s.)
Peak memory usage: 224.03 MiB.
```
### User with the most answers (active accounts)
Account requires a `UserId`.
```sql
SELECT
any(OwnerUserId) UserId,
OwnerDisplayName,
count() AS c
FROM stackoverflow.posts WHERE OwnerDisplayName != '' AND PostTypeId='Answer' AND OwnerUserId != 0
GROUP BY OwnerDisplayName
ORDER BY c DESC
LIMIT 5
┌─UserId─┬─OwnerDisplayName─┬────c─┐
│ 22656 │ Jon Skeet │ 2727 │
│ 23354 │ Marc Gravell │ 2150 │
│ 12950 │ tvanfosson │ 1530 │
│ 3043 │ Joel Coehoorn │ 1438 │
│ 10661 │ S.Lott │ 1087 │
└────────┴──────────────────┴──────┘
5 rows in set. Elapsed: 0.154 sec. Processed 35.83 million rows, 193.39 MB (232.33 million rows/s., 1.25 GB/s.)
Peak memory usage: 206.45 MiB.
```
### ClickHouse related posts with the most views
```sql
SELECT
Id,
Title,
ViewCount,
AnswerCount
FROM stackoverflow.posts
WHERE Title ILIKE '%ClickHouse%'
ORDER BY ViewCount DESC
LIMIT 10
┌───────Id─┬─Title────────────────────────────────────────────────────────────────────────────┬─ViewCount─┬─AnswerCount─┐
│ 52355143 │ Is it possible to delete old records from clickhouse table? │ 41462 │ 3 │
│ 37954203 │ Clickhouse Data Import │ 38735 │ 3 │
│ 37901642 │ Updating data in Clickhouse │ 36236 │ 6 │
│ 58422110 │ Pandas: How to insert dataframe into Clickhouse │ 29731 │ 4 │
│ 63621318 │ DBeaver - Clickhouse - SQL Error [159] .. Read timed out │ 27350 │ 1 │
│ 47591813 │ How to filter clickhouse table by array column contents? │ 27078 │ 2 │
│ 58728436 │ How to search the string in query with case insensitive on Clickhouse database? │ 26567 │ 3 │
│ 65316905 │ Clickhouse: DB::Exception: Memory limit (for query) exceeded │ 24899 │ 2 │
│ 49944865 │ How to add a column in clickhouse │ 24424 │ 1 │
│ 59712399 │ How to cast date Strings to DateTime format with extended parsing in ClickHouse? │ 22620 │ 1 │
└──────────┴──────────────────────────────────────────────────────────────────────────────────┴───────────┴─────────────┘
10 rows in set. Elapsed: 0.472 sec. Processed 59.82 million rows, 1.91 GB (126.63 million rows/s., 4.03 GB/s.)
Peak memory usage: 240.01 MiB.
```
### Most controversial posts
```sql
SELECT
Id,
Title,
UpVotes,
DownVotes,
abs(UpVotes - DownVotes) AS Controversial_ratio
FROM stackoverflow.posts
INNER JOIN
(
SELECT
PostId,
countIf(VoteTypeId = 2) AS UpVotes,
countIf(VoteTypeId = 3) AS DownVotes
FROM stackoverflow.votes
GROUP BY PostId
HAVING (UpVotes > 10) AND (DownVotes > 10)
) AS votes ON posts.Id = votes.PostId
WHERE Title != ''
ORDER BY Controversial_ratio ASC
LIMIT 3
┌───────Id─┬─Title─────────────────────────────────────────────┬─UpVotes─┬─DownVotes─┬─Controversial_ratio─┐
│ 583177 │ VB.NET Infinite For Loop │ 12 │ 12 │ 0 │
│ 9756797 │ Read console input as enumerable - one statement? │ 16 │ 16 │ 0 │
│ 13329132 │ What's the point of ARGV in Ruby? │ 22 │ 22 │ 0 │
└──────────┴───────────────────────────────────────────────────┴─────────┴───────────┴─────────────────────┘
3 rows in set. Elapsed: 4.779 sec. Processed 298.80 million rows, 3.16 GB (62.52 million rows/s., 661.05 MB/s.)
Peak memory usage: 6.05 GiB.
```
## Attribution
We thank Stack Overflow for providing this data under the `cc-by-sa 4.0` license, acknowledging their efforts and the original source of the data at [https://archive.org/details/stackexchange](https://archive.org/details/stackexchange).

View File

@ -150,7 +150,6 @@ void LocalServer::initialize(Poco::Util::Application & self)
getClientConfiguration().getUInt("max_io_thread_pool_free_size", 0),
getClientConfiguration().getUInt("io_thread_pool_queue_size", 10000));
const size_t active_parts_loading_threads = getClientConfiguration().getUInt("max_active_parts_loading_thread_pool_size", 64);
getActivePartsLoadingThreadPool().initialize(
active_parts_loading_threads,

View File

@ -13,6 +13,7 @@
#include <fmt/format.h>
#include "config.h"
#include "config_tools.h"
#include <Common/StringUtils.h>
@ -439,6 +440,14 @@ extern "C"
}
#endif
/// Prevent messages from JeMalloc in the release build.
/// Some of these messages are non-actionable for the users, such as:
/// <jemalloc>: Number of CPUs detected is not deterministic. Per-CPU arena disabled.
#if USE_JEMALLOC && defined(NDEBUG) && !defined(SANITIZER)
extern "C" void (*malloc_message)(void *, const char *s);
__attribute__((constructor(0))) void init_je_malloc_message() { malloc_message = [](void *, const char *){}; }
#endif
/// This allows to implement assert to forbid initialization of a class in static constructors.
/// Usage:
///

View File

@ -358,22 +358,18 @@ bool LocalConnection::poll(size_t)
if (!state->is_finished)
{
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
state->after_send_progress.restart();
next_packet_type = Protocol::Server::Progress;
if (needSendProgressOrMetrics())
return true;
}
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
sendProfileEvents();
return true;
}
try
{
pollImpl();
while (pollImpl())
{
LOG_DEBUG(&Poco::Logger::get("LocalConnection"), "Executor timeout encountered, will retry");
if (needSendProgressOrMetrics())
return true;
}
}
catch (const Exception & e)
{
@ -468,12 +464,34 @@ bool LocalConnection::poll(size_t)
return false;
}
bool LocalConnection::needSendProgressOrMetrics()
{
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
state->after_send_progress.restart();
next_packet_type = Protocol::Server::Progress;
return true;
}
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
sendProfileEvents();
return true;
}
return false;
}
bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block && !state->io.null_format)
if (!block && next_read)
{
return true;
}
else if (block && !state->io.null_format)
{
state->block.emplace(block);
}
@ -482,7 +500,7 @@ bool LocalConnection::pollImpl()
state->is_finished = true;
}
return true;
return false;
}
Packet LocalConnection::receivePacket()

View File

@ -151,8 +151,11 @@ private:
void sendProfileEvents();
/// Returns true on executor timeout, meaning a retryable error.
bool pollImpl();
bool needSendProgressOrMetrics();
ContextMutablePtr query_context;
Session session;

View File

@ -187,13 +187,6 @@ size_t FileSegment::getDownloadedSize() const
return downloaded_size;
}
void FileSegment::setDownloadedSize(size_t delta)
{
auto lk = lock();
downloaded_size += delta;
assert(downloaded_size == std::filesystem::file_size(getPath()));
}
bool FileSegment::isDownloaded() const
{
auto lk = lock();
@ -311,6 +304,11 @@ FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
return remote_file_reader;
}
FileSegment::LocalCacheWriterPtr FileSegment::getLocalCacheWriter()
{
return cache_writer;
}
void FileSegment::resetRemoteFileReader()
{
auto lk = lock();
@ -340,33 +338,31 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
remote_file_reader = remote_file_reader_;
}
void FileSegment::write(char * from, size_t size, size_t offset)
void FileSegment::write(char * from, size_t size, size_t offset_in_file)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentWriteMicroseconds);
if (!size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
auto file_segment_path = getPath();
{
auto lk = lock();
assertIsDownloaderUnlocked("write", lk);
assertNotDetachedUnlocked(lk);
}
if (!size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
const auto file_segment_path = getPath();
{
auto lk = lock();
assertIsDownloaderUnlocked("write", lk);
assertNotDetachedUnlocked(lk);
}
{
if (download_state != State::DOWNLOADING)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected DOWNLOADING state, got {}", stateToString(download_state));
const size_t first_non_downloaded_offset = getCurrentWriteOffset();
if (offset != first_non_downloaded_offset)
if (offset_in_file != first_non_downloaded_offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
size, offset_in_file, first_non_downloaded_offset);
const size_t current_downloaded_size = getDownloadedSize();
chassert(reserved_size >= current_downloaded_size);
@ -396,10 +392,10 @@ void FileSegment::write(char * from, size_t size, size_t offset)
#endif
if (!cache_writer)
cache_writer = std::make_unique<WriteBufferFromFile>(file_segment_path, /* buf_size */0);
cache_writer = std::make_unique<WriteBufferFromFile>(getPath(), /* buf_size */0);
/// Size is equal to offset as offset for write buffer points to data end.
cache_writer->set(from, size, /* offset */size);
cache_writer->set(from, /* size */size, /* offset */size);
/// Reset the buffer when finished.
SCOPE_EXIT({ cache_writer->set(nullptr, 0); });
/// Flush the buffer.
@ -435,7 +431,6 @@ void FileSegment::write(char * from, size_t size, size_t offset)
}
throw;
}
catch (Exception & e)
{
@ -445,7 +440,7 @@ void FileSegment::write(char * from, size_t size, size_t offset)
throw;
}
chassert(getCurrentWriteOffset() == offset + size);
chassert(getCurrentWriteOffset() == offset_in_file + size);
}
FileSegment::State FileSegment::wait(size_t offset)
@ -828,7 +823,7 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
};
const auto file_path = getPath();
if (segment_kind != FileSegmentKind::Temporary)
{
std::lock_guard lk(write_mutex);
if (downloaded_size == 0)

View File

@ -48,7 +48,7 @@ friend class FileCache; /// Because of reserved_size in tryReserve().
public:
using Key = FileCacheKey;
using RemoteFileReaderPtr = std::shared_ptr<ReadBufferFromFileBase>;
using LocalCacheWriterPtr = std::unique_ptr<WriteBufferFromFile>;
using LocalCacheWriterPtr = std::shared_ptr<WriteBufferFromFile>;
using Downloader = std::string;
using DownloaderId = std::string;
using Priority = IFileCachePriority;
@ -204,7 +204,7 @@ public:
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
/// Write data into reserved space.
void write(char * from, size_t size, size_t offset);
void write(char * from, size_t size, size_t offset_in_file);
// Invariant: if state() != DOWNLOADING and remote file reader is present, the reader's
// available() == 0, and getFileOffsetOfBufferEnd() == our getCurrentWriteOffset().
@ -212,6 +212,7 @@ public:
// The reader typically requires its internal_buffer to be assigned from the outside before
// calling next().
RemoteFileReaderPtr getRemoteFileReader();
LocalCacheWriterPtr getLocalCacheWriter();
RemoteFileReaderPtr extractRemoteFileReader();
@ -219,8 +220,6 @@ public:
void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_);
void setDownloadedSize(size_t delta);
void setDownloadFailed();
private:

View File

@ -944,14 +944,7 @@ KeyMetadata::iterator LockedKey::removeFileSegmentImpl(
try
{
const auto path = key_metadata->getFileSegmentPath(*file_segment);
if (file_segment->segment_kind == FileSegmentKind::Temporary)
{
/// FIXME: For temporary file segment the requirement is not as strong because
/// the implementation of "temporary data in cache" creates files in advance.
if (fs::exists(path))
fs::remove(path);
}
else if (file_segment->downloaded_size == 0)
if (file_segment->downloaded_size == 0)
{
chassert(!fs::exists(path));
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/EmptyReadBuffer.h>
#include <base/scope_guard.h>
@ -33,21 +34,20 @@ namespace
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPath()))
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, file_segment(file_segment_)
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment_holder_)
: WriteBufferFromFileDecorator(
segment_holder_->size() == 1
? std::make_unique<WriteBufferFromFile>(segment_holder_->front().getPath())
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, file_segment(&segment_holder_->front())
, segment_holder(std::move(segment_holder_))
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
if (segment_holder->size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment");
}
/// If it throws an exception, the file segment will be incomplete, so you should not use it in the future.
@ -82,9 +82,6 @@ void WriteBufferToFileSegment::nextImpl()
reserve_stat_msg += fmt::format("{} hold {}, can release {}; ",
toString(kind), ReadableSize(stat.non_releasable_size), ReadableSize(stat.releasable_size));
if (std::filesystem::exists(file_segment->getPath()))
std::filesystem::remove(file_segment->getPath());
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Failed to reserve {} bytes for {}: {}(segment info: {})",
bytes_to_write,
file_segment->getKind() == FileSegmentKind::Temporary ? "temporary file" : "the file in cache",
@ -95,17 +92,37 @@ void WriteBufferToFileSegment::nextImpl()
try
{
SwapHelper swap(*this, *impl);
/// Write data to the underlying buffer.
impl->next();
file_segment->write(working_buffer.begin(), bytes_to_write, written_bytes);
written_bytes += bytes_to_write;
}
catch (...)
{
LOG_WARNING(getLogger("WriteBufferToFileSegment"), "Failed to write to the underlying buffer ({})", file_segment->getInfoForLog());
throw;
}
}
file_segment->setDownloadedSize(bytes_to_write);
void WriteBufferToFileSegment::finalizeImpl()
{
next();
auto cache_writer = file_segment->getLocalCacheWriter();
if (cache_writer)
{
SwapHelper swap(*this, *cache_writer);
cache_writer->finalize();
}
}
void WriteBufferToFileSegment::sync()
{
next();
auto cache_writer = file_segment->getLocalCacheWriter();
if (cache_writer)
{
SwapHelper swap(*this, *cache_writer);
cache_writer->sync();
}
}
std::unique_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
@ -114,7 +131,10 @@ std::unique_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
* because in case destructor called without `getReadBufferImpl` called, data won't be read.
*/
finalize();
return std::make_unique<ReadBufferFromFile>(file_segment->getPath());
if (file_segment->getDownloadedSize() > 0)
return std::make_unique<ReadBufferFromFile>(file_segment->getPath());
else
return std::make_unique<EmptyReadBuffer>();
}
}

View File

@ -9,7 +9,7 @@ namespace DB
class FileSegment;
class WriteBufferToFileSegment : public WriteBufferFromFileDecorator, public IReadableWriteBuffer
class WriteBufferToFileSegment : public WriteBufferFromFileBase, public IReadableWriteBuffer
{
public:
explicit WriteBufferToFileSegment(FileSegment * file_segment_);
@ -17,6 +17,13 @@ public:
void nextImpl() override;
std::string getFileName() const override { return file_segment->getPath(); }
void sync() override;
protected:
void finalizeImpl() override;
private:
std::unique_ptr<ReadBuffer> getReadBufferImpl() override;
@ -29,6 +36,7 @@ private:
FileSegmentsHolderPtr segment_holder;
const size_t reserve_space_lock_wait_timeout_milliseconds;
size_t written_bytes = 0;
};

View File

@ -3,6 +3,8 @@
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Formats/NativeWriter.h>
@ -224,25 +226,37 @@ struct TemporaryFileStream::OutputWriter
bool finalized = false;
};
TemporaryFileStream::Reader::Reader(const String & path, const Block & header_, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
TemporaryFileStream::Reader::Reader(const String & path_, const Block & header_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
, header(header_)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
TemporaryFileStream::Reader::Reader(const String & path, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
TemporaryFileStream::Reader::Reader(const String & path_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
Block TemporaryFileStream::Reader::read()
{
return in_reader.read();
if (!in_reader)
{
if (fs::exists(path))
in_file_buf = std::make_unique<ReadBufferFromFile>(path, size);
else
in_file_buf = std::make_unique<ReadBufferFromEmptyFile>();
in_compressed_buf = std::make_unique<CompressedReadBuffer>(*in_file_buf);
if (header.has_value())
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, header.value(), DBMS_TCP_PROTOCOL_VERSION);
else
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION);
}
return in_reader->read();
}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)

View File

@ -151,9 +151,13 @@ public:
Block read();
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
const std::string path;
const size_t size;
const std::optional<Block> header;
std::unique_ptr<ReadBufferFromFileBase> in_file_buf;
std::unique_ptr<CompressedReadBuffer> in_compressed_buf;
std::unique_ptr<NativeReader> in_reader;
};
struct Stat

View File

@ -1759,11 +1759,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
ThreadPoolCallbackRunnerLocal<void> runner(getActivePartsLoadingThreadPool().get(), "ActiveParts");
bool all_disks_are_readonly = true;
for (size_t i = 0; i < disks.size(); ++i)
{
const auto & disk_ptr = disks[i];
if (disk_ptr->isBroken())
continue;
if (!disk_ptr->isReadOnly())
all_disks_are_readonly = false;
auto & disk_parts = parts_to_load_by_disk[i];
auto & unexpected_disk_parts = unexpected_parts_to_load_by_disk[i];
@ -1916,7 +1919,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
if (suspicious_broken_unexpected_parts != 0)
LOG_WARNING(log, "Found suspicious broken unexpected parts {} with total rows count {}", suspicious_broken_unexpected_parts, suspicious_broken_unexpected_parts_bytes);
if (!is_static_storage)
for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
@ -1961,7 +1963,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
unloaded_parts.push_back(node);
});
if (!unloaded_parts.empty())
/// By the way, if all disks are readonly, it does not make sense to load outdated parts (we will not own them).
if (!unloaded_parts.empty() && !all_disks_are_readonly)
{
LOG_DEBUG(log, "Found {} outdated data parts. They will be loaded asynchronously", unloaded_parts.size());

View File

@ -9,7 +9,7 @@ set -xeuo pipefail
echo "Running prepare script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_VERSION=2.316.1
export RUNNER_VERSION=2.317.0
export RUNNER_HOME=/home/ubuntu/actions-runner
deb_arch() {
@ -54,7 +54,8 @@ apt-get install --yes --no-install-recommends \
python3-dev \
python3-pip \
qemu-user-static \
unzip
unzip \
gh
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
@ -101,7 +102,7 @@ sudo -u ubuntu docker buildx version
sudo -u ubuntu docker buildx rm default-builder || : # if it's the second attempt
sudo -u ubuntu docker buildx create --use --name default-builder
pip install boto3 pygithub requests urllib3 unidiff dohq-artifactory
pip install boto3 pygithub requests urllib3 unidiff dohq-artifactory jwt
rm -rf $RUNNER_HOME # if it's the second attempt
mkdir -p $RUNNER_HOME && cd $RUNNER_HOME
@ -212,9 +213,9 @@ chmod +x /usr/local/share/scripts/init-network.sh
touch /var/tmp/clickhouse-ci-ami.success
# END OF THE SCRIPT
# TOE description
# TOE (Task Orchestrator and Executor) description
# name: CIInfrastructurePrepare
# description: instals the infrastructure for ClickHouse CI runners
# description: installs the infrastructure for ClickHouse CI runners
# schemaVersion: 1.0
#
# phases:

View File

@ -2,6 +2,4 @@ Instruction check fail. The CPU does not support SSSE3 instruction set.
Instruction check fail. The CPU does not support SSE4.1 instruction set.
Instruction check fail. The CPU does not support SSE4.2 instruction set.
Instruction check fail. The CPU does not support POPCNT instruction set.
<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
1

View File

@ -1,6 +1,6 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-fasttest, no-cpu-aarch64
# Tag no-fasttest: avoid dependency on qemu -- invonvenient when running locally
# Tag no-fasttest: avoid dependency on qemu -- inconvenient when running locally
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,3 @@
<jemalloc>: Number of CPUs detected is not deterministic. Per-CPU arena disabled.
1
<jemalloc>: Number of CPUs detected is not deterministic. Per-CPU arena disabled.
100000000
1

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-msan, no-ubsan, no-fasttest
# Tags: no-tsan, no-asan, no-msan, no-ubsan, no-fasttest, no-debug
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# NOTE: jemalloc is disabled under sanitizers

View File

@ -14,7 +14,7 @@ for _ in {1..10}; do
${CLICKHOUSE_LOCAL} -q 'select * from numbers_mt(100000000) settings max_threads=100 FORMAT Null'
# Binding to specific CPU is not required, but this makes the test more reliable.
taskset --cpu-list 0 ${CLICKHOUSE_LOCAL} -q 'select * from numbers_mt(100000000) settings max_threads=100 FORMAT Null' 2>&1 | {
# build with santiziers does not have jemalloc
# build with sanitiziers does not have jemalloc
# and for jemalloc we have separate test
# 01502_jemalloc_percpu_arena
grep -v '<jemalloc>: Number of CPUs detected is not deterministic. Per-CPU arena disabled.'

View File

@ -22,7 +22,7 @@ create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test2',
type = object_storage,
object_storage_type = s3,
metadata_storage_type = local,
metadata_type = local,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
@ -32,7 +32,7 @@ create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test3',
type = object_storage,
object_storage_type = s3,
metadata_storage_type = local,
metadata_type = local,
metadata_keep_free_space_bytes = 1024,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
@ -43,7 +43,7 @@ create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test4',
type = object_storage,
object_storage_type = s3,
metadata_storage_type = local,
metadata_type = local,
metadata_keep_free_space_bytes = 0,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,

47
utils/backup/backup Executable file
View File

@ -0,0 +1,47 @@
#!/bin/bash
user="default"
path="."
usage() {
echo
echo "A trivial script to upload your files into ClickHouse."
echo "You might want to use something like Dropbox instead, but..."
echo
echo "Usage: $0 --host <hostname> [--user <username>] --password <password> <path>"
exit 1
}
while [[ "$#" -gt 0 ]]; do
case "$1" in
--host)
host="$2"
shift 2
;;
--user)
user="$2"
shift 2
;;
--password)
password="$2"
shift 2
;;
--help)
usage
;;
*)
path="$1"
shift 1
;;
esac
done
if [ -z "$host" ] || [ -z "$password" ]; then
echo "Error: --host and --password are mandatory."
usage
fi
clickhouse-client --host "$host" --user "$user" --password "$password" --secure --query "CREATE TABLE IF NOT EXISTS default.files (time DEFAULT now(), path String, content String CODEC(ZSTD(6))) ENGINE = MergeTree ORDER BY (path, time)" &&
find "$path" -type f | clickhouse-local --input-format LineAsString \
--max-block-size 1 --min-insert-block-size-rows 0 --min-insert-block-size-bytes '100M' --max-insert-threads 1 \
--query "INSERT INTO FUNCTION remoteSecure('$host', default.files, '$user', '$password') (path, content) SELECT line, file(line) FROM table" --progress

View File

@ -575,6 +575,7 @@ MySQLDump
MySQLThreads
NATS
NCHAR
NDJSON
NEKUDOTAYIM
NEWDATE
NEWDECIMAL
@ -717,6 +718,8 @@ PlantUML
PointDistKm
PointDistM
PointDistRads
PostHistory
PostLink
PostgreSQLConnection
PostgreSQLThreads
Postgres
@ -2516,6 +2519,7 @@ sqlite
sqrt
src
srcReplicas
stackoverflow
stacktrace
stacktraces
startsWith
@ -2854,6 +2858,7 @@ userver
utils
uuid
uuidv
vCPU
varPop
varPopStable
varSamp