trvrm.github.io

Database

Efficient Postgres Bulk Inserts Take 2

Mon 24 December 2018


In a previous post I outlined a technique for achieving highly efficient bulk inserts from Python into a Postgres database.

The heart of this technique relies on passing multiple rows to postgres as a single parameter, and using the unnest function to convert that parameter from an array into a set of rows:

INSERT INTO upload_time_test(text,properties)
  SELECT unnest( %(texts)s ) ,
         unnest( %(properties)s)  

More recently, I've been using an even more expressive technique that relies on jsonb_array_elements. Like unnest, this function takes a single parameter and unrolls it into multiple rows, but unlike unnest we only need to use the function once, rather than once per column.

For example, imagine we have a table like this:

create table test(id int primary key, firstname text, lastname text, age int);

We could insert values into it one at a time like this:

    INSERT INTO test (id, firstname,lastname,age) 
         VALUES (%(id)s, %(firstname)s, %(lastname)s, %(age)s)

and run a Python loop over the rows, calling this insert once for every row:

    INSERT = """
        INSERT INTO test (id, firstname,lastname,age) 
             VALUES (%(id)s, %(firstname)s, %(lastname)s, %(age)s)
    """

    with engine.connect() as connection:
        for row in rows:
            connection.execute(INSERT,row)

In testing, it took about 8 seconds to insert 10,000 rows using this technique. This clearly doesn't scale to larger datasets. We need a way of inserting multiple rows simultaneously.

Enter jsonb_array_elements:

    INSERT = """
        INSERT INTO test (id, firstname,lastname,age) 
            SELECT 
                (el->>'id')::int,
                el->>'firstname',
                el->>'lastname',
                (el->>'age')::int
              FROM (
                    SELECT jsonb_array_elements(%(data)s) el
              ) a;
    """

    with engine.connect() as connection:
        connection.execute(INSERT,data=Json(rows))

This code took only 70 milliseconds to insert 10,000 rows, representing a 100-fold speedup!

A full demonstration of this technique is available at https://github.com/trvrm/bulktest

Efficient Postgres Bulk Inserts using Psycopg2 and Unnest

Thu 22 October 2015


One of the biggest challenges I face maintaining large Postgres systems is getting data into them in an efficient manner. Postgres is very, very good at maintaining, organising, querying and retrieving data, but inserts themselves can be quite slow.

Various stackoverflow questions suggest using things like COPY , but that assumes your application has direct write access to the machine that is running Postgres, and I work very hard to maintain strict seperation of functionality between software components.

So I've been looking for a faster way of inserting, say, 100,000 rows into the database across the wire, and what follows is by far the most efficient technique I've found so far.

Set some visualisation options

%matplotlib inline
import seaborn

Import the libraries we'll be using

import pandas
from numpy import array_split
import psycopg2
from psycopg2.extras import Json
import time
import contextlib

A context manager for timing operations.

@contextlib.contextmanager
def timer(name="duration"):
    'Utility function for timing execution'
    start=time.time()
    yield
    duration=time.time()-start
    print("{0}: {1} second(s)".format(name,duration))

Test setup

My tests are based on creating a fairly simple table and timing how long it takes to perform inserts on it via several different methods. The table has a couple of default columns, a text column, and an HSTORE column.

SETUP_SQL="""
    DROP TABLE IF EXISTS upload_time_test;

    CREATE TABLE upload_time_test(
        uuid uuid primary key default uuid_generate_v4(),
        created timestamp with time zone not null default now(),
        text text not null,
        properties hstore not null default ''::hstore
    );

    GRANT ALL ON upload_time_test TO test;
"""

This is the SQL we'll use for inserting a single row into the database table:

SINGLE_INSERT="""
    INSERT INTO upload_time_test(text,properties)
         VALUES (%(text)s, %(properties)s)
"""

Credentials for connecting to my local test database

HOST='localhost'
DATABASE='test'
USER='test'
PASSWORD='password'

Then we define a couple of simple wrappers around psycopg2

def connect():
    connection= psycopg2.connect(host=HOST,database=DATABASE,user=USER,password=PASSWORD)
    psycopg2.extras.register_hstore(connection)
    return connection
def execute(sql,params={}):
    with connect() as connection:
        with connection.cursor() as cursor:
            cursor.execute(sql,params)

This is the heart of my tests. The Tester class destroys and re-creates the sample table every time we instantiate it.

It provides three different functions for inserting database rows, each based on a different technique.

  • slowInsert() is the slowest, because it creates a new database connection for each row
  • insert() is the approach I had been using up till now. It creates one connection, and re-uses it for each insertion. This is basically what executemany() in psycopg2 is doing.
  • fastInsert() is my new approach, based on using unnest() to unroll a set of arrays passed in through psycopg2
class Tester():
    def __init__(self,count):
        execute(SETUP_SQL)
        self.count=count

        self.data=[
            {
                'text':'Some text',
                'properties': {"key":"value"},
            }
            for i in range(count)
        ]

    def slowInsert(self):
        '''
            Creates a new connection for each insertion
        '''
        for row in self.data:
            text=row['text']
            properties=row['properties']
            execute(SINGLE_INSERT,locals())

    def insert(self):
        '''
            One connection.
            Multiple queries.
        '''
        with connect() as connection:
            with connection.cursor() as cursor:
                for row in self.data:
                    text=row['text']
                    properties=row['properties']
                    cursor.execute(SINGLE_INSERT,locals())


    def fastInsert(self):
        '''
            One connection, one query.
        '''
        sql='''
            INSERT INTO upload_time_test(text,properties)
              SELECT unnest( %(texts)s ) ,
                     unnest( %(properties)s)

        '''

        texts=[r['text'] for r in self.data]
        properties=[r['properties'] for r in self.data]
        execute(sql,locals())

So now we have the Tester class written, we can see how log each approach takes to insert 1000 rows

def runTests(iterations):
    tester = Tester(iterations)
    with timer('slow'):
        tester.slowInsert()
    with timer('normal'):
        tester.insert()
    with timer('fast'):
        tester.fastInsert()
runTests(1000)
slow: 7.160489320755005 second(s)
normal: 0.1441025733947754 second(s)
fast: 0.042119503021240234 second(s)

We notice an obvious difference between the approaches.

Re-using the connection makes a huge difference. Inserts run 50 times faster.

Using unnest() runs 3 times faster than that.

What about much bigger data sets?

Next, I wanted to know if this held true for inserting, say, 100,000 rows. I won't bother with the slowest approach, because that's clearly unusable.

tester=Tester(count=100000)
with timer('normal'):
    tester.insert()

tester=Tester(count=100000)
with timer('fast'):
    tester.fastInsert()
normal: 14.866096019744873 second(s)
fast: 3.9566986560821533 second(s)

So even over 100,000 rows we still run nearly 4 times faster using unnest

Further investigation - mapping insertion rate against number of rows inserted.

I wanted to see the exact relationship between the rate of insertion and the number of rows being inserted.

So first I wrote a couple of functions to measure the insertion rate of our two methods:

def bulkInsertRate(count):
    tester=Tester(count)
    start=time.time()
    tester.fastInsert()
    duration=time.time()-start
    return count/duration

def normalInsertRate(count):
    tester=Tester(count)
    start=time.time()
    tester.insert()
    duration=time.time()-start
    return count/duration

And then we run them with a range of dataset sizes

counts=[50,100,200,500,1000,2000,5000,10000,20000,50000,100000]

rates=[
    {
        "count":count,
         'bulk':bulkInsertRate(count),
        'normal':normalInsertRate(count)

    }
    for count in counts
]

Finally, I use Pandas to plot the output data from these tests.

frame=pandas.DataFrame(rates).set_index('count')
frame
bulk normal
count
50 4485.694730 3867.856879
100 10159.389609 4847.897547
200 15212.186276 6057.106548
500 27340.842720 7081.049689
1000 33248.545382 7694.657609
2000 35640.695767 7070.777670
5000 41223.200473 8027.790910
10000 40948.723106 7785.005392
20000 42604.387914 7568.314015
50000 40795.233470 7291.552509
100000 27014.354119 6872.935483
frame.plot(logx=True)
<matplotlib.axes._subplots.AxesSubplot at 0x7fcbdecd0908>

Conclusion

Using unnest to load multiple rows simultaneously has the following advantages:

  • It is significantly faster than a regular insert loop, especially when inserting thousands of rows.
  • The benefits of using unnest() increase at least up to 50,000 rows
  • It still allows us to write (reasonably) straightforward parameterised SQL with no string concatenation
  • When I tried this on a remote database, the improvements were even more impressive, presumably as it reduces significantly how much data is transferred back and forth across the network.

Postgres JSON Aggregation

Thu 01 January 2015


I've been using the new JSON functionality in Postgres a lot recently: I'm fond of saying that Postgresql is the best NoSQL database available today. I'm quite serious about this: having used key-value and JSON stores such as CouchDB in the past, it's amazing to me how the Postgres developers have managed to marry the best of traditional relational technology with the flexibility of schema-free JSON documents.

As of version 9.3, postgres allows you to create JSON column types, and provides a number of functions to access and iterate through the data stored in them.

This week I discovered another hidden gem - json_agg() . This function lets you take the results from an aggregation operation and convert them into a JSON block - very helpful if you're then going to work with the returned data in a language like Python

To demonstrate this, we'll first set up some simple tables.

%load_ext sql
%config SqlMagic.feedback=False
%%sql
postgresql://testuser:password@localhost/test
u'Connected: testuser@test'
%%sql
CREATE TABLE IF NOT EXISTS person (
    name text primary key
);

INSERT INTO person (name) VALUES
('emily'),('arthur'),('nicki'),('oliver')
;
[]

We can query this in the usual way:

%sql SELECT * FROM person;
name
emily
arthur
nicki
oliver

But we can also use json_agg()

%sql SELECT json_agg(name) FROM person
json_agg
[u'emily', u'arthur', u'nicki', u'oliver']

Which gives us a single object to work with. So far, this isn't particularly helpful, but it becomes very useful when we start doing JOINS

%%sql
CREATE TABLE IF NOT EXISTS action(
    id serial primary key,
    created timestamp with time zone default now(),
    person_name text references person,
    type text not null
);

INSERT INTO action(person_name, type) VALUES ('emily','login');
INSERT INTO action(person_name, type) VALUES ('emily','pageview');
INSERT INTO action(person_name, type) VALUES ('arthur','login');
INSERT INTO action(person_name, type) VALUES ('emily','logout');
INSERT INTO action(person_name, type) VALUES ('nicki','password_change');
INSERT INTO action(person_name, type) VALUES ('nicki','createpost');
[]

If we want to ask Postgres to give us every user and every action they've performed, we could do it this way:

%sql SELECT person.name,  action.type , action.created FROM action JOIN person ON action.person_name=person.name
name type created
emily login 2014-11-08 17:45:05.963569-05:00
emily pageview 2014-11-08 17:45:05.964663-05:00
arthur login 2014-11-08 17:45:05.965214-05:00
emily logout 2014-11-08 17:45:05.965741-05:00
nicki password_change 2014-11-08 17:45:05.966274-05:00
nicki createpost 2014-11-08 17:45:05.966824-05:00

But then iterating through this recordset is a pain - I can't easily construct a nested for loop to iterate through each person and then through each action.

Enter json_agg()

%sql SELECT person.name,  json_agg(action) FROM action JOIN person ON action.person_name=person.name GROUP BY person.name
name json_agg
arthur [{u'person_name': u'arthur', u'type': u'login', u'id': 3, u'created': u'2014-11-08 17:45:05.965214-05'}]
emily [{u'person_name': u'emily', u'type': u'login', u'id': 1, u'created': u'2014-11-08 17:45:05.963569-05'}, {u'person_name': u'emily', u'type': u'pageview', u'id': 2, u'created': u'2014-11-08 17:45:05.964663-05'}, {u'person_name': u'emily', u'type': u'logout', u'id': 4, u'created': u'2014-11-08 17:45:05.965741-05'}]
nicki [{u'person_name': u'nicki', u'type': u'password_change', u'id': 5, u'created': u'2014-11-08 17:45:05.966274-05'}, {u'person_name': u'nicki', u'type': u'createpost', u'id': 6, u'created': u'2014-11-08 17:45:05.966824-05'}]

Which becomes much more usable in Python:

people = %sql SELECT person.name,  json_agg(action) FROM action JOIN person ON action.person_name=person.name GROUP BY person.name
for name, actions in people:
    print name
arthur
emily
nicki
for name, actions in people:
    print name
    for action in actions:
        print '\t',action['type']
arthur
    login
emily
    login
    pageview
    logout
nicki
    password_change
    createpost

So now we've managed to easily convert relational Postgres data into a hierarchical Python data structure. From here we can easily continue to XML, JSON, HTML or whatever document type suits your need.

Postgres Timestamps

Thu 01 January 2015


At my company, I maintain a large distributed. data collection platform . Pretty much every record we collect needs to be stamped with a created field. But because the incoming data comes from sources on various devices in multiple countries and timezones, making sure that the timestamps are precise and meaningful can be a challenge. Postgres can do this very elegantly, but can also trip you up in subtle ways.

Postgres has two subtly different timestamp data types: TIMESTAMP and TIMESTAMP WITH TIMEZONE .

The former stores year/month/day/hour/minutes/second/milliseconds, as you’d expect, and the later ALSO stores a timezone offset, expressed in hours.

We can switch between the two using the AT TIMEZONE syntax, but, and here is the tricky bit the function goes BOTH WAYS, and you can easily get confused if you don’t know what type you’re starting with.

Furthermore, Postgres will sometimes sneakily convert one to the other without you asking.

%load_ext sql
%config SqlMagic.feedback=False
%%sql
postgresql://testuser:password@localhost/test
u'Connected: testuser@test'
%sql SELECT NOW();
now
2014-08-18 22:33:58.998549-04:00

now() returns a TIMESTAMP WITH TIME ZONE . It shows the current local time, and the offset between that time and UTC (http://time.is/UTC)

But if we put the output from now() into a field that has type TIMESTAMP we will get a silent conversion:

%sql SELECT NOW():: timestamp
now
2014-08-18 22:33:58.998549

Which is not the current UTC time. We have stripped the timezone offset right of it. However, if we explicitly do the conversion, we get:

%sql SELECT NOW() AT TIME ZONE 'UTC';
timezone
2014-08-19 02:33:58.998549

Which is the current UTC time: (http://time.is/UTC)

It's worth reviewing the Postgresql documentation on this construct at this point.

Expression

Return Type

Description

timestamp without time zone AT TIME ZONE zone

timestamp with time zone

Treat given time stamp without time zone as located in the specified time zone

timestamp with time zone AT TIME ZONE zone

timestamp without time zone

Convert given time stamp with time zone to the new time zone, with no time zone designation

The danger here is that the AT TIMEZONE construct goes both ways. If you don't know what type you're feeding in, you won't know what type you're getting out. I've been bitten by this in the past; ending up with a timestamp that is wrong by several hours because I wasn't clear about my inputs.

Specifically, consider a table that looks like this:

%%sql
DROP TABLE IF EXISTS test;
CREATE TABLE test(name TEXT, created TIMESTAMP DEFAULT NOW());

Which I then populate:

%%sql
INSERT INTO test (name) VALUES ('zaphod beeblebrox');
INSERT INTO test(name,created) VALUES('ford prefect',now() at time zone 'utc');
SELECT * FROM test;
name created
zaphod beeblebrox 2014-08-18 22:34:03.620583
ford prefect 2014-08-19 02:34:03.621957

Note that the second record contains the current UTC time, but the first contains the current time local to the database server. This seems a good idea, and tends to work fine in local testing. But when you try to maintain a system where the database may be in one province, the data collected in another, and then reviewed in a third, you start to understand why this is too simplistic.

The fact that it's 10:12 now in Toronto isn't very helpful for a record that's getting created for a user in Halifax and is monitored from Vancouver.

So it's probably best to save timestamps WITH their timezone so as to avoid any ambiguity. This is the recommendation given here.

In our above example, the simplest approach is to change the table definition:

%%sql
DROP TABLE IF EXISTS test;
CREATE TABLE test(name TEXT, created TIMESTAMP WITH TIME ZONE DEFAULT (NOW() ));
%%sql
INSERT INTO test (name) VALUES ('zaphod beeblebrox');
INSERT INTO test(name,created) VALUES('ford prefect',now() );
SELECT * FROM test;
name created
zaphod beeblebrox 2014-08-18 22:35:15.988764-04:00
ford prefect 2014-08-18 22:35:15.989726-04:00

So now the dates are globally meaningful. But I still have to be careful, because if I use the wrong date format to populate this table, it'll still get messed up.

%sql INSERT INTO test(name,created) VALUES ('arthur dent',now() at time zone 'utc')
%sql SELECT * FROM test;
name created
zaphod beeblebrox 2014-08-18 22:35:15.988764-04:00
ford prefect 2014-08-18 22:35:15.989726-04:00
arthur dent 2014-08-19 02:35:15.990308-04:00

Note how arthur dent has completely the wrong created time.

Now, if I want to report on this data, I'm going to now have to specify which timezone I want the dates formatted too:

%sql delete from test WHERE name='arthur dent';
%sql select name, created FROM test;
name created
zaphod beeblebrox 2014-08-18 22:35:15.988764-04:00
ford prefect 2014-08-18 22:35:15.989726-04:00

gives me timestamps formatted in the timezone of the database server, which isn't necessarily particularly helpful, which may be helpful, but will be less so if the actual users of the data are in a different time zone.

%sql  SELECT name, created at time zone 'utc' FROM test;
name timezone
zaphod beeblebrox 2014-08-19 02:35:15.988764
ford prefect 2014-08-19 02:35:15.989726

gives me the time formatted in the UTC timezone, and

%sql select CREATED at time zone 'CST' FROM test;
timezone
2014-08-18 20:35:15.988764
2014-08-18 20:35:15.989726

gives me the time formatted for central standard time.

external data

Now so far we've been letting the database create the timestamps, but sometimes we want to save data provided to us from an external source. In this case it's very important the we know what timezone the incoming data comes from. So our middleware should require that all dates include a timestamp. Fortunately, if we're writing javascript applications, we get this automatically:

%%html
<div id="js-output"></div>
%%javascript
var d = JSON.stringify(new Date())
"2014-08-19T02:41:12.872Z"
import psycopg2,pandas
def execute(sql,params={}):
    with psycopg2.connect(database='test') as connection:
        with connection.cursor() as cursor:
            cursor.execute(sql,params)

So let's imagine that we got this string submitted to us by a client, and we're going to store it in the database via some Python code.

sql="INSERT INTO test (name, created) VALUES ( 'externally created date', %(date)s)"
params=dict(date="2014-08-19T02:35:24.321Z")
execute(sql,params)
%sql SELECT * FROM test
name created
zaphod beeblebrox 2014-08-18 22:35:15.988764-04:00
ford prefect 2014-08-18 22:35:15.989726-04:00
externally created date 2014-08-18 22:35:24.321000-04:00

And now we're getting to the point where all our timestamp data is both stored and displayed unambiguously.