本文分享了初次使用PyODPS(Python版的Open Data Processing Service)的心路历程。作者通过实际案例,深入浅出地探讨了PyODPS相较于传统ODPS SQL在数据处理上的灵活性与便捷性,特别是在处理复杂JSON字段统计与多条件筛选方面展现出的独特优势。同时,文章诚实地指出了PyODPS学习曲线陡峭、运行效率较低及文档细节需完善等不足。借助一系列代码示例,作者不仅揭示了PyODPS中DataFrame操作的精髓,还贴心地总结了调试技巧与最佳实践,为读者搭建起一套实用的数据处理脚手架。
背景介绍
刚开始接触ODPS时,最初有一个需求比较简单,通过ODPS SQL的方式很快得到了解决。
不过最近收到了一个稍微棘手一点的数据处理需求:
统计某些展厅的uv,展厅商品的uv,计算一个比例统计展厅中某一个json字段内,满足某些条件的数量统计这里先总结一下PyODPS的优势:
灵活的row handle,能灵活地进行数据处理。事实上,需求中也需要对一个json对象进行统计分析,这点上用SQL会非常痛苦。可以全量加载内容比较少的表、文件资源,降低表处理逻辑上的复杂性。而SQL在这点上没有优势,只能疯狂的join。优秀的可配置能力,比如说在我这个需求中出现了需要hardCode配置的多关键字过滤复用SQL处理逻辑,在我的场景里,我需要统计总的比例,与最近15天的比例。但统计逻辑是一样的,不同的是数据的范围~劣势也很明显:
基本就是写SQL的思路写python。DataFrame基本就是以SQL的表达对数据处理的封装。运行贼慢,每次调试时间很久。不能说文档不全面,但是很多语法编译都能过,实际运行没效果。针对pyodps与python的区别, 我用一段条件判断代码来做个解释:
# 这段是生效的,最后的sql,包含where key in (?) and source == "A"uv_table = visit_table[ visit_table.key.isin(target_key_list) \ & (visit_table.source == "A")].groupby(visit_table.target_id)# 这段是有问题的。最后的sql,只有where source == "A"uv_table = visit_table[ visit_table.key.isin(target_key_list) and (visit_table.source == "A")].groupby(visit_table.target_id)# 这段也是ok的,看起来是官方文档中推荐的写法uv_table = visit_table[ visit_table.key.isin(target_key_list) & (visit_table.source == "A")].groupby(visit_table.target_id)# 这段也是ok的,这里换行没有任何问题,也就是说\可加可不加。uv_table = visit_table[ visit_table.key.isin(target_key_list) & (visit_table.source == "A")].groupby(visit_table.target_id)# 这段就是不行的,会丢失in。对应到SQL就是 where true and source == "A"uv_table = visit_table[ visit_table.key in target_key_list & (visit_table.source == "A")].groupby(visit_table.target_id)上面的代码示例,全部都可以正常编译且执行,但是从结果上来说却大有不同:
在PyOdps对象中,使用了python语言特性的判断条件,如a in a_list、a is None这类逻辑均不会生效,会被忽略。取而代之,应该使用a.isin(a_list)、a.isnull()这样的pyodps方法。所以先解释下为啥拿判断条件开头:已经被坑了n次了,编译全过,运行完成,但结果却经常没生效某一些条件,导致来来回回全文检查。甚至我感觉这个是目前来说最容易踩坑的点。
最后推荐的判断条件写法如下:
uv_table = visit_table[ (visit_table.key.isin(target_key_list)) & (visit_table.source == "A")].groupby(visit_table.target_id)每个判断条件均用()包裹,并换行or不换行&与、|或、~非,分割条件。
从这个点延伸开,我们已经发现了,PyODPS中,有两种思路。一种是面向DataFrame而另一种则是面向纯Python。
正常来说通篇均为面向DataFrame,除了以下情况:
通过TableReader、table.head(10)等方法将表数据读取为python的list对象数据。后续的处理逻辑均需要用python去解决。@output代码处理逻辑,全部为python的能力去解决。class Agg,这种自定义聚合代码,均为Python的逻辑进行处理。而所有与DataFrame相关的逻辑,都必须查文档来处理,比如说对json的处理,我们就需要使用df.func.get_json_obj(table_name.field),而不能使用python的json.loads()。
数据的空判断则需要用a.isnull()或者a.notnull()等方法。
pyodps文档:https://pyodps.readthedocs.io/zh-cn/stable/api-df.html
写完了脚本回来一看就有种理所当然的感觉~不得不说设计上还是巧妙的。
但是这里不得不提一个点:
PyODPS如果用了错误的方式调用,则也不会错误,必须仔细检查我们的SQL。是否达到我们预期的想法。所以调试我们的PyODPS,就是重中之重!
同时,对于去重来说,官方文档的方法好像是有问题的。
# 这段只会提示no field in XXXXGroupBySequence(具体是啥记不住了)show_room_uv = show_room_uv.agg(show_room_uv=show_room_uv.visitor_id.unique())# 反复验证后,正确的去重计数是nunique()吐槽结束,接下来开始本期的重点。
PyODPS开发的基本脚手架咱们的这个数据处理的功能非常适合以一个基础的脚手架起步~这里我根据自己的开发经验总结了一个:
from odps.df import DataFrame, Scalar, func, output# args也是一个内置对象,就是我们在调度配置中的参数bizdate = args["bizdate"]output_table = "xxxx"# 加载我们的数据表。o是一个内置对象。也有o.get_table("xxx").to_df()的写法。data_process_table = DataFrame(o.get_table("xxxx"))# 加载我们的数据import jsonfilters_words = []# filters_words.txt就是我们放在MaxCompute -> 资源下的文件。with o.get_resource('filters_words.txt').open('r', encoding='utf-8') as f: filters_words = json.loads(f.read())# 这里就是odps语法了,这里等同于 where content in (?, ?)。包括说content is null,就是content.isnull()。# 在DataFrame的范围内,需要遵从官方的API。data_process_table[ data_process_table.content.isin(filters_words)]# 如果要like怎么处理呢?data_process_table = data_process_table.query( " or ".join([f"content.contains('{x}')" for x in filters_words]))@output(["content_len"], ["int64"])def handle(row): # 这里是按行处理数据。如果要做reduce之类的多行处理,要通过agg自定义聚合的逻辑。 # python的数据处理 yield len(row.content)# 很有意思的列处理,这个操作相当于handle处理完多了一列content_len。# 另外我们可以理解每次[]处理完之后,是拿到了一个新的DataFrame对象。res_t = data_process_table[ data_process_table, data_process_table.apply(handle, axis = 1)]# 这一部分是后补的,纯手撕代码。class Agg(object): def buffer(self): # 定义你心仪的聚合结果对象。自定义聚合的本质就是将结果加到这个buffer对象里 return { "merge_length": 0 } def __call__(self, buffer, content_len): if content_len is not None: # 当前聚合对象数据合并。数据被分成了无数个小片,这是其中一片的n条数据聚合 buffer["merge_length"] += content_len def merge(self, buffer, pbuffer): # 和其他的聚合对象进行合并~ buffer["merge_length"] += pbuffer["merge_length"] def getvalue(self, buffer): return buffer["merge_length"]# 调用聚合方法to_agg = agg( [ # output输出的新字段,我们作为聚合的value去处理 res_t.content_len ], Agg, rtype="int64", )# 用id去做聚合,对content_len的值进行运算,最后输出一个新字段valueres_t = res_t.groupby("id").agg(value=to_agg)# 此时res_t的列有 id、value,两个字段。# 调试用,看看数据,最后换成persist持久化到output表里。res_t.head(10)# 最后要写数据库了,直接用下面的方法.# res_t.persist(output_table, partition=f"ds='{bizdate}'", drop_partition=True, create_partition=True)在总结脚手架的时候,不得不说PyODPS是一个精妙的设计,估计是再也回不去写SQL的日子了。
PyODPS核心思想就两点:
在DataFrame中做列处理和聚合。删除列,按条件过滤,整列计算。在handle中做行处理,同时定义按行处理后的输出列。难以分析的学习,直接用代码分析~核心文档,写的过程中还是需要不断借鉴:
列运算聚合操作,里面的unique应该是过期了,用nunique。一旦聚合后就是一个GroupBy对象,需要调用agg对聚合结果处理后,回到DataFrame另外还得吐槽一句,确实很难写。
# 看着是不是没问题。直接报错.agg Syntax Errorclosely_count_table = data_process_table.groupby('content_len').agg(closely_count = data_process_table.content_len)这个写法里有两个问题:
第一个是我怎么都摸不清的换行问题,即使不是这个情况,有的时候换行就会解析不了,包括条件判断。第二个呢,就是对象问题,agg函数的入参应该是一个GroupBy对象,而不是DataFrame。但是,自定义聚合连着写又没啥问题,只能说最终解释权都在PyOdps。所以这里这么写是最保险的。
closely_count_table = data_process_table.groupby('content_len')closely_count_table = closely_count_table.agg(closely_count = closely_count_table.content_len)即使同为DataFrame也有一样的问题,不要妄想用多个[][]来完成多次处理。第一个[]内可以用当前的DataFrame,但第二个[]就不一样了,它需要的是第一个[]返回的DataFrame对象。举个例子:
# 过滤了content_len小于等于10的数据,并输出content.# 但这个大概率是错的。虽然我没试过。data_process_table = data_process_table[ data_process_table.content_len > 10][ data_process_table.content]# 你可以这么写,因为过滤content_len的DataFrame仍然有content字段,通过字符是可以取出来的。data_process_table = data_process_table[ data_process_table.content_len > 10]["content"]# 保守写法data_process_table = data_process_table[data_process_table.content_len > 10]data_process_table = data_process_table[data_process_table.content]关于list type:
@output( [ "list_value" ], ["list<string>"])def handle_list_type(row): yield [["test1", "test2"]]试了很多次才得到这个结果。看到结果的瞬间一下次就想明白了。
用这个例子做个解释:
@output( [ "int_value", "string_value" ], ["int64", "string"])def handle_list_type(row): yield 10, "test"这里的10, "test"是一个元组,恐怕用了list()之类的方法对返回进行了包装。
我最初直接返回["test1", "test2"]的情况下,等同于返回2个string。
所以必须再包一层。想明白了这个原理,那么下面的写法会更加优雅:
@output( [ "list_value" ], ["list<string>"])def handle_list_type(row): res = ["test1", "test2"] yield res, #这里有一个逗号结语PyODPS的列处理与聚合功能、行处理自定义逻辑,为大数据处理提供了新的视角和工具,让作者乃至更多开发者在告别纯SQL编写的同时,开启了数据处理的新篇章。总之,拥抱变化,勇于实践,PyODPS的潜力等待着每一位数据工程师去挖掘。
团队介绍
我们是淘天集团的场景智能技术团队,作为一支专注于通过AI和3D技术驱动商业创新的技术团队, 依托大淘宝丰富的业务形态和海量的用户、数据, 致力于为消费者提供创新的场景化导购体验, 为商家提供高效的场景化内容创作工具, 为淘宝打造围绕家的场景的第一消费入口。我们不断探索并实践新的技术, 通过持续的技术创新和突破,创新用户导购体验, 提升商家内容生产力, 让用户享受更好的消费体验, 让商家更高效、低成本地经营。