Подключение Apache NiFi к ClickHouse
Apache NiFi — это программное обеспечение с открытым исходным кодом для управления рабочими процессами, предназначенное для автоматизации потоков данных между программными системами. Оно позволяет создавать конвейеры данных ETL и поставляется с более чем 300 процессорами данных. Это пошаговое руководство показывает, как подключить Apache NiFi к ClickHouse в качестве источника и приёмника данных, а также загрузить тестовый набор данных.
Соберите данные для подключения
Чтобы подключиться к ClickHouse по HTTP(S) вам потребуется следующая информация:
| Параметр(ы) | Описание |
|---|---|
HOST and PORT | Typically, the port is 8443 when using TLS or 8123 when not using TLS. |
DATABASE NAME | Out of the box, there is a database named default, use the name of the database that you want to connect to. |
USERNAME and PASSWORD | Out of the box, the username is default. Use the username appropriate for your use case. |
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

Choose HTTPS. Connection details are displayed in an example curl command.

If you're using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
Загрузите и запустите Apache NiFi
Для новой установки загрузите бинарный файл с https://nifi.apache.org/download.html и запустите его, выполнив команду ./bin/nifi.sh start
Загрузите JDBC-драйвер ClickHouse
- Перейдите на страницу релизов драйвера ClickHouse JDBC на GitHub и найдите последнюю доступную версию JDBC-драйвера
- В разделе релиза нажмите на "Show all xx assets" и найдите JAR-файл, в имени которого есть ключевое слово "shaded" или "all", например
clickhouse-jdbc-0.5.0-all.jar - Поместите JAR-файл в каталог, доступный Apache NiFi, и запишите его абсолютный путь
Добавьте службу контроллера DBCPConnectionPool и настройте её свойства
-
Чтобы настроить Controller Service в Apache NiFi, перейдите на страницу NiFi Flow Configuration, нажав кнопку с иконкой шестерёнки

-
Выберите вкладку Controller Services и добавьте новый Controller Service, нажав кнопку
+в правом верхнем углу
-
Найдите
DBCPConnectionPoolи нажмите кнопку Add
-
Только что добавленная служба контроллера
DBCPConnectionPoolпо умолчанию будет находиться в статусе Invalid. Нажмите кнопку с иконкой «шестерёнки», чтобы начать настройку
-
В разделе Properties введите следующие значения
| Property | Value | Remark |
|---|---|---|
| Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | Замените HOSTNAME в URL подключения соответствующим значением |
| Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
| Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | Абсолютный путь к JAR-файлу JDBC-драйвера ClickHouse |
| Database User | default | Имя пользователя ClickHouse |
| Password | password | Пароль ClickHouse |
-
В разделе Settings измените имя Controller Service на ClickHouse JDBC для удобства

-
Активируйте Controller Service
DBCPConnectionPool, нажав кнопку с изображением молнии, а затем кнопку «Enable»

-
Проверьте вкладку Controller Services и убедитесь, что Controller Service включён

Чтение из таблицы с помощью процессора ExecuteSQL
-
Добавьте процессор
ExecuteSQL, а также соответствующие предшествующие и последующие процессоры
-
В разделе "Properties" процессора
ExecuteSQLвведите следующие значенияProperty Value Remark Database Connection Pooling Service ClickHouse JDBC Выберите Controller Service, настроенный для ClickHouse SQL select query SELECT * FROM system.metrics Введите здесь запрос -
Запустите процессор
ExecuteSQL
-
Чтобы убедиться, что запрос был успешно обработан, просмотрите один из объектов
FlowFileв выходной очереди
-
Переключите вид на "formatted", чтобы просмотреть результат выходного
FlowFile
Запись в таблицу с помощью процессоров MergeRecord и PutDatabaseRecord
-
Чтобы записать несколько строк одним оператором INSERT, сначала нужно объединить несколько записей в одну. Это можно сделать с помощью процессора
MergeRecord. -
В разделе Properties процессора
MergeRecordвведите следующие значенияProperty Value Remark Record Reader JSONTreeReaderВыберите соответствующий Record Reader Record Writer JSONReadSetWriterВыберите соответствующий Record Writer Minimum Number of Records 1000 Установите большее значение, чтобы минимальное количество строк объединялось в одну запись. По умолчанию — 1 строка Maximum Number of Records 10000 Установите значение больше, чем «Minimum Number of Records». По умолчанию — 1 000 строк -
Чтобы убедиться, что несколько записей объединены в одну, проверьте входные и выходные данные процессора
MergeRecord. Обратите внимание, что выходные данные представляют собой массив, содержащий несколько входных записей.Входные данные

Результат

-
В разделе Properties процессора
PutDatabaseRecordвведите следующие значенияProperty Value Remark Record Reader JSONTreeReaderВыберите подходящий Record Reader Database Type Generic Оставьте значение по умолчанию Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC Выберите службу контроллера ClickHouse Table Name tbl Укажите здесь имя вашей таблицы Translate Field Names false Установите в "false", чтобы вставляемые имена полей совпадали с именами столбцов Maximum Batch Size 1000 Максимальное количество строк на один INSERT. Это значение не должно быть меньше значения "Minimum Number of Records" в процессоре MergeRecord -
Чтобы подтвердить, что каждая операция вставки содержит несколько строк, убедитесь, что количество строк в таблице увеличивается как минимум на значение параметра «Minimum Number of Records», заданное в
MergeRecord.
-
Поздравляем — вы успешно загрузили данные в ClickHouse с помощью Apache NiFi!