基于storm实时热点统计的分布式并行缓存预热
一、基于nginx+lua完成商品详情页访问流量实时上报kafka的开发 ==================================== 在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka 这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip yum install -y unzip unzip lua-resty-kafka-master.zip cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib nginx -s reload local cjson = require("cjson") local producer = require("resty.kafka.producer") local broker_list = { { host = "192.168.31.187", port = 9092 }, { host = "192.168.31.19", port = 9092 }, { host = "192.168.31.227", port = 9092 } } local log_json = {} log_json["headers"] = ngx.req.get_headers() log_json["uri_args"] = ngx.req.get_uri_args() log_json["body"] = ngx.req.read_body() log_json["http_version"] = ngx.req.http_version() log_json["method"] =ngx.req.get_method() log_json["raw_reader"] = ngx.req.raw_header() log_json["body_data"] = ngx.req.get_body_data() local message = cjson.encode(log_json); local productId = ngx.req.get_uri_args()["productId"] local async_producer = producer:new(broker_list, { producer_type = "async" }) local ok, err = async_producer:send("access-log", productId, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end ``` 两台机器上都这样做,才能统一上报流量到kafka ``` bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning ``` (1)kafka在187上的节点死掉了,可能是虚拟机的问题,杀掉进程,重新启动一下 nohup bin/kafka-server-start.sh config/server.properties & (2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8; (3)需要在kafka中加入advertised.host.name = 192.168.31.187,重启三个kafka进程 (4)需要启动eshop-cache缓存服务,因为nginx中的本地缓存可能不在了 二、基于storm+kafka完成商品访问次数实时统计拓扑的开发 ================================ maven构建出的一些问题,直接从maven中央仓库可能下载不到jar包,自己去百度一下jar,下载下来 根据错误提示,拷贝到maven本地仓库对应的目录中去,然后手工安装一下 1、kafka consumer spout 单独的线程消费,写入队列 nextTuple,每次都是判断队列有没有数据,有的话再去获取并发射出去,不能阻塞 2、日志解析bolt 3、商品访问次数统计bolt 基于LRUMap完成统计 三、基于storm完成LRUMap中topn热门商品列表的算法讲解与编写 ==================================== **topn list生成算法讲解** ![这里写图片描述](//img-blog.csdn.net/20180319061103227?watermark/2/text/Ly9ibG9nLmNzZG4ubmV0L3FxMTEzNzYyMzE2MA==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70) 1、storm task启动的时候,基于分布式锁将自己的taskid累加到一个znode中 2、开启一个单独的后台线程,每隔1分钟算出top3热门商品list 3、每个storm task将自己统计出的热数据list写入自己对应的znode中 4、task初始化 5、热门商品list保存 四、基于双重zookeeper分布式锁完成分布式并行缓存预热的代码开发 =================================== 1、服务启动的时候,进行缓存预热 2、从zk中读取taskid列表 3、依次遍历每个taskid,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了 4、直接尝试获取下一个taskid的分布式锁 5、即使获取到了分布式锁,也要检查一下这个taskid的预热状态,如果已经被预热过了,就不再预热了 6、执行预热操作,遍历productid列表,查询数据,然后写ehcache和redis 7、预热完成后,设置taskid对应的预热状态 代码地址:
- 1
- 粉丝: 164
- 资源: 67
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助