本文主要意在介绍spark用户行为分析的流程和主要功点,仅会贴出部分代码,完整代码可以点击参考这里,代码中有详细的注释,所以可以也可以略过文章,直接看代码项目源码地址
项目说明 用户行为分析,是所有互联网公司都会进行的一个项目.通过用户的访问行为记录,从多种维度分析用户行为,帮助公司了解用户,辅助决策.
项目模拟数据 为了方便的测试代码,这里使用脚本生成类似生产环境的模拟数据
生成模拟数据 执行脚本,生成两个文件,一个是用户行为记录数据,一个是用户详细信息数据.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import datetime import random import uuid import os def imitate_data(): """ 生成模拟数据 用户行为表: date, userid, sessionid, page_id, action_time, search_key_word, click_category_id,click_product_id, order_category_ids, order_product_ids, pay_category_ids, pay_product_ids, city_id 用户表: userid, username, name, age, professional, city, gender """ rows = [] """初始化 user_visit_action""" search_key_words = ["火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"] date = datetime.date.today().strftime("%Y-%m-%d") actions = ["search", "click", "order", "pay"] for i in range(100): # 用户id userid = str(random.randint(1, 100)) city_id = str(random.randint(0, 9)) for j in range(10): # 使用随机 uuid 模拟 session_id, 代表某次访问事件 sessionid = str(uuid.uuid4()).replace("-", "") base_action_time = date + " " + str(random.randint(0, 23)).zfill(2) for k in range(random.randint(1, 100)): # 分页id page_id = str(random.randint(1, 10)) # 点击行为发生的具体时间 action_time = base_action_time + ":" + \ str(random.randint(0, 59)).zfill(2) + ":" + \ str(random.randint(0, 59)).zfill(2) # 如果是搜索行为,这里是搜索关键字 search_key_word = '' # 在网站首页点击了某个品类 click_category_id = '' # 可能是在网站首页或商品列表点击了某个商品 click_product_id = '' # 订单中的商品的品类 order_category_ids = '' # 订单中的商品 order_product_ids = '' # 某次支付行为下所有商品的品类 pay_category_ids = '' # 某次支付行为中的所有商品 pay_product_ids = '' action = actions[random.randint(0, 3)] if action == 'search': search_key_word = search_key_words[random.randint(0, 9)] elif action == 'click': click_category_id = str(random.randint(1, 100)) click_product_id = str(random.randint(1, 100)) elif action == 'order': order_category_ids = str(random.randint(1, 100)) order_product_ids = str(random.randint(1, 100)) elif action == 'pay': pay_category_ids = str(random.randint(1, 100)) pay_product_ids = str(random.randint(1, 100)) else: raise Exception("action error") row = (date, userid, sessionid, page_id, action_time, search_key_word, click_category_id, click_product_id, order_category_ids, order_product_ids, pay_category_ids, pay_product_ids, city_id) row = ",".join(row) rows.append(row) path = os.path.dirname(os.path.realpath(__file__)) with open(path + "/session_analysis_data.txt", 'w') as f: for line in rows: f.write(line + "\n") """初始化 user_info""" rows.clear() genders = ["male", "female"] for i in range(1, 101): userid = str(i) username = "user" + str(i) name = "name" + str(i) age = str(random.randint(1, 60)) professional = "professional" + str(i) city = "city" + str(random.randint(0, 9)) gender = genders[random.randint(0, 1)] row = (userid, username, name, age, professional, city, gender) row = ",".join(row) rows.append(row) with open(path + "/user_info.txt", 'w') as f: for line in rows: f.write(line + "\n") if __name__ == '__main__': imitate_data()
生成模拟表 spark 读取文件中的数据,并转换为hive表的形式供后续使用
功能点 按照 session 粒度聚合数据 需求 要求可以灵活的针对不同的筛选条件对用户进行分析,比如查询年龄在20~30岁的教师在当天浏览过哪些网页
方案 分析用户行为,首先应该考虑两个问题:
对于大部分互联网产品,记录用户行为的表或日志都会非常大,比如一个日获千万的应用,用户行为表可能一天就有 10 亿左右条新增记录.
“过滤”的条件的粒度是不同的,比如按照访问时长筛选是 session 粒度的,按照点击品类是 action 粒度的,按照年龄,性别是 用户粒度的,如果不做特别处理,每次查询都要做全表扫表,非常消耗性能
针对上述问题,一个可靠的解决方案是对原始按照session粒度聚合,聚合后的每一行表示某一用户在某个会话期间所进行的操作,比如访问过的网页,点击过的产品,会话的时长,以及当前用户的年龄,职业等用户信息.
统计访问时长和访问步长 需求 统计出符合条件的session中访问时长在1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m以上各个范围内的session占比;访问步长在13、46、79、1030、3060、60以上各个范围内的session占比.假设有1000万个session记录,访问时长在 13s的有100万个,那么1~3s访问步长就占比10%.通过这里功能,可以了解到用户使用产品的一些习惯
方案 访问时长: 某个用户某次访问会话的时长(用最后一个行为记录的时间减去第一个行为记录的时间)访问步长: 某个用户某次访问会话中点击过的页面数量
用于 spark 是分布式计算,所以在做全局统计的时候,不能使用对自定义变量做累加操作的方式进行统计,针对这样的需求,spark提供了 Accumulators(累加器).
由于累加器仅仅支持int的累加,如果直接使用 Accumulator 的方式,需要很繁琐的定义十几个累加器,这样做不便于管理,会使代码显得繁琐,很容易出错,理想的方式是自定义一个累加器,对各个访问时长和访问步长统一进行累加操作
自定义累加器 自定义累加器需要实现一个继承 AccumulatorParam 的类, 并实现 zero
方法和 addInPlace
方法, zero
方法返回一个自定义类型的初始值, addInPlace
方法定义累加的方式.
注意: 要时刻清楚spark是分布式计算的, 可能会把在一个 executor 中累加好的结果累加到另一个结果上,所以对于下面的代码,在进行累加的时候,需要判断value2(累加值)是单个值还是多个值累加的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 # 定义常量 TIME_PERIOD_1s_3s = "1s_3s" TIME_PERIOD_4s_6s = "4s_6s" TIME_PERIOD_7s_9s = "7s_9s" TIME_PERIOD_10s_30s = "10s_30s" TIME_PERIOD_30s_60s = "30s_60s" TIME_PERIOD_1m_3m = "1m_3m" TIME_PERIOD_3m_10m = "3m_10m" TIME_PERIOD_10m_30m = "10m_30m" TIME_PERIOD_30m = "30m" STEP_PERIOD_1_3 = "1_3" STEP_PERIOD_4_6 = "4_6" STEP_PERIOD_7_9 = "7_9" STEP_PERIOD_10_30 = "10_30" STEP_PERIOD_30_60 = "30_60" STEP_PERIOD_60 = "60" # 自定义累加器 from .constant import * from pyspark import AccumulatorParam class SessionAggrAccumulator(AccumulatorParam): """ 自定义累加器 """ def zero(self, value): return { SESSION_COUNT: 0, TIME_PERIOD_1s_3s: 0, TIME_PERIOD_4s_6s: 0, TIME_PERIOD_7s_9s: 0, TIME_PERIOD_10s_30s: 0, TIME_PERIOD_30s_60s: 0, TIME_PERIOD_1m_3m: 0, TIME_PERIOD_3m_10m: 0, TIME_PERIOD_10m_30m: 0, TIME_PERIOD_30m: 0, STEP_PERIOD_1_3: 0, STEP_PERIOD_4_6: 0, STEP_PERIOD_7_9: 0, STEP_PERIOD_10_30: 0, STEP_PERIOD_30_60: 0, STEP_PERIOD_60: 0 } def addInPlace(self, value1, value2): # print(value1, value2) if value1 == "": return value2 if isinstance(value2, dict): # rdd 可能会被分割成多分并行计算,所以这里处理当 value2 传入的是某个rdd某个部分计算的值 value = {k: v + value2[k] for k, v in value1.items()} return value else: value1[value2] += 1 return value1
按时间比例随机抽取指定数量的session 需求 需要从每天的访问记录中随机抽取100次访问记录,要求按照每小时session量的比例抽取,假设在00:00 ~ 01:00 有1000session记录,一天总session记录为10000条,那么就要在00:00 ~ 01:00中抽取10条,这样的方式可以保证抽取的公平性
方案
将 session的rdd 中的日期小时(yyyy-MM-dd_HH) 提取出来,转换成格式为<yyyy-MM-dd_HH,<row>>
的rdd, 记作 time_session_id_rdd
对 time_session_id_rdd
做 countByKey
操作,算出每小时session的数量, rdd格式转换为 <yyyy-MM-dd_HH,count>
, 记作 count_session_by_hour
将count_session_by_hour
用分隔符做处理,转换为 {yyyy-MM-dd:{HH:count,HH:count, …},}的字典, 记作 date_hour_counts
根据 date_hour_counts
的记录,就可以算出一天中每小时需求抽取多少条数据,然后生成每小时要抽取的 session 的随机索引
对从第一步中得到的time_session_id_rdd
做groupByKey()操作,遍历并按照随机索引抽取session_id
获取点击,下单,支付数量排名前 N 的品类 需求 为了比较产品的受欢迎程度,对所有 session 中对每个品类的按照点击次数,下单数量和支付数量进行倒序排序,取出排名前N的品类
方案 因为要对三个字段进行排序,即需要进行多次排序,比如两个session点击量相同,就去比较下单数量,如果下数量也相同,就再比较支付数量.spark 中使用 sortByKey
对rdd排序,为了满足多次排序,如果把 rdd 构造为 <(click_count,order_count,pay_count), <row>
的形式.
取出排名前 N 的品类中,被点击次数排名前 N 的session 需求 为了了解每个受欢迎的品类中的优质客户,计算每个品类中点击量排名前N的session
方案
取出所有 category_id, 并转换为 <category_id, category_id> 形式的rdd
将 session 原始rdd 转换为 <category_id, <session_id, click_count>> 形式的rdd
合并上面的两个 rdd, 进行groupByKey 操作,再对每个分组中的 click_count, 进行排序,取出排名前 N 的记录