Lua Triggers

The server can be set up to run a stored procedure when records are inserted, updated or deleted from a table. The stored procedure is called once per row that is changed. This is true even if the transaction changes several rows in one commit. The stored procedure receives one argument (an event) - it is a Lua table which contains data describing the changes to the row:

id = unique id associated with this event
name = table name which triggered event
type = add|del|upd
new = nil|{Lua-table with values which were updated/inserted}
old = nil|{Lua-table with values which were updated/deleted}

The trigger procedure can obtain the transaction-id for a given event by passing it to db:get_event_tid().

The CREATE LUA TRIGGER statement creates the named-trigger when rows are inserted, updated and (or) deleted from the table.

Let us see a few examples. Consider a table t with columns i, j, k, l. We’d like to run stored procedure audit which logs changes to t into table audit. The stored procedure would like to log values of i, j when a new row is inserted, log values of j, k when rows are updated and log values of k, l when rows are deleted.

CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT OF i, j AND UPDATE OF j, k AND DELETE OF k, l)

If we wanted to log all columns when a row is inserted, the statement would look like:

CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT OF i, j, k, l)

If we skip column names, then all columns at the time of creating trigger are considered. Keep in mind that these definitions are not updated automatically when table t is altered. Previous example could be written as:

CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT)

Trigger can be set up so the system will assign monotonically increasing ids to events: CREATE LUA TRIGGER audit WITH SEQUENCE ON (TABLE t FOR INSERT)

Statement to set up trigger on insert into multiple tables, say t1 and t2 would look like:

CREATE LUA TRIGGER audit ON (TABLE t1 FOR INSERT), (TABLE t2 FOR INSERT)

Behind the scenes, the database will set up a queue to save requested changes to specified tables. The database will then select one node in the cluster to start listening on this queue. When there is an item on the queue, the database reads the item, starts a new transaction, creates a corresponding Lua table and executes the stored procedure. When stored procedure completes successfully, the database consumes this item from the queue and commits the transaction. The database then goes back to listening for subsequent items on the queue. db:begin() and db:commit() calls are not available to triggers as the database starts one implicitly so all of stored procedure actions and queue-consume operation are guaranteed to commit atomically.

The stored procedure should return 0 on success. Not returning a value or any other non-zero value will cause the stored procedure transaction to rollback. This implies that if trigger stored procedure transaction has any failure (ex. a failed insert, a duplicate error, etc.), that transaction will abort and the event will not be removed from the queue, rather the event will be reprocessed: i.e. stored procedure will be rerun (and if it fails, it will be retried, possibly indefinitely). Only if the stored procedure actions complete successfully, will the event be consumed from the queue.

Following is an example for a trigger audit which logs all changes to table t. Table has two int fields: i, j. Stored procedure logs data to table audit, storing type of change to t, time of log and the changed values.

cdb2sql testdb default - <<'EOF'
CREATE TABLE t(i int, j int)$$
CREATE TABLE audit(type cstring(4), tbl cstring(64), logtime datetime, i int, j int, old_i int, old_j int)$$
CREATE PROCEDURE audit VERSION 'sample' {
local function main(event)
    local audit = db:table('audit')
    local chg
    if event.new ~= nil then
        chg = event.new
    end
    if chg == nil then
        chg = {}
    end
    if event.old ~= nil then
        for k, v in pairs(event.old) do
            chg['old_'..k] = v
        end
    end
    chg.type = event.type
    chg.tbl = event.name
    chg.logtime = db:now()
    return audit:insert(chg)
end
}$$
CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT AND UPDATE AND DELETE)
INSERT INTO t VALUES(1,1),(1,2),(1,3),(1,4)
UPDATE t SET i = j WHERE j % 2 = 0
DELETE FROM t WHERE i % 2 <> 0
EOF

This should generate 8 events and our sample stored procedure should have logged them in audit.

cdb2sql testdb default "select * from audit order by logtime"
(type='add', tbl='t', logtime="2019-12-19T131422.330 America/New_York", i=1, j=1, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.331 America/New_York", i=1, j=2, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.332 America/New_York", i=1, j=3, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.332 America/New_York", i=1, j=4, old_i=NULL, old_j=NULL)
(type='upd', tbl='t', logtime="2019-12-19T131422.333 America/New_York", i=2, j=2, old_i=1, old_j=2)
(type='upd', tbl='t', logtime="2019-12-19T131422.334 America/New_York", i=4, j=4, old_i=1, old_j=4)
(type='del', tbl='t', logtime="2019-12-19T131422.334 America/New_York", i=NULL, j=NULL, old_i=1, old_j=1)
(type='del', tbl='t', logtime="2019-12-19T131422.335 America/New_York", i=NULL, j=NULL, old_i=1, old_j=3)

The DROP LUA TRIGGER statement will stop execution of the store procedure and remove the queue associated with the trigger. To delete our example trigger, we will run the following statement:

DROP LUA TRIGGER audit

Lua Consumers

The CREATE LUA CONSUMER statement creates the named-consumer for INSERT/UPDATE/DELETE of specified fields from the specified table. A Lua consumer is similar in mechanics to a Lua trigger. Instead of the database running the stored procedure automatically, this requires a client program to run ‘EXEC PROCEDURE sp-name()’. Additionally, there is a mechanism to send back data to the calling program and block until client signals that data is consumed. This is different from regular server-client protocol which streams rows and blocks only when socket is full.

Let us study a sample consumer. The client program would like to receive all changes to table t. The example uses cdb2sql, however a real application would use the API and make and the usual calls (cdb2_run_statement, cdb2_next_record) to run the stored procedure and obtain rows. Buffering in cdb2sql may cause the rows to not appear immediately.

cdb2sql testdb default - <<'EOF'
CREATE TABLE t(i int, d double, c cstring(10), b byte(4), t datetime, y intervalym)$$
CREATE PROCEDURE watch VERSION 'sample' {
local function define_emit_columns()
    local num = db:exec("select count(*) as num from comdb2_columns where tablename='t'"):fetch().num
    local total = 2 + (num * 2) -- type, tablename, new column values, old column values
    db:num_columns(total)
    db:column_name('tbl', 1)  db:column_type('cstring', 1)
    db:column_name('type', 2) db:column_type('cstring', 2)
    local stmt = db:exec("select columnname as name, type as type from comdb2_columns where tablename='t'")
    local r = stmt:fetch()
    local i = 2
    while r do
        i = i + 1
        db:column_name(r.name, i)                db:column_type(r.type, i)
        db:column_name('old_'..r.name, i + num)  db:column_type(r.type, i + num)
        r = stmt:fetch()
    end
end

local function main()
    define_emit_columns()
    -- get handle to consumer associated with stored procedure
    local consumer = db:consumer()
    while true do
        local change = consumer:get() -- blocking call
        local row
        if change.new ~= nil then
            row = change.new
        end
        if row == nil then
            row = {}
        end
        if change.old ~= nil then
            for k, v in pairs(change.old) do
                row['old_'..k] = v
            end
        end
        row.tbl = change.name
        row.type = change.type
        consumer:emit(row) -- blocking call
        consumer:consume()
    end
end
}$$
CREATE LUA CONSUMER watch ON (TABLE t FOR INSERT AND UPDATE AND DELETE)

INSERT INTO t VALUES(1, 22.0/7, 'hello', x'deadbeef', now(), 5)
UPDATE t set i = i + d, d = d + i, b = x'600dc0de'
DELETE from t
EOF

A client which executes the watch procedure will receive the following rows:

cdb2sql testdb default "EXEC PROCEDURE watch()"
(tbl='t', type='add', i=1, d=3.142857, c='hello', b=x'deadbeef', t="2019-12-19T200838.418 ", y="0-5", old_i=NULL, old_d=NULL, old_c=NULL, old_b=NULL, old_t=NULL, old_y=NULL)
(tbl='t', type='upd', i=4, d=4.142857, c='hello', b=x'600dc0de', t="2019-12-19T200838.418 ", y="0-5", old_i=1, old_d=3.142857, old_c='hello', old_b=x'deadbeef', old_t="2019-12-19T200838.418 ", old_y="0-5")
(tbl='t', type='del', i=NULL, d=NULL, c=NULL, b=NULL, t=NULL, y=NULL, old_i=4, old_d=4.142857, old_c='hello', old_b=x'600dc0de', old_t="2019-12-19T200838.418 ", old_y="0-5")

The system provides a unique id for each event delivered to consumer. If client successfully processed an event, but crashed before requesting next row, the database will send the last event again. id may be useful to detect such a condition.

The dbconsumer:emit() only returns when the client application requests the next event by calling cdb2_next_record. In the example above, dbconsumer:consume() starts a new transaction and consumes the last event.

The consumer may want to log the change (like the sample in Lua trigger), emit the event and consume event atomically. This can be accomplished by wrapping these operations in an explicit transaction as shown below:

local function main()
        local audit = db:table("audit") -- from Lua trigger example
        local consumer = db:consumer()
        ...
        while true do
                local change = consume:get()
                -- massage change into audit and emit formats
                ...
                db:begin()
                        audit:insert(lua_tbl_for_audit)
                        consumer:emit(lua_tbl_for_emit)
                        consumer:consume()
                db:commit()
        end
end

If several client application runs the same consumer stored procedure, the database will ensure that only one executes. The rest of the stored procedures will block in the call to obtain dbconsumer handle (call to db:consumer() will block). When the executing application disconnects, the server will pick any one of the outstanding clients to run next.

The DROP LUA CONSUMER statement will stop execution of the store procedure and remove the queue associated with the consumer. To delete our example consumer, we will run the following statement:

DROP LUA CONSUMER watch

Batch consume from queue

At times it is useful to consume several events atomically. This can be used to reduce round trips to the client application and also to reduce transaction overhead when consuming. This is specially useful for applications which modify large number of records in a transaction. Consuming in batches which match the size of the originating transaction helps reduce latency in processing events as well.

Instead of calling dbconsumer:consume() which consumes that last event immediately, we can call dbconsumer:next() to remember to consume the last event fetched by dbconsumer:get() (or dbconsumer:poll()). Subsequent call to get/poll will return the next available event. Before calling dbconsumer:next() it is required that user start an explicit transaction by calling db:begin(). On calling db:commit(), system will consume all events for which dbconsumer:next() was called. User may choose to commit on transaction boundary or perhaps after every N records, etc.

Here is an example stored procedure which consumes all events which belong to the same originating transaction. To reduce round-trips between client-server, it calls db:emit() for all events, and then emits a sentinel row using dbconsumer:emit(), which waits for client to signal that it has finished processing all the events.

local function consume_a_txn(consumer)
        local event0 = consumer:get()
        -- missing error handling here
        local txn0 = db:get_event_tid(event0)
        while true do
                local event = consumer:poll(0)
                -- If there are no more events, or we are at a transaction boundary:
                -- return so we consume everything we have seen so far.
                if event == nil then return end
                if db:get_event_tid(event) ~= txn0 then return end
                -- Otherwise, emit this event and move to next event
                db:emit(event.new.data) -- Does not wait for client to ack
                consumer:next()
        end
end
local function main()
        local consumer = db:consumer()
        while true do
                db:begin()
                consume_a_txn(consumer)
                consumer:emit('--sentinel--') -- Wait here for client to ack
                db:commit() -- consume all events emitted so far
        end
end

Let us insert some data:

insert into t(data) values('first')
insert into t(data) values('second'),('third')
begin
insert into t(data) values('fourth')
insert into t(data) values('fifth')
commit
insert into t(data) select printf('row %d', value) from generate_series(6, 10)

The client which executes this procedure will receive:

($0='first')
($0='--sentinel--')
($0='second')
($0='third')
($0='--sentinel--')
($0='fourth')
($0='fifth')
($0='--sentinel--')
($0='row 6')
($0='row 7')
($0='row 8')
($0='row 9')
($0='row 10')
($0='--sentinel--')

Consumer API

db:consumer

dbconsumer = db:consumer(x)
    x: Optional Lua table to set timeout, etc

Description:

Returns a dbconsumer object associated with the stored procedure running a Lua consumer. This method blocks until registration succeeds with master node. It accepts an optional Lua table with following keys:

x.register_timeout = number (ms)

Specify timeout (in milliseconds) to wait for registration with master. For example, db:consumer({register_timeout = 5000}) will return nil if registration fails after 5 seconds.

x.with_sequence = true | false (default)

When with_sequence is true, the Lua table returned by dbconsumer:get/poll() includes an additional property (sequence). If the trigger was created with sequence option enabled, then this sequence will be a monotonically increasing count of the items that have been enqueued.

x.with_epoch = true | false (default)

When with_epoch is true, the Lua table returned by dbconsumer:get/poll() includes an additional property (epoch) which contains the unix time-epoch (seconds since 00:00:00 January 1, 1970 UTC) at the time when this event was enqueued.

x.with_tid = true | false (default)

When with_tid is true, Lua table returned by dbconsumer:get/poll() include additional property (tid). This is the same tid returned by db:get_event_tid()

db:get_event_epoch

e = db:get_event_epoch(x)
    x: Lua table returned by `dbconsumer:get/poll()` or argument passed to a `main` in Lua trigger.
    e: Unix time-epoch at the time when this event (x) was enqueued.

Returns unix time-epoch for a given event.

db:get_event_sequence

s = db:get_event_sequence(x)
    x: Lua table returned by `dbconsumer:get/poll()` or argument passed to a `main` in Lua trigger.
    s: Sequence for the event (x)

If the trigger (or consumer) is created with sequence, return the sequence for event x.

db:get_event_tid

tid = db:get_event_tid(x)
    x: Lua table returned by `dbconsumer:get/poll()` or argument passed to a `main` in Lua trigger.

Returns transaction-id (tid) for a given event. All events belonging to same originating transaction will have the same tid. This can be used by application to detect transaction boundaries as tid changes. tids are not unique like event’s id and will eventually recycle.

dbconsumer:get

lua-table = dbconsumer:get()

Description:

This method blocks until there an event available to consume. It returns a Lua table which contains data describing the event. It contains:

Key Value
id unique id associated with this event
name table name which triggered event
type “add” or “del” or “upd”
new nil or a Lua-table with values which were updated/inserted
old nil or Lua-table with values which were updated/deleted
tid optional transaction id (see with_tid above)

dbconsumer:poll

lua-table = dbconsumer:poll(t)
    t: number (ms)

Specify timeout (in milliseconds) to wait for event to be generated in the system. Similar to dbconsumer:get() otherwise. Returns nil if no event is avaiable after timeout.

dbconsumer:consume

Description:

Consumes the last event obtained by dbconsumer:get/poll(). Creates a new transaction if no explicit transaction was ongoing.

dbconsumer:next

Description:

Adds the last event obtained by dbconsumer:get/poll() to list of events to consume by subsequent db:commit() call. Requires that db:begin() has been called prior.

dbconsumer:emit

Description:

Like db:emit(), except it will block until the calling client requests next row by calling cdb2_next_record.

dbconsumer:emit_timeout

dbconsumer:emit_timeout(t)
    t: number (ms)

Description:

Specify timeout (in milliseconds) for a dbconsumer:emit() call. If a client does not request the next event (by calling cdb2_next_record()) for the duration of the timeout, and there is another client blocked on call to db:consumer() to read from the same queue, then the blocked client will be allowed to proceed.

Default Lua Consumers

Most consumer procedures simply emit whatever events are generated. The system provides a default consumer which does this as a convenience for developers.

CREATE DEFAULT LUA CONSUMER ...

statement will create a default procedure, set the default version for the procedure, and create the queue for saving and consuming modifications to specified table. A client application simply runs exec procedure ... statement for the named consumer and start receiving events without any additional work.

In addition to the table’s columns, the emitted rows also have comdb2_event and comdb2_id columns. comdb2_id is the unique id associated with an event as described in the prior sections. comdb2_event will be add or del for insert and delete events respectively. For update events, two rows are emitted. comdb2_event for the first row will be old and rest of the columns will contain the old values, while comdb2_event for the second row will be new and rest of the columns will provide the updated values. comdb2_id for old and new rows will be the same.

An application can pass a JSON string as an argument to main to modify the behavior of the default stored procedure.

To include additional metadata (epoch, sequence, and tid as documented in earlier sections), an application can set the following attributes:

{
    "with_epoch": true,
    "with_sequence": true,
    "with_tid": true
}

These will add comdb2_epoch, comdb2_sequence, and comdb2_tid columns respectively to the emitted rows.

By deafult, one event is consumed per transaction (dbconsumer:get() followed by dbconsumer:consume()), Application can change this to consume in batches which match originating transaction sizes (by using dbconsumer:next().) To do this pass the following parameter to main:

{
    "batch_consume": true
}

To emit a sentinel value at trasaction boundary in batch_consume mode, pass the following parameters to main:

{
    "batch_consume": true,
    "with_txn_sentinel": true
}

When all events from a transaction have been emitted, system will generate a sentinel row and column comdb2_event will contain string txn.

The default consumer makes blocking calls (db:consumer() and dbconsumer:get().) The consumer also sets emit_timeout to 10 seconds. To override this behavior, applications can set the following attributes (parameter values in milliseconds):

{
    "register_timeout": 1000,
    "poll_timeout": 2000,
    "emit_timeout": 3000
}

When poll_timeout is specified, default consumer uses dbconsumer:poll() instead of dbconsumer:get(). If register_timeout or poll_timeout occur, the procedure will emit a row and comdb2_event will contain the string register_timeout and poll_timeout respectively, and then continue execution (unless the client calls cdb2_close().)