1. 云栖社区>
  2. 博客列表>
  3. 正文

PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)

德哥 2015-12-15 15:06:14 浏览5143 评论6

PostgreSQL

摘要: 物联网的特点是万物联网,会产生大量的数据。 例如 : 一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。 又如 : 健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。 股价的实时预测。 车流实时

物联网的一个特点是万物联网,会产生大量的数据。
例如 :
一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。
又如 :
健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。
股价的实时预测。
车流实时数据统计,车辆轨迹实时合并。
商场人流实时统计。
数据监控实时处理,例如数据库的监控,服务器的监控,操作系统的监控等。
等等。。。。。。
传感器种类繁多,采集的数据量已经达到了海量。
这些数据比电商双十一的量有过之而不及,怎样才能处理好这些数据呢?如何做到实时的流式数据处理?

PostgreSQL提供了一个很好的基于流的数据处理产品,实时计算能力达到了单机10W记录/s(普通X86服务器)。

下面是应用CASE。

下载并安装pipelineDB,它是基于PostgreSQL改进的流式数据处理数据库。

# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm  
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm  --prefix=/home/digoal/pipelinedb  

配置环境变量脚本

$vi env_pipe.sh   

export PS1="$USER@`/bin/hostname -s`-> "  
export PGPORT=1922  
export PGDATA=/disk1/digoal/pipe/pg_root  
export LANG=en_US.utf8  
export PGHOME=/home/digoal/pipelinedb  
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.  
export MANPATH=$PGHOME/share/man:$MANPATH  
export PGHOST=$PGDATA  
export PGUSER=postgres  
export PGDATABASE=pipeline  
alias rm='rm -i'  
alias ll='ls -lh'  
unalias vi  

$ . ./env_pipe.sh  

初始化数据库

$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W  

配置参数

$ cd $PGDATA  
$ vi pipelinedb.conf  
listen_addresses = '0.0.0.0'            # what IP address(es) to listen on;  
port = 1922                            # (change requires restart)  
max_connections = 200                   # (change requires restart)  
unix_socket_directories = '.'   # comma-separated list of directories  
shared_buffers = 8GB                    # min 128kB  
maintenance_work_mem = 640MB            # min 1MB  
dynamic_shared_memory_type = posix      # the default is the first option  
synchronous_commit = off                # synchronization level;  
wal_buffers = 16MB                      # min 32kB, -1 sets based on shared_buffers  
wal_writer_delay = 10ms         # 1-10000 milliseconds  
checkpoint_segments = 400               # in logfile segments, min 1, 16MB each  
log_destination = 'csvlog'              # Valid values are combinations of  
logging_collector = on          # Enable capturing of stderr and csvlog  
log_timezone = 'PRC'  
datestyle = 'iso, mdy'  
timezone = 'PRC'  
lc_messages = 'C'                       # locale for system error message  
lc_monetary = 'C'                       # locale for monetary formatting  
lc_numeric = 'C'                        # locale for number formatting  
lc_time = 'C'                           # locale for time formatting  
default_text_search_config = 'pg_catalog.english'  
continuous_query_combiner_work_mem = 1GB  
continuous_query_batch_size = 100000  
continuous_query_num_combiners = 8  
continuous_query_num_workers = 4  
continuous_queries_enabled = on  

启动数据库

$ pipeline-ctl start  

创建流(从表里消费数据)
应用场景例子,

.1. 假设传感器会上传3个数据,分别是传感器ID,时间,以及采样值。
gid, crt_time, val
应用需要实时统计每分钟,每小时,每天,每个传感器上传的值的最大,最小,平均值,以及 count。
创建三个流视图,每个代表一个统计维度。
如下:

pipeline=# CREATE CONTINUOUS VIEW sv01  AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);   

pipeline=# CREATE CONTINUOUS VIEW sv02  AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);   

pipeline=# CREATE CONTINUOUS VIEW sv03  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);   

激活流

pipeline=# activate;  
ACTIVATE  

插入数据测试

pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());  
INSERT 0 1  
pipeline=# select * from sv01;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

pipeline=# select * from sv02;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

pipeline=# select * from sv03;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 00:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

压力测试:
假设有10万个传感器,传感器上传的取值范围1到100。

$ vi test.sql  
\setrandom gid 1 100000  
\setrandom val 1 100  
insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575  
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549  
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573  
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517  
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492  
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488  
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482  
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578  
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494  
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530  
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425  
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617  
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619  
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653  
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593  
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435  
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480  
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429  
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478  
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 10114821  
latency average: 0.235 ms  
latency stddev: 0.530 ms  
tps = 101089.580065 (including connections establishing)  
tps = 101101.483296 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003051        \setrandom gid 1 100000  
        0.000866        \setrandom val 1 100  
        0.230430        insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

每秒约处理10万记录,统计维度见上面的流SQL。

多轮测试后

pipeline=# select sum(count) from sv03;  
   sum      
----------  
 53022588  
(1 row)  

pipeline=# select * from sv01 limit 10;  
  gid  |     date_trunc      | max | min |          avg           | count   
-------+---------------------+-----+-----+------------------------+-------  
     1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
 53693 | 2015-12-15 13:47:00 |  68 |   1 |    28.0000000000000000 |     6  
   588 | 2015-12-15 13:47:00 |  88 |  11 |    47.6250000000000000 |     8  
 60154 | 2015-12-15 13:47:00 |  95 |   1 |    40.9090909090909091 |    11  
 38900 | 2015-12-15 13:47:00 |  90 |  17 |    57.2000000000000000 |     5  
 12784 | 2015-12-15 13:47:00 |  93 |  13 |    64.1250000000000000 |     8  
 79782 | 2015-12-15 13:47:00 |  60 |  16 |    43.1666666666666667 |     6  
  5122 | 2015-12-15 13:47:00 | 100 |   3 |    46.8333333333333333 |    12  
 97444 | 2015-12-15 13:47:00 |  98 |   9 |    59.5833333333333333 |    12  
 34209 | 2015-12-15 13:47:00 |  86 |  13 |    52.2857142857142857 |     7  
(10 rows)  

pipeline=# select * from sv02 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 91065 | 2015-12-15 14:00:00 | 100 |   0 | 51.4299065420560748 |   321  
 24081 | 2015-12-15 14:00:00 | 100 |   0 | 52.1649831649831650 |   297  
 29013 | 2015-12-15 14:00:00 | 100 |   0 | 50.9967213114754098 |   305  
 13134 | 2015-12-15 14:00:00 | 100 |   0 | 49.6968750000000000 |   320  
 84691 | 2015-12-15 14:00:00 | 100 |   0 | 49.5547445255474453 |   274  
 91059 | 2015-12-15 14:00:00 | 100 |   1 | 47.7536764705882353 |   272  
 50115 | 2015-12-15 14:00:00 | 100 |   1 | 49.4219269102990033 |   301  
 92610 | 2015-12-15 14:00:00 | 100 |   0 | 50.1197183098591549 |   284  
 36616 | 2015-12-15 14:00:00 | 100 |   1 | 48.8750000000000000 |   312  
 46390 | 2015-12-15 14:00:00 |  99 |   0 | 48.3246268656716418 |   268  
(10 rows)  

pipeline=# select * from sv03 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 68560 | 2015-12-15 00:00:00 | 100 |   0 | 51.2702702702702703 |   555  
 42241 | 2015-12-15 00:00:00 | 100 |   0 | 49.5266903914590747 |   562  
 64946 | 2015-12-15 00:00:00 | 100 |   0 | 48.2409177820267686 |   523  
  2451 | 2015-12-15 00:00:00 | 100 |   0 | 49.8153564899451554 |   547  
 11956 | 2015-12-15 00:00:00 | 100 |   0 | 51.2382739212007505 |   533  
 21578 | 2015-12-15 00:00:00 | 100 |   0 | 49.2959558823529412 |   544  
 36451 | 2015-12-15 00:00:00 | 100 |   0 | 51.1292035398230088 |   565  
 62380 | 2015-12-15 00:00:00 | 100 |   0 | 48.9099437148217636 |   533  
 51946 | 2015-12-15 00:00:00 | 100 |   0 | 51.0318091451292247 |   503  
 35084 | 2015-12-15 00:00:00 | 100 |   0 | 49.3613766730401530 |   523  
(10 rows)  

.2. 假设车辆运行过程中,每隔一段时间会上传位置信息,
gid, crt_time, poi
应用需要按天,绘制车辆的路径信息(把多个point聚合成路径类型,或者数组类型,或者字符串,。。。)。

假设有1000万量车,每辆车每次上传一个坐标和时间信息,(或者是一批信息)。
应用需求,
.2.1. 按天绘制车辆的路径信息
.2.2. 按小时统计每个区域有多少量车经过

创建流 (这里假设点信息已经经过了二进制编码,用一个INT8来表示,方便压力测试)

CREATE CONTINUOUS VIEW sv04  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);  

压力测试

$ vi test.sql  
\setrandom gid 1 10000000  
\setrandom poi 1 1000000000  
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370  
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347  
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368  
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353  
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363  
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366  
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358  
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352  
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345  
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353  
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352  
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342  
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358  
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360  
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373  
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355  
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370  
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442  
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852  
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 11112035  
latency average: 0.213 ms  
latency stddev: 0.836 ms  
tps = 111106.652772 (including connections establishing)  
tps = 111118.651135 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002939        \setrandom gid 1 10000000  
        0.000887        \setrandom poi 1 1000000000  
        0.209177        insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  

pipeline=# select * from sv04 limit 3;  
  448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}  
 7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}  
 8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}  

.3. 按交警探头为单位,统计每个探头采集的车辆信息。
例如
.3.1 以车辆为单位,统计车辆的探头位置信息,串成轨迹数据。
.3.2 以探头为单位,统计每个路口的车流信息。(假设一个探头对应一个路口)

第一个需求和前面的绘制车辆轨迹例子一样,统计路口流量信息则是以探头ID为单位进行统计。
用法都差不多,不再举例

.4. 实时股价预测。
可以结合madlib或者plr进行多元回归,选择最好的R2,根据对应的截距和斜率推测下一组股价。
需要用到UDF,具体的用法参考我以前写的文章。
这里不再举例。

.5. 商场WIFI传感器的信息实时统计。
根据WIFI提供的位置信息,实时统计每个店铺的人流量。店铺的人均驻留时间,总计驻留时间。

.6. 假设你的数据处理场景,PG现有的函数无法处理怎么办?没问题,PG提供了自定义UDF,数据类型,操作符,索引方法等一系列API。你可以根据业务的需求,在此基础上实现。

用法还有很多,无法穷举。

下面再结合一个当下非常流行的消息队列,pipelineDB可以实时的从消息队列取数据并进行实时计算。
例子:
在本地起一个nginx服务端,并且使用siege模拟HTTP请求,nginx将记录这些行为,存储为JSON格式到文件中。
在本地起kafka服务端,使用kafkacat将nginx的访问日志不断的push到kafka。
在pipelinedb中订阅kafka的消息,并实时处理为想要的统计信息,(WEB页面的访问人数,延迟,等信息)

安装kafka

http://kafka.apache.org/07/quickstart.html  

# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz  
# tar -zxvf kafka_2.10-0.8.2.2.tgz  

# git clone https://github.com/edenhill/librdkafka.git  
# cd librdkafka  
./configure  
make  
make install  

# git clone https://github.com/lloyd/yajl.git  
# cd yajl  
./configure  
make  
make install  

# vi /etc/ld.so.conf  
/usr/local/lib  
# ldconfig  

# git clone https://github.com/edenhill/kafkacat.git  
# cd kafkacat  
./configure  
make  
make install  

安装siege和nginx

# yum install -y siege nginx  

创建一个nginx配置文件,记录访问日志到/tmp/access.log,格式为json

cd /tmp  

cat <<EOF > nginx.conf  
worker_processes 4;  
pid $PWD/nginx.pid;  
events {}  
http {  

    log_format json   
    '{'  
        '"ts": "\$time_iso8601", '  
        '"user_agent": "\$http_user_agent", '  
        '"url": "\$request_uri", '  
        '"latency": "\$request_time",  '  
        '"user": "\$arg_user"'  
    '}';  

    access_log $PWD/access.log json;  
    error_log $PWD/error.log;  

    server {  
        location ~ ^/ {  
            return 200;  
        }  
    }  
}  
EOF  

启动nginx

nginx -c $PWD/nginx.conf -p $PWD/  

配置主机名

# hostname  
digoal.org  
# vi /etc/hosts  
127.0.0.1 digoal.org  

启动kafka

cd /opt/soft_bak/kafka_2.10-0.8.2.2  
bin/zookeeper-server-start.sh config/zookeeper.properties &  
bin/kafka-server-start.sh config/server.properties &  

产生一个随机URL文件

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done  

使用siege模拟访问这些URL,nginx会产生访问日志到/tmp/access.log

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1  

/tmp/access.log举例,格式为JSON  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002",  "user": "18583"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003",  "user": "24827"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003",  "user": "3988"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003",  "user": "18433"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001",  "user": "10801"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001",  "user": "4915"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001",  "user": "5367"}  

将访问日志输出到kafkacat,推送到kafka消息系统,对应的topic为logs_topic。

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &  

原始的消费方式如下:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2  
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning  
# Ctrl+C  

接下来我们使用pipelinedb来实时消费这些消息,并转化为需要的统计结果。

CREATE EXTENSION pipeline_kafka;  
SELECT kafka_add_broker('localhost:9092');  -- 添加一个kafka broker(kafka集群的一个节点)  
CREATE STREAM logs_stream (payload json);  -- 创建一个流映射到,kafka消息系统。  
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;   -- 创建一个流视图,实时消费,处理kafka消息。  
SELECT kafka_consume_begin('logs_topic', 'logs_stream');  -- 开始消费指定的topic,logs_topic,  
 kafka_consume_begin   
------------------  
 success  
(1 row)  

查询流视图,可以获得当前NGINX的访问统计。

SELECT * FROM message_count;  
 count   
--------  
  24  
(1 row)  

SELECT * FROM message_count;  
 count  
--------  
  36  
 success  
(1 row)  

接下来做一个更深入的实时分析,分析每个URL的访问次数,用户数,99%用户的访问延迟低于多少。

/*   
 * This function will strip away any query parameters from each url,  
 * as we're not interested in them.  
 */  
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')  
    RETURNS text  
AS 'textregexreplace_noopt'    -- textregexreplace_noopt@src/backend/utils/adt/regexp.c  
LANGUAGE internal;  

CREATE CONTINUOUS VIEW url_stats AS  
    SELECT  
        url, -- url地址  
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,  -- 99%的URL访问延迟小于多少  
        count(DISTINCT user) AS uniques,  -- 唯一用户数  
    count(*) total_visits  -- 总共访问次数  
  FROM  
    (SELECT   
        url(payload->>'url'),  -- 地址  
        payload->>'user' AS user,  -- 用户ID  
        (payload->>'latency')::float * 1000 AS latency_ms,  -- 访问延迟  
        arrival_timestamp  
    FROM logs_stream) AS unpacked  
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'  
 GROUP BY url;  

CREATE CONTINUOUS VIEW user_stats AS  
    SELECT  
        day(arrival_timestamp),  
        payload->>'user' AS user,  
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,  
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,  
        count(DISTINCT url(payload->>'url')) AS unique_urls,  
        count(*) AS total_visits  
    FROM logs_stream GROUP BY payload->>'user', day;  

-- What are the top-10 most visited urls?  
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;  
      url      | total_visits   
---------------+--------------  
 /page62/path4 |        10182  
 /page51/path4 |        10181  
 /page24/path5 |        10180  
 /page93/path3 |        10180  
 /page81/path0 |        10180  
 /page2/path5  |        10180  
 /page75/path2 |        10179  
 /page28/path3 |        10179  
 /page40/path2 |        10178  
 /page74/path0 |        10176  
(10 rows)  


-- What is the 99th percentile latency across all urls?  
SELECT combine(p99) FROM url_stats;  
     combine        
------------------  
 6.95410494731137  
(1 row)  

-- What is the average conversion rate each day for the last month?  
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;  
          day           |            avg               
------------------------+----------------------------  
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000  
(1 row)  

-- How many unique urls were visited each day for the last week?  
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;  
          day           | combine   
------------------------+---------  
 2015-09-15 00:00:00-07 |  100000  
(1 row)  

-- Is there a relationship between the number of unique urls visited and the highest conversion rates?  
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats  
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;  
 unique_urls |  conversion_rate    
-------------+-------------------  
          41 |  2.67121005785842  
          36 |  2.02713894173361  
          34 |  2.02034637010851  
          31 |  2.01958418072859  
          27 |  2.00045348712296  
          24 |  1.99714899522942  
          19 |  1.99438839453606  
          16 |  1.98083502184886  
          15 |  1.87983011139079  
          14 |  1.84906254929873  
(1 row)  

使用PipelineDB + kafka,应用场景又更丰富了。

如何构建更大的实时消息处理集群?
规划好数据的分片规则(避免跨节点的统计),如果有跨节点访问需求,可以在每个节点使用维度表,来实现。
例如每天要处理 万亿 条消息,怎么办?
根据以上压力测试,平均每台机器每秒处理10万记录(每天处理86亿),计算需要用到116台PostgreSQL。是不是很省心呢?
一个图例:
每一层都可以扩展
从lvs到 haproxy到 kafka到 PostgreSQL到 离线分析HAWQ。

1

用云栖社区APP,舒服~

【云栖快讯】最强攻略!10分钟上手阿里云产品,快速搭建自己的专属网站。云服务器配置选型、实例创建、搭建云端网站、云数据库操作、玩转云存储,统统不在话下。  详情请点击

网友评论

1F
xingdai

德歌,请受小弟一拜

2F
computerlover

看傻了,膜拜。

3F
odoo123.com

眼花,,,傻眼了。。。

4F
桑木乐

我们这边就是深度pipelineDB用户,并且在这基础上再做自己的开发。从版本0.7.8开始使用到现在,不知道你们那边是否在使用?

德哥 赞同
5F
轨迹16

德哥,咨询一个问题,假设我现在是要保存历史数据,但是又要实时统计,是直接操作pg和pipelinedb两个数据库好呢?还是pg和pipeline怎么传递数据呢

6F
xaojan_199977

真不错,收藏

关注
德哥
公益是一辈子的事, I'm digoal, j...
1222篇文章|822关注
阿里云流计算(Aliyun StreamCompute)是运行在阿里云平台上的流式大数据分析平台,提供给用户在云...

是基于语音识别、语音合成、自然语言理解等技术,为企业在多种实际应用场景下,赋予产品“能听、会说、懂你”式的智能人...

国内建站市场NO.1

为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
从春运抢票到相亲,春节背后的这些技术,你知道多少?

从春运抢票到相亲,春节背后的这些技术,你知道多少?