XUtils

epgsql

PostgreSQL Driver for Erlang.


Erlang PostgreSQL Database Client

Asynchronous fork of wg/epgsql originally here: mabrek/epgsql and subsequently forked in order to provide a common fork for community development.

pgapp

If you want to get up to speed quickly with code that lets you run Postgres queries, you might consider trying epgsql/pgapp, which adds the following, on top of the epgsql driver:

  • A ‘resource pool’ (currently poolboy), which lets you decide how many Postgres workers you want to utilize.
  • Resilience against the database going down or other problems. The pgapp code will keep trying to reconnect to the database, but will not propagate the crash up the supervisor tree, so that, for instance, your web site will stay up even if the database is down for some reason. Erlang’s “let it crash” is a good idea, but external resources going away might not be a good reason to crash your entire system.

Difference highlights

  • 3 API sets:
    • epgsql maintains backwards compatibility with the original driver API
    • epgsqla delivers complete results as regular erlang messages
    • epgsqli delivers results as messages incrementally (row by row) All API interfaces can be used with the same connection: eg, connection opened with epgsql can be queried with epgsql / epgsqla / epgsqli in any combinations.
  • internal queue of client requests, so you don’t need to wait for the response to send the next request (pipelining)
  • single process to hold driver state and receive socket data
  • execution of several parsed statements as a batch
  • binding timestamps in erlang:now() format

see CHANGES for full list.

Differences between current epgsql and mabrek’s original async fork:

  • Unnamed statements are used unless specified otherwise. This may cause problems for people attempting to use the same connection concurrently, which will no longer work.

Known problems

  • SSL performance can degrade if the driver process has a large inbox (thousands of messages).

Extended Query

{ok, Columns, Rows}        = epgsql:equery(C, "select ...", [Parameters]).
{ok, Count}                = epgsql:equery(C, "update ...", [Parameters]).
{ok, Count, Columns, Rows} = epgsql:equery(C, "insert ... returning ...", [Parameters]).
{error, Error}             = epgsql:equery(C, "invalid SQL", [Parameters]).

Parameters - optional list of values to be bound to $1, $2, $3, etc.

The extended query protocol combines parse, bind, and execute using the unnamed prepared statement and portal. A select statement returns {ok, Columns, Rows}, insert/update/delete returns {ok, Count} or {ok, Count, Columns, Rows} when a returning clause is present. When an error occurs, all statements result in {error, #error{}}.

epgsql:equery(C, "select id from account where name = $1", ["alice"]),
> {ok,
    [#column{name = <<"id">>, type = int4, …}],
    [{1}]
}

PostgreSQL’s binary format is used to return integers as Erlang integers, floats as floats, bytes/text/varchar columns as binaries, bools as true/false, etc. For details see pgsql_binary.erl and the Data Representation section below.

Asynchronous API epgsqla:equery/3 requires you to parse statement beforehand

#statement{types = Types} = Statement,
TypedParameters = lists:zip(Types, Parameters),
Ref = epgsqla:equery(C, Statement, [TypedParameters]),
receive
  {C, Ref, Res} -> Res
end.
  • Statement - parsed statement (see parse below)
  • Res has same format as return value of epgsql:equery/3.

epgsqli:equery(C, Statement, [TypedParameters]) sends same set of messages as squery including final {C, Ref, done}.

Prepared Query

{ok, Columns, Rows}        = epgsql:prepared_query(C, Statement :: #statement{} | string(), [Parameters]).
{ok, Count}                = epgsql:prepared_query(C, Statement, [Parameters]).
{ok, Count, Columns, Rows} = epgsql:prepared_query(C, Statement, [Parameters]).
{error, Error}             = epgsql:prepared_query(C, "non_existent_query", [Parameters]).
  • Parameters - optional list of values to be bound to $1, $2, $3, etc.
  • Statement - name of query given with erlang epgsql:parse(C, StatementName, "select ...", []). (can be empty string) or #statement{} record returned by epgsql:parse.

With prepared query one can parse a query giving it a name with epgsql:parse on start and reuse the name for all further queries with different parameters.

{ok, Stmt} = epgsql:parse(C, "inc", "select $1+1", []).
epgsql:prepared_query(C, Stmt, [4]).
epgsql:prepared_query(C, Stmt, [1]).

Asynchronous API epgsqla:prepared_query/3 requires you to always parse statement beforehand

#statement{types = Types} = Statement,
TypedParameters = lists:zip(Types, Parameters),
Ref = epgsqla:prepared_query(C, Statement, [TypedParameters]),
receive
  {C, Ref, Res} -> Res
end.
  • Statement - parsed statement (see parse below)
  • Res has same format as return value of epgsql:prepared_query/3.

epgsqli:prepared_query(C, Statement, [TypedParameters]) sends same set of messages as squery including final {C, Ref, done}.

Parse/Bind/Execute

{ok, Statement} = epgsql:parse(C, [StatementName], Sql, [ParameterTypes]).
  • StatementName - optional, reusable, name for the prepared statement.
  • ParameterTypes - optional list of PostgreSQL types for each parameter.

For valid type names see pgsql_types.erl.

epgsqla:parse/2 sends {C, Ref, {ok, Statement} | {error, Reason}}.

epgsqli:parse/2 sends:

  • {C, Ref, {types, Types}}
  • {C, Ref, {columns, Columns}}
  • {C, Ref, no_data} if statement will not return rows
  • {C, Ref, {error, Reason}}
ok = epgsql:bind(C, Statement, [PortalName], ParameterValues).
  • PortalName - optional name for the result portal.

both epgsqla:bind/3 and epgsqli:bind/3 send {C, Ref, ok | {error, Reason}}

{ok | partial, Rows} = epgsql:execute(C, Statement, [PortalName], [MaxRows]).
{ok, Count}          = epgsql:execute(C, Statement, [PortalName]).
{ok, Count, Rows}    = epgsql:execute(C, Statement, [PortalName]).
  • PortalName - optional portal name used in epgsql:bind/4.
  • MaxRows - maximum number of rows to return (0 for all rows).

epgsql:execute/3 returns {partial, Rows} when more rows are available.

epgsqla:execute/3 sends {C, Ref, Result} where Result has same format as return value of epgsql:execute/3.

epgsqli:execute/3 sends

  • {C, Ref, {data, Row}}
  • {C, Ref, {error, Reason}}
  • {C, Ref, suspended} partial result was sent, more rows are available
  • {C, Ref, {complete, {_Type, Count}}}
  • {C, Ref, {complete, _Type}}
ok = epgsql:close(C, Statement).
ok = epgsql:close(C, statement | portal, Name).
ok = epgsql:sync(C).

All epgsql functions return {error, Error} when an error occurs.

epgsqla/epgsqli modules’ close and sync functions send {C, Ref, ok}.

Batch execution

Batch execution is bind + execute for several prepared statements. It uses unnamed portals and MaxRows = 0.

Results = epgsql:execute_batch(C, BatchStmt :: [{statement(), [bind_param()]}]).
{Columns, Results} = epgsql:execute_batch(C, statement() | sql_query(), Batch :: [ [bind_param()] ]).
  • BatchStmt - list of {Statement, ParameterValues}, each item has it’s own #statement{}
  • Batch - list of ParameterValues, each item executes the same common #statement{} or SQL query
  • Columns - list of #column{} descriptions of Results columns
  • Results - list of {ok, Count} or {ok, Count, Rows}

There are 2 versions:

execute_batch/2 - each item in a batch has it’s own named statement (but it’s allowed to have duplicates)

example:

{ok, S1} = epgsql:parse(C, "one", "select $1::integer", []),
{ok, S2} = epgsql:parse(C, "two", "select $1::integer + $2::integer", []),
[{ok, [{1}]}, {ok, [{3}]}] = epgsql:execute_batch(C, [{S1, [1]}, {S2, [1, 2]}]).
ok = epgsql:close(C, "one").
ok = epgsql:close(C, "two").

execute_batch/3 - each item in a batch executed with the same common SQL query or #statement{}. It’s allowed to use unnamed statement.

example (the most efficient way to make batch inserts with epgsql):

{ok, Stmt} = epgsql:parse(C, "my_insert", "INSERT INTO account (name, age) VALUES ($1, $2) RETURNING id", []).
{[#column{name = <<"id">>}], [{ok, [{1}]}, {ok, [{2}]}, {ok, [{3}]}]} =
    epgsql:execute_batch(C, Stmt, [ ["Joe", 35], ["Paul", 26], ["Mary", 24] ]).
ok = epgsql:close(C, "my_insert").

equivalent:

epgsql:execute_batch(C, "INSERT INTO account (name, age) VALUES ($1, $2) RETURNING id",
                     [ ["Joe", 35], ["Paul", 26], ["Mary", 24] ]).

In case one of the batch items causes an error, all the remaining queries of that batch will be ignored. So, last element of the result list will be {error, #error{}} and the length of the result list might be shorter that the length of the batch. For a better illustration of such scenario please refer to epgsql_SUITE:batch_error/1

epgsqla:execute_batch/{2,3} sends {C, Ref, Results}

epgsqli:execute_batch/{2,3} sends

  • {C, Ref, {data, Row}}
  • {C, Ref, {error, Reason}}
  • {C, Ref, {complete, {_Type, Count}}}
  • {C, Ref, {complete, _Type}}
  • {C, Ref, done} - execution of all queries from Batch has finished

Data Representation

Data representation may be configured using pluggable datatype codecs, so following is just default mapping:

PG type Representation
null null
bool true
char $A
intX 1
floatX 1.0
date {Year, Month, Day}
time {Hour, Minute, Second.Microsecond}
timetz {time, Timezone}
timestamp {date, time}
timestamptz {date, time}
interval {time, Days, Months}
text <<"a">>
varchar <<"a">>
bytea <<1, 2>>
array [1, 2, 3]
record {int2, time, text, ...} (decode only)
point {10.2, 100.12}
int4range [1,5)
hstore {[ {binary(), binary() \| null} ]} (configurable)
json/jsonb <<"{ \"key\": [ 1, 1.0, true, \"string\" ] }">> (configurable)
uuid <<"123e4567-e89b-12d3-a456-426655440000">>
inet inet:ip_address()
cidr {ip_address(), Mask :: 0..32}
macaddr(8) tuple of 6 or 8 byte()
geometry ewkb:geometry()
tsrange {{Hour, Minute, Second.Microsecond}, {Hour, Minute, Second.Microsecond}}
tstzrange {{Hour, Minute, Second.Microsecond}, {Hour, Minute, Second.Microsecond}}
daterange {{Year, Month, Day}, {Year, Month, Day}}

null can be configured. See nulls connect/1 option.

timestamp and timestamptz parameters can take erlang:now() format: {MegaSeconds, Seconds, MicroSeconds}

int4range is a range type for ints that obeys inclusive/exclusive semantics, bracket and parentheses respectively. Additionally, infinities are represented by the atoms minus_infinity and plus_infinity

tsrange, tstzrange, daterange are range types for timestamp, timestamptz and date respectively. They can return empty atom as the result from a database if bounds are equal

hstore type can take map or jiffy-style objects as input. Output can be tuned by providing return :: map | jiffy | proplist option to choose the format to which hstore should be decoded. nulls :: [atom(), ...] option can be used to select the terms which should be interpreted as SQL NULL - semantics is the same as for connect/1 nulls option.

json and jsonb types can optionally use a custom JSON encoding/decoding module to accept and return erlang-formatted JSON. The module must implement the callbacks in epgsql_codec_json, which most popular open source JSON parsers will already, and you can specify it in the codec configuration like this:

{epgsql_codec_json, JsonMod}

% With options
{epgsql_codec_json, JsonMod, EncodeOpts, DecodeOpts}

% Real world example using jiffy to return a map on decode
{epgsql_codec_json, {jiffy, [], [return_maps]}}

Note that the decoded terms will be message-passed to the receiving process (i.e. copied), which may exhibit a performance hit if decoding large terms very frequently.

Server Notifications

PostgreSQL may deliver two types of asynchronous message: “notices” in response to notice and warning messages generated by the server, and notifications which are generated by the LISTEN/NOTIFY mechanism.

Passing the {async, PidOrName} option to epgsql:connect/3 will result in these async messages being sent to the specified pid or registered process, otherwise they will be dropped.

Another way to set notification receiver is to use set_notice_receiver/2 function. It returns previous async value. Use undefined to disable notifications.

% receiver is pid()
{ok, Previous} = epgsql:set_notice_receiver(C, self()).

% receiver is registered process
register(notify_receiver, self()).
{ok, Previous1} = epgsqla:set_notice_receiver(C, notify_receiver).

% disable notifications
{ok, Previous2} = epgsqli:set_notice_receiver(C, undefined).

Message formats:

{epgsql, Connection, {notification, Channel, Pid, Payload}}
  • Connection - connection the notification occurred on
  • Channel - channel the notification occurred on
  • Pid - database session pid that sent notification
  • Payload - optional payload, only available from PostgreSQL >= 9.0
{epgsql, Connection, {notice, Error}}
  • Connection - connection the notice occurred on
  • Error - an #error{} record, see epgsql.hrl

Utility functions

Transaction helpers

with_transaction(connection(), fun((connection()) -> Result :: any()), Opts) ->
    Result | {rollback, Reason :: any()} when
Opts :: [{reraise, boolean()},
         {ensure_committed, boolean()},
         {begin_opts, iodata()}] | map().

Executes a function in a PostgreSQL transaction. It executes BEGIN prior to executing the function, ROLLBACK if the function raises an exception and COMMIT if the function returns without an error. If it is successful, it returns the result of the function. The failure case may differ, depending on the options passed.

Options (proplist or map):

  • reraise (default true): when set to true, the original exception will be re-thrown after rollback, otherwise {rollback, ErrorReason} will be returned
  • ensure_committed (default false): even when the callback returns without exception, check that transaction was committed by checking the CommandComplete status of the COMMIT command. If the transaction was rolled back, the status will be rollback instead of commit and an ensure_committed_failed error will be generated.
  • begin_opts (default ""): append extra options to BEGIN command (see https://www.postgresql.org/docs/current/static/sql-begin.html) as a string by just appending them to "BEGIN " string. Eg {begin_opts, "ISOLATION LEVEL SERIALIZABLE"}. Beware of SQL injection! The value of begin_opts is not escaped!

Command status

epgsql{a,i}:get_cmd_status(C) -> undefined | atom() | {atom(), integer()}

This function returns the last executed command’s status information. It’s usually the name of SQL command and, for some of them (like UPDATE or INSERT) the number of affected rows. See libpq PQcmdStatus. But there is one interesting case: if you execute COMMIT on a failed transaction, status will be rollback, not commit. This is how you can detect failed transactions:

{ok, _, _} = epgsql:squery(C, "BEGIN").
{error, _} = epgsql:equery(C, "SELECT 1 / $1::integer", [0]).
{ok, _, _} = epgsql:squery(C, "COMMIT").
{ok, rollback} = epgsql:get_cmd_status(C).

Server parameters

epgsql{a,i}:get_parameter(C, Name) -> binary() | undefined

Retrieve actual value of server-side parameters, such as character endoding, date/time format and timezone, server version and so on. See libpq PQparameterStatus. Parameter’s value may change during connection’s lifetime.

Streaming replication protocol

See streaming.md.

Pluggable commands

See pluggable_commands.md

Pluggable datatype codecs

See pluggable_types.md

Mailing list

Google groups

Test Setup

In order to run the epgsql tests, you will need to install local Postgres database.

NOTE: you will need the postgis and hstore extensions to run these tests! On Ubuntu, you can install them with a command like this:

  1. apt-get install postgresql-12-postgis-3 postgresql-contrib
  2. make test # Runs the tests

NOTE 2: It’s possible to run tests on exact postgres version by changing $PATH like

PATH=$PATH:/usr/lib/postgresql/12/bin/ make test

CI


Articles

  • coming soon...