Pub/Sub

Pub/Sub is a messaging pattern that allows multiple applications to communicate with each other asynchronously. In the context of SQLiteCloud, Pub/Sub can be used to provide real-time updates and notifications to subscribed applications whenever data changes in the database or it can be used to send payloads (messages) to anyone subscribed to a channel. Here’s how it works:

Publishers: Publishers are responsible for sending messages or notifications to the system whenever a change occurs in the database. Publishers can be any application that has write access to the database, including web servers, mobile apps, or background processes. A Publisher can also be anyone who NOTIFY a payload to a specific channel (without any write database operation).

Subscribers: Subscribers are applications that want to receive updates whenever a change occurs in the database or whenever someone send a message to a specific channel.

Channels: Channels are messaging patterns through which messages are sent and received. Publishers send messages to specific channel, and subscribers can subscribe to one or more channel to receive notifications. A channel can be a database table or a unique name not bound to any database entity.

Here are some of the capabilities that Pub/Sub provides for a database management system:

  • Real-time updates: With Pub/Sub, subscribers can receive real-time updates whenever data changes in the database. This can be useful for applications that need to display real-time information to users, such as stock tickers or social media feeds.

  • Scalability: Pub/Sub provides a scalable solution for database notifications, allowing multiple subscribers to receive updates without impacting database performance.

  • Customizable filtering: Pub/Sub allows subscribers to customize the types of messages they receive by filtering on specific topics or keywords. This can help reduce network traffic and improve application performance.

  • Fault tolerance: Pub/Sub systems are designed to be fault-tolerant, ensuring that messages are not lost even if a subscriber or publisher goes offline.

Overall, Pub/Sub provides a powerful messaging system for database management systems, enabling real-time updates and notifications for subscribed applications while maintaining scalability, reliability, and performance.

Pub/Sub Payload Format

JSON is used to deliver payload to all listening clients. JSON format depends on the operation type. In case of database tables, notifications occur on COMMIT so the same JSON can collect more changes related to that table. SQLite Cloud guarantees one JSON per channel.

1. NOTIFY payload

{
    sender: "UUID",
    channel: "name",
    type: "MESSAGE",
    payload: "Message content here"	// payload is optional
}

2. Multiple TABLE modification payload

{
    sender: "UUID",
    channel: "tablename",
    type: "TABLE",
    pk: ["id", "col1"],    // array of primary key name(s)
    payload: [             // array of operations that affect table name
        {
            type: "INSERT",
            id: 12,
            col1: "value1",
            col2: 3.14
        },
        {
            type: "DELETE",
            pv: [13],    // primary key value (s) in the same order as the pk array
        },
        {
            type: "UPDATE",
            id: 15,        // new value
            col1: "newvalue",
            col2: 0.0,
            // if primary key is updated during this update then add it to:
            // UPDATE TABLE SET col1='newvalue', col2=0.0, id = 15 WHERE id=14
            pv: [14]       // primary key value (s) set prior to this UPDATE operation
        }
    ]
}

Details:

  • sender: is the UUID of the client who sent the NOTIFY event or who initiated the WRITE operation that triggers the notification. It is common for a client that executes NOTIFY to be listening on the same notification channel itself. In that case it will get back a notification event, just like all the other listening sessions. Depending on the application logic, this could result in useless work, for example, reading a database table to find the same updates that that session just wrote out. It is possible to avoid such extra work by noticing whether the notifying UUID (supplied in the notification event message) is the same as one’s UUID (available from SDK). When they are the same, the notification event is one’s own work bouncing back, and can be ignored. If UUID is 0 it means that server sent that payload.
  • channel: this field represents the channel/table affected.
  • type: determine the type of operation, it can be: MESSAGE, TABLE, INSERT, UPDATE, or DELETE (more to come).
  • pk/pv: these fields represent the primary key name(s) and value(s) affected by this table operation.
  • payload: TODO

More SQL examples:

> USE DATABASE test.sqlite
OK

> GET SQL foo
CREATE TABLE "foo" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "col1" TEXT, "col2" TEXT)

> LISTEN TABLE foo
OK

3. DELETE FROM foo WHERE id=14;

{
	"sender": "b7a92805-ef82-4ad1-8c2f-92da6df6b1d5",
	"channel": "foo",
	"type": "TABLE",
	"pk": ["id"],
	"payload": [{
		"type": "DELETE",
		"pv": [14]
	}]
}

4. INSERT INTO foo(col1, col2) VALUES (‘test100’, ‘test101’);

{
	"sender": "b7a92805-ef82-4ad1-8c2f-92da6df6b1d5",
	"channel": "foo",
	"type": "TABLE",
	"pk": ["id"],
	"payload": [{
		"type": "INSERT",
		"id": 15,
		"col1": "test100",
		"col2": "test101"
	}]
}

5. UPDATE foo SET id=14,col1=‘test200’ WHERE id=15;

{
	"sender": "b7a92805-ef82-4ad1-8c2f-92da6df6b1d5",
	"channel": "foo",
	"type": "TABLE",
	"pk": ["id"],
	"payload": [{
		"type": "DELETE",
		"pv": [15]
	}, {
		"type": "INSERT",
		"id": 14,
		"col1": "test200",
		"col2": "test101"
	}]
}

Client Library Examples

import { Database } from '@sqlitecloud/drivers'
import { PubSub, PUBSUB_ENTITY_TYPE } from '@sqlitecloud/drivers/lib/drivers/pubsub'

let database = new Database('sqlitecloud://user:password@xxx.sqlite.cloud:8860/chinook.sqlite')
// or use sqlitecloud://xxx.sqlite.cloud:8860?apikey=xxxxxxx

const pubSub: PubSub = await database.getPubSub()

await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, 'albums', (error, results, data) => {
  if (results) {
    // Changes on albums table will be received here as JSON object
    console.log('Received message:', results)
  }
})

await database.sql`INSERT INTO albums (Title, ArtistId) values ('Brand new song', 1)`

// Stop listening changes on the table
await pubSub.unlisten(PUBSUB_ENTITY_TYPE.TABLE, 'albums')