热文章计算
热文章计算
一、热点文章-定时计算
1、 需求分析
需求:为每个频道缓存热度较高的30条文章优先展示

判断文章热度较高的标准是什么?
文章:阅读,点赞,评论,收藏
2、 实现思路

3、 实现步骤
分值计算不涉及到前端工程,也无需提供api接口,是一个纯后台的功能的开发。
1) 频道列表远程接口准备
计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息
① 在heima-leadnews-feign-api定义远程接口
package com.heima.apis.wemedia;
import com.heima.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
@FeignClient("leadnews-wemedia")
public interface IWemediaClient {
@GetMapping("/api/v1/channel/list")
public ResponseResult getChannels();
}
② heima-leadnews-wemedia端提供接口
package com.heima.wemedia.feign;
import com.heima.apis.wemedia.IWemediaClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class WemediaClient implements IWemediaClient {
@Autowired
private WmChannelService wmChannelService;
@GetMapping("/api/v1/channel/list")
@Override
public ResponseResult getChannels() {
return wmChannelService.findAll();
}
}
在ApArticleMapper.xml新增方法
<select id="findArticleListByLast5days" resultMap="resultMap">
SELECT
aa.*
FROM
`ap_article` aa
LEFT JOIN ap_article_config aac ON aa.id = aac.article_id
<where>
and aac.is_delete != 1
and aac.is_down != 1
<if test="dayParam != null">
and aa.publish_time <![CDATA[>=]]> #{dayParam}
</if>
</where>
</select>
修改ApArticleMapper类
package com.heima.article.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.article.dtos.ArticleHomeDto;
import com.heima.model.article.pojos.ApArticle;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
@Mapper
public interface ApArticleMapper extends BaseMapper<ApArticle> {
/**
* 加载文章列表
* @param dto
* @param type 1 加载更多 2记载最新
* @return
*/
public List<ApArticle> loadArticleList(ArticleHomeDto dto,Short type);
public List<ApArticle> findArticleListByLast5days(@Param("dayParam") Date dayParam);
}
2) 热文章业务层
定义业务层接口
package com.heima.article.service;
public interface HotArticleService {
/**
* 计算热点文章
*/
public void computeHotArticle();
}
修改ArticleConstans
package com.heima.common.constants;
public class ArticleConstants {
public static final Short LOADTYPE_LOAD_MORE = 1;
public static final Short LOADTYPE_LOAD_NEW = 2;
public static final String DEFAULT_TAG = "__all__";
public static final String ARTICLE_ES_SYNC_TOPIC = "article.es.sync.topic";
public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;
public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
}
创建一个vo接收计算分值后的对象
package com.heima.model.article.vos;
import com.heima.model.article.pojos.ApArticle;
import lombok.Data;
@Data
public class HotArticleVo extends ApArticle {
/**
* 文章分值
*/
private Integer score;
}
业务层实现类
package com.heima.article.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.apis.wemedia.IWemediaClient;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.article.vos.HotArticleVo;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@Service
@Slf4j
@Transactional
public class HotArticleServiceImpl implements HotArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
/**
* 计算热点文章
*/
@Override
public void computeHotArticle() {
//1.查询前5天的文章数据
Date dateParam = DateTime.now().minusDays(50).toDate();
List<ApArticle> apArticleList = apArticleMapper.findArticleListByLast5days(dateParam);
//2.计算文章的分值
List<HotArticleVo> hotArticleVoList = computeHotArticle(apArticleList);
//3.为每个频道缓存30条分值较高的文章
cacheTagToRedis(hotArticleVoList);
}
@Autowired
private IWemediaClient wemediaClient;
@Autowired
private CacheService cacheService;
/**
* 为每个频道缓存30条分值较高的文章
* @param hotArticleVoList
*/
private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {
//每个频道缓存30条分值较高的文章
ResponseResult responseResult = wemediaClient.getChannels();
if(responseResult.getCode().equals(200)){
String channelJson = JSON.toJSONString(responseResult.getData());
List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);
//检索出每个频道的文章
if(wmChannels != null && wmChannels.size() > 0){
for (WmChannel wmChannel : wmChannels) {
List<HotArticleVo> hotArticleVos = hotArticleVoList.stream().filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList());
//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章
sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId());
}
}
}
//设置推荐数据
//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章
sortAndCache(hotArticleVoList, ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ArticleConstants.DEFAULT_TAG);
}
/**
* 排序并且缓存数据
* @param hotArticleVos
* @param key
*/
private void sortAndCache(List<HotArticleVo> hotArticleVos, String key) {
hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
if (hotArticleVos.size() > 30) {
hotArticleVos = hotArticleVos.subList(0, 30);
}
cacheService.set(key, JSON.toJSONString(hotArticleVos));
}
/**
* 计算文章分值
* @param apArticleList
* @return
*/
private List<HotArticleVo> computeHotArticle(List<ApArticle> apArticleList) {
List<HotArticleVo> hotArticleVoList = new ArrayList<>();
if(apArticleList != null && apArticleList.size() > 0){
for (ApArticle apArticle : apArticleList) {
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hot);
Integer score = computeScore(apArticle);
hot.setScore(score);
hotArticleVoList.add(hot);
}
}
return hotArticleVoList;
}
/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer scere = 0;
if(apArticle.getLikes() != null){
scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
scere += apArticle.getViews();
}
if(apArticle.getComment() != null){
scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return scere;
}
}
在ArticleApplication的引导类中添加以下注解
@EnableFeignClients(basePackages = "com.heima.apis")
现在数据库中准备点数据
package com.heima.article.service.impl;
import com.heima.article.ArticleApplication;
import com.heima.article.service.HotArticleService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = ArticleApplication.class)
@RunWith(SpringRunner.class)
public class HotArticleServiceImplTest {
@Autowired
private HotArticleService hotArticleService;
@Test
public void computeHotArticle() {
hotArticleService.computeHotArticle();
}
}
3) xxl-job定时计算-步骤
①:在heima-leadnews-article中的pom文件中新增依赖
<!--xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
② 在xxl-job-admin中新建执行器和任务
新建执行器:leadnews-hot-article-executor
新建任务:路由策略为轮询,Cron表达式:0 0 2 * * ?
③ leadnews-article中集成xxl-job
XxlJobConfig
package com.heima.article.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.port}")
private int port;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor;
}
}
在nacos配置新增配置
xxl:
job:
admin:
addresses: http://192.168.200.130:8888/xxl-job-admin
executor:
appname: leadnews-hot-article-executor
port: 9999
④:在article微服务中新建任务类
package com.heima.article.job;
import com.heima.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ComputeHotArticleJob {
@Autowired
private HotArticleService hotArticleService;
@XxlJob("computeHotArticleJob")
public void handle(){
log.info("热文章分值计算调度任务开始执行...");
hotArticleService.computeHotArticle();
log.info("热文章分值计算调度任务结束...");
}
}
4、查询文章接口改造
1) 思路分析

2) 功能实现
(1) 在ApArticleService中新增方法
/**
* 加载文章列表
* @param dto
* @param type 1 加载更多 2 加载最新
* @param firstPage true 是首页 flase 非首页
* @return
*/
public ResponseResult load2(ArticleHomeDto dto,Short type,boolean firstPage);
实现方法
/**
* 加载文章列表
* @param dto
* @param type 1 加载更多 2 加载最新
* @param firstPage true 是首页 flase 非首页
* @return
*/
@Override
public ResponseResult load2(ArticleHomeDto dto, Short type, boolean firstPage) {
if(firstPage){
String jsonStr = cacheService.get(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + dto.getTag());
if(StringUtils.isNotBlank(jsonStr)){
List<HotArticleVo> hotArticleVoList = JSON.parseArray(jsonStr, HotArticleVo.class);
ResponseResult responseResult = ResponseResult.okResult(hotArticleVoList);
return responseResult;
}
}
return load(type,dto);
}
(2)修改控制器
/**
* 加载首页
* @param dto
* @return
*/
@PostMapping("/load")
public ResponseResult load(@RequestBody ArticleHomeDto dto){
// return apArticleService.load(dto, ArticleConstants.LOADTYPE_LOAD_MORE);
return apArticleService.load2(dto, ArticleConstants.LOADTYPE_LOAD_MORE,true);
}
二、热点文章实时计算
1、 思路说明

2、 功能实现
1) 用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例
①在heima-leadnews-behavior微服务中集成kafka生产者配置
修改nacos,新增内容
spring:
application:
name: leadnews-behavior
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
②修改ApLikesBehaviorServiceImpl新增发送消息
定义消息发送封装类:UpdateArticleMess
package com.heima.model.mess;
import lombok.Data;
@Data
public class UpdateArticleMess {
/**
* 修改文章的字段类型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改数据的增量,可为正负
*/
private Integer add;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
topic常量类:
package com.heima.common.constants;
public class HotArticleConstants {
public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}
完整代码如下:
package com.heima.behavior.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public ResponseResult like(LikesBehaviorDto dto) {
//1.检查参数
if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//2.是否登录
ApUser user = AppThreadLocalUtil.getUser();
if (user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
//3.点赞 保存数据
if (dto.getOperation() == 0) {
Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if (obj != null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
}
// 保存当前key
log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
mess.setAdd(1);
} else {
// 删除当前key
log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
mess.setAdd(-1);
}
//发送消息,数据聚合
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 检查参数
*
* @return
*/
private boolean checkParam(LikesBehaviorDto dto) {
if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
return true;
}
return false;
}
}
③修改阅读行为的类ApReadBehaviorServiceImpl发送消息
完整代码:
package com.heima.behavior.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public ResponseResult readBehavior(ReadBehaviorDto dto) {
//1.检查参数
if (dto == null || dto.getArticleId() == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//2.是否登录
ApUser user = AppThreadLocalUtil.getUser();
if (user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
//更新阅读次数
String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if (StringUtils.isNotBlank(readBehaviorJson)) {
ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
}
// 保存当前key
log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
//发送消息,数据聚合
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
mess.setAdd(1);
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
}
2) 使用kafkaStream实时接收消息,聚合内容
①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)
②定义实体类,用于聚合之后的分值封装
package com.heima.model.article.mess;
import lombok.Data;
@Data
public class ArticleVisitStreamMess {
/**
* 文章id
*/
private Long articleId;
/**
* 阅读
*/
private int view;
/**
* 收藏
*/
private int collect;
/**
* 评论
*/
private int comment;
/**
* 点赞
*/
private int like;
}
修改常量类:增加常量
package com.heima.common.constans;
public class HotArticleConstants {
public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}
③ 定义stream,接收消息并聚合
package com.heima.article.stream;
import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@Slf4j
public class HotArticleStreamHandler {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式处理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//重置消息的key:1234343434 和 value: likes:1
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id进行聚合
.groupBy((key,value)->key)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/**
* 自行的完成聚合的计算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
* @return
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/**
* 真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
@Override
public String apply(String key, String value, String aggValue) {
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
int col = 0,com=0,lik=0,vie=0;
for (String agg : aggAry) {
String[] split = agg.split(":");
/**
* 获得初始值,也是时间窗口内计算之后的值
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/**
* 累加操作
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}
String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
System.out.println("文章的id:"+key);
System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
return formatStr;
}
}, Materialized.as("hot-atricle-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//发送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
return stream;
}
/**
* 格式化消息的value数据
* @param articleId
* @param value
* @return
*/
public String formatObj(String articleId,String value){
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}
3) 重新计算文章的分值,更新到数据库和缓存中
①在ApArticleService添加方法,用于更新数据库中的文章分值
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
public void updateScore(ArticleVisitStreamMess mess);
实现类方法
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {
//1.更新文章的阅读、点赞、收藏、评论的数量
ApArticle apArticle = updateArticle(mess);
//2.计算文章的分值
Integer score = computeScore(apArticle);
score = score * 3;
//3.替换当前文章对应频道的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
//4.替换推荐对应的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
}
/**
* 替换数据并且存入到redis
* @param apArticle
* @param score
* @param s
*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
String articleListStr = cacheService.get(s);
if (StringUtils.isNotBlank(articleListStr)) {
List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
boolean flag = true;
//如果缓存中存在该文章,只更新分值
for (HotArticleVo hotArticleVo : hotArticleVoList) {
if (hotArticleVo.getId().equals(apArticle.getId())) {
hotArticleVo.setScore(score);
flag = false;
break;
}
}
//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
if (flag) {
if (hotArticleVoList.size() >= 30) {
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
if (lastHot.getScore() < score) {
hotArticleVoList.remove(lastHot);
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}
} else {
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}
}
//缓存到redis
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
cacheService.set(s, JSON.toJSONString(hotArticleVoList));
}
}
/**
* 更新文章行为数量
* @param mess
*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
ApArticle apArticle = getById(mess.getArticleId());
apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
updateById(apArticle);
return apArticle;
}
/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if(apArticle.getLikes() != null){
score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
score += apArticle.getViews();
}
if(apArticle.getComment() != null){
score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return score;
}
②定义监听,接收聚合之后的数据,文章的分值重新进行计算
package com.heima.article.listener;
import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ArticleIncrHandleListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);
}
}
}