См. <https://github.com/toddwschneider/nyc-taxi-data>
и <http://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html>
для описания набора данных и инструкций по загрузке.
После скачивания получится порядка 227 Гб несжатых данных в CSV файлах. Скачивание занимает порядка часа на 1 Гбит соединении (параллельное скачивание с s3.amazonaws.com утилизирует как минимум половину гигабитного канала).
Некоторые файлы могут скачаться не полностью. Проверьте размеры файлов и скачайте повторно подозрительные.
Некоторые файлы могут содержать некорректные строки. Их можно скорректировать следующим образом:
```bash
sed -E '/(.*,){18,}/d' data/yellow_tripdata_2010-02.csv > data/yellow_tripdata_2010-02.csv_
sed -E '/(.*,){18,}/d' data/yellow_tripdata_2010-03.csv > data/yellow_tripdata_2010-03.csv_
Далее данные должны быть предобработаны в PostgreSQL. Будут сделаны выборки точек в полигонах (для установки соответствия точек на карте с районами Нью-Йорка) и объединение всех данных в одну денормализованную плоскую таблицу с помощью JOIN. Для этого потребуется установить PostgreSQL с поддержкой PostGIS.
При запуске `initialize_database.sh`, будьте осторожны и вручную перепроверьте, что все таблицы корректно создались.
Обработка каждого месяца данных в PostgreSQL занимает около 20-30 минут, в сумме порядка 48 часов.
Проверить количество загруженных строк можно следующим образом:
```text
time psql nyc-taxi-data -c "SELECT count(*) FROM trips;"
LEFT JOIN central_park_weather_observations_raw weather
ON weather.date = trips.pickup_datetime::date
LEFT JOIN nyct2010 pick_up
ON pick_up.gid = trips.pickup_nyct2010_gid
LEFT JOIN nyct2010 drop_off
ON drop_off.gid = trips.dropoff_nyct2010_gid
) TO '/opt/milovidov/nyc-taxi-data/trips.tsv';
```
Слепок данных создается со скоростью около 50 Мб в секунду. Во время создания слепка, PostgreSQL читает с диска со скоростью около 28 Мб в секунду.
Это занимает около 5 часов. Результирующий tsv файл имеет размер в 590612904969 байт.
Создание временной таблицы в ClickHouse:
```sql
CREATE TABLE trips
(
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(UInt8),
snow_depth Nullable(UInt8),
snowfall Nullable(UInt8),
max_temperature Nullable(UInt8),
min_temperature Nullable(UInt8),
average_wind_speed Nullable(UInt8),
pickup_nyct2010_gid Nullable(UInt8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(UInt8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Log;
```
Она нужна для преобразование полей к более правильным типам данных и, если возможно, чтобы избавиться от NULL'ов.
```text
time clickhouse-client --query="INSERT INTO trips FORMAT TabSeparated" <trips.tsv
real 75m56.214s
```
Данные читаются со скоростью 112-140 Мб в секунду.
Загрузка данных в таблицу типа Log в один поток заняла 76 минут.
Данные в этой таблице занимают 142 Гб.
(Импорт данных напрямую из Postgres также возможен с использованием `COPY ... TO PROGRAM`.)
К сожалению, все поля, связанные с погодой (precipitation...average_wind_speed) заполнены NULL. Из-за этого мы исключим их из финального набора данных.
Для начала мы создадим таблицу на одном сервере. Позже мы сделаем таблицу распределенной.
toUInt16(ifNull(dropoff_puma, '0')) AS dropoff_puma
FROM trips
```
Это занимает 3030 секунд со скоростью около 428 тысяч строк в секунду.
Для более короткого времени загрузки, можно создать таблицу с движком `Log` вместо `MergeTree`. В этом случае загрузка отработает быстрее, чем за 200 секунд.
Таблица заняла 126 Гб дискового пространства.
```text
:) SELECT formatReadableSize(sum(bytes)) FROM system.parts WHERE table = 'trips_mergetree' AND active
SELECT formatReadableSize(sum(bytes))
FROM system.parts
WHERE (table = 'trips_mergetree') AND active
┌─formatReadableSize(sum(bytes))─┐
│ 126.18 GiB │
└────────────────────────────────┘
```
Между прочим, на MergeTree можно запустить запрос OPTIMIZE. Но это не обязательно, всё будет в порядке и без этого.
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type
```
0.490 секунд.
Q2:
```sql
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count
```
1.224 секунд.
Q3:
```sql
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year
```
2.104 секунд.
Q4:
```sql
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*)
FROM trips_mergetree
GROUP BY passenger_count, year, distance
ORDER BY year, count(*) DESC
```
3.593 секунд.
Использовался следующий сервер:
Два Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz, в сумме 16 физических ядер,
128 GiB RAM,
8x6 TB HD на программном RAID-5
Время выполнения — лучшее из трех запусков
На самом деле начиная со второго запуска, запросы читают данные из кеша страниц файловой системы. Никакого дальнейшего кеширования не происходит: данные зачитываются и обрабатываются при каждом запуске.
INSERT INTO trips_mergetree_x3 SELECT * FROM trips_mergetree
```
Это занимает 2454 секунд.
На трёх серверах:
Q1: 0.212 секунд.
Q2: 0.438 секунд.
Q3: 0.733 секунд.
Q4: 1.241 секунд.
Никакого сюрприза, так как запросы масштабируются линейно.
Также у нас есть результаты с кластера из 140 серверов:
Q1: 0.028 sec.
Q2: 0.043 sec.
Q3: 0.051 sec.
Q4: 0.072 sec.
В этом случае, время выполнения запросов определяется в первую очередь сетевыми задержками..
Мы выполняли запросы с помощью клиента, расположенного в датацентре Яндекса в Мянтсяля (Финляндия), на кластер в России, что добавляет порядка 20 мс задержки.