使用MaxCompute进行纽约的士拼车分析

简介: 我们通过将纽约的士的时空数据转换成为图的方法,使用odps自带的graph分析工具来进行拼车分析。

前言

最近几年以来出现的共享的士(Uber,Lyft,滴滴)给人们的出行带来了极大的便利。随着烧钱大战的结束,中美市场大局已定,为了维持高估值(Uber 80 Billion $, 滴滴30 Billion $),缩减亏损,增长净利润,继而进入上市流程,几大公司都开始发掘盈利的规则。带来的影响是,共享出行的用户们发现:1)车越来越难打,价钱越来越高;2)使用拼车会大幅度增长时间损耗,而带来的金钱节约却并不明显;3)司机发现盈利有限,真正愿意开车的司机越来越少,某拼车公司正在慢慢的转变成为一个的士信息服务平台。这与其做成让用户通过手机便捷,实惠出行的愿景大相径庭。

我想说一个故事,作为这篇文章所要解决的的一个问题的引子。那天天气炎热,我正在公交等去高铁站的公共汽车,由于太热,我决定打一辆车,当我拦下来一辆车后跟司机说15块钱到车站,司机答应了。而此时跟我同时等公交的另外一个陌生人也过来问我们要到哪里去,当他得知目的地是火车站以后,表示也想搭车,这时候司机坐地起价要他加10块钱。这位陌生人想想觉得可以,就加了10块钱给他。设想一下,如果我在拦的士之前就知道这个陌生人也想去火车站,两人决定一起打车最后的价钱是怎样的结果?也许15-20块钱就可以搞定问题,而不是最终的25块。而事实上,如果大家都具有这样的能力,我想对的士司机来说也可以增长盈利,因为更多的打车需求会让他们的单数变多从而增加总的流水。

回到某拼车公司的话题,目前假设从阿里巴巴西溪总部出发到杭州东站(路径A),一个人打车的费用是100,那么第二个人拼车也是到杭州东站附近(路径B),这时候他可能需要付的价钱是90块钱,也就是说总价190块钱。大家是否认为司机会拿到这部分的差额呢?事实上,的士司机只拿到了他们共享路程的费用((A /union B)* 20%),而不是((A + B) * 20%),如果A和B完全相等的话,那么司机基本上不会拿到更多的钱,这部分多出来的利润就被某拼车公司完全拿去了。 为什么某拼车公司会这么做而且敢这么做呢?因为他们不但垄断了共享车的平台,也垄断了信息分享的平台,一个人在上车之前他是不知道另外一个人跟在类似的时间段去类似的地方的。如若这两个人在上车前就已经知道了对方的目的地,并联合起来打一辆车的话,那么这个博弈的格局就完全不同了。我们写此文的目的就是要分析真实世界中这样的需求是否真实存在,值不值得我们投入精力去开发或者利用一个已有的信息平台让有类似出行需求的人在按下打车”的按钮前就找到对方,从而增加议价的权利。

本文使用的数据来自于Todd Scheider维护的纽约的士数据[1],在此文中只分析Yellow Cab的数据,因为其时间跨度较长(2009-2016),同时覆盖纽约市区的范围也更广(所有纽约5个大区)。使用的阿里云大数据的技术有:MaxCompute的Tunnel,Sql,UDF,MapReduce,Graph和quick BI。实验机为阿里云的ECS最低配的机器。所有开发实验工作均在公有云上进行。本文的结构如下:第二节将介绍数据分析的技术细节,第三节为实验结果分析。

技术方案

数据导入

首先我们将csv格式的数据使用Tunnel导入到ODPS表中,使用的表的schema如下:

create table nyc_taxi_raw_small (vid                   bigint,
                                 vendor_name           string,
                                 Trip_Pickup_DateTime  string,
                                 Trip_Dropoff_DateTime string,
                                 Passenger_Count       string,
                                 Trip_Distance         double,
                                 Start_Lon             double,
                                 Start_Lat             double,
                                 Rate_Code             string,
                                 store_and_forward     string,
                                 End_Lon               double,
                                 End_Lat               double,
                                 Payment_Type          string,
                                 Fare_Amt              double,
                                 surcharge             double,
                                 mta_tax               double,
                                 Tip_Amt               double,
                                 Tolls_Amt             double,
                                 Total_Amt             double);

我们需要给每一个事件指定一个唯一的ID,用于后续的图分析,唯一ID的指定我们可以引用用[2]中的技术,但是当数据量比较大时,这个方法无法保证ID的唯一性,经过一系列调研后,发现这个ID生成在ODPS中是一个比较难的问题,所以我选择在tunnel导入之前就计算好每个记录的ID,使用的Tunnel导入的script如下:

touch ../data/experiments/counts.txt
for YEAR in 2009 2010 2011 2012 2013 2014 2015 2016
do
    for MONTH in 01 02 03 04 05 06 07 08 09 10 11 12
    do
        wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_${YEAR}-${MONTH}.csv -O /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        python ../python/add_vid.py /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv /home/zhaoming/NYC_TAXI_DATA/data/experiments/counts.txt
        /home/zhaoming/odps_console/bin/odpscmd -e "tunnel upload /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out nyc_taxi_raw_small -dbr true;"
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out
    done
done

计算vid的python代码如下:

import sys

class VidAppender:

    def __init__(self, infile, cntfile):
        self.infile  = infile
        self.oufile  = infile + ".out"
        self.cntfile = cntfile
        reader = open(cntfile, "r")
        self.count  = 0
        for line in reader:
            self.count += int(line.replace("\n", ""))

    def process(self):
        reader = open(self.infile, "r")
        writer = open(self.oufile, "w")
        cnt = 0
        for line in reader:
            try:
                if line.startswith("vendor_name") or line.startswith("^M") or line.startswith("VendorID"):
                    writer.write("vid,"+line)
                    continue
                writer.write(str(self.count)+","+line)
                cnt += 1
                self.count += 1
            except Exception,e:
                print Exception,":",e
                print line
                pass
        reader.close()
        writer.close()
        cntwriter = open(self.cntfile, "a")
        cntwriter.write(str(cnt)+"\n")
        cntwriter.close()

def main(argv):
    appender = VidAppender(argv[1], argv[2])
    appender.process()

if __name__ == "__main__":
    main(sys.argv)

这样我们得到的数据一共有:866,796,462条数据。       

数据清洗和图生成

首先我们要建立一张表来存储需要被计算的内容:

create table nyc_taxi_data (vid                   bigint, 
                            trip_pickup_datetime  string, 
                            trip_dropoff_datetime string, 
                            start_lon             double, 
                            start_lat             double, 
                            end_lon               double, 
                            end_lat               double);

并且将数据注入:

insert overwrite table nyc_taxi_data 
    select vid, 
           trip_pickup_datetime, 
           trip_dropoff_datetime, 
           start_lon,
           start_lat, 
           end_lon, 
           end_lat 
from nyc_taxi_raw_small;

我们使用一个点(Vertex)来表示一个打车事件,假设两个打车事件之间的起始时间在100秒内,起始距离在200米内,终点距离在500米内,我们认为这两个打车事件具备拼车的可能性(这个标准可以调整,但是我认为这个标准已经比较严格)。那么我们用一条边(Edge)将这两个点连接起来,将所有可能拼车的点用边相连,我们便得到了一个图(Graph)。图的schema如下所示:

 

生成图和进行数据清理我们使用MapReduce来进行,Mapper和Reducer的代码如下显示:

public void map(long recordNum, Record record, TaskContext context) throws IOException {
		String pickup = record.getString(1);
		String keyStr = pickup.split(" ")[0] + "-" + pickup.split(" ")[1].split(":")[0];
		key.set("pt", keyStr);
		value.set("time", getTS(pickup));
		value.set("vid", record.getBigint(0));
		value.set("start_lon", record.getDouble(3));
		value.set("start_lat", record.getDouble(4));
		value.set("end_lon", record.getDouble(5));
		value.set("end_lat", record.getDouble(6));
		try {
			if (record.getDouble(3) != 0 &&
			    record.getDouble(4) != 0 &&
			    record.getDouble(5) != 0 &&
			    record.getDouble(6) != 0) {
				context.write(key, value);
			}
		} catch (NullPointerException e) {
			System.out.println("record is broken!");
		}
	}
Mapper做的事情很简单,就是生成一个以小时为单位的Key,进行数据清洗(坐标值不能为0)同时将时间转换成为Time Stamp。
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
		Vector<Event> v = new Vector<Event>();
		HashMap<Long, Vector<Long>> hmap = new HashMap<Long, Vector<Long>>();
		while (values.hasNext()) {
			Record r = values.next();
			Event e = new Event(r.getBigint("vid"), r.getBigint("time"), r.getDouble("start_lon"),
					r.getDouble("start_lat"), r.getDouble("end_lon"), r.getDouble("end_lat"));
			v.add(e);
		}

		Collections.sort(v);

		for (int i = 0; i < v.size(); i++) {
			for (int j = i + 1; j < v.size(); j++) {
				long time_diff = cal_time(v.get(i), v.get(j));
				double from_dist = cal_from_dist(v.get(i), v.get(j));
				double to_dist = cal_to_dist(v.get(i), v.get(j));
				if (time_diff < 100) {
					if (from_dist < 200 && to_dist < 500) {
						long from = v.get(i).vid;
						long to = v.get(j).vid;
						if (hmap.get(from) == null)
							hmap.put(from, new Vector<Long>());
						if (hmap.get(to) == null)
							hmap.put(to, new Vector<Long>());
						hmap.get(from).add(to);
						hmap.get(to).add(from);
					}
				} else {
					break;
				}
			}
		}

		Set set = hmap.entrySet();
		Iterator iterator = set.iterator();
		while (iterator.hasNext()) {
			String ret = "";
			Map.Entry mentry = (Map.Entry) iterator.next();
			Vector<Long> tmp = hmap.get(mentry.getKey());
			for (int j = 0; j < tmp.size(); j++) {
				if (j != tmp.size() - 1) {
					ret += Long.toString(tmp.get(j)) + ":1,";
				} else
					ret += Long.toString(tmp.get(j)) + ":1";
			}
			output.set(0, mentry.getKey());
			output.set(1, ret);
			context.write(output);
		}
	}

Reducer则负责将本Key内的所有数据进行距离计算(起始时间,起始位置,终点位置),并输出可连接的点。这里面为了提升效率,我们将数据按照时间进行排序,超过时间范围的则不计算,事实上可以提升效率的方法有很多种,比如说使用R-Tree等等 [3]。将代码打包并进行计算的odps指令如下(注意这段代码是可以指定执行的mapper reducer个数的):

create resource jar /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar -f;
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataTransform/NYCTaxiDataTransform nyc_taxi_data nyc_taxi_graph 256;

经过计算,一共有497,819,232个点和1,373,388,220条边,也就是说有这么多个打车事件与其它事件有拼车可能性。

拼车可能性计算

当两人拼车时,我们使用边即可以表达这个关系,三人拼车时三角形可以进行计算。但是这里面存在着一个问题,就是当一个点已经被算到属于某条边的拼车事件中去时,那么其在其它边上的拼车事件就不能被计算(我们在这里使用的策略是只有小id的点负责计算边)。对于三角形的计算也应该同样遵循这样的规则。首先计算边的算法我们叫做IndependentEdgeCount,其table schema为:
create table nyc_taxi_independent_edge(vid bigint, count bigint);
计算边数量的核心算法为:
public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {
			if (context.getSuperstep() == 0L) {
				// sends a message with its ID to all its outgoing neighbors
				Tuple t = new Tuple();
				t.append(getId());
				context.sendMessageToNeighbors(this, t);
				
				boolean hasLess = false;
				int count = 0;
				for(int i=0; i<this.getValue().getAll().size();i++)
				{
					if(Long.parseLong(this.getValue().get(i).toString())<Long.parseLong(this.getId().toString()))
						hasLess = true;
				}
				if(!hasLess && getValue().getAll().size() != 0)
					count = 1;
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
					
			}
		}
用来保存三角形计数的table的schema为:
create table nyc_taxi_triangle(vid bigint, count bigint);
三角形的计算我们使用ODPS标准的例子,具体见[4],但是因为不能重复将已经计算的三角形作为拼车的例子,所以我们需要将算法进行改进,计算过的三角形不再列入进一步的计算中,同时因为我们使用的图为无向图,所以相比较[4]的例子,我们只需要两轮迭代,经过改进后的算法代码如下:
		public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {

			if (context.getSuperstep() == 0L) {
				this.getValue().append(this.getId());
				context.sendMessageToNeighbors(this, getValue());
			} else if (context.getSuperstep() == 1L) {
				long my_v = Long.parseLong(this.getId().toString());
				int count = 0;
				for (Tuple msg : msgs) {
					long from_v = Long.parseLong(msg.getAll().get(msg.getAll().size() - 1).toString());
					for (int i = 0; i < msg.getAll().size() - 1; i++) {
						long inter_v = Long.parseLong(msg.getAll().get(i).toString());
						if (!this.getValue().getAll().contains((LongWritable) msg.getAll().get(i)) && my_v < from_v
								&& my_v < inter_v) {
							count = 1;
						}
					}
				}
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
			}
		}
计算独立三角形(边数)的odps命令为:
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataGraphAnalysis/TriangleCount nyc_taxi_graph nyc_taxi_triangle;

数据分析

我们找到的独立边的个数为 168,841,988,独立三角形的个数为 102,091,976,这样可以推论在现有的拼车标准下,可拼车倾向的比例为:
两人拼车: 168,841,988*2/ 866,796,462 = 38.96%
我们想要分析具体在哪个时间段的拼车需求比较多,那么先需要把这个独立边和独立三角形的信息映射回原表上,具体使用JOIN操作:
create table nyc_taxi_data_join_independent_edge 
like nyc_taxi_data;

insert overwrite table nyc_taxi_data_join_independent_edge 
select a.vid, 
       a.trip_pickup_datetime, 
       a.trip_dropoff_datetime, 
       a.start_lon, 
       a.start_lat, 
       a.end_lon, 
       a.end_lat 
from nyc_taxi_data a 
INNER JOIN 
(select * from nyc_taxi_independent_edge where count != 0) b 
on a.vid = b.vid;
在quick BI上建立一个SQL数据源:
select SUBSTR(trip_pickup_datetime, 12, 2) hour, 1 cnt
 from odps_zhaoming.nyc_taxi_data_join_independent_edge
得到的图表如下:
8c5164c1ac8ef1a22d60fc0589708d3f7155a71d
三人拼车: 102,091,976*3/ 866,796,462 = 35.33%
使用同样的流程获得的图为:
3a3039c4be8fb862601c89cbbfdf54ea2009f4b5
可以看到,两人和三人拼车基本遵循类似的规律,就是在晚上7,8,9点时下班时左右达到高峰,不同的是,三人拼车在早上7,8,9点左右会有一个与下午类似的高峰。


引用

[1] https://github.com/toddwschneider/nyc-taxi-data 

[2] MaxCompute SQL Row_Sequence 实现列自增长 https://yq.aliyun.com/articles/118901?spm=5176.8091938.0.0.CxYtZS

[3] Guttman, A. (1984). "R-Trees: A Dynamic Index Structure for Spatial Searching". 

[4] 开放数据处理服务ODPS Graph用户指南

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
14天前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
26天前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
1月前
|
存储 消息中间件 大数据
Go语言在大数据处理中的实际应用与案例分析
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理中的实际应用,通过案例分析展示了Go语言在处理大数据时的优势和实践效果。文章首先介绍了大数据处理的挑战与需求,然后详细分析了Go语言在大数据处理中的适用性和核心技术,最后通过具体案例展示了Go语言在大数据处理中的实际应用。
|
1月前
|
数据采集 运维 数据挖掘
API电商接口大数据分析与数据挖掘 (商品详情店铺)
API接口、数据分析以及数据挖掘在商品详情和店铺相关的应用中,各自扮演着重要的角色。以下是关于它们各自的功能以及如何在商品详情和店铺分析中协同工作的简要说明。
|
3月前
|
关系型数据库 MySQL Serverless
高顿教育:大数据抽数分析业务引入polardb mysql serverless
高顿教育通过使用polardb serverless形态进行数据汇总,然后统一进行数据同步到数仓,业务有明显高低峰期,灵活的弹性伸缩能力,大大降低了客户使用成本。
|
3月前
|
机器学习/深度学习 数据采集 算法
大数据分析技术与方法探究
在当今信息化时代,数据量的增长速度远快于人类的处理能力。因此,如何高效地利用大数据,成为了企业和机构关注的焦点。本文将从大数据分析的技术和方法两个方面进行探究,为各行业提供更好的数据应用方向。
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
大数据分析的技术和方法:从深度学习到机器学习
大数据时代的到来,让数据分析成为了企业和组织中不可或缺的一环。如何高效地处理庞大的数据集并且从中发现潜在的价值是每个数据分析师都需要掌握的技能。本文将介绍大数据分析的技术和方法,包括深度学习、机器学习、数据挖掘等方面的应用,以及如何通过这些技术和方法来解决实际问题。
47 2
|
3月前
|
机器学习/深度学习 人工智能 运维
大数据分析:探索信息世界的钥匙
在当今信息爆炸的时代,大数据分析成为挖掘宝藏般的技术和方法。本文将介绍大数据分析的基本概念、技术与方法,并探讨其在商业、科学和社会领域中的广泛应用。从数据收集和预处理到模型构建和结果解读,大数据分析为我们揭示了信息世界的钥匙,为决策者提供了有力的支持。
|
2月前
|
API
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
104 0
|
数据采集 数据可视化 算法
电商API接口的大数据分析与挖掘技巧
随着电商行业的快速发展,电商平台上的交易数据量也越来越大。如何对这些数据进行分析和挖掘,从中获取有价值的信息,已经成为电商企业和开发者关注的重点。本文将介绍电商API接口的大数据分析与挖掘技巧。