从电波表到数据库小程序之 - 数据库异步广播(notify/listen)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 标签 PostgreSQL , notify , listen , 异步消息 背景 小时候就梦想有个酷酷的电波表(虽然现在还没有拥有),不过电波表和PostgreSQL有什么关系呢?听我道来。

标签

PostgreSQL , notify , listen , 异步消息


背景

小时候就梦想有个酷酷的电波表(虽然现在还没有拥有),不过电波表和PostgreSQL有什么关系呢?听我道来。

pic

http://baike.baidu.com/view/1124741.htm

电波表内置高感度小型天线,接收标准电波进行自动对时,因而可以实现时间上的精准。在国际上,德国、英国、美国、日本都已经有标准电波的发送。2007年7月,在中国河南商丘建成的新电波塔已经开始发送电波。

标准电波接收(6局电波接收)原理

中国标准时间以10万年误差1秒的铯原子钟为基准。通过手表内置的高敏感度接收器接收以无线电波传送的标准时间信号,并自动校准手表走时。使手表显示的时间与标准时间同步,精确计时。在建筑物密集的室内也可以接收标准电波信号,显示正确的时间。

标准电波的接收范围是半径约1000~3000km, 深夜(最多6次)自动接收电波信号,不能收到标准电波信号时,自动接收GPS卫星电波获取时间信息并校正时间。中国的电波塔在河南省商丘市。

而用于接收电波信号的防震天线采用的是非晶材质,能够实现高敏感度接收。不但具备对应极端户外环境下使用的牢靠性,还可以确保稳定接收全球6局电波信号。

电波表是一个非常典型的广播应用,类似的还有组播(注意不是主播哦),类似的应用也很多,比如广播电视,电台等。

数据库广播

在数据库中,其实也有类似的应用,比如我前几天在文章中写的数据库到WEB客户端的广播,详见 :

《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》

实际上是利用了PostgreSQL数据库的异步消息机制,数据库往消息通道发送数据,应用程序可以监听对应的消息通道,获取数据库发出的数据。

通过异步消息在数据库中实现了一对多的广播效果。

SQL语法参考文档:

1. 向通道发送消息

https://www.postgresql.org/docs/9.6/static/sql-notify.html

2. 监听某通道的消息

https://www.postgresql.org/docs/9.6/static/sql-listen.html

3. 取消监听某通道

https://www.postgresql.org/docs/9.6/static/sql-unlisten.html

4. 数据库函数

查看会话监听了哪些通道,以及当前数据库的异步消息队列使用了多少。

pg_listening_channels()         setof text  channel names that the session is currently listening on  
pg_notification_queue_usage()   double          fraction of the asynchronous notification queue currently occupied (0-1)  

pg_notification_queue_usage用来计算已使用的异步消息页面占比,如果有监听,但是一直不消费,可能导致溢出。

src/backend/commands/async.c  

/*  
 * slru.c currently assumes that all filenames are four characters of hex  
 * digits. That means that we can use segments 0000 through FFFF.  
 * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us  
 * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.  
 *  
 * It's of course possible to enhance slru.c, but this gives us so much  
 * space already that it doesn't seem worth the trouble.  
 *  
 * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2  
 * pages, because more than that would confuse slru.c into thinking there  
 * was a wraparound condition.  With the default BLCKSZ this means there  
 * can be up to 8GB of queued-and-not-read data.  
 *  
 * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of  
 * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.  
 */  
#define QUEUE_MAX_PAGE                  (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)  


src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT        32  

异步消息编程

除了使用SQL来编写异步消息,还可以使用数据库的驱动来编写异步消息

c

参考libpq的异步消息部分

https://www.postgresql.org/docs/9.6/static/libpq-notify.html

PGnotify *PQnotifies(PGconn *conn);  

typedef struct pgNotify  
{  
    char *relname;              /* notification channel name */  
    int  be_pid;                /* process ID of notifying server process */  
    char *extra;                /* notification payload string */  
} PGnotify;  

文档中有一个例子如下

https://www.postgresql.org/docs/9.6/static/libpq-example.html#LIBPQ-EXAMPLE-2

java

参考文档

https://jdbc.postgresql.org/documentation/head/listennotify.html

应用举例

Broadcasting PostgreSQL NOTIFY messages to WebSocket Clients

The system works like this:

Client subscribes to a WebSocket topic...  

NOTIFY event on database server ->  
  PGNotificationListener on web server ->  
      Send Websocket notification on server ->  
         Receive Websocket event on browser.   

With the code below, if you call NOTIFY dml_events, 'some message'; in Postgres, it will be broadcast to all WebSocket clients.

Follow this answer regarding proper listener setup

URL:

http://blog.databasepatterns.com/2014/04/postgresql-nofify-websocket-spring-mvc.html

http://stackoverflow.com/questions/37916489/listen-notify-pgconnection-goes-down-java

The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected.   
Check out the BasicContext class lines 642 - 655:  

---  
public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {  

    name = nullToEmpty(name);  
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";  

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);  

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);  

    synchronized (notificationListeners) {  
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));  
    }  

}  
---  

If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710  

---  
  @Override  
  public synchronized void reportNotification(int processId, String channelName, String payload) {  

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();  
    while (iter.hasNext()) {  

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();  

      NotificationListener listener = entry.getValue().get();  
      if (listener == null) {  

        iter.remove();  
      }  
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {  

        listener.notification(processId, channelName, payload);  
      }  

    }  

}  
---  

To fix this, add your notification listeners as such:  

---  
/// Do not let this reference go out of scope!  
PGNotificationListener listener = new PGNotificationListener() {  

@Override  
public void notification(int processId, String channelName, String payload) {  
    // interesting code  
};  
pgConnection.addNotificationListener(listener);  
---  

Quite an odd use-case for weak references in my opinion...  

代码:

https://bitbucket.org/neilmcg/postgresql-websocket-example

其他编程语言的驱动,大多数是基于libpq的,不再举例。

参考

《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
5月前
|
小程序 JavaScript API
小程序云开发实战六:云数据库读取的数据显示在小程序端列表里
小程序云开发实战六:云数据库读取的数据显示在小程序端列表里
91 0
|
5月前
|
小程序 JavaScript API
小程序云开发实战五:如何将获取到的API数据存入云数据库里面
小程序云开发实战五:如何将获取到的API数据存入云数据库里面
87 0
|
5月前
|
小程序 前端开发 API
小程序云开发四:向云数据库插入一条数据
小程序云开发四:向云数据库插入一条数据
80 0
|
6月前
|
小程序 前端开发 JavaScript
微信小程序连接数据库与WXS的使用
微信小程序连接数据库与WXS的使用
47 0
|
5月前
|
小程序 JavaScript 数据库
微信小程序云开发数据库操作删除记录
微信小程序云开发数据库操作删除记录
63 0
|
5月前
|
小程序 数据库
小程序云开发五:从云数据库读取数据
小程序云开发五:从云数据库读取数据
145 0
|
22天前
|
小程序 前端开发 JavaScript
微信小程序——解决异步问题
微信小程序——解决异步问题
13 0
|
5月前
|
小程序 JavaScript 数据库
微信小程序云开发数据库操作二查询记录
微信小程序云开发数据库操作二查询记录
67 0
|
1月前
|
小程序 Java 关系型数据库
基于springboot的场地预约小程序的设计与实现(程序+数据库+文档)
基于springboot的场地预约小程序的设计与实现(程序+数据库+文档)
|
1月前
|
小程序 前端开发 Java
基于微信小程序的电影院订票选座系统的设计与实现(程序+数据库+)
基于微信小程序的电影院订票选座系统的设计与实现(程序+数据库+)