MapReduce
Концепция MapReduce была введена Дином (Jeffrey Dean) и Гемаватом (Sanjay Ghemawat) в 2004 г. . Для понимания нашей статьи не требуется знание всех деталей работы MapReduce. Коротко говоря, MapReduce обрабатывает данные, распределенные (и реплицированные) между узлами кластера без общих ресурсов на основе трех базовых операций. Во-первых, параллельно выполняется некоторый набор задач Map (по одной задаче на узел) без каких-либо коммуникаций между узлами. Далее данные заново разделяются между узлами кластера. Наконец, параллельно выполняется некоторый набор задач Reduce (по одной задаче в каждом узле, получившем раздел данных). За этим может следовать произвольное число циклов Map-repartition-Reduce, если это требуется. В системе MapReduce не создается какой-либо детальный план выполнения запроса, в котором заранее указывалось бы, какие узлы должны выполнять соответствующие задачи; это определяется во время выполнения. Такой подход позволяет системам MapReduce "на лету" подстраиваться к отказам узлов и медленным узлам путем назначения большего числа задач более быстрым узлам и переназначения задач, которые были назначены для отказавших узлов. Кроме того, в MapReduce на локальный диск сбрасываются результаты каждой задачи Map, чтобы минимизировать объем работы, которую придется повторно выполнить после возникновения отказа.
Что касается требуемых свойств рабочих нагрузок крупномасштабного анализа данных, то MapReduce наилучшим образом поддерживает свойства отказоустойчивости и возможности функционировать в разнородных средах. Отказоустойчивость достигается за счет выявления и переназначения другим узлам задач отказавших узлов (предпочтение отдается узлам, содержащим реплики входных данных Map). Возможность функционирования в неоднородных средах обеспечивается за счет выполнения избыточных задач. Задачи, выполнение которых занимает долгое время в медленных узлах, избыточным образом выполняются на узлах, завершивших выполнение назначенных им задач. Время выполнения подобной задачи становится равным времени выполнения в самом быстром узле назначенной ему избыточной задачи. Если делать задачи достаточно мелкими, то можно минимизировать влияние отказов и "отстающих" узлов.
В MapReduce имеется гибкий интерфейс запросов; функции Map и Reduce представляют собой всего лишь произвольные вычисления, закодированные на некотором языке общего назначения. Поэтому каждая задача может делать со своими входными данными все, что угодно, лишь бы только она производила результирующие данные в соответствии с соглашениями модели. В большинстве систем, основанных на MapReduce, (в том числе, и в системе Hadoop, в которой напрямую реализованы детали системного уровня, описанные в статье про MapReduce) не поддерживается декларативный SQL. Однако имеются некоторые исключения (например, Hive).
Как было показано в предыдущем исследовании, самой большой проблемой MapReduce является производительность . Поскольку от пользователей не требуется моделирование и загрузка данных до их обработки, использование многих упомянутых выше средств повышения производительности, применяемых в системах баз данных, в данном случае оказывается невозможным.
В идеальном случае отказоустойчивость и возможность функционировать в неоднородных средах MapReduce можно было бы объединить с производительностью параллельных систем баз данных. В следующих разделах мы опишем свою попытку построить такую гибридную систему.
Параллельные СУБД
Направление параллельных системы баз данных возникло на основе исследований, выполненных в середине 1980-х гг., и большинство современных систем выглядят подобно прототипам параллельных СУБД Gamma и Grace . Во всех этих системах поддерживаются стандартные реляционные таблицы и язык SQL и реализуются многие методы повышения производительности, разработанные исследовательским сообществом в последние десятилетия, включая индексацию, сжатие (операции, выполняемые без распаковки данных), материализованные представления, кэширование результатов и совместное использование ресурсов ввода-вывода. Большая часть таблиц (или даже все таблицы) разделяется по нескольким узлам кластера без совместного использования ресурсов; однако механизм разделения данных прозрачен для конечного пользователя. В параллельных системах баз данных используется оптимизатор запросов, приспособленный к распределенной рабочей нагрузке и превращающий SQL-команды в планы запросов, выполнение которых поровну разделяется между несколькими узлами.
Что касается требуемых свойств рабочих нагрузок крупномасштабного анализа данных, описанных в разд. 3, то в параллельных системах баз данных лучше всего поддерживается "свойство производительности", поскольку именно это свойство больше всего требуется для успешной конкуренции на открытом рынке. Достижению высокой производительности способствует использование ряда хитроумных приемов, придуманных на протяжении десятилетий в сообществе баз данных. Особенно высокой производительности параллельные системы баз данных достигают при наличии высококвалифицированного администратора баз данных (database administrator, DBA), который может тщательно спроектировать базу данных, правильно установить и настроить систему и должным образом ее поддерживать. Однако современные достижения в области автоматизации таких задач и расширяющаяся тенденция к использованию заранее настроенных и сконфигурированных специализированных аппаратно-программных систем (appliance) позволяют многим параллельным системам баз данных демонстрировать высокую производительность без специальных действий DBA.
В параллельных системах баз данных хорошо поддерживается и свойство гибкого интерфейса запросов. Поддержка SQL и ODBC обычно сама собой разумеется, и во многих параллельных системах баз данных допускается определение и использование UDF (хотя возможности планировщика и оптимизатора запросов по распараллеливанию выполнения UDF по узлам кластера без общих ресурсов различаются в разных реализациях).
Однако в параллельных системах баз данных должным образом не обеспечиваются свойства отказоустойчивости и возможности функционирования в неоднородных средах. Хотя конкретные детали реализаций параллельных систем баз данных различаются, все они исторически опираются на предположения о том, что отказы случаются редко, и что "крупные" кластеры состоят из десятков (а не сотен или тысяч) узлов, и это приводит к инженерным решениям, затрудняющим достижение этих свойств.
Кроме того, в некоторых случаях требуется очевидный компромисс между отказоустойчивостью и производительностью, и в параллельных системах баз данных преимущество обычно отдается производительности. Например, частая установка контрольных точек для выполненных подзадач приводит к повышению отказоустойчивости долго выполняемых запросов, но приводит и к снижению производительности. В дополнение к этому, конвейеризация промежуточных результатов между операциями запроса может повысить производительность, но также может привести к потере большого объема выполненной работы в результате отказа.
Предпосылки и недостатки имеющихся подходов
В этом разделе мы приведем обзор подходов параллельных систем баз данных и MapReduce к выполнению анализа данных и укажем свойства из разд. 3, которыми обладает каждый из этих подходов.
Требуемые свойства
В этом разделе мы описываемым требуемые свойства системы, разрабатываемой для анализа данных петабайтного масштаба (который скоро станет более распространенным). В следующем разделе мы обсуждаем, по каким причинам системы параллельных баз данных и системы MapReduce по отдельности не удовлетворяют некоторым из этих свойств.
Производительность. Производительность – это основная характеристика, которая используется производителями коммерческих систем баз для проведения различия между своими системами и другими решениями. В маркетинговой литературе часто встречаются утверждения о том, что некоторое решение во много раз быстрее своих конкурентов. Десятикратное различие в производительности может серьезно повлиять на объем, качество и глубину анализа, который может произвести система.
Высокопроизводительные системы иногда могут способствовать экономии расходов. Переход к использованию более быстрого программного продукта может позволить отложить дорогостоящую модернизацию аппаратных средств или избежать приобретения дополнительных вычислительных узлов при росте масштаба приложения. При использовании публичных платформ облачных вычислений ценообразование устроено таким образом, что человек платит только за то, что он реально использует, так что цена продукта от поставщика линейно возрастает в зависимости от требуемых ресурсов процессоров, дисковой памяти и пропускной способности сети. Следовательно, если при выполнениия одной и той же задачи для некоторого программного продукта анализа данных A требуется на порядок больше вычислительных ресурсов, чем для некоторого программного продукта анализа данных B, то продукт A будет стоить (приблизительно) на порядок дороже продукта B. Эффективность программного обеспечения оказывает непосредственное воздействие на его реальную стоимость.
Отказоустойчивость. Отказоусточивость в контексте рабочих нагрузок анализа данных оценивается не так, как в контексте транзакционных рабочих нагрузок. Для транзакционных рабочих нагрузок отказоустойчивая СУБД может восстановиться после отказа без потери каких-либо данных или обновлений, внесенных зафиксированными транзакциями, и в контексте распределенных баз данных такая система может успешно фиксировать транзакции и продвигать выполнение рабочей нагрузки даже при отказах рабочих узлов. В аналитических рабочих нагрузках присутствуют только запросы на выборку данных, отсутствует потребность в фиксации транзакций, изменяющих базу данных, и отказы узлов не могут привести к потере данных. Поэтому отказоустойчивой аналитической СУБД является такая система, которая не вынуждена выполнять какой-либо запрос заново при отказе узла, участвующего в выполнении этого запроса.
Использование дешевых и ненадежных массовых аппаратных средств для построения кластеров без совместно используемых ресурсов позволяет сократить эксплуатационные расходы и потребление дорогостоящих ресурсов. Имеется также тенденция к использованию наиболее дешевых аппаратных средств при организации центров данных . В результате быстро возрастает вероятность отказов узлов во время выполнения запросов. Эта проблема только обостряется при масштабировании аналитических систем: чем больший объем данных затрагивается при выполнении аналитических запросов, тем большее число узлов требуется для обработки этих запросов. Это еще больше повышает вероятность отказа по крайней мере одного узла во время выполнения запроса. Например, по информации Google , при выполнении аналитического задания в среднем возникает 1,2 отказов. Если при каждом отказе узла требуется выполнять запрос заново, то трудно довести до конца выполнение сложных запросов, для обработки которых требуется достаточно большое время.
Возможность работы в неоднородной среде. Как отмечалось выше, имеется устойчивая тенденция к увеличению числа узлов, участвующих в выполнении запросов. Почти невозможно добиться одной и той же производительности сотен или тысяч вычислительных узлов, даже если всем узлам соответствуют полностью идентичные аппаратные или виртуальные машины. При масштабировании системы все более распространенными становятся частичные отказы узлов, приводящие не к полной утрате их работоспособности, а к падению производительности. Производительность отдельных узлов может также снижаться из-за фрагментации дисковой памяти или ошибок конфигурирования программного обеспечения. На однородность производительности узлов кластера может оказывать влияние и параллельное выполнение нескольких запросов (или, в некоторых случаях, процессов). Параллельные активности, выполняемые в разных виртуальных машинах, которые базируются на одной и той же физической машине, могут приводить к отклонениям показателей производительности на 2-4% .
Если объем работы, требуемой для выполнения запроса, поровну распределяется между узлами кластера без совместно используемых ресурсов, то имеется опасность, что время полного выполнения запроса будет примерно равно времени выполнения своей части работы самым медленным вычислительным узлом. Таким образом, узел с вырожденной производительностью может оказывать несоразмерное влияние на общее время выполнения запроса. В системе, разрабатываемой для использования в неоднородной среде, должны приниматься меры для предотвращения таких ситуаций.
Гибкий интерфейс запросов. Имеются разнообразные средства интеллектуального анализа данных (business intelligence, BI), ориентированные на пользователей. Эти средства работают с программным обеспечением баз данных и поддерживают развитые возможности аналитики, генерацию запросов и визуализацию результатов. Подобные средства являются важной частью общей картины управления аналитическими данными, поскольку бизнес-аналитики часто не получают должную техническую подготовку, и им трудно взаимодействовать с программным обеспечением баз данных напрямую. Средства BI обычно подключаются к базам данных с использованием ODBC или JDBC, так что системы баз данных, для которых требуется обеспечить возможность работы с этими средствами, должны уметь обрабатывать SQL-запросы, поступающие через такие интерфейсы.
В идеале, в системах анализа данных должен также иметься надежный механизм, позволяющий пользователям писать определяемые ими функции (user-defined function, UDF), и запросы, включающие вызовы UDF, должны автоматически распараллеливаться по узлам кластера без совместного использования ресурсов. Таким образом, требуется поддержка как SQL, так и интерфейсного языка, не являющегося SQL.
Data Connector
Data Connector – это интерфейс между независимыми системами баз данных, располагаемыми в узлах кластера, и компонентами TaskTracker. Он расширяет класс InputFormat из Hadoop и является частью библиотеки реализаций InputFormat. От каждого задания MapReduce в коннектор поступают SQL-запрос и параметры подключения, такие как указание на требуемый драйвер JDBC, размер структуры выборки запроса и другие параметры настройки запроса. Коннектор подключается к базе данных, выполняет SQL-запрос и возвращает результат в виде пар "ключ-значение". Теоретически коннектор мог бы подключаться к любой JDBC-совместимой системе баз данных, располагаемой в кластере. Однако для разных баз данных требуются разные оптимизации запросов на выборку данных. Мы реализовали коннекторы для MySQL и PostgreSQL. В будущем мы планируем интегрировать другие СУБД, включая поколоночные системы с открытыми исходными текстами MonetDB и InfoBright. За счет расширения InputFormat из Hadoop мы обеспечиваем органичную интеграцию с MapReduce Framework. Для этой среды базы данных являются источниками данных, аналогичными блокам данных HDFS.
HadoopDB
В этом разделе мы описываем разработку HadoopDB. Целью этого проекта является достижение всех свойств, описанных в разд. 3.
Основная идея HadoopDB состоит в связывании нескольких одноузловых систем баз данных с использованием Hadoop в качестве координатора задач и сетевого коммуникационного слоя. Запросы распараллеливаются по узлам с использованием среды MapReduce; однако как можно больший объем работы по выполнению запроса "проталкивается" в одноузловые системы баз данных. В HadoopDB отказоустойчивость и возможность функционирования в неоднородных средах достигаются путем использования реализации планирования и отслеживания заданий в Hadoop, а производительность, свойственная параллельным системам баз данных, обеспечивается за счет максимального применения при обработке запросов одноузловых СУБД.
История реализации Hadoop
Основой HadoopDB является среда Hadoop. Hadoop состоит из двух уровней: (i) уровня хранения данных, или распределенная файловая система Hadoop (Hadoop Distributed File System, HDFS) и (ii) уровень обработки данных, или среда MapReduce (MapReduce Framework).
HDFS – это блочная файловая система, управляемая центральным узлом NameNode. Файлы разбиваются на блоки фиксированного размера и распределяются по нескольким узлам DataNode кластера. В NameNode поддерживаются метаданные о размере и местоположении блоков и их реплик.
MapReduce Framework основывается на простой архитектуре "главный-подчиненный" (master-slave). Главным является единственный узел JobTracker, а подчиненными, или рабочими узлами – узлы TaskTracker. В узле JobTracker выполняется планирование времени выполнения заданий MapReduce и поддерживается информация о загрузке каждого узла TaskTracker и доступных ресурсах. Каждое задание разбивается на задачи Map (их число зависит от числа блоков данных, которые требуется обработать) и задачи Reduce. JobTracker назначает задачи узлам TaskTracker исходя из требований локальности данных и балансировки нагрузки. Требование локальности удовлетворяется за счет назначения узлам TaskTracker тех задач Map, которые обрабатывают данные, являющиеся локальными для соответствующего узла. Балансировка нагрузки производится за счет того, что всем доступным узлам TaskTracker назначаются задачи. Узлы TaskTracker регулярно посылают в узел JobTracker контрольные сообщения с информацией о своем состоянии.
Интерфейс между уровнями хранения и обработки поддерживается библиотекой InputFormat. Реализации InputFormat разбирают текстовые/бинарные файлы (или подключаются к произвольному источнику данных) и преобразуют данные в пары "ключ-значение", которые могут обрабатываться задачами Map. В Hadoop обеспечивается несколько реализаций InputFormat, одна из которых позволяет всем задачам одного задания, обрабатываемого в данном кластере, обращаться к одной JDBC-совместимой базе данных.
Каталог
В каталоге поддерживается метаинформация о базах данных: (i) параметры соединения, такие как месторасположение базы данных, класс драйвера и учетные данные, (ii) метаданные, такие как наборы данных, содержащиеся в кластере, местоположение реплик и свойства разделения данных.
В текущей реализации HadoopDB эта метаинформация сохраняется в формате XML в HDFS. К этому файлу обращаются JobTracker и TaskTracker для выборки информации, требуемой для планирования задач и обработки данных, которые требуются для запроса. В будущем мы планируем образовать для поддержки каталога отдельную службу, которая будет работать подобно NameNode в Hadoop.
Компоненты HadoopDB
HadoopDB расширяет Hadoop Framework (см. рис. 1) следующими четырьмя компонентами.
Рис. 1. Архитектура HadoopDB
От SQL к MapReduce и планировщику SQL (SMS)
В HadoopDB аналитикам данных предоставляется внешний интерфейс, позволяющий выполнять SQL-запросы.
Планировщик SMS является расширением Hive . Hive преобразует HiveQL (вариант SQL) в задания MapReduce, которые подключаются к таблицам, хранимым в виде файлов HDFS. Задания MapReduce являются ориентированными ациклическими графами (directed acyclic graph, DAG) реляционных операций (таких как фильтрация, выборка (проекция), соединение, агрегирование), которые действуют как итераторы: каждая операция после обработки очередного кортежа данных направляет свой результат в следующую операцию. Поскольку каждая таблица хранится в виде отдельного файла HDFS, в Hive не предполагается совместное размещение таблиц в узлах. Поэтому операции над несколькими таблицами обычно, главным образом, выполняются на фазе Reduce задания MapReduce. Это предположение не совсем справедливо для Hadoop, поскольку некоторые таблицы размещаются в узлах совместно, и, если они разделяются по одному и тому же атрибуту, операцию соединения можно целиком вытолкнуть на уровень базы данных.
Чтобы можно было понять, каким образом Hive расширяется до SMS, и какие между ними имеются различия, сначала мы опишем, как в Hive создается выполняемое задание MapReduce для простого запроса с группировкой и агрегацией. Затем мы покажем, как мы изменяем план запроса для HadoopDB, выталкивая большую часть логики запроса на уровень базы данных.
Рассмотрим следующий запрос:
SELECT YEAR(saleDate), SUM(revenue) FROM sales GROUP BY YEAR(saleDate);
В Hive этот запрос обрабатывается в следующей последовательности фаз:
Синтаксический анализатор преобразует запрос в абстрактное синтаксическое дерево.
Семантический анализатор подключается к внутреннему каталогу Hive MetaStore для выборки схемы таблицы sales. Он также заполняет метаинформацией различные структуры данных (такие как классы Deserializer и InputFormat), требуемые для сканирования таблицы и извлечения необходимых полей.
Затем генератор логических планов создает DAG реляционных операций – план запроса.
До выполнения запроса мы модифицируем MetaStore, помещая в него ссылки на таблицы своей базы данных. В Hive допускается существование внешних таблиц, вне HDFS. В каталоге HadoopDB (п. 5.2.2) поддерживается информация о схемах таблиц и требуемые для MetaStore классы Deserializer и InputFormat. Мы реализовали эти классы.
После генерации физического плана запроса и до выполнения заданий MapReduce мы производим два прохода по физическому плану. На первом проходе мы устанавливаем, какие поля данных действительно обрабатываются планом, и определяем ключи разделения, используемые в операциях Reduce Sink (переразделение). На втором проходе мы обходим DAG снизу-вверх от операций сканирования таблиц до формирования результата или операции File Sink. Все операции до первой операции переразделения с ключом разделения, отличным от ключа базы данных, преобразуются в один или несколько SQL-запросов, которые проталкиваются на уровень базы данных. Для повторного создания SQL из реляционных операций в SMS используется основанный на правилах генератор SQL. После этого логику обработки запроса можно вытолкнуть на уровень базы данных, причем эта часть работы может находиться в диапазоне от пустой (если все таблицы сканируются независимо, и кортежи по одному выталкиваются в DAG операций) до практически всей работы (задача Map требуется только для записи результата в файлы HDFS).
Для приведенного выше запроса с группировкой SMS производит один из двух разных планов. Если таблица sales является разделенной по YEAR(saleDate), производится план запроса, показанный на рис. 2(b): в этом плане вся логика обработки запроса выталкивается на уровень базы данных. Все, что требуется от задачи Map, – это запись результатов в файл HDFS. В противном случае SMS производит план, показанный на рис. 2(c), в котором на уровне базы данных производится частичная агрегация данных, и исключаются операции выборки и группировки, которые присутствуют на фазе Map в плане запроса, генерируемом Hive (рис. 2(a)). Однако в этом случае по-прежнему требуется шаг окончательной агрегации на фазе Reduce для слияния частичных результатов, полученных в каждом узле.
Для обработки запросов с соединениями в Hive предполагается отсутствие совместного размещения соответствующих таблиц. Поэтому в планах, генерируемых Hive, каждая таблица сканируется независимо, и соединение вычисляется после переразделения данных по ключу соединения. В отличие от этого, если ключ соединения совпадает с ключом разделения базы данных, SMS проталкивает на уровень базы данных все поддерево соединения.
К настоящему времени мы поддерживаем только операции фильтрации, выборки (проекции) и агрегации. Поддерживаются только исключительно бесхитростные возможности разделения; в частности, отсутствует поддержка разделения на основе выражений. Поэтому мы не можем выявить, разделена ли таблица по YEAR(saleDate), и, следовательно, вынуждены пессимистически предполагать отсутствие разделения по этому атрибуту. Следует отметить, что вариант Hive, который мы расширяли, является немного дефектным; как разъясняется в п. 6.2.5, он не справляется с выполнением задачи соединения, используемой в нашем тестовом наборе, даже при работе с таблицами из HDFS. Однако для всех остальных тестовых запросов, использованных в наших экспериментах, которые описываются в данной статье, для автоматического проталкивания SQL-запросов на уровень СУБД системы HadoopDB использовался планировщик SMS.
HadoopDB не заменяет Hadoop. Эти
HadoopDB не заменяет Hadoop. Эти системы сосуществуют, позволяя аналитику выбирать соответствующие средства в зависимости от имеющихся данных и задач. Тестовые испытания, описываемые в следующих разделах, показывают, что использование эффективного уровня баз данных позволяет сократить время обработки, особенно при решении задач, требующих обработки сложных запросов (в частности, с соединениями) над структурированными данными. Эксперименты также показывают способность HadoopDB к отказоустойчивости и возможность использования системы в неоднородных средах, являющиеся естественными для систем в стиле Hadoop.
3 Группа Hive разрешила эти проблемы в июне (2009 г.) после того как мы завершили эксперименты. Мы планируем интегрировать с SMS этот последний вариант Hive.
Загрузчик данных (Data Loader)
Data Loader отвечает за (i) глобальное переразделение данных по заданному ключу при их загрузке, (ii) разбиение данных, хранимых в одном узле, на несколько более мелких разделов, или чанков (chank) и (iii) массовую загрузку данных в одноузловые базы данных с использованием чанков.
Data Loader состоит из двух основных компонентов: Global Hasher и Local Hasher. Global Hasher выполняет специальное задание MapReduce в Hadoop, которое читает файлы данных, хранимые в HDFS, и переразделяет их на столько частей, сколько имеется узлов в кластере. Работа перазделения не вызывает накладные расходы сортировки типичных работ MapReduce.
Затем Local Hasher в каждом узле копирует соответствующий раздел из HDFS в локальную файловую систему узла, разделяя его на более мелкие чанки на основе заданного в системе максимального размера чанка.
В Global Hasher и Local Hasher используются разные хэш-функции, чтобы у чанков были примерно одинаковые размеры. Эти хэш-функции также отличаются от функции хэш-разделения, используемой в Hadoop по умолчанию, что обеспечивает лучшую балансировку нагрузки при выполнении заданий MapReduce над данными.
Hadoop
Hadoop – это версия с открытыми кодами среды MapReduce, реализованная под непосредственным влиянием идей исходной статьи про MapReduce и используемая сегодня в десятках компаний для выполнения анализа данных . В описываемых в данной статье экспериментах мы использовали систему Hadoop версии 0.19.1, выполняемую в среде Java 1.6.0. Мы устанавливали систему с несколькими изменениями в конфигурационных установках, используемых по умолчанию. Данные в HDFS сохранялись в блоках размером 256 мегабайт вместо размера в 64 мегабайта, принимаемого по умолчанию. Каждый исполнитель MR работал с кучей максимального размера в 1024 мегабайта. В каждом узле допускалось одновременное выполнение двух экземпляров Map и одного экземпляра Reduce. Мы также расширили буферное пространство для операций чтения-записи файлов и увеличили буфер сортировок до 200 мегабайт (со ста параллельными потоками для слияния). В дополнение к этому, мы изменили число параллельных пересылок, выполняемых функцией Reduce на фазе "перетасовки" (shuffle), и число рабочих потоков управления для каждого HTTP-сервера компонента TaskTracker до 50. Эти настройки соответствуют принципам организации высокопроизводительных кластеров Hadoop . Кроме того, мы допустили повторное использование JVM (Java Virtual Machine).
Для каждого прогона мы сохраняли все входные данные и результаты в HDFS без репликации (в разд. 7 мы добавляем репликацию). После прогона тестов на кластере конкретного размера мы удаляли во всех узлах каталоги данных, заново форматировали и загружали HDFS, чтобы обеспечить равномерное распределение данных между всеми узлами.
Мы представляем результаты как для Hadoop с кодированием вручную, так и для Hadoop с использованием Hive (т.е. планы Hadoop генерировались автоматически на основе SQL-интерфейса Hive). Эти результаты для Hadoop на диаграммах показаны путем разделения соответствующих столбцов на две части. В нижней части показано время, затраченное Hadoop при выполнении заданий, которые кодировались вручную, а верхняя часть демонстрирует дополнительные накладные расходы, затраченные на автоматическую генерацию плана системой Hive, а также на вызовы функций и динамическое разрешение типов данных через Java Reflection API при обработке каждого кортежа в задании, код которого получен путем использования Hive.
Система Hadoop, используемая внутри HadoopDB, конфигурировалась точно так же, как описывалось в предыдущем пункте, за исключением того, что в узлах не допускалось одновременное выполнение нескольких задач map. Кроме того, в каждом рабочем узле инсталлировалась СУБД PostreSQL 8.2.5. Объем основной памяти, используемой для разделяемых буферов, был увеличен до 512 мегабайт, а объем рабочей памяти – до 1 гигабайта. Сжатие данных в PostreSQL не применялось.
Как и в случае Hadoop, мы представляем результаты для HadoopDB при выполнении планов, закодированных вручную, и планов, полученных за счет использования SMS. Эти результаты для HadoopDB на диаграммах показаны путем разделения соответствующих столбцов на две части. В нижней части показано время, затраченное HadoopDB при выполнении планов, которые кодировались вручную, а в верхней части демонстрируются дополнительные накладные расходы, порожденные планировщиком SMS (в частности, расходы на сериализацию данных, выбираемых из основной базы данных, и на их десериализацию перед дальнейшей обработкой в Hadoop).
Испытываемые системы
В наших экспериментах сравнивалась производительность Hadoop, HadoopDB (с PostgreSQL в качестве основой СУБД) и две коммерческие параллельные СУБД.
СУБД-X
СУБД-X – это та же самая коммерческая система баз данных с хранением данных по строкам, которая использовалась для экспериментов в . Поскольку ко времени представления этой статьи на конференцию VLDB облачной редакции этой СУБД не было, мы не могли экспериментировать с ней в среде EC2. Однако, поскольку наши эксперименты с СУБД Vertica в среде EC2 показали снижение ее производительности на 10-15% по сравнению с аналогичными экспериментами на Висконсинском кластере, описанными в , (этого следовало ожидать, поскольку известно, что уровень виртуализации приводит к образованию накладных расходов), в своих цифрах мы воспроизводим показатели СУБД-X из , считая их оценкой сверху производительности СУБД-X, если бы она работала в среде EC2.
4
Нам известно правило для авторов, запрещающее использовать ссылки на литературу в качестве имен существительных. Однако для экономиии места здесь мы используем не как ссылку, а как сокращенную форму выражения "статья Павло и др. из трудов конференции SIGMOD 2009".
5 Сначала мы экспериментировали с MySQL (уровень хранения MyISAM). Однако мы обнаружили, что хотя простые сканирования таблиц выполнялись на 30% быстрее, более сложные запросы обрабатывались намного медленнее из-за отсутствия кластеризованных индексов и слабых алгоритмов соединения.
6 На самом деле, мы попросили того же человека, который пропускал запросы на Vertica в предыдущем исследовании, пропустить те же запросы в среде EC2.
7 В своих экспериментах мы использовали более позднюю версию Vertica, чем в . Замедление в среде EC2 по-преднему сохранилось в пределах 10-15%.
Тестовые испытания
В этом разделе мы оцениваем систему HadoopDB, сравниваем ее с реализацией MapReduce и двумя реализациями параллельных систем баз данных, используя тестовый набор, впервые представленный в . Этот тестовый набор состоит из пяти задач. Первая из них взята прямо из исходной статьи про MapReduce , авторы которой называют ее характерным представителем распространенных задач MR. Следующие четыре задачи являются аналитическими запросами, представляющими характерную рабочую нагрузку анализа структурированных данных, на поддержку которой ориентируется HadoopDB.
Мы проводили свои эксперименты на "крупных" экземплярах Amazon EC2 (зона us-east-1b). В каждом экземпляре имелось 7,5 гигабайт основной памяти, 4 вычислительных блока EC2 (2 виртуальных ядра), 850 гигабайт дисковой памяти (2 × 420 гигабайт плюс 10-гигабайтный корневой раздел). В качестве операционной системы использовалась 64-битная Linux Fedora 8.
Поначалу производительность дискового ввода-вывода в узлах EC2 была довольно низкой (25 мегабайт в секунду). Впоследствии мы инициализировали в каждом узле некоторое дополнительное дисковое пространство, чтобы это исходное замедление не сказывалось на скорости записи промежуточных файлов и результатов задач. После инициализации этого дискового пространства последующие операции записи выполнялись гораздо быстрее (86 мегабайт в секунду). Скорость сети составляла примерно 100-110 мегабайт в секунду. Каждая задача выполнялась по три раза, и фиксировались средние результаты. Окончательные результаты запросов, выполняемых в параллельных системах баз данных, отправлялись из команды shell в файл через программный канал (pipe). Hadoop и HadoopDB сохраняли результаты в HDFS. В этом разделе мы приводим результаты только тех прогонов, в которых все узлы были доступными, работали корректно, и во время выполнения тестов отсутствовали одновременно выполняемые задачи (в разд. 7 мы отказываемся от этих требований). Для каждой задачи производительность измерялась на кластерах из 10, 50 и 100 узлов.
Vertica
Vertica – это относительно новая параллельная система баз данных (первый выпуск вышел в 2005 г.) , основанная на исследовательском проекте C-Store . Vertica является поколоночным хранилищем данных (column-store), т.е. каждый атрибут каждой таблицы хранится (и к нему обеспечивается доступ) по отдельности. Было показано, что этот метод позволяет повысить производительность системы на рабочих назрузках, содержащих, главным образом, запросы на выборку данных.
У системы Vertica имеется "облачная" редакция, которую мы и использовали в экспериментах, описываемых в этой статье. Vertica использовалась для сравнения производительности систем и в предыдущем исследовании на том же тестовом наборе, поэтому мы конфигуривали эту систему таким же образом, как и раньше . Таким образом, у Vertica имелась следующая конфигурация. Все данные были сжаты, и Vertica работала прямо со сжатыми данными. Первичные индексы реализовывались путем сортировки таблиц по индексному атрибуту. Параметры конфигурирования Vertica, используемые по умолчанию, не изменялись.
Загрузка данных
На рис. 3 и 4 показано время загрузки двух наборов данных – Grep и UserVisits. В то время как данные задачи Grep генерируются случайным образом, и для них не требуется какая-либо предварительная обработка, данные UserVisits нужно во время загрузки переразделять по destinationURL и индексировать во всех базах данных по visitDate, чтобы добиться лучшей производительности на аналитических запросах (системе Hadoop такое переразделение пользы бы не принесло). Кратко опишем процесс загрузки для всех систем.
Hadoop: Данные в каждом узле загружались в неизменяемом виде из файла UserVisits. HDFS автоматически разбивает файл на блоки размеров 256 мегабайт и сохраняет блоки в локальных DataNode. Поскольку все узлы загружали свои данные в параллель, для кластера каждого размера мы указываем максимальное время загрузки в его узлах. На время загрузки сильно влияют "отстающие". Этот эффект особенно заметен при загрузке UserVisits, где в 100-узловом кластере наличие одного медленного узла привело к увеличению общего времени загрузки до 4355 секунд, а в 10-узловом – до 2600 секунд при среднем времени загрузки по всем узлам всего 1100 секунд.
HadoopDB: В качестве максимального размера чанка мы установили 1 гигабайт. Каждый чанк размещался в отдельной базе данных PostgreSQL, и SQL-запросы к нему обрабатывались независимо от запросов к другим чанкам. Мы указываем максимальное время загрузки по всем узлам, имея в виду полное время загрузки и Grep, и UserVisits.
Поскольку для набора данных Grep не требуется какая-либо предварительная обработка, и на каждый узел приходится всего 535 мегабайт данных, все данные загружались с использованием стандартной команды SQL COPY в один чанк в каждом узле.
Global Hasher разделяет набор данных UserVisits по всем узлам кластера. После этого Local Hasher выбирает из HDFS 20-гигабайтный раздел и хэш-разделяет его на 20 более мелких чанков. Затем каждый чанк массовым образом загружается с использованием команды COPY. В заключение для каждого чанка создается индекс по visitDate.
Процесс загрузки UserVisits разбивается на несколько шагов. Наиболее дорогостоящим шагом этого процесса является первое переразделение, выполняемое Global Hasher. Оно занимает почти половину общего времени загрузки – 14000 секунд. Из оставшихся 16000 секунд 2500 секунд (15,6%) выполняется локальное разделение данных на 20 чанков; массовое копирование в таблицы занимает 5200 секунд (32,5%); на создание кластеризованных индексов (включая сортировку) тратится 7100 секунд (44,4%); и на завершающую очистку (vacuuming) баз данных уходит 7200 секунд (7,5%). Все шаги после глобального переразделения выполняются параллельно во всех узлах. Время загрузки в разных узлах различалось. В некоторых узлах загрузка UserVisits полностью завершалась всего за 10000 секунд после конца глобального переразделения.
Vertica: Процедура загрузки для Vertica аналогична той, которая описана в . Время загрузки сократилось, поскольку для экспериментов использовалась более новая версия Vertica (3.0). Основное отличие состоит в том, что теперь команда массовой загрузки COPY выполняется во всех узлах кластера полностью параллельно.
СУБД-X: Мы указываем общее время загрузки, включая сжатие данных и построения индексов, взятое из .
В отличие от СУБД-X, возможности параллельной загрузки Hadoop, HadoopDB и Vertica обеспечивают масштабирование всех этих систем при увеличении числа узлов. Поскольку скорость загрузки ограничивается самой низкой скоростью записи на диск в кластере, загрузка – это единственный процесс, для которого естественная устойчивость Hadoop и HadoopDB к неоднородности среды не обеспечивает никаких преимуществ.
Тестовые испытания для сравнения производительности и масштабируемости
В первой тестовой задаче ("задаче Grep") требуется просканировать набор данных, состоящий из 100-байтных записей, для нахождения записей, которые содержат заданный шаблон из трех символов. Это единственная задача, в которой требуется обработка большей частью неструктурированных данных, и она была включена в тестовый набор авторами , поскольку упоминалась в исходной статье про MapReduce .
Для изучения более сложных случаев использования сравниваемых систем в тестовый набор были включены четыре в большей степени аналитические задачи, связанные с анализом журнальных файлов и HTML-документов. Три задачи работают над структурированными данными, а последняя – как над структированными, так и над неструктурированными данными.
Набор данных, с которым работают эти четыре задачи, включает таблицу UserVisits, моделирующую журнальные файлы трафика HTTP-сервера, таблицу Documents, содержащую 600000 случайным образом сгенерированных HTML-документов, и таблицу Ranking, которая содержит некоторые метаданые, вычисленные на основе данных из таблицы Documents. Схемы таблиц тестового набора подробно описаны в . Вкратце, таблица UserVisits содержит 9 атрибутов, наиболее крупным из которых является destinationURL, имеющий тип VARCHAR(100. Каждый кортеж включает примерно 150 байт. Таблица Documents содержит два атрибута: URL (VARCHAR(100)) и contents (произвольный текст). Наконец, таблица Ranking содержит три атрибута: pageURL (VARCHAR(100)), pageRank (INT) и avgDuration (INT).
Генератор данных производит по 155 миллионов записей UserVisits (20 гигабайт) и 18 миллионов записей Ranking (1 гигабайт) на каждый узел. Поскольку генератор данных не обеспечивает попадание в один узел кортежей Ranking и UserVisits с одним и тем же значением атрибута URL, во время загрузки данных производится их переразделение, как описывается ниже.
Записи наборов данных UserVisits и Ranking сохраняются в HDFS в виде плоского текста, по одной записи в строке с полями, разделяемыми специальным символом-разделителем. Для обеспечения доступа во время выполнения к разным атрибутам функции Map и Reduce расщепляют запись по разделителю, образуя массив строк.
Рис. 3. Загрузка данных для задачи Grep (0,5 гигабайта на узел)
Рис. 4. Загрузка набора данных UserVisits (20 гигабайт на узел)
Задача агрегации
В следующей задаче вычисляются суммы значений атрибута adRevenue (доходы от рекламы) для групп кортежей таблицы UserVisits, получаемых путем группировки этой таблицы либо по первым семи символам столбца sourceIP, либо по всему этому столбцу. В отличие от предыдущих задач, при решении этой задачи требуется обмен промежуточными результатами между разными узлами кластера (чтобы можно было вычислить окончательные агрегатные значения). При группировке по семибайтному префиксу образуется 2000 уникальных групп. При группировке по всему sourceIP число таких групп составляет 2500000.
В системах Vertica, СУБД-X, HadoopDB и Hadoop (Hive) выполнялись одни и те же SQL-запросы:
небольшой запрос:
SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue) FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7);
крупный запрос:
SELECT sourceIP, SUM(adRevenue) FROM UserVisits GROUP BY sourceIP;
В Hadoop (с кодированием вручную) это задание выполнялось в точности так же, как в : функция Map выводит adRevenue и первые семь символов поля sourceIP (или все поле в случае крупного запроса), и эти данные передаются функции Reduce, которая выполняет требуемую агрегацию для каждого префикса (или всего значения) sourceIP.
В HadoopDB планировщик SMS проталкивает весь SQL-запрос в экземпляры PostgreSQL. Полученные результаты передаются задачам Reduce в Hadoop, которые выполняют окончательную агрегацию (после сбора всех предварительных частичных агрегатов от всех экземпляров).
Рис. 7. Крупная задача агрегации
Рис. 8. Малая задача агрегации
Показатели производительности всех сравниваемых систем показаны на рис. 7 и 8. Аналогично задаче Grep, на время выполнения этого запроса влияет скорость чтения с диска. Поэтому обе коммерческие системы получают преимущества от сжатия данных и превосходят по производительности HadoopDB и Hadoop.
"Малая" (с группировкой по подстроке) задача агрегации демонстрирует исключение из того общего правила, что Hive добавляет накладные расходы к Hadoop, кодируемому вручную (на рис. 8 время, затраченное Hadoop при выполнении плана, который был подготовлен с использованием Hive, представлено нижней частью столбца Hadoop). План, подготовленный Hive, выполняется гораздо быстрее задания, закодированного вручную, потому что в нем используется стратегия хэш-агрегации (на фазе Map задания поддерживается внутренняя схема хэширования-агрегации), которая оказывается оптимальной при небольшом числе групп. При решении крупной задачи агрегации Hive переключается на стратегию агрегации путем сортировки, обнаруживая, что число групп превышает половину числа входных записей, помещающихся в одном блоке. В плане для Hadoop, закодированном нами (и авторами ) вручную, мы не смогли применить хэш-агрегацию для "малого" запроса, потому что общепринятой практикой MapReduce является использование агрегации путем сортировки (с применением комбинаторов (combiner)).
Эти результаты иллюстрируют преимущество использования оптимизаторов, которые присутствуют в системах баз данных и системах обработки реляционных запросов, подобных Hive, и могут использовать статистические данные из каталогов системы или простые правила оптимизации для выбора между хэш-агрегацией и агрегацией путем сортировки.
В отличие от комбинаторов Hadoop, Hive сериализует частичные агрегаты в строки, а не поддерживает их в естественном бинарном представлении. Поэтому при обработке крупного запроса план, построенный Hive, выполняется намного дольше плана, закодированного для Hadoop вручную.
В PostgreSQL при решении обеих задач используется хэш-агрегация, поскольку таблица хэш-агрегации для каждого гигабайтного чанка легко помещается в основной памяти. Из-за применения этой эффективной реализации агрегации HadoopDB превосходит по производительности Hadoop при решении обеих задач.
Эти запросы хорошо подходят для систем с поколоночным хранением таблиц, поскольку два атрибута, требуемые для выполнения запроса (sourceIP и adRevenue) включают всего 20 байт из более чем 200 байт каждой записи UserVisits. Из-за соответствующей экономии ввода-вывода производительность Vertica оказывается значительно выше производительности других систем.
Задача агрегации с использованием UDF
В последней задаче для каждого документа из таблицы Documents нужно посчитать число входящих в него ссылок из других документов из той же таблицы. Для Hadoop и Vertica HTML-документы объединяются в более крупные файлы, каждый размером в 256 и 56 мегабайт соответственно. Система HadoopDB могла хранить каждый документ по отдельности в таблице Documents с использованием типа данных TEXT. СУБД-X обрабатывала по отдельности каждый файл с HTML-документом, как описывается ниже.
Теоретически в параллельных системах баз данных следовало бы иметь возможность использования определяемой пользователями функции F для разбора содержимого каждого документа и порождения списка всех URL, обнаруживаемых в документе. Затем можно было бы поместить этот список во временную таблицу и выполнить над ней простой запрос с COUNT и GROUP BY, подсчитывающий число вхождений каждого уникального URL.
К сожалению, как было установлено в , внутри используемых параллельных систем баз данных реализовать такую UDF было затруднительно. В СУБД-X отсутствовала возможность сохранения каждого документа в базе данных в виде символьного BLOB и определения UDF, работающей прямо с такими BLOB'ами, по причине "известной ошибки в [данной] версии системы". Поэтому UDF была реализована внутри СУБД, но данные хранились в отдельных HTML-документах во внешней файловой системе, и UDF производила требуемые внешние вызовы.
В Vertica в настоящее время UDF не поддерживаются, и поэтому пришлось написать на Java простой парсер документов, работающий вне СУБД. Этот парсер параллельно выполнялся в каждом узле, разбирая файл с конкатенированными документами и записывая в файл на локальном диске обнаруживаемые URL. Затем этот файл загружался во временную таблицу с использованием средства массовой загрузки Vertica, и выполнялся второй запрос, который подсчитывался число входящих ссылок.
В Hadoop мы использовали стандартное средство TextInputFormat, которое разбирало внутри задачи Map каждый документ и выводило список обнаруженных в нем URL. Функции Combine и Reduce суммировали число экземпляров каждого уникального URL.
Что касается HadoopDB, то поскольку текстовая обработка значительно проще выражается в MapReduce, мы решили воспользоваться той возможностью, что в HadoopDB допускаются запросы либо на SQL, либо в терминах MapReduce, и применили в данном случае второй вариант. Все содержимое таблицы Documents в каждом узле PostgreSQL передавалось в Hadoop с использованием следующего оператора SQL:
SELECT url, contents FROM Documents;
После этого данные обрабатывались с использованием задания MR. На самом деле, в Hadoop и HadoopDB использовался один и тот же код MR.
Рис. 10. Задача агрегации с применением UDF
Рис. 10 иллюстрирует преимущество использования гибридной системы, подобной HadoopDB. Уровень баз данных позволяет эффективно хранить текстовые HTML-документы, а среда MapReduce обеспечивает требуемую мощность их обработки.
Hadoop превосходит HadoopDB по производительности, если обрабатывает файлы, в которых склеено несколько HTML-документов. Однако в HadoopDB не утрачивается исходная структура данных, поскольку не требуется склейка файлов HTML-документов. Заметим, что общее время такой склейки составляет около 6000 секунд на узел. Эти накладные расходы на рис. 10 не учитываются.
Производительность СУБД-X и Vertica ниже, чем у систем, основанных на Hadoop, поскольку входные файлы хранятся вне базы данных. Кроме того, при решении этой задачи обе коммерческие СУБД не масштабируются линейным образом при увеличении числа узлов в кластере.
8 Диски EC2 медленно работают при начальной записи. Однако скорость записи не влияла на тестовые испытания производительности. Кроме того, до начала экспериментов диски инициализировались.
Задача фильтрации
В первой задаче над структурированными данными таблица Rankings фильтруется по простому условию на атрибуте pageRank. Этому условию удовлетворяют примерно 36000 кортежей в каждом узле.
В системах Vertica, СУБД-X, HadoopDB и Hadoop (Hive) выполнялся один и тот же SQL-запрос:
SELECT pageURL, pageRank FROM Rankings WHERE pageRank > 10;
В Hadoop (с кодированием вручную) это задание выполнялось в точности так же, как в : функция Map разбирает кортежи Rankings с использованием поля-разделителя, применяет предикат на pageRank и помещает в результат pageURL и pageRank из кортежей, удовлетворяющих условию, в виде пара "ключ-значение". Функция Reduce в данном случае не требуется.
Планировщик HadoopDB SMS проталкивает разделы WHERE и SELECT в экземпляры PostgreSQL.
Рис. 6. Задача фильтрации
Производительность каждой системы показана на рис. 6. В Hadoop (с использованием и без использования Hive) применяется принцип грубой силы, полность сканируются все данные файла. Однако другие системы выигрывают от использования кластеризованных индексов на столбце pageRank. Поэтому в целом HadoopDB и параллельным системам баз данных удается превзойти HadoopDB по производительности.
Поскольку данные UserVisits разделяются по destinationURL, наличие связи по внешнему ключу между pageURL таблицы Rankings и destinationURL таблицы UserVisits приводит к тому, что Global Hasher и Local Hasher переразделяют Rankings по pageURL. Каждый чанк таблицы Rankings составляет всего 50 мегабайт (располагаясь совместно с соответствующим гигабайтным чанком таблицы UserVisits). Накладные расходы на планирование двадцати задач Map для обработки всего одного гигабайта данных на узел приводят к значительному снижению производительности HadoopDB.
Поэтому мы поддерживаем дополнительную, не разделенную на чанки копию таблицы Rankings, содержащую по одному гигабайту на узел. При работе с таким набором данных HadoopDB превосходит по производительности Hadoop, поскольку использование кластеризованного индекса по pageRank позволяет отказаться от последовательного сканирования всего набора данных. HadoopDB масштабируется лучше, чем СУБД-X и Vertica, в основном из-за возрастающих сетевых накладных расходов этих систем, которые выходят на первый план, когда время выполнения запроса в других отношениях является очень незначительным.
Задача Grep
Каждая запись состоит из уникального ключа в первых 10 байтах и 90-байтной строки символов. Шаблон "XYZ" ищется в 90-байтном поле, и в каждых 10000 записей содержится одна такая подстрока. В каждом узле содержится 5,6 миллионов таких 100-байтных записей, или примерно 535 мегабайт данных. Общее число записей, обрабатываемых в кластере с заданным числом услов, составляет 5600000 × (число узлов).
В системах Vertica, СУБД-X, HadoopDB и Hadoop (Hive) выполнялся один и тот же SQL-запрос:
SELECT * FROM Data WHERE field LIKE ‘%XYZ%’;
Ни у одной из сравниваемых систем не было индекса на атрибуте символьной строки. Поэтому во всех системах требовалось полное сканирование таблицы, и производительность в основном органичивалась скоростью дисков.
В Hadoop (с кодированием вручную) это задание выполнялось в точности так же, как в (одна функция Map, сравнивающая подстроки с "XYZ"). В этом случае функция Reduce не требовалась, и результаты Map напрямую записывались в HDFS.
Планировщик HadoopDB SMS проталкивал раздел WHERE в экземпляры PostgreSQL.
Рис. 5. Задача Grep
На рис. 5 показаны результаты (смысл столбцов с разрывом пояснялся в подразделе 6.1). Производительность HadoopDB немного выше, чем у Hadoop, поскольку наша система более эффективно производит ввод-вывод из-за отсутствия разбора данных во время выполнения. Однако обе системы проигрывают в производительности параллельным системам баз данных. Это объясняется тем, что и в Vertica, и в СУБД-X данные сжимаются, что существенно сокращает объем ввода-вывода (в отмечается, что во всех экспериментах сжатие данных приводило к ускорению СУБД-X почти на 50%).
Задача соединения
Задача соединения состоит в нахождении среднего значения pageRank набора страниц, посещенных из sourceIP, которые принесли наибольший доход от рекламы в течение недели с 15 по 22 января 2009 г. Ключевое различие между этой и предыдущими задачами состоит в том, что теперь требуется считывать данные из двух разных наборов данных и соединять эти данные (информация о рейтинге станиц (pageRank) находится в таблице Rankings, а информация о доходе от рекламы (adRevenue) – в таблице UserVisits). В таблице UserVisits имеется примерно 134000 записей, у которых значение атрибута visitDate попадает в заданный интервал времени.
В отличие от трех предыдущих задач, мы не могли использовать одни и те же формулировки запросов на SQL и для параллельных систем баз данных, и для систем, основанных на Hadoop. Это связано с тем, что версия Hive, которую мы расширяли, требуемый запрос обработать не могла. Хотя эта версия принимала на обработку SQL-запрос, который соединял, отфильтровывал и агрегировал кортежи из двух таблиц, выполнить сгенерированный план в среде Hadoop не удавалось. Кроме того, мы заметили, что в плане запросов с соединениями данного типа использовалась исключительно неэффективная стратегия выполнения. В частности, операция фильтрации планировалась позже соединения таблиц. Поэтому для такого запроса мы можем представить только результаты выполнения планов, закодированых вручную.
В HadoopDB мы проталкивали в экземпляры SQL фильтрацию, соединение и частичную агрегацию с использованием следующего SQL-запроса:
SELECT sourceIP, COUNT(pageRank), SUM(pageRank), SUM(adRevenue) FROM Rankings AS R, UserVisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN ‘2000-01-15’ AND ‘2000-01-22’ GROUP BY UV.sourceIP;
Затем мы использовали в Hadoop одну задачу Reduce, которая собирала все частичные агрегаты от всех экземпляров PostgreSQL и выполняла окончательную агрегацию.
Параллельные системы баз данных выполняли тот же SQL-запрос, что и в .
Хотя в Hadoop имеется поддержка операции соединения, для ее выполнения требуется, чтобы оба набора данных были отсортированы по ключу соединения. Это требование ограничивает применимость операции соединения, поскольку во многих случаях, включая рассматриваемый запрос, такая сортировка автоматически не обеспечивается, а выполнение сортировки до соединения добавляет существенные накладные расходы. Мы установили, что даже если бы мы отсортировали входные данные (и не включили бы время сортировки в общее время выполнения запроса), производительность запроса на основе Hadoop-соединения была бы ниже производительности запроса с использованием трехфазной MR-программы, применявшейся в (которая основывалась на стандартных операциях 'Map' и 'Reduce'). Поэтому наши результаты получены путем использования той же MR-программы, которая использовалась (и подробно описывалась) в .
Рис. 9. Задача соединения
На рис. 9 приводится сводка результатов, полученных при прогонах этой тестовой задачи. Для Hadoop получены результаты, аналогичные приведенным в : производительность системы ограничивается тем, что она полностью сканирует набор данных UserVisits в каждом узле для выполнения фильтрации данных.
HadoopDB, СУБД-X и Vertica показывают более высокую производительность за счет использования индексов для ускорения фильтрации и наличия естественной поддержки соединений. Эти системы демонстрируют незначительное ухудшение производительности при увеличении числа узлов из-за финальной одноузловой агрегации adRevenue и сортировки по полученным агрегатным значениям.
Благодарности
Мы хотели бы поблагодарить Сергея Мельника (Sergey Melnik) и трех анонимных рецензентов за их исключительно глубокие комментарии к ранней версии этой статьи, которые мы учли при подготовке окончательного варианта. Мы также благодарны Эрику МакКоллу (Eric McCall) за помощь в использовании Vertica в среде EC2. Это исследование поддерживалось грантами NSF IIS- 0845643 и IIS-0844480.
Литература
Hadoop. Web Page HadoopDB Project. Web page
Vertica
D. Abadi. What is the right way to measure scale? DBMS Musings Blog
P. Barham, B. Dragovic, K. Fraser, S. Hand, T. Harris, A. Ho, R. Neugebauer, I. Pratt, and A. Warfield. Xen and the art of virtualization. In Proc. of SOSP, 2003 R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey, D. Shakib, S. Weaver, and J. Zhou. Scope: Easy and efficient parallel processing of massive data sets. In Proc. of VLDB, 2008 G. Czajkowski. Sorting 1pb with mapreduce
J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI, 2004 D. DeWitt and M. Stonebraker. MapReduce: A major step backwards. DatabaseColumn Blog. D. J. DeWitt, R. H. Gerber, G. Graefe, M. L. Heytens, K. B. Kumar, and M. Muralikrishna. GAMMA - A High Performance Dataflow Database Machine. In VLDB ’86, 1986 Facebook. Hive. Web page. S. Fushimi, M. Kitsuregawa, and H. Tanaka. An Overview of The System Software of A Parallel Relational Database Machine. In VLDB ’86, 1986. Hadoop Project. Hadoop Cluster Setup. Web Page
J. Hamilton. Cooperative expendable micro-slice servers (cems): Low cost, low power servers for internet-scale services. In Proc. of CIDR, 2009 Hive Project. Hive SVN Repository. Accessed May 19th 2009
J. N. Hoover. Start-Ups Bring Google’s Parallel Processing To Data Warehousing. InformationWeek, August 29th, 2008. S. Madden, D. DeWitt, and M. Stonebraker. Database parallelism choices greatly impact scalability. DatabaseColumn Blog
Mayank Bawa. A $5.1M Addendum to our Series B
C. Monash. The 1-petabyte barrier is crumbling
C. Monash. Cloudera presents the MapReduce bull case. DBMS2 Blog
C. Olofson. Worldwide RDBMS 2005 vendor shares. Technical Report 201692, IDC, May 2006. C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig latin: a not-so-foreign language for data processing. In Proc. of SIGMOD, 2008. A. Pavlo, A. Rasin, S. Madden, M. Stonebraker, D. DeWitt, E. Paulson, L. Shrinivas, and D. J. Abadi. A Comparison of Approaches to Large Scale Data Analysis. In Proc. of SIGMOD, 2009. Русский перевод: Эндрю Павло, Эрик Паулсон, Александр Разин, Дэниэль Абади, Дэвид Девитт, Сэмюэль Мэдден, Майкл Стоунбрейкер. Сравнение подходов к крупномасштабному анализу данных
M. Stonebraker, D. J. Abadi, A. Batkin, X. Chen, M. Cherniack, M. Ferreira, E. Lau, A. Lin, S. Madden, E. J. O’Neil, P. E. O’Neil, A. Rasin, N. Tran, and S. B. Zdonik. C-Store: A column-oriented DBMS. In VLDB, 2005. D. Vesset. Worldwide data warehousing tools 2005 vendor shares. Technical Report 203229, IDC, August 2006.
9 Информация: У авторов Дэниэла Абади и Александра Разина имеются небольшие пакеты акций компании Vertica, врученные им за работу в проекте C-Store – предшественнике Vertica.
Обсуждение
Следует отметить, что хотя процентное замедление Vertica было больше, чем у Hadoop и HadoopDB, ее общее время выполнения запроса (даже при наличии отказа или медленного узла) значительно меньше, чем у этих систем. Кроме того, производительность Vertica в отсутствии сбоев на порядок выше, чем у Hadoop и HadoopDB (в основном, потому, что поколоночное хранение данных обеспечивает большой выигрыш при выполнении небольших запросов с агрегацией). Hadoop и HadoopDB могут показать такую же производительность, но на кластере, у которого число узлов на порядок больше. Следовательно, для Vertica сбои и замедление работы узлов менее вероятны, чем для Hadoop и HadoopDB. Кроме того, для поддержки 6,5-гигабайтной базы данных eBay (вероятно, крупнейшего в мире хранилища данных к июню 2009 г.) используется всего лишь 96-узловой кластер без совместно используемых ресурсов. В кластерах с числом узлов меньше 100 отказы узлов возникают достаточно редко.
Мы утверждаем, что в будущем станут распространенными производственные установки систем баз данных с использованием 1000-узловых кластеров, и будут не редки случаи использования 10000-узловых кластеров. Это предсказание основывается на трех наблюдаемых тенденциях. Во-первых, объем производственных данных растет быстрее, чем диктует закон Мура (см. разд. 1). Во-вторых, становится понятно, что из соображений и соотношения "цена-производительность", и (все более важного) соотношения "потребляемая энергия-производительность" использовать много дешевых, потребляющих мало энегии серверов выгоднее, чем использовать меньшее число "тяжеловесных" серверов [14]. В-третьих, как никогда ранее требуется выполнять анализ данных внутри СУБД, а не выталкивать данные для анализа во внешние системы. В перегруженных дисками архитектурах, подобных 96-узловой системе баз данных eBay, отсутствует вычислительная мощность, требуемая для поддержки аналитических рабочих нагрузок.
Поэтому в будущем нас ожидают тяжеловесные аналитические задания над базами данных, для выполнения которых понадобится больше времени и больше узлов. Вероятность отказа в этих приложениях следующего поколения будет гораздо больше, чем сегодня, и повторное выполнение всего задания после возникновения отказа будет неприемлемым (сбои могут стать настолько частыми, что долго работающие задания никогда не закончатся!). Поэтому, хотя Hadoop и HadoopDB расплачиваются падением производительности за планирование во время исполнения, рестарт на уровне блоков и частое сохранение состояния, накладные расходы на достижение стабильной отказоустойчивости станут неизбежными. Одной из особенностей системы HadoopDB является то, что она может переходить из одного конца этого спектра в другой конец. Поскольку основной единицей обработки является чанк, при снятии ограничений на размер чанка такие системы, подобно Vertica, смогут поддерживать рабочие нагрузки, для которых требуется высокая производительность и низкая отказоустойчивость, а при использовании более мелких чанков смогут обеспечивать высокий уровень отказоустойчивости (подобно Hadoop).
Отказоустойчивость и неоднородная среда
Как отмечалось в разд. 3, в крупных кластерах без совместно используемых ресурсов высока вероятность отказа или замедления отдельных узлов. При выполнении экспериментов, описываемых в этой статье, в среде EC2 мы часто сталкивались и с отказами узлов, и с замедлением их работы (вот примеры уведомлений, которые нам случалось получать: "4:12 PM PDT (Pacific Daylight Time): "Мы изучаем локальную проблему в зоне US-EAST. Из-за этого небольшое число экземпляров в настоящее время недоступно для использования. Мы работаем над восстановлением их работоспособости." или "Сегодня, начиная с 11:30 PM PDT, мы будем производить техническое обслуживание частей сети Amazon EC2. Целью работ является сведение к минимуму вероятности воздействия на экземпляры Amazon EC2, но, возможно, в течение короткого времени, пока соответствующие изменения вступят в силу, некоторым пользователям придется столкнуться с более частой потерей пакетов.").
В параллельных системах баз данных время обработки запросов обычно определяется временем, которое затрачивается на выполнение своей части задачи наиболее медленным узлом. В отличие от этого, в MapReduce любая задача может быть запланирована для выполнения на любом узле при условии, что он свободен, и на него передаются или в нем уже имеются требуемые входные данные. Кроме того, в Hadoop поддерживается избыточное выполнение задач, выполняемых на "отстающих" узлах, чтобы сократить влияние медленных услов на общее время выполнения запроса.
Отказоустойчивость в Hadoop достигается путем перезапуска в других узлах задач, которые выполнялись на отказавших узлах. JobTracker получает периодические контрольные сообщения от компонентов TaskTracker. Если некоторый TaskTracker не общается с JobTracker в течение некотрого предустановленного периода времени (срока жизни (expiry interval), JobTracker считает, что соответствующий узел отказал и переназначает все задачи Map/Reduce этого узла другим узлам TaskTracker. Этот подход отличается от подхода, применяемого в большинстве параллельных систем баз данных, в которых при отказе какого-либо узла обработка незавершенных запросов аварийным образом завершается и начинается заново (с использованием вместо отказавшего узла узла-реплики).
За счет наследования от Hadoop средств планирования и отслеживания заданий HadoopDB обладает аналогичными свойствами отказоустойчивости и эффективной работы при наличии "отстающих" узлов.
Для проверки эффективности HadoopDB в сравнении с Hadoop и Vertica в средах, подверженных отказам и неоднородности, мы выполняли запрос с агрегацией 2000 групп (см. п. 6.2.4) на 10-узловом кластере, и в каждой системе поддерживали по две реплики данных. В Hadoop и HadoopDB срок жизни TaskTracker устанавливался в 60 секунд. В этих экспериментах использовались следующие установки.
Hadoop (Hive): Репликацией данных управляла HDFS. HDFS реплицировала каждый блок данных в некотором другом узле, выбираемом случайным образом с равномерным распределением.
HadoopDB (SMS): Как описывалось в разд. 6, в каждом узле содержится двадцать гигабайтных чанков таблицы UserVisits. Каждый из этих 20 чанков реплицировался в некотором другом узле, выбираемом случайным образом.
Vertica: В Vertica репликация обеспечивается путем хранения дополнительных копий сегментов каждой таблицы. Каждая таблица хэш-разделяется между узлами, и резервная копия каждого сегмента размещается в некотором другом узле, выбираемом по правилу репликации. При сбое узла используется эта резервная копия, пока не будет заново образован утраченный сегмент.
В тестах отказоустойчивости мы прекращали работу некоторого узла после выполения 50% обработки запроса. Для Hadoop и HadoopDB это эквивалентно отказу узла в тот момент, когда было выполнено 50% работы запланированными задачами Map. Для Vertica это эквивалентно тому, что узел отказал после истечения 50% от среднего времени обработки данного запроса.
Для измерения процентного увеличения времени выполнения запроса в неоднородных средах мы замедляли работу некоторого узла путем выполнения фонового задания с большим объемом ввода-вывода. Это задание считывало значения из случайных позиций крупного файла и часто очищало кэши операционной системы. Файл находится на том же диске, на котором сохранялись данные системы.
Не было замечено какой-либо разницы в процентном замедлении HadoopDB с использованием и без использования SMS и Hadoop с использованием и без использования Hive. Поэтому мы указываем результаты для HadoopDB с использованием SMS и Hadoop с использованием Hive и, начиная с этого места, называем эти системы просто HadoopDB и Hadoop соответственно.
Рис. 11. Эксперименты с отказоустойчивостью и неоднородностью на кластере с 10 узлами
Результаты экспериментов показаны на рис. 11. Отказы узлов замедляли HadoopDB и Hadoop в меньшей степени, чем систему Vertica. В Vertica возрастание общего времени выполнения запроса происходит из накладных расходов на аварийное завершение выполнения запроса и его полное повторное выполнение.
В HadoopDB и Hadoop задачи, выполнявшиеся в отказавшем узле, распределялись между оставшимися узлами, содержащими реплики данных. HadoopDB несколько превосходит Hadoop по производительности. В Hadoop те узлы TaskTracker, которым придется обрабатывать блоки, не локальные для этих узлов, будут вынуждены до начала обработки их скопировать (из реплик). В HadoopDB же обработка проталкивается в реплики баз данных. Поскольку число записей, возвращаемых после обработки запроса, меньше размера исходных данных, HadoopDB не приходится сталкиваться при отказе узла с такими же сетевыми накладными расходами, что возникают у Hadoop.
В среде, в которой один из узлов является исключительно медленным, HadoopDB и Hadoop демонстрируют менее чем 30-процентное увеличение времени выполнения запроса, в то время как у Vertica это время увеличивается на 170%. Vertica ожидает, пока "отстающий" узел завершит обработку. В HadoopDB и Hadoop запускаются избыточные задачи в узлах, которые завершили выполнение своих задач. Поскольку данные разбиваются на чанки (в HadoopDB имеются гигабайтные чанки, а в Hadoop – 256-мегабайтные блоки), разные реплики необработанных блоков, назначенных "отстающему" узлу, параллельно обрабатываются несколькими узлами TaskTracker. Таким образом, задержка из-за потребности обработки этих блоков распределяется между узлами кластера.
В своих экспериментах мы обнаружили, что в планировщике задач Hadoop используется некоторое предположение, противоречащее модели HadoopDB. В Hadoop узлы TaskTracker копируют данные, не являющиеся для них локальными, из отстающих узлов или реплик. Однако HadoopDB не перемещает чанки PostgreSQL в новые узлы. Вместо этого TaskTracker избыточной задачи подключается либо к базе данных "отстающего" узла, либо к ее реплике. Если этот TaskTracker подключится к базе данных "отстающего" узла, то в этом узле потребуется параллельно обрабатывать еще один запрос, что приведет к еще большему замедлению. Поэтому та же особенность, которая приводит к немного лучшим характеристкам HadoopDB, чем у Hadoop, при продолжении работы после сбоя узла, приводит к несколько более высокому процентному замедлению работы HadoopDB при работе в неоднородных средах. Мы планируем поменять реализацию планировщика, чтобы узлы TaskTracker всегда подключались не к базам данных "отстающих" узлов, а к их репликам.
Сводка описанных результатов
При отсутствии отказов или фоновых процессов производительность HadoopDB может приблизиться к производительности параллельных систем баз данных. Имеется несколько причин, по которым HadoopDB не достигает тех же или лучших результатов, чем параллельные системы: (1) в PostreSQL не поддерживается поколоночное хранение таблиц; (2) оценки производительности СУБД-X являются излишне оптимистичными (примерно на 15% лучше реальных показателей); (3) в PostgreSQL не использовалось сжатие данных; (4) имеются некоторые накладные расходы на поддержку взимодействия между Hadoop и PostgreSQL, возрастающие при увеличении числа чанков. Мы надеемся, что часть этих накладных расходов в будущем удастся устранить.
HadoopDB неизменно превосходит по производительности Hadoop (за исключение задачи агрегации с использованием UDF, для которой мы не учитывали время слияния данных для Hadoop).
Хотя время загрузки HadoopDB почти в 10 раз больше, чем у Hadoop, эти расходы амортизируются существенно более высокой производительностью выполнения запросов над загруженными данными. Для некоторых задач, таких как задача соединения, десятикратное повышение стоимости загрузки сразу влечет десятикратный же выигрыш в производительности.
Наши эксперименты показывают, что HadoopDB
Наши эксперименты показывают, что HadoopDB может приблизиться в отношении производительности к параллельным системам баз данных, обеспечивая при этом отказоустойчивость и возможность использования в неоднородной среде при тех же правилах лицензирования, что и Hadoop. Хотя производительность HadoopDB, вообще говоря, ниже производительности параллельных систем баз данных, во многом это объясняется тем, что в PostgreSQL таблицы хранятся не по столбцам, и тем, что в PostgreSQL не использовалось сжатие данных. Кроме того, Hadoop и Hive – это сравнительно молодые проекты с открытыми кодами. Мы ожидаем, что их следующие версии будут демонстрирорвать более высокую производительность. От этого автоматически выиграет и HadoopDB.
В HadoopDB применяется некоторый гибрид подходов параллельных СУБД и Hadoop к анализу данных, позволяющий достичь производительности и эффективности параллельных систем баз данных, обеспечивая при этом масштабируемсть, отказоустойчивость и гибкость систем, основанных на MapReduce. Способность HadoopDB к прямому включению Hadoop и программного обеспечения СУБД с открытыми исходными текстами (без изменения кода) делает HadoopDB особенно пригодной для выполнения крупномасштабного анализа данных в будущих рабочих нагрузках.