Перейти к основному содержимому
Перейти к основному содержимому

Клиент ClickHouse на Rust

Официальный клиент на Rust для подключения к ClickHouse, первоначально разработанный Paul Loyd. Исходный код клиента доступен в репозитории GitHub.

Обзор

  • Использует serde для кодирования/декодирования строк.
  • Поддерживает атрибуты serde: skip_serializing, skip_deserializing, rename.
  • Использует формат RowBinary поверх HTTP-транспорта.
    • Планируется переход на Native поверх TCP.
  • Поддерживает TLS (через функции native-tls и rustls-tls).
  • Поддерживает сжатие и разжатие (LZ4).
  • Предоставляет API для выборки или вставки данных, выполнения операторов DDL и пакетной отправки на стороне клиента.
  • Предоставляет удобные заглушки (mocks) для модульного тестирования.

Установка

Чтобы использовать этот крейт, добавьте следующее в свой Cargo.toml:

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

См. также страницу пакета на crates.io: clickhouse.

Возможности Cargo

  • lz4 (включена по умолчанию) — включает варианты Compression::Lz4 и Compression::Lz4Hc(_). При включённой опции Compression::Lz4 используется по умолчанию для всех запросов, кроме WATCH.
  • native-tls — поддерживает URL со схемой HTTPS через hyper-tls, который линкуется с OpenSSL.
  • rustls-tls — поддерживает URL со схемой HTTPS через hyper-rustls, который не линкуется с OpenSSL.
  • inserter — включает client.inserter().
  • test-util — добавляет моки. См. пример. Используйте только в dev-dependencies.
  • watch — включает функциональность client.watch. См. соответствующий раздел для подробностей.
  • uuid — добавляет serde::uuid для работы с крейтом uuid.
  • time — добавляет serde::time для работы с крейтом time.
Ссылки

При подключении к ClickHouse по URL со схемой HTTPS должна быть включена одна из возможностей: native-tls или rustls-tls. Если включены обе, приоритет будет у rustls-tls.

Совместимость с версиями ClickHouse

Клиент совместим с LTS-версиями и более новыми версиями ClickHouse, а также с ClickHouse Cloud.

Сервер ClickHouse версии ниже v22.6 обрабатывает RowBinary некорректно в некоторых редких случаях. Вы можете использовать v0.11+ и включить флаг функции wa-37420, чтобы решить эту проблему. Примечание: этот флаг не следует использовать с более новыми версиями ClickHouse.

Примеры

Мы стремимся охватить различные сценарии использования клиента с помощью примеров в репозитории клиента. Обзор приведён в файле README для examples.

Если что‑то остаётся непонятным или чего‑то не хватает в примерах или в приведённой ниже документации, вы можете связаться с нами.

Использование

Примечание

Crate ch2rs полезен для генерации типа строки из ClickHouse.

Создание экземпляра клиента

Совет

Повторно используйте созданные клиенты или клонируйте их, чтобы повторно использовать базовый пул соединений hyper.

use clickhouse::Client;

let client = Client::default()
    // should include both protocol and port
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

Подключение по HTTPS или к ClickHouse Cloud

HTTPS поддерживается при использовании cargo-фич rustls-tls или native-tls.

Далее создайте клиент как обычно. В этом примере переменные окружения используются для хранения параметров подключения:

Ссылки

URL-адрес должен включать и протокол, и порт, например https://instance.clickhouse.cloud:8443.

fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));

См. также:

  • Пример HTTPS с ClickHouse Cloud в репозитории клиента. Он должен подойти и для HTTPS-подключений к on-premise-инсталляциям.

Выборка строк

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • Заполнитель ?fields заменяется на no, name (поля Row).
  • Заполнитель ? заменяется значениями в последующих вызовах bind().
  • Удобные методы fetch_one::<Row>() и fetch_all::<Row>() можно использовать, чтобы получить первую строку или все строки соответственно.
  • sql::Identifier можно использовать для привязки имен таблиц.

NB: так как весь ответ передается потоком, курсоры могут вернуть ошибку даже после того, как выдали некоторые строки. Если это происходит в вашем сценарии, вы можете попробовать query(...).with_option("wait_end_of_query", "1"), чтобы включить буферизацию ответа на стороне сервера. Подробнее. Опция buffer_size также может быть полезной.

Примечание

Используйте wait_end_of_query с осторожностью при выборке строк, так как это может привести к повышенному потреблению памяти на стороне сервера и, скорее всего, снизит общую производительность.

Добавление строк

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • Если end() не был вызван, INSERT прерывается.
  • Строки отправляются постепенно, потоком, чтобы распределить сетевую нагрузку.
  • ClickHouse выполняет вставку батчей атомарно только в том случае, если все строки попадают в одну и ту же партицию и их количество меньше max_insert_block_size.

Асинхронная вставка (серверное формирование батчей)

Вы можете использовать асинхронные вставки ClickHouse, чтобы избежать формирования батчей входящих данных на стороне клиента. Это можно сделать, просто указав опцию async_insert в методе insert (или даже в экземпляре Client, чтобы она применялась ко всем вызовам insert).

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");

См. также:

Возможность Inserter (клиентская пакетная вставка)

Требуется включить функцию Cargo inserter.

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
  • Inserter завершает активную вставку в commit(), если достигнут любой из порогов (max_bytes, max_rows, period).
  • Интервал между завершениями активных INSERT может быть скорректирован с помощью with_period_bias, чтобы избежать пиков нагрузки при параллельных вставках.
  • Inserter::time_left() можно использовать, чтобы определить, когда закончится текущий период. Вызовите Inserter::commit() ещё раз, чтобы проверить лимиты, если ваш поток редко эмитирует элементы.
  • Временные пороги реализованы с использованием крейта quanta, чтобы ускорить работу inserter. Не используется, если включён test-util (таким образом, временем можно управлять через tokio::time::advance() в пользовательских тестах).
  • Все строки между вызовами commit() вставляются в одном операторе INSERT.
Примечание

Не забудьте выполнить flush, если вы хотите завершить/финализировать вставку:

inserter.end().await?;

Выполнение DDL-операций

При одноузловом развертывании достаточно выполнить DDL-операции следующим образом:

client.query("DROP TABLE IF EXISTS some").execute().await?;

Однако в кластерных развертываниях с балансировщиком нагрузки или в ClickHouse Cloud рекомендуется дожидаться, пока DDL не будет применена на всех репликах, используя опцию wait_end_of_query. Это можно сделать следующим образом:

client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

Настройки ClickHouse

Вы можете применять различные настройки ClickHouse с помощью метода with_option. Например:

let numbers = client
    .query("SELECT number FROM system.numbers")
    // This setting will be applied to this particular query only;
    // it will override the global client setting.
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;

Помимо query, аналогично работают методы insert и inserter; также тот же метод можно вызвать у экземпляра Client, чтобы задать глобальные настройки, применяемые ко всем запросам.

Идентификатор запроса

С помощью .with_option вы можете установить параметр query_id, чтобы идентифицировать запросы в журнале запросов ClickHouse.

let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;

Помимо query, этот метод аналогичным образом работает с методами insert и inserter.

Опасность

Если вы задаёте query_id вручную, убедитесь, что он уникален. Для этого хорошо подходят идентификаторы UUID.

См. также: пример query_id в репозитории клиента.

ID сессии

Аналогично query_id, вы можете задать session_id, чтобы выполнять команды в одной и той же сессии. session_id можно задать либо глобально на уровне клиента, либо для каждого вызова query, insert или inserter.

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
Опасность

В кластерных развертываниях из‑за отсутствия «липких сессий» необходимо оставаться подключённым к конкретному узлу кластера, чтобы корректно использовать эту возможность, поскольку, например, балансировщик нагрузки с алгоритмом round-robin не гарантирует, что последующие запросы будут обрабатываться тем же самым узлом ClickHouse.

См. также: пример session_id в репозитории клиента.

Пользовательские HTTP-заголовки

Если вы используете аутентификацию на прокси-сервере или вам нужно передать дополнительные заголовки, вы можете сделать это так:

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");

См. также: пример пользовательских HTTP-заголовков в репозитории клиента.

Пользовательский HTTP‑клиент

Это может быть полезно для тонкой настройки параметров нижележащего пула HTTP‑соединений.

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // For how long keep a particular idle socket alive on the client side (in milliseconds).
    // It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
    // which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
    .pool_idle_timeout(Duration::from_millis(2_500))
    // Sets the maximum idle Keep-Alive connections allowed in the pool.
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Примечание

Этот пример основан на устаревшем Hyper API и в будущем может быть изменён.

См. также: пример с пользовательским HTTP‑клиентом в репозитории клиента.

Типы данных

  • (U)Int(8|16|32|64|128) сопоставляется с соответствующими типами (u|i)(8|16|32|64|128) или newtype-обёртками над ними.

  • (U)Int256 не поддерживается напрямую, но существует обходной путь.

  • Float(32|64) сопоставляется с соответствующими типами f(32|64) или newtype-обёртками над ними.

  • Decimal(32|64|128) сопоставляется с соответствующими типами i(32|64|128) или newtype-обёртками над ними. Удобнее использовать fixnum или другую реализацию знаковых чисел с фиксированной запятой.

  • Boolean сопоставляется с bool или newtype-обёртками над ним.

  • String сопоставляется с любыми строковыми или байтовыми типами, например &str, &[u8], String, Vec<u8> или SmartString. Также поддерживаются новые типы. Для хранения байтов рассмотрите использование serde_bytes, так как это более эффективно.

#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N) поддерживается в виде массива байтов, например [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Типы Enum(8|16) поддерживаются с помощью serde_repr.
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUID сопоставляется с uuid::Uuid и обратно с помощью serde::uuid. Требуется фича uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Date сопоставляется с u16 или newtype-обёрткой вокруг него и представляет количество дней, прошедших с 1970-01-01. Также поддерживается time::Date при использовании serde::time::date, для чего требуется включённая фича time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32 отображается в/из i32 или newtype-обёртки вокруг него и представляет количество дней, прошедших с 1970-01-01. Также поддерживается time::Date при использовании serde::time::date32, что требует включения фичи time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTime сопоставляется с типом u32 или newtype-обёрткой над ним и представляет собой число секунд, прошедших с начала эпохи UNIX. Также поддерживается time::OffsetDateTime при использовании serde::time::datetime, для чего требуется включить функциональность time (feature time).
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_) маппится в/из i32 или newtype-обёртку вокруг него и представляет время, прошедшее с момента начала эпохи Unix. Также поддерживается time::OffsetDateTime при использовании serde::time::datetime64::*, для чего требуется включить feature time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...) отображается в/из (A, B, ...) или обёртки newtype вокруг него.
  • Array(_) отображается в/из любого среза, например Vec<_>, &[_]. Также поддерживаются пользовательские типы.
  • Map(K, V) ведёт себя как Array((K, V)).
  • LowCardinality(_) поддерживается прозрачно.
  • Nullable(_) отображается в/из Option<_>. Для вспомогательных функций clickhouse::serde::* добавьте ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nested поддерживается с помощью нескольких массивов с переименованием.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Типы Geo поддерживаются. Point ведёт себя как кортеж (f64, f64), а остальные типы — это просто срезы точек.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • типы данных Variant, Dynamic и новый тип данных JSON пока не поддерживаются.

Мокирование

Crate предоставляет утилиты для мокирования сервера ClickHouse и тестирования DDL-, SELECT-, INSERT- и WATCH-запросов. Эту функциональность можно включить с помощью опции test-util. Используйте её только как зависимость для разработки.

См. пример.

Устранение неполадок

CANNOT_READ_ALL_DATA

Наиболее распространённая причина ошибки CANNOT_READ_ALL_DATA заключается в том, что определение строки на стороне приложения не совпадает с определением строки в ClickHouse.

Рассмотрим следующую таблицу:

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

Затем, если EventLog определён на стороне приложения с несовместимыми типами, например:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- should be u32 instead!
}

При вставке данных может произойти следующая ошибка:

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

В этом примере это исправляется за счет корректного определения структуры EventLog:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

Известные ограничения

  • Типы данных Variant, Dynamic и (новый) JSON пока не поддерживаются.
  • Привязка параметров на стороне сервера пока не поддерживается; см. эту задачу для отслеживания прогресса.

Свяжитесь с нами

Если у вас есть вопросы или нужна помощь, свяжитесь с нами в нашем Community Slack или через раздел issues на GitHub.