Самый быстрый способ загрузки данных в PostgreSQL с помощью Python
Last updated
Was this helpful?
Last updated
Was this helpful?
Нам, как прославленным сборщикам данных, часто приходится загружать данные, полученные из удаленного источника, в наши системы. Если нам повезет, данные сериализуются в формате JSON или YAML. Когда нам везет меньше, мы получаем электронную таблицу Excel или файл CSV, который всегда каким-то образом поврежден, и это невозможно объяснить.
Данные крупных компаний или старых систем каким-то образом всегда кодируются странным образом, и системные администраторы всегда думают, что делают нам одолжение, архивируя файлы (пожалуйста, gzip) или разбивая их на более мелкие файлы со случайными именами.
Современные сервисы могут предоставлять достойный API, но чаще всего нам нужно получить файл с FTP, SFTP, S3 или какого-либо собственного хранилища, которое работает только в Windows.
В этой статье мы рассмотрим лучший способ импорта беспорядочных данных из удаленного источника в PostgreSQL.
Чтобы обеспечить реальное и работоспособное решение, мы установили следующие основные роли:
Данные извлекаются из удаленного источника.
Данные загрязнены и их необходимо преобразовать.
Данные большие.
Одиночный запрос на пиво из API выглядит так:
Для краткости я обрезал вывод, но здесь много информации о пиве. В этой статье мы хотим импортировать все поля перед brewers_tips в таблицу базы данных.
Поле volume является вложенным. Мы хотим извлечь из поля только значение value и сохранить его в поле таблицы с именем volume.
Поле first_brewed содержит только год и месяц, а в некоторых случаях только год. Мы хотим преобразовать значение в действительную дату. Например, значение 09/2007 будет преобразовано в дату 01.09.2007. Значение 2006 будет преобразовано в дату 01 января 2016 г.
Давайте напишем простую функцию для преобразования текстового значения в поле в datetime.date Python:
Давайте быстро убедимся, что это работает:
В реальной жизни трансформации могут быть гораздо сложнее. Но для наших целей этого более чем достаточно.
API предоставляет постраничные результаты. Чтобы инкапсулировать подкачку, мы создаем генератор, который выдает пиво одно за другим:
И чтобы использовать функцию генератора, мы вызываем и повторяем ее:
Вы заметите, что первый результат каждой страницы занимает немного больше времени. Это связано с тем, что он выполняет сетевой запрос для получения страницы.
Следующим шагом будет создание таблицы в базе данных для импорта данных.
Создайте базу данных:
Измените haki в примере на своего локального пользователя.
Используя psycopg, создайте соединение с базой данных:
Мы устанавливаем autocommit=True
, чтобы каждая выполняемая нами команда вступала в силу немедленно. Для целей данной статьи это нормально.
Теперь, когда у нас есть соединение, мы можем написать функцию для создания таблицы:
Функция получает курсор и создает нерегистрируемую таблицу с именем staging_beers.
Используя соединение, которое мы создали ранее, функция применяется следующим образом:
Теперь мы готовы перейти к следующей части.
В этой статье нас интересуют две основные метрики: время и память.
Этот пакет обеспечивает использование памяти и дополнительное использование памяти для каждой строки кода. Это очень полезно при оптимизации памяти. Для иллюстрации это пример, представленный в PyPI:
Интересная часть — это столбец Increment, который показывает дополнительную память, выделенную кодом в каждой строке.
В этой статье нас интересует пиковая память, используемая функцией. Пиковый объем памяти — это разница между начальным значением столбца "Mem usage" и максимальным значением (также известным как "high watermark").
Чтобы получить список "Mem usage", мы используем функцию memory_usage из memory_profiler:
При таком использовании функция memory_usage выполняет функцию fn с предоставленными аргументами args и kwargs, а также запускает другой процесс в фоновом режиме для мониторинга использования памяти каждый inetrval секунд.
Аргумент retval сообщает функции, что она должна вернуть результат fn.
Чтобы собрать все это вместе, мы создаем следующий декоратор для измерения и отчета о времени и памяти:
Чтобы исключить взаимное влияние таймингов на память и наоборот, мы выполняем функцию дважды. Во-первых, чтобы рассчитать время, во-вторых, чтобы измерить использование памяти.
Декоратор выведет имя функции и все аргументы ключевого слова, а также сообщит об использованном времени и памяти:
Печатаются только аргументы ключевых слов. Это сделано намеренно, мы собираемся использовать это в параметризованных тестах.
На момент написания API пива содержит только 325 сортов пива. Чтобы работать с большим набором данных, мы дублируем его 100 раз и сохраняем в памяти. Результирующий набор данных содержит 32 500 сортов пива:
Чтобы имитировать удаленный API, наши функции будут принимать итераторы, аналогичные возвращаемому значению iter_beers_from_api:
Для тестирования мы собираемся импортировать данные о пиве в базу данных. Чтобы устранить внешние влияния, такие как сеть, мы заранее получаем данные из API и обслуживаем их локально.
Чтобы получить точное время, мы «подделываем» удаленный API:
В реальной жизненной ситуации вы бы использовали функцию iter_beers_from_api напрямую:
Теперь мы готовы начать!
Чтобы установить базовый уровень, мы начнем с самого простого подхода - вставляем строки одну за другой:
Обратите внимание: при переборе пива мы преобразуем first_brewed в datetime.date и извлекаем значение объема из вложенного поля объема.
Запуск этой функции дает следующий результат:
Для импорта 32 тыс. строк функции потребовалось 129 секунд. Профилировщик памяти показывает, что функция потребляла очень мало памяти.
Интуитивно понятно, что вставка строк одна за другой кажется не очень эффективной. Должно быть, постоянное переключение контекста между программой и базой данных замедляет ее работу.
Выполнить операцию базы данных (запрос или команду) для всех кортежей параметров или сопоставлений, найденных в последовательности vars_list.
Звучит многообещающе!
Давайте попробуем импортировать данные с помощью executemany:
Функция внешне очень похожа на предыдущую функцию, и преобразования такие же. Основное отличие здесь в том, что мы сначала преобразуем все данные в памяти и только потом импортируем их в базу данных.
Запуск этой функции дает следующий результат:
Это разочаровывает. Время стало немного лучше, но теперь функция потребляет 2,7 МБ памяти.
Чтобы оценить использование памяти, файл JSON, содержащий только импортируемые нами данные, весит на диске 25 МБ. Учитывая пропорции, использование этого метода для импорта файла размером 1 ГБ потребует 110 МБ памяти.
Предыдущий метод потреблял много памяти, поскольку преобразованные данные сохранялись в памяти до обработки psycopg.
Давайте посмотрим, можем ли мы использовать итератор, чтобы избежать хранения данных в памяти:
Разница здесь в том, что преобразованные данные «передаются в поток» в метод выполнения с помощью итератора.
Эта функция дает следующий результат:
Наше «потоковое» решение сработало как положено, и нам удалось свести объем памяти к нулю. Однако сроки остаются примерно такими же, даже по сравнению с методом «один за другим».
Текущая реализация метода выполнения executemany() (очень мягко говоря) не особенно эффективна. Эти функции можно использовать для ускорения повторного выполнения оператора с набором параметров. Уменьшив количество обращений к серверу, производительность может быть на несколько порядков выше, чем при использовании метода выполнения executemany().
Значит, мы всё время делали всё неправильно!
Выполняйте группы операторов за меньшее количество обращений к серверу.
Давайте реализуем функцию загрузки с помощью execute_batch:
Выполнение функции:
Ух ты! Это огромный скачок. Функция завершилась менее чем за 4 секунды. Это примерно в 33 раза быстрее, чем 129 секунд, с которых мы начали.
Функция execute_batch использовала меньше памяти, чем executemany для тех же данных. Давайте попробуем освободить память, «передавая» данные в execute_batch с помощью итератора:
Выполнение функции
Получили примерно то же время, но с меньшим объемом памяти.
page_size – максимальное количество элементов списка аргументов, которые можно включить в каждый оператор. Если элементов больше, функция выполнит более одного оператора.
Ранее в документации указывалось, что функция работает лучше, поскольку она меньше обращается к базе данных. В этом случае больший размер страницы должен уменьшить количество обращений туда и обратно и привести к более быстрому времени загрузки.
Давайте добавим в нашу функцию аргумент размера страницы, чтобы мы могли поэкспериментировать:
Размер страницы по умолчанию — 100. Давайте сравним разные значения и результаты:
Мы получили некоторые интересные результаты, давайте разберем их:
1: Результаты аналогичны результатам, которые мы получили, вставляя строки одну за другой.
100: это размер страницы по умолчанию, поэтому результаты аналогичны нашему предыдущему тесту.
1000: Тайминг здесь примерно на 40% быстрее, а памяти мало.
10000: Время не намного быстрее, чем при размере страницы 1000, но объем памяти значительно выше.
Результаты показывают, что существует компромисс между памятью и скоростью. В этом случае кажется, что оптимальным вариантом является размер страницы 1000.
Выполните оператор, используя VALUES с последовательностью параметров.
Функция execute_values генерирует для запроса огромный список VALUES.
Давайте покрутим:
Импорт пива с помощью функции:
Таким образом, прямо из коробки мы получаем небольшое ускорение по сравнению с execute_batch. Однако памяти немного выше.
Как и раньше, чтобы уменьшить потребление памяти, мы стараемся избегать хранения данных в памяти, используя итератор вместо списка:
Выполнение функции дало следующие результаты:
Итак, время почти такое же, но память вернулась к нулю.
Как и execute_batch, функция execute_values также принимает аргумент page_size:
Выполнение с разными размерами страниц:
Как и в случае с execute_batch, мы видим компромисс между памятью и скоростью. Здесь также оптимальный размер страницы составляет около 1000. Однако, используя execute_values, мы получили результаты примерно на 20% быстрее по сравнению с тем же размером страницы, используя execute_batch.
Давайте разберемся:
clean_csv_value: преобразует одно значение.
Экранирование новых строк: некоторые текстовые поля содержат символы новой строки, поэтому мы экранируем \n -> \\n
.
Пустые значения преобразуются в \N: строка «\N» — это строка по умолчанию, используемая PostgreSQL для обозначения NULL в COPY (это можно изменить с помощью опции NULL).
csv_file_like_object.write: Преобразовать пиво в строку CSV.
Преобразование данных: здесь выполняются преобразования first_brewed и valume.
Выберите разделитель: некоторые поля в наборе данных содержат произвольный текст с запятыми. Чтобы предотвратить конфликты, мы выбираем «|»
в качестве разделителя (другой вариант — использовать QUOTE).
Теперь давайте посмотрим, окупилась ли вся эта тяжелая работа:
Команда copy — самая быстрая из всех, что мы когда-либо видели! При использовании COPY процесс завершился менее чем за секунду. Однако кажется, что этот метод гораздо более расточителен с точки зрения использования памяти. Функция занимает 99 МБ, что более чем в два раза превышает размер нашего файла JSON на диске.
Одним из основных недостатков использования копирования с помощью StringIO является то, что весь файл создается в памяти. Что, если вместо создания всего файла в памяти мы создадим файлоподобный объект, который будет действовать как буфер между удаленным источником и командой COPY. Буфер будет использовать JSON через итератор, очищать и преобразовывать данные и выводить чистый CSV.
Чтобы продемонстрировать, как это работает, можно создать объект, подобный файлу CSV, из списка чисел:
Обратите внимание, что мы использовали f как файл. Внутри он извлекал строки из gen только тогда, когда его внутренний буфер строк был пуст.
Функция загрузки с использованием StringIteratorIO выглядит следующим образом:
Основное отличие состоит в том, что CSV-файл beers используется по требованию, и данные не сохраняются в памяти после его использования.
Давайте выполним функцию и посмотрим результаты:
Большой! Время мало, и память вернулась к нулю.
Пытаясь еще немного снизить производительность, мы замечаем, что, как и в случае с page_size, команда копирования также принимает аналогичный аргумент, называемый size:
size – размер буфера, используемого для чтения из файла.
Добавим в функцию аргумент size:
Значение размера по умолчанию — 8192, что равно 2 ** 13, поэтому мы будем сохранять размеры в степени 2:
В отличие от предыдущих примеров, кажется, что компромисса между скоростью и памятью нет. Это имеет смысл, поскольку этот метод был разработан так, чтобы не потреблять память. Однако при изменении размера страницы мы получаем разное время. Для нашего набора данных оптимальным является значение по умолчанию 8192.
Краткое изложение результатов:
insert_one_by_one()
128.8
0.08203125
insert_executemany()
124.7
2.765625
insert_executemany_iterator()
129.3
0.0
insert_execute_batch()
3.917
2.50390625
insert_execute_batch_iterator(page_size=1)
130.2
0.0
insert_execute_batch_iterator(page_size=100)
4.333
0.0
insert_execute_batch_iterator(page_size=1000)
2.537
0.2265625
insert_execute_batch_iterator(page_size=10000)
2.585
25.4453125
insert_execute_values()
3.666
4.50390625
insert_execute_values_iterator(page_size=1)
127.4
0.0
insert_execute_values_iterator(page_size=100)
3.677
0.0
insert_execute_values_iterator(page_size=1000)
1.468
0.0
insert_execute_values_iterator(page_size=10000)
1.503
2.25
copy_stringio()
0.6274
99.109375
copy_string_iterator(size=1024)
0.4536
0.0
copy_string_iterator(size=8192)
0.4596
0.0
copy_string_iterator(size=16384)
0.4649
0.0
copy_string_iterator(size=65536)
0.6171
0.0
Теперь большой вопрос: что мне использовать? Как всегда, ответ: «Это зависит».
Каждый метод имеет свои преимущества и недостатки и подходит для разных обстоятельств:
Отдавайте предпочтение встроенным подходам для сложных типов данных.
executemany, execute_values и execute_batch позаботятся о преобразовании типов данных Python в типы базы данных. Подходы CSV требуют экранирования.
Отдавайте предпочтение встроенным подходам для небольших объемов данных.
Встроенные подходы более читабельны и с меньшей вероятностью сломаются в будущем. Если память и время не проблема, будьте проще!
Предпочитайте подходы copy для больших объемов данных.
Подход копирования copy больше подходит для больших объемов данных, где память может стать проблемой.
Я нашел , поэтому мы собираемся импортировать данные в таблицу пива в базе данных.
Для подключения Python к базе данных PostgreSQL мы используем :
Данные, записываемые в , не будут регистрироваться в журнале упреждающей записи (WAL), что делает его идеальным для промежуточных таблиц. Обратите внимание, что таблицы UNLOGGED не будут восстановлены в случае сбоя и не будут реплицироваться.
Для измерения времени для каждого метода мы используем встроенный :
Функция предоставляет часам максимально возможное разрешение, что делает ее идеальной для наших целей.
Для измерения потребления памяти мы воспользуемся пакетом .
Для очень быстрых операций функция fn может выполняться более одного раза. Установив интервал interval , мы заставляем его выполняться только один раз.
Psycopg2 предоставляет возможность вставлять множество строк одновременно с помощью выполнения . Из документов:
В документации psycopg в разделе есть очень интересное примечание о выполнении executemany:
Функция чуть ниже этого раздела — :
Когда я читал , мне в глаза бросился аргумент page_size:
Жемчужины в документации psycopg не заканчиваются на execute_batch. Просматривая документацию, мое внимание привлекла еще одна функция под названием :
В официальной документации PostgreSQL есть целый раздел, посвященный . Согласно документации, лучший способ загрузить данные в базу данных — использовать .
Чтобы использовать копирование copy из Python, psycopg предоставляет специальную функцию . Для команды копирования copy требуется файл CSV. Давайте посмотрим, сможем ли мы преобразовать наши данные в CSV и загрузить их в базу данных с помощью copy_from:
csv_file_like_object: создайте объект, подобный файлу, с помощью . Объект StringIO содержит строку, которую можно использовать как файл. В нашем случае это CSV-файл.
Вдохновленные , мы создали объект, который использует итератор и предоставляет файловый интерфейс:
Исходный код этого теста можно найти .