Как UPSERT (слияние, вставка ... при дублировании обновления) в PostgreSQL?



очень часто задаваемый вопрос здесь-как сделать upsert, который является тем, что MySQL называет INSERT ... ON DUPLICATE UPDATE и стандартные поддержки как часть MERGE операции.



учитывая, что PostgreSQL не поддерживает его напрямую (до pg 9.5), как вы это делаете? Рассмотрим следующее:



CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');


теперь представьте, что вы хотите "upsert" кортежи (2, 'Joe'),(3, 'Alan'), так что новое содержание таблицы будет:



(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple


вот, о чем говорят люди, когда обсуждая upsert. Принципиально, любой подход должен быть безопасно при наличии нескольких транзакций, работающих на одной и той же таблице - либо с помощью явной блокировки, либо иным образом защищаясь от результирующих условий гонки.



эта тема широко обсуждается на Insert, при дублировании обновления в PostgreSQL?, но это касается альтернатив синтаксису MySQL, и со временем он вырос довольно много несвязанных деталей. Я работаю над окончательным ответы.



эти методы также полезны для "вставить, если не существует, в противном случае ничего не делать", т. е. "вставить ... на дубликат ключа игнорировать".

1976   6  

6 ответов:

9.5 и новее:

PostgreSQL 9.5 и более новая поддержка INSERT ... ON CONFLICT UPDATEON CONFLICT DO NOTHING), т. е. upsert.

сравнению с ON DUPLICATE KEY UPDATE.

краткое описание.

для использования см. руководство - а конкретно conflict_action предложения в синтаксической диаграмме, и пояснительным текстом.

в отличие от решений для 9.4 и старше, которые даются ниже эта функция работает с несколькими конфликтующими строками и не требует исключительной блокировки или цикла повтора.

фиксация добавление функции здесь и дискуссия вокруг его развития здесь.


если вы находитесь на 9.5 и не должны быть обратно совместимы вы можете остановить чтение сейчас.


9.4 и старше:

PostgreSQL не имеет встроенного UPSERT (или MERGE) объект, и делать это эффективно при одновременном использовании очень сложно.

эта статья обсуждает проблему в полезных деталях.

в общем, вы должны выбрать один из двух вариантов:

  • отдельные операции вставки / обновления в цикле повтора; или
  • блокировка таблицы и выполнение пакетного слияния

цикл повтора отдельных строк

использование отдельной строке upserts в цикле повтора является разумным вариантом, если вы хотите, чтобы многие соединения одновременно пытались выполнять вставки.

документация PostgreSQL содержит полезную процедуру, которая позволит вам сделать это в цикле внутри базы данных. Он защищает от потерянных обновлений и вставляет гонки, в отличие от большинства наивных решений. Он будет работать только в READ COMMITTED режим и безопасен только в том случае, если это единственное, что вы делаете в транзакции. Функция не будет работать правильно, если триггеры или вторичные уникальные ключи вызывают уникальные нарушения.

эта стратегия очень неэффективна. Всякий раз, когда это практично, вы должны поставить работу в очередь и сделать массовый upsert, как описано ниже.

многие попытки решения этой проблемы не учитывают откаты, поэтому они приводят к неполным обновлениям. Две транзакции соревнуются друг с другом; одна из них успешно INSERTs; другой получает дубликат ключевой ошибки и делает . Элемент UPDATE блоки в ожидании INSERT откат или фиксация. Когда он откатывается, то UPDATE условие повторной проверки соответствует нулевым строкам, так что даже если UPDATE commits это на самом деле не сделал upsert вы ожидали. Вы должны проверить количество строк результата и повторить попытку, где это необходимо.

некоторые попытки решения также не учитывают избранные расы. Если вы попробуете очевидное и простое:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

затем, когда два запуска одновременно есть несколько режимов отказа. Одно это уже обсуждалась проблема с повторной проверкой обновления. Другой-это где оба UPDATE в то же время, сопоставление нулевых строк и продолжение. Тогда они оба делают EXISTS тест, который происходит до the INSERT. Оба получают нулевые строки, поэтому оба делают INSERT. Один сбой с повторяющейся ключевой ошибкой.

вот почему вам нужно повторно попробовать цикл. Можно подумать, что вы можете допустить ошибки повторения ключа или потерянные обновления с умными SQL, но вы не можете. Вам нужно проверить количество строк или ручка дублируйте ключевые ошибки (в зависимости от выбранного подхода) и повторите попытку.

пожалуйста, не бросайте свое собственное решение для этого. Как и в случае с очередью сообщений, это, вероятно, неправильно.

Навальный upsert с замком

иногда вы хотите сделать массовый upsert, где у вас есть новый набор данных, который вы хотите объединить в более старый существующий набор данных. Это значительно эффективнее, чем отдельные строки upserts и должны быть предпочтительными всякий раз, когда практическое.

в этом случае вы обычно выполняете следующий процесс:

  • CREATE a TEMPORARY стол

  • COPY или bulk-вставьте новые данные в временную таблицу

  • LOCK целевой таблице IN EXCLUSIVE MODE. Это позволяет другим транзакциям SELECT, но не вносите никаких изменений в таблице.

  • сделать UPDATE ... FROM существующих записей с использованием значений в временная таблица;

  • сделать INSERT строк, которые еще не существуют в целевой таблице;

  • COMMIT, освободив замок.

например, для примера, приведенного в вопросе, используя многозначный INSERT чтобы заполнить временную таблицу:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

статьи

а как же MERGE?

SQL-standard MERGE на самом деле имеет плохо определенную семантику параллелизма и не подходит для upserting без блокировки таблицы в первую очередь.

это действительно полезный оператор OLAP для слияния данных, но на самом деле это не полезное решение для безопасного параллелизма upsert. Есть много советов для людей, использующих другие СУБД, чтобы использовать MERGE для вставки, но это на самом деле неправильный.

Другое ГСМ:

Я пытаюсь внести свой вклад в другое решение для одной проблемы вставки с версиями PostgreSQL до 9.5. Идея состоит в том, чтобы просто попытаться выполнить первую вставку, и в случае, если запись уже присутствует, обновить ее:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

обратите внимание, что это решение может быть применено только если нет удаления строк таблицы.

Я не знаю об эффективности этого решения, но мне оно кажется достаточно разумным.

вот несколько примеров для insert ... on conflict ... ( pg 9.5+):

  • "вставить", на конфликте - ничего.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;

  • "вставить", на конфликте - do update укажите конфликта через колонки.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3;

  • "вставить", на конфликте - do update укажите конфликта через ограничение имя.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

протестировано на Postgresql 9.3

SQLAlchemy upsert для Postgres >=9.5

поскольку большой пост выше охватывает множество различных подходов SQL для версий Postgres (не только не-9.5, как в вопросе), я хотел бы добавить, как это сделать в SQLAlchemy, если вы используете Postgres 9.5. Вместо реализации собственного upsert, вы также можете использовать функции SQLAlchemy (которые были добавлены в SQLAlchemy 1.1). Лично я бы рекомендовал использовать их, если это возможно. Не только из-за удобства, но и потому, что позволяет PostgreSQL обрабатывать любые условия гонки, которые могут возникнуть.

кросс-постинг из другого ответа, который я дал вчера (https://stackoverflow.com/a/44395983/2156909)

SQLAlchemy поддерживает ON CONFLICT теперь с двумя методами on_conflict_do_update() и on_conflict_do_nothing():

копировать из документации:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

С этот вопрос был закрыт, я публикую здесь, как вы это делаете с помощью SQLAlchemy. Через рекурсию он повторяет массовую вставку или обновление для борьбы с гонки и ошибки валидации.

сначала импорт

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

теперь пара вспомогательных функций

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

и, наконец, функция upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

вот как вы его используете

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

преимущество над bulk_save_objects это то, что он может обрабатывать отношения, проверку ошибок и т. д. На insert (в отличие от массовые операции).

Comments

    Ничего не найдено.