Merge pull request #21306 from azat/unused-writers

[RFC] Remove unused writers
This commit is contained in:
alexey-milovidov 2021-02-28 21:47:01 +03:00 committed by GitHub
commit b924b28c57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 0 additions and 162 deletions

View File

@ -1,71 +0,0 @@
#pragma once
#include <vector>
#include <Common/ThreadPool.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBuffer.h>
namespace DB
{
/** Writes data asynchronously using double buffering.
*/
class AsynchronousWriteBuffer : public WriteBuffer
{
private:
WriteBuffer & out; /// The main buffer, responsible for writing data.
std::vector <char> memory; /// A piece of memory for duplicating the buffer.
ThreadPool pool; /// For asynchronous data writing.
bool started; /// Has an asynchronous data write started?
/// Swap the main and duplicate buffers.
void swapBuffers()
{
swap(out);
}
void nextImpl() override
{
if (!offset())
return;
if (started)
pool.wait();
else
started = true;
swapBuffers();
/// The data will be written in separate stream.
pool.scheduleOrThrowOnError([this] { thread(); });
}
public:
AsynchronousWriteBuffer(WriteBuffer & out_) : WriteBuffer(nullptr, 0), out(out_), memory(out.buffer().size()), pool(1), started(false)
{
/// Data is written to the duplicate buffer.
set(memory.data(), memory.size());
}
~AsynchronousWriteBuffer() override
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
if (started)
pool.wait();
swapBuffers();
out.next();
}
/// That is executed in a separate thread
void thread()
{
out.next();
}
};
}

View File

@ -1,30 +0,0 @@
#include <common/types.h>
#include <Common/hex.h>
#include <Common/MemoryTracker.h>
#include <IO/HexWriteBuffer.h>
namespace DB
{
void HexWriteBuffer::nextImpl()
{
if (!offset())
return;
for (Position p = working_buffer.begin(); p != pos; ++p)
{
UInt8 byte = *p;
out.write(hexDigitUppercase(byte / 16));
out.write(hexDigitUppercase(byte % 16));
}
}
HexWriteBuffer::~HexWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
nextImpl();
}
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
/// Since HexWriteBuffer is often created in the inner loop, we'll make its buffer size small.
#define DBMS_HEX_WRITE_BUFFER_SIZE 32
namespace DB
{
/** Everything that is written into it, translates to HEX (in capital letters) and writes to another WriteBuffer.
*/
class HexWriteBuffer final : public WriteBuffer
{
protected:
char buf[DBMS_HEX_WRITE_BUFFER_SIZE]; //-V730
WriteBuffer & out;
void nextImpl() override;
public:
HexWriteBuffer(WriteBuffer & out_) : WriteBuffer(buf, sizeof(buf)), out(out_) {}
~HexWriteBuffer() override;
};
}

View File

@ -25,9 +25,6 @@ target_link_libraries (var_uint PRIVATE clickhouse_common_io)
add_executable (read_escaped_string read_escaped_string.cpp) add_executable (read_escaped_string read_escaped_string.cpp)
target_link_libraries (read_escaped_string PRIVATE clickhouse_common_io) target_link_libraries (read_escaped_string PRIVATE clickhouse_common_io)
add_executable (async_write async_write.cpp)
target_link_libraries (async_write PRIVATE dbms)
add_executable (parse_int_perf parse_int_perf.cpp) add_executable (parse_int_perf parse_int_perf.cpp)
target_link_libraries (parse_int_perf PRIVATE clickhouse_common_io) target_link_libraries (parse_int_perf PRIVATE clickhouse_common_io)

View File

@ -1,26 +0,0 @@
#include <iostream>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/AsynchronousWriteBuffer.h>
#include <IO/copyData.h>
#include <Compression/CompressedWriteBuffer.h>
int main(int, char **)
try
{
DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor out1(STDOUT_FILENO);
DB::AsynchronousWriteBuffer out2(out1);
DB::CompressedWriteBuffer out3(out2);
DB::copyData(in1, out3);
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -62,7 +62,6 @@ int main(int argc, char ** argv)
{ {
DB::WriteBufferFromVector wb(formatted); DB::WriteBufferFromVector wb(formatted);
// DB::CompressedWriteBuffer wb2(wb1); // DB::CompressedWriteBuffer wb2(wb1);
// DB::AsynchronousWriteBuffer wb(wb2);
Stopwatch watch; Stopwatch watch;
UInt64 tsc = rdtsc(); UInt64 tsc = rdtsc();

View File

@ -29,7 +29,6 @@ SRCS(
HTTPChunkedReadBuffer.cpp HTTPChunkedReadBuffer.cpp
HTTPCommon.cpp HTTPCommon.cpp
HashingWriteBuffer.cpp HashingWriteBuffer.cpp
HexWriteBuffer.cpp
LZMADeflatingWriteBuffer.cpp LZMADeflatingWriteBuffer.cpp
LZMAInflatingReadBuffer.cpp LZMAInflatingReadBuffer.cpp
LimitReadBuffer.cpp LimitReadBuffer.cpp

View File

@ -6,7 +6,6 @@
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <IO/HexWriteBuffer.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>

View File

@ -13,7 +13,6 @@
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <IO/ConcatReadBuffer.h> #include <IO/ConcatReadBuffer.h>
#include <IO/HexWriteBuffer.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>