Как класть Parquet 2: внутреннее устройство

Почему-то в интернетах не так уж много пишут о внутреннем устройстве Parquet. Мне удалось найти всего несколько достойных внимания источников, и это официальный док, readme для parquet-mr, официальная презентация и презентация про оптимизацию от Ryan Blue. Кроме этого, у нас есть JavaDoc к ParquetOutputFormat.java и гора исходников. Давайте разберемся, как же оно работает?

Итак, Parquet хранит структурированные данные с заранее определенной схемой, и это называется таблицей. Данные могут быть разложены в несколько файлов, и вся директория воспринимается как одна таблица. Каждый файл при этом самодостаточен и может быть прочтен и записан отдельно от остальных — это залог massive parallel processing’а.

Структура файла

Файл в терминологии Parquet называется Row Group, и содержит он информацию о группе строк. В начале файла находятся метаданные, которые описывают хранимые данные. Чтение паркетного файла всегда начинается с метаданных и устроено так, что файл выступает минимальной единицей параллелизма. Таким образом, максимальный уровень параллелизма чтения таблицы в Parquet равен числу файлов в ней.

Заголовок файла и метаданные выглядят примерно вот так:

Каждая колонка в таблице сохраняется отдельно от остальных. Она хранится в так называемых страницах, Pages. Parquet делит данные колонки на участки — страницы примерно одинакового размера, не превышающие page size. После этого каждая страница записывается в файл отдельно, а в метаданных указывается ее смещение в файле и интервалы строк, в ней содержащиеся. Таким образом, в Parquet можно по метаданным определить, в каких страницах лежат данные для нужных строк и столбцов, что позволяет читать только нужные страницы и пропускать все остальные. Это свойство активно используется в push-down predicates (см. ниже) и позволяет многократно ускорить чтение файла.

Кодирование и сжатие

Данные каждой страницы при хранении подвергается кодированию и сжатию для уменьшения их размера. Parquet поддерживает три способа кодирования — dictionary encodingbit packing и run-length encoding.

Dictionary encoding позволяет пронумеровать длинные повторяющиеся значения и вместо них их использовать их номера, что отлично работает с данными, где число уникальных значений невелико.

Bit packing позволяет сэкономить на размере целых чисел и хранить только минимально необходимое число бит. Например, если вы храните булевы значения в переменной типа long, bit packing сэкономит вам до 63 бит на каждое значение. Приятно, что это работает, в том числе, на номерах из словаря dictionary encoding.

Run-length encoding позволяет закодировать несколько повторяющихся значений парой «значение — число повторений». Это прекрасно сжимает отсортированные неуникальные массивы.

Данные набиваются в страницу, пока их кодированный размер не достигнет page size. После этого к закодированным данным могут еще и применяться обычные механизмы сжатия, такие как GZip, LZO или Snappy. Они помогают еще немного уменьшить размер страницы.

Из-за такого наложения кодировки и сжатия нельзя начать читать страницу посередине. То есть, страница должна быть прочитана вся от начала до конца, даже если нас интересует всего одно значение из нее.

Push-down predicates

Parquet на уровне формата поддерживает т.н. предварительное чтение данных. При инициализации чтения файла (ParquetInputFormat‘а) можно передать в драйвер набор условий, которые определяют, читать ли данную строку или нет.  Условия выглядят как обычная where-строка из SQL, например:
user = 'admin' or (time > 123 and time < 345)

Драйвер проводит оценку, какие колонки используются в запросе, и сначала читает только их, и тут же налагает условия. Чтение проходит очень быстро, десерализации не происходит, над значениями выполняются векторизованные операции. Итогом такого чтения получается bitset index, содержащий позиции всех строк, которые удовлетворяют фильтру.

После этого Parquet уже приступает к полноценному чтению только нужных строк. Для этого по bitset’у и запрошенным колонкам вычисляются номера страниц к прочтению — только те страницы, которые содержат хотя бы одно нужное значение. Если данные были разложены с хорошей сортировкой, можно минимизировать число читаемых страниц. Если же данные разбросаны хаотично, возможно, придется читать все страницы.

Смещенный профиль нагрузки

Из-за вышеописанного «хитрого» способа записи и чтения данных Parquet обладает необычными требованиями к чтению и записи. Во-первых, блоки нельзя записывать «слева направо» по мере поступления новых строк — приходится сначала составлять содержимое страниц, после этого сжимать их и изучать получившийся размер, и только потом писать страницы друг за другом. Из-за этого приходится держать весь блок в памяти до начала записи. В Spark это означает, что приходится резервировать примерно 2,5 x block size памяти на куче для записи.

При чтении профиль нагрузки также смещается, т.к. во-первых, при чтении происходит много seek-ов с пропусками, а во-вторых, опять-таки, нельзя начать генерировать записи сразу в процессе чтения — приходится сначала собрать по одной странице для каждой колонки, и только тогда данные начинают собираться в строки.