aiopg, aiopg.sa and aiopg8000

Why?:

There are two available async adaptors for postgreSQL in python: aiopg, aiopg.sa & aiopg8000

So, benchmarking these packages is the first step before using one of them.

Last result with 90 workers and inserting 50 rows per worker:

[aiopg.sa]  Total time: 16.52s
[aiopg] Total time: 6.08s
[aiopg8000] Total time: 62.20s
[aiopg] is 2.72x faster than [aiopg.sa]
[aiopg] is 10.22x faster than [aiopg8000]
[aiopg.sa] is 3.76x faster than [aiopg8000]

Code

To run the code, create databases and install dependencies first:

$ sudo -u postgres createdb aiopg8000
$ sudo -u postgres createdb aiopg
$ sudo -u postgres createdb aiopg_sa
$ pip-3.5 install aiopg aiopg8000 sqlalchemy

Here is the code:

import asyncio
import datetime
import aiopg
# noinspection PyPackageRequirements
import aiopg8000
import sqlalchemy as sa
from aiopg.sa import create_engine

ROWS_PER_WORKER = 50
WORKERS_COUNT = 90


#####################################
# aiopg8000
#####################################


async def aiopg8000_stream():
    return await asyncio.open_connection(
        host='127.0.0.1',
        port=5432,
        # proto=socket.IPPROTO_TCP,
        ssl=False)


async def aiopg8000_create_connection():
    return await aiopg8000.connect(
        stream_generator=aiopg8000_stream,
        user='postgres',
        password='postgres',
        database='aiopg8000'
    )


async def aiopg8000_setup_db():
    db = await aiopg8000_create_connection()
    c = await db.cursor()
    await c.execute('DROP TABLE IF EXISTS t1')
    await c.execute(
        'CREATE TABLE t1('
        '   id    SERIAL  NOT NULL PRIMARY KEY, '
        '    age int not null, '
        '    name varchar(50) null)'
    )
    await db.commit()
    await c.yield_close()
    await db.yield_close()


async def aiopg8000_worker(name: str):
    db = await aiopg8000_create_connection()
    c = await db.cursor()
    for i in range(ROWS_PER_WORKER):
        q = await c.mogrify(
                'INSERT INTO t1(age, name) VALUES(%(age)s, %(name)s) RETURNING id, age, name',
                dict(
                    age=i,
                    name=name
                )
            )
        await c.execute(
            q
        )
        results = await c.fetchall()
        for row in results:
            _id, age, name = row
            if _id % 100 == 0:
                print('id = %s, age = %s, name = %s' % (_id, age, name))
        await db.commit()
        # time.sleep(.3)

    await c.yield_close()
    await db.yield_close()


####################################
# aiopg
####################################

aiopg_dsn = 'dbname=aiopg user=postgres password=postgres host=127.0.0.1'
aiopg_pool = None


async def aiopg_setup_db():
    global aiopg_pool
    aiopg_pool = await aiopg.create_pool(aiopg_dsn)
    async with aiopg_pool.acquire() as conn:
        async with conn.cursor() as c:
            await c.execute('DROP TABLE IF EXISTS t1')
            await c.execute(
                'CREATE TABLE t1('
                '   id    SERIAL  NOT NULL PRIMARY KEY, '
                '    age int not null, '
                '    name varchar(50) null)'
            )


async def aiopg_worker(name: str):
    async with aiopg_pool.acquire() as conn:
        async with conn.cursor() as c:
            for i in range(ROWS_PER_WORKER):
                await c.execute(
                    'INSERT INTO t1(age, name) VALUES(%s, %s ) RETURNING id, age, name', (i, name)
                )
                results = await c.fetchall()
                for row in results:
                    _id, age, name = row
                    if _id % 100 == 0:
                        print('id = %s, age = %s, name = %s' % (_id, age, name))


########################################
# aiopg.sa
########################################

aiopg_sa_metadata = sa.MetaData()
aiopg_sa_t1 = sa.Table(
    't1', aiopg_sa_metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('age', sa.Integer),
    sa.Column('name', sa.String(50))
)
aiopg_sa_engine = None


async def aiopg_sa_setup_db():
    global aiopg_sa_engine
    aiopg_sa_engine = await create_engine(
        user='postgres',
        database='aiopg_sa',
        host='127.0.0.1',
        password='postgres')
    async with aiopg_sa_engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS t1')
        await conn.execute(
            'CREATE TABLE t1('
            '   id    SERIAL  NOT NULL PRIMARY KEY, '
            '    age int not null, '
            '    name varchar(50) null)'
        )


async def aiopg_sa_worker(name: str):
    async with aiopg_sa_engine.acquire() as conn:
        for i in range(ROWS_PER_WORKER):
            results = await conn.execute(aiopg_sa_t1.insert().values(age=i, name=name))
            for row in results:
                _id = row[0]
                if _id % 100 == 0:
                    print('id = %s, age = %s, name = %s' % (_id, i, name))
        conn.close()


async def aiopg_sa_teardown():
    aiopg_sa_engine.close()
    await aiopg_sa_engine.wait_closed()


########################################
# MAIN
########################################

async def get_package_functions(package_name: str):
    setup_db = eval('%s_setup_db' % package_name)
    worker = eval('%s_worker' % package_name)
    try:
        teardown = eval('%s_teardown' % package_name)
    except NameError:
        teardown = None
    return setup_db, worker, teardown


async def run_benchmark(package_name: str):
    setup_db, worker, teardown = await get_package_functions(package_name)

    try:
        await setup_db()
        start = datetime.datetime.now()
        await asyncio.gather(*[worker('worker: %s' % i) for i in range(WORKERS_COUNT)])
        delta = (datetime.datetime.now() - start).total_seconds()
        if teardown is not None:
            await teardown()
        return delta
    except KeyboardInterrupt:
        print('CTRL+C Pressed, Terminating.')
        return


async def main():
    aiopg8000_delta = await run_benchmark('aiopg8000')
    aiopg_sa_delta = await run_benchmark('aiopg_sa')
    aiopg_delta = await run_benchmark('aiopg')
    print('[aiopg.sa]\tTotal time: %.2Fs' % aiopg_sa_delta)
    print('[aiopg]\tTotal time: %.2Fs' % aiopg_delta)
    print('[aiopg8000]\tTotal time: %.2Fs' % aiopg8000_delta)
    print('[aiopg] is %.2Fx faster than [aiopg.sa]' % (aiopg_sa_delta / aiopg_delta))
    print('[aiopg] is %.2Fx faster than [aiopg8000]' % (aiopg8000_delta / aiopg_delta))
    print('[aiopg.sa] is %.2Fx faster than [aiopg8000]' % (aiopg8000_delta / aiopg_sa_delta))
    print('Bye Bye !')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Go & Python performance test

Scenario:

Consider this pseudo code:

async def worker(worker_idx, iterations):
    result, o = 0, iterations * worker_idx
    for i in range(o):
        result += i**i
    for i in range(o):
        result -= i**i

await wait([worker(i, 600) for i in range(10)])

The Go code:

package main

import (
    "time"
    "log"
    "math/big"
)

var (
    workersCount = 10
    iterations = 600
    workers = make(chan int, workersCount)
)

func power(a, b int) *big.Int{
    return new(big.Int).Exp(big.NewInt(int64(a)), big.NewInt(int64(b)), nil)
}

func worker(idx int, iterations int){
    result, o := new(big.Int), idx * iterations
    for i := 0; i < o; i++ {
        result.Add(result, power(i, i))
    }
    for i := 0; i < o; i++ {
        result.Sub(result, power(i, i))
    }
    workers <- idx
}

func main(){
    start := time.Now()

    for i := 0; i < workersCount; i++ {
        go worker(i, iterations)
    }

    for i := 0; i < workersCount; i++ {
        <- workers
    }

    elapsed := time.Since(start)
    log.Printf("Time elapsed: %s", elapsed)
}

Python code:

  • cyaio_demo.pyx

    cdef void c_worker(int worker_idx, int iterations):
        cdef int i, result, o = iterations * worker_idx
        for i in range(o):
            result += i**i
        for i in range(o):
            result -= i**i
    
    async def worker(int worker_idx, int iterations):
        c_worker(worker_idx, iterations)
    
  • pyaio_demo.py

    async def worker(worker_idx, iterations):
        result, o = 0, iterations * worker_idx
        for i in range(o):
            result += i**i
        for i in range(o):
            result -= i**i
    
  • run.py

    import cyaio_demo
    import pyaio_demo
    import sys
    import asyncio
    from datetime import datetime
    
    WORKERS = 10
    ITERATIONS = 600
    
    async def runner(f):
        start_time = datetime.now()
        await asyncio.wait([f(i, ITERATIONS) for i in range(WORKERS)])
        return (datetime.now() - start_time).total_seconds()
    
    async def benchmark():
        cython = await runner(cyaio_demo.worker)
        print('Cython: %.5Fs' % cython)
        python = await runner(pyaio_demo.worker)
        print('CPython: %.5Fs' % python)
        print('Cython is %dX faster than CPython' % round(python / cython))
    
    def main():
        loop = asyncio.get_event_loop()
        ret = loop.run_until_complete(benchmark())
        loop.close()
        return ret
    
    if __name__ == '__main__':
        sys.exit(main())
    
  • setup.py

    from distutils.core import setup
    from Cython.Build import cythonize
    
    __author__ = 'vahid'
    
    setup(
        name='cyaio',
        ext_modules=cythonize("cyaio_demo.pyx"),
    )
    

The Result

Cython:   0.01392s
GoLang:   5.33766s
CPython: 13.39106s
Cython is 962X faster than CPython
Golang is 2.5X faster than CPython
Cython is 383X faster than GoLang!

So, Cython is my default optimization method an of-source, python is the best language ever.