用户行为分析(pyspark)

本文主要意在介绍spark用户行为分析的流程和主要功点,仅会贴出部分代码,完整代码可以点击参考这里,代码中有详细的注释,所以可以也可以略过文章,直接看代码项目源码地址

项目说明

用户行为分析,是所有互联网公司都会进行的一个项目.通过用户的访问行为记录,从多种维度分析用户行为,帮助公司了解用户,辅助决策.

项目模拟数据

为了方便的测试代码,这里使用脚本生成类似生产环境的模拟数据

生成模拟数据

执行脚本,生成两个文件,一个是用户行为记录数据,一个是用户详细信息数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import datetime
import random
import uuid
import os


def imitate_data():

"""
生成模拟数据
用户行为表: date, userid, sessionid, page_id, action_time, search_key_word, click_category_id,click_product_id, order_category_ids, order_product_ids, pay_category_ids, pay_product_ids, city_id
用户表: userid, username, name, age, professional, city, gender
"""
rows = []

"""初始化 user_visit_action"""
search_key_words = ["火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"]
date = datetime.date.today().strftime("%Y-%m-%d")
actions = ["search", "click", "order", "pay"]

for i in range(100):
# 用户id
userid = str(random.randint(1, 100))
city_id = str(random.randint(0, 9))

for j in range(10):
# 使用随机 uuid 模拟 session_id, 代表某次访问事件
sessionid = str(uuid.uuid4()).replace("-", "")
base_action_time = date + " " + str(random.randint(0, 23)).zfill(2)

for k in range(random.randint(1, 100)):
# 分页id
page_id = str(random.randint(1, 10))
# 点击行为发生的具体时间
action_time = base_action_time + ":" + \
str(random.randint(0, 59)).zfill(2) + ":" + \
str(random.randint(0, 59)).zfill(2)
# 如果是搜索行为,这里是搜索关键字
search_key_word = ''
# 在网站首页点击了某个品类
click_category_id = ''
# 可能是在网站首页或商品列表点击了某个商品
click_product_id = ''
# 订单中的商品的品类
order_category_ids = ''
# 订单中的商品
order_product_ids = ''
# 某次支付行为下所有商品的品类
pay_category_ids = ''
# 某次支付行为中的所有商品
pay_product_ids = ''
action = actions[random.randint(0, 3)]

if action == 'search':
search_key_word = search_key_words[random.randint(0, 9)]
elif action == 'click':
click_category_id = str(random.randint(1, 100))
click_product_id = str(random.randint(1, 100))
elif action == 'order':
order_category_ids = str(random.randint(1, 100))
order_product_ids = str(random.randint(1, 100))
elif action == 'pay':
pay_category_ids = str(random.randint(1, 100))
pay_product_ids = str(random.randint(1, 100))
else:
raise Exception("action error")

row = (date, userid, sessionid, page_id, action_time, search_key_word, click_category_id,
click_product_id, order_category_ids, order_product_ids, pay_category_ids, pay_product_ids, city_id)
row = ",".join(row)
rows.append(row)

path = os.path.dirname(os.path.realpath(__file__))
with open(path + "/session_analysis_data.txt", 'w') as f:
for line in rows:
f.write(line + "\n")

"""初始化 user_info"""
rows.clear()
genders = ["male", "female"]
for i in range(1, 101):
userid = str(i)
username = "user" + str(i)
name = "name" + str(i)
age = str(random.randint(1, 60))
professional = "professional" + str(i)
city = "city" + str(random.randint(0, 9))
gender = genders[random.randint(0, 1)]
row = (userid, username, name, age, professional, city, gender)
row = ",".join(row)
rows.append(row)

with open(path + "/user_info.txt", 'w') as f:
for line in rows:
f.write(line + "\n")


if __name__ == '__main__':
imitate_data()

生成模拟表

spark 读取文件中的数据,并转换为hive表的形式供后续使用

功能点

按照 session 粒度聚合数据

需求

要求可以灵活的针对不同的筛选条件对用户进行分析,比如查询年龄在20~30岁的教师在当天浏览过哪些网页

方案

分析用户行为,首先应该考虑两个问题:

  1. 对于大部分互联网产品,记录用户行为的表或日志都会非常大,比如一个日获千万的应用,用户行为表可能一天就有 10 亿左右条新增记录.
  2. “过滤”的条件的粒度是不同的,比如按照访问时长筛选是 session 粒度的,按照点击品类是 action 粒度的,按照年龄,性别是 用户粒度的,如果不做特别处理,每次查询都要做全表扫表,非常消耗性能

针对上述问题,一个可靠的解决方案是对原始按照session粒度聚合,聚合后的每一行表示某一用户在某个会话期间所进行的操作,比如访问过的网页,点击过的产品,会话的时长,以及当前用户的年龄,职业等用户信息.

统计访问时长和访问步长

需求

统计出符合条件的session中访问时长在1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m以上各个范围内的session占比;访问步长在13、46、79、1030、3060、60以上各个范围内的session占比.假设有1000万个session记录,访问时长在 13s的有100万个,那么1~3s访问步长就占比10%.通过这里功能,可以了解到用户使用产品的一些习惯

方案

访问时长: 某个用户某次访问会话的时长(用最后一个行为记录的时间减去第一个行为记录的时间)
访问步长: 某个用户某次访问会话中点击过的页面数量

用于 spark 是分布式计算,所以在做全局统计的时候,不能使用对自定义变量做累加操作的方式进行统计,针对这样的需求,spark提供了 Accumulators(累加器).

由于累加器仅仅支持int的累加,如果直接使用 Accumulator 的方式,需要很繁琐的定义十几个累加器,这样做不便于管理,会使代码显得繁琐,很容易出错,理想的方式是自定义一个累加器,对各个访问时长和访问步长统一进行累加操作

自定义累加器

自定义累加器需要实现一个继承 AccumulatorParam 的类, 并实现 zero方法和 addInPlace方法, zero方法返回一个自定义类型的初始值, addInPlace 方法定义累加的方式.

注意:要时刻清楚spark是分布式计算的, 可能会把在一个 executor 中累加好的结果累加到另一个结果上,所以对于下面的代码,在进行累加的时候,需要判断value2(累加值)是单个值还是多个值累加的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 定义常量
TIME_PERIOD_1s_3s = "1s_3s"
TIME_PERIOD_4s_6s = "4s_6s"
TIME_PERIOD_7s_9s = "7s_9s"
TIME_PERIOD_10s_30s = "10s_30s"
TIME_PERIOD_30s_60s = "30s_60s"
TIME_PERIOD_1m_3m = "1m_3m"
TIME_PERIOD_3m_10m = "3m_10m"
TIME_PERIOD_10m_30m = "10m_30m"
TIME_PERIOD_30m = "30m"

STEP_PERIOD_1_3 = "1_3"
STEP_PERIOD_4_6 = "4_6"
STEP_PERIOD_7_9 = "7_9"
STEP_PERIOD_10_30 = "10_30"
STEP_PERIOD_30_60 = "30_60"
STEP_PERIOD_60 = "60"

# 自定义累加器
from .constant import *
from pyspark import AccumulatorParam


class SessionAggrAccumulator(AccumulatorParam):

"""
自定义累加器
"""

def zero(self, value):
return {
SESSION_COUNT: 0,
TIME_PERIOD_1s_3s: 0,
TIME_PERIOD_4s_6s: 0,
TIME_PERIOD_7s_9s: 0,
TIME_PERIOD_10s_30s: 0,
TIME_PERIOD_30s_60s: 0,
TIME_PERIOD_1m_3m: 0,
TIME_PERIOD_3m_10m: 0,
TIME_PERIOD_10m_30m: 0,
TIME_PERIOD_30m: 0,
STEP_PERIOD_1_3: 0,
STEP_PERIOD_4_6: 0,
STEP_PERIOD_7_9: 0,
STEP_PERIOD_10_30: 0,
STEP_PERIOD_30_60: 0,
STEP_PERIOD_60: 0
}

def addInPlace(self, value1, value2):
# print(value1, value2)
if value1 == "":
return value2
if isinstance(value2, dict):
# rdd 可能会被分割成多分并行计算,所以这里处理当 value2 传入的是某个rdd某个部分计算的值
value = {k: v + value2[k] for k, v in value1.items()}
return value
else:
value1[value2] += 1
return value1

按时间比例随机抽取指定数量的session

需求

需要从每天的访问记录中随机抽取100次访问记录,要求按照每小时session量的比例抽取,假设在00:00 ~ 01:00 有1000session记录,一天总session记录为10000条,那么就要在00:00 ~ 01:00中抽取10条,这样的方式可以保证抽取的公平性

方案
  1. 将 session的rdd 中的日期小时(yyyy-MM-dd_HH) 提取出来,转换成格式为<yyyy-MM-dd_HH,<row>>的rdd, 记作 time_session_id_rdd
  2. time_session_id_rddcountByKey 操作,算出每小时session的数量, rdd格式转换为 <yyyy-MM-dd_HH,count>, 记作 count_session_by_hour
  3. count_session_by_hour用分隔符做处理,转换为 {yyyy-MM-dd:{HH:count,HH:count, …},}的字典, 记作 date_hour_counts
  4. 根据 date_hour_counts的记录,就可以算出一天中每小时需求抽取多少条数据,然后生成每小时要抽取的 session 的随机索引
  5. 对从第一步中得到的time_session_id_rdd做groupByKey()操作,遍历并按照随机索引抽取session_id

获取点击,下单,支付数量排名前 N 的品类

需求

为了比较产品的受欢迎程度,对所有 session 中对每个品类的按照点击次数,下单数量和支付数量进行倒序排序,取出排名前N的品类

方案

因为要对三个字段进行排序,即需要进行多次排序,比如两个session点击量相同,就去比较下单数量,如果下数量也相同,就再比较支付数量.spark 中使用 sortByKey 对rdd排序,为了满足多次排序,如果把 rdd 构造为 <(click_count,order_count,pay_count), <row>的形式.

取出排名前 N 的品类中,被点击次数排名前 N 的session

需求

为了了解每个受欢迎的品类中的优质客户,计算每个品类中点击量排名前N的session

方案
  1. 取出所有 category_id, 并转换为 <category_id, category_id> 形式的rdd
  2. 将 session 原始rdd 转换为 <category_id, <session_id, click_count>> 形式的rdd
  3. 合并上面的两个 rdd, 进行groupByKey 操作,再对每个分组中的 click_count, 进行排序,取出排名前 N 的记录

文章标题:用户行为分析(pyspark)

文章字数:2.4k

本文作者:Waterandair

发布时间:2018-04-19, 09:24:06

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-04-19-spark-app-session-analysis.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github