申明,自我学习用,非原创。
接上节,
如何根据用户的标签,快速找到用户的相关信息。
- 建议通过二级索引(ES),筛选标签找到用户id,
- 再基于用户ID在Hbase 中查找用户的详细信息。
在线接口在查询HBase中数据时,由于Hbase无法像关系数据库那样根据多种条件对数据进行筛选(类似SQL语言中的where筛选条件)。一般地Hbase需要建立二级索引来满足根据复杂条件查询数据的需求,本案中选择Elastic search 存储Hbase 索引数据。
在组合标签查询对应的用户人群场景中,首先通过组合标签的条件在Elasticsearch 中查询对应的索引数据,然后通过索引数据去HBase中批量获取rowkey对应的数据。(elastic search 中的documentid 和HBase 中的rowkey都设计为用户id)
基于Elastic search 存储的HBase二级索引方案
为了避免从Hive向Hbase灌入数据时缺失,在向HBase数据同步完成后,还需要校验HBase和Hive中数据量是否一致,如出现较大的波动则发送告警信息。
下面通过Python脚本来看该Hbase状态表数据校验逻辑:
#查询Hvie中数据
def check_Hive_data(data_date):
r = os.popen("Hive -S -e\ "select count(1) from dw.userprofile_usergroup_labels_all where data_date='"+data_date+" '\ "")
Hive_userid_count = r.read()
r.close()
Hive_count = str(int(Hive_userid_count))
print "Hive_result: " + str(Hive_count)
print "Hive select finished"
#查询HBase中数据
def check_Hbase_data(data_date):
r = os.popen("HBase org.apache.hadoop.HBase.mapreduce.RowCounter 'userprofile_labels'\ " 2>&1 |grep ROWS")
HBase_count = r.read().strip()[5:]
r.close()
print "Hbase result: " + str(Hbase_count)
print "Hbase select finished!"
#连接db,将查询结果插入表
db = MySQLdb.connect(host ="xx.xx.xx.xx",port=3306,user="username",passwd="password",db="xxx",charset="utf8")
cursor = db.cursor()
cursor.execute("INSERT INTO service_monitor(date,service_type,Hive_count,Hbase_count) VALUES('"+Datestr_"','advertisement',"+str(Hive_userid_count)+", "+str(Hbase_count)+")")
db.commit()
本案例中将userid 作为rowkey 存入HBase,一方面在组合标签的场景中可以支持条件查询多用户人群,另一方面可以支持单个用户标签的查询,例如查看某id用户身上的标签,
以便运营人员决定是否对其进行运营操作。
Hbase 在离线数仓环境的服务架构如下图所示:
图:Hbase离线数仓服务架构
Elastic search 存储架构:
Elasticsearch 是一个开源的分布式全文检索引擎,可以近乎实时地存储、检索数据。
而且扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。
对于用户标签查询、用户人群计算、用户群多维度透视分析这类对响应时间要求较高的场景,也可以考虑选用Elastic search存储。
Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档,用json作为文档格式。为了更清晰的理解Elasticsearch查询的一些概念,将其和关系数据库的类型进行对照。
在关系型数据库中查询数据时,可通过选中数据库、表、行、列 来定位所查找的内容,在Elasticsearch中通过索引(index)、类型(type)、文档(document)、字段来定位查找内容。Elastic search的交互可以使用Java API,也可以使用HTTP的RESTful API方式。
图:Elasticsearch 与关系型数据库的对比
应用场景:
基于Hbase的存储方案并没有解决数据的高效检索问题。在实际应用中,经常有根据特定的几个字段进行组合后检索的应用场景,而Hbase采用 rowkey 作为一级索引,不支持多条件查询,如果要对库里的非rowkey 进行数据检索和查询,往往需要通过MapReduce 等分布式框架进行计算,时间延迟上会比较高,难以同时满足用户对于复杂条件查询和高效率响应这两方面的需求。
主要查询过程包括:
- 在Elasticsearch 中存放用于检索条件的数据,并将rowkey也存储进去。
- 使用Elasticsearch 的API根据组合标签的条件查询出rowkey的集合。
- 使用上一步得到的rowkey去HBase数据库查询对应的结果。
Hbase数据存储数据的索引放在Elasticsearch中,实现了数据和索引的分离。在Elasticsearch 中documentid是文档的唯一标识,在HBase中rowkey是记录的唯一标识,在工程实践中,两者可同时选用用户在平台上得唯一标识(如 userid 或deviceid)作为rowkey 或 documentid,进而解决HBase和Elasticsearch 索引关联的问题。
基于Elasticsearch 存储的Hbase 二级索引方案
标签汇聚数据
通过scala 代码,把Hive 数据从总表中迁移到Elastic search中[略]:
提交命令:
"spark-submit --class com.example.HiveDataToEs --master yarn --deploy-mode client --executor-memory 2g --num-executors 50 --driver-memory 3g --executor-cores 2 spark-hive-to-es.jar 20190101"
#查询命令
GET userprofile/tags/_search
{
"size":0,
"aggs": {
"tagcounts": {
"terms":{
"field": "tags.ACTION_U_01_003"
}
}
}
}
}
methodurl地址描述
PUTlocalhost:9200/索引名称/类型名称/文档id创建文档(指定文档id)
POSTlocalhost:9200/索引名称/类型名称创建文档(随机文档id)
POSTlocalhost:9200/索引名称/类型名称/文档id/_update修改文档
DELETElocalhost:9200/索引名称/类型名称/文档id删除文档
GETlocalhost:9200/索引名称/类型名称/文档id查询文档通过文档id
POSTlocalhost:9200/索引名称/类型名称/_search查询所有数据
elasticsearch(集群)中可以包含多个索引(数据库),每个索引中可以包含多个类型(表),每个类型下又包含多个 个文档(行),每个文档中又包含多个字段(列)。
下面简单介绍下elasticsearch(ES)
1、添加数据
POST /db/user/1
{
"username": "wmyskxz1",
"password": "123456",
"age": "22"
}
POST /db/user/2
{
"username": "wmyskxz2",
"password": "123456",
"age": "22"
}
2、获取数据 GET
GET /carroll/user/1
3、修改数据
PUT /db/user/2
{
"username": "wmyskxz3",
"password": "123456",
"age": "22"
}
4、删除数据 DELETE
DELETE /db/user/1
————————————————
————————————————
版权声明:本文为CSDN博主「carroll18」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:
https://blog.csdn.net/qq_40722827/article/details/106287725
使用restfulapi 查询包含某个标签的用户量,可实时得到返回结果:
返回结果:
图:Elasticsearch 查询某标签的返回结果
从返回结果中可以看到,用户总量(total)为100000000人,包含标签“ACTION_U_01_003”的用户有2500000人(doc_count).
#查询命令:
GET userprofile/_search
{
"query":{
"match_all":{}
}
}
查询结果如图3-27所示。
Elasticsearch 查询某index数据总量
工程化案例
“用户人群”+ “人群分析”的功能解决方案
每天的ETL调度中,需要将Hive计算的标签数据导入ES中,如下图所示。
当标签完成“标签监控预警”后,将标签数据同步到ES中。
工程化调度中导入Elastic search
在与ES同步完成并通过校验后,向MySQL中维护的状态表中插入一条状态记录,
表示当前日期的Elasticsearch 数据可用,线上计算用户人群的接口则读取最近日期对应的数据。如果某天因为调度延迟等方面的原因,没有及时将当日数据导入Elasticsearch中,接口也能读取最近一天对应的数据,是一种可行的灾备方案:
数据同步完成后向MySQL状态表“elasticsearch_state”中插入记录,state字段为“0”,产出异常时为“1”。图3-29中,1月20日导入的数据出现异常,则“state” 状态字段置1,线上接口扫描该状态记录位后不读取1月20日数据,而是会读取最近的1月19日数据。
Elastic search状态记录表
为了避免从Hive向Elastic search中灌入数据时发生数据缺失,在向状态表更新状态位前需要校验ES 和Hive中的数据量
是否一致。下面通过Python 脚本来看数据校验逻辑:
【略】
之后业务人员在画像产品端计算人群或透视分析人群时(如图所示)
画像产品端计算人群
通过Restful API 访问Elasticsearch进行计算(如图所示)。
用户人群计算架构。