diff --git a/game/src/main/java/com/ccsens/game/mq/GameScoreListener.java b/game/src/main/java/com/ccsens/game/mq/GameScoreListener.java index ac40a350..660f4312 100644 --- a/game/src/main/java/com/ccsens/game/mq/GameScoreListener.java +++ b/game/src/main/java/com/ccsens/game/mq/GameScoreListener.java @@ -1,109 +1,106 @@ -//package com.ccsens.game.mq; -// -//import cn.hutool.core.collection.CollectionUtil; -//import cn.hutool.core.util.StrUtil; -//import com.alibaba.fastjson.JSON; -//import com.alibaba.fastjson.JSONObject; -//import com.ccsens.game.bean.dto.ClientDto; -//import com.ccsens.game.bean.dto.message.GameMessageCountOut; -//import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; -//import com.ccsens.game.netty.ChannelManager; -//import com.ccsens.game.util.GameConstant; -//import com.ccsens.util.JacksonUtil; -//import com.ccsens.util.RedisUtil; -//import com.ccsens.util.bean.message.common.InMessage; -//import com.ccsens.util.bean.message.common.OutMessage; -//import com.ccsens.util.bean.message.common.OutMessageSet; -//import com.ccsens.util.config.RabbitMQConfig; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.amqp.core.AmqpTemplate; -//import org.springframework.amqp.rabbit.annotation.RabbitHandler; -//import org.springframework.amqp.rabbit.annotation.RabbitListener; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.data.redis.core.ZSetOperations; -//import org.springframework.stereotype.Component; -// -//import java.io.IOException; -//import java.util.ArrayList; -//import java.util.HashSet; -//import java.util.List; -//import java.util.Set; -// -///** -// * @description: -// * @author: whj -// * @time: 2020/6/5 14:30 -// */ -//@Slf4j -//@Component -//@RabbitListener(queues = RabbitMQConfig.GAME_SCORE) -//public class GameScoreListener { -// -// @Autowired -// private RedisUtil redisUtil; -// @Autowired -// private AmqpTemplate rabbitTemplate; -// -// @RabbitHandler -// public void process(String messageJson) throws IOException { -// log.info("计算分数 {}",messageJson); -// OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class); -// Set messageSet = outMessageSet.getMessageSet(); -// if (CollectionUtil.isEmpty(messageSet)) { -// return; -// } -// messageSet.forEach(outMessage -> { -// -// InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class); -// Long userId = to.getId(); -// GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getAuthMessage(), GameDto.GameMsg.class); -// Long recordId = gameMsg.getRecordId(); -// log.info("统计分数:{},{}",userId, recordId); -// GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId); -// -// // 发送给消息系统 -// InMessage inMessage = new InMessage(); -// Set tos = new HashSet<>(); -// tos.add(JSONObject.toJSONString(new InMessage.To(userId))); -// inMessage.setTos(tos); -// inMessage.setData(JSONObject.toJSONString(gameMessageCountOut)); -// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_SCORE, inMessage); -// }); -// -// } -// -// public GameMessageCountOut clientAddTimes(Long userId, Long recordId) { -// GameMessageCountOut gameMessageCountOut = new GameMessageCountOut(); -// log.info("userId:{}", userId); -// if (userId == null || recordId == null) { -// return gameMessageCountOut; -// } -// -// String gameUserKey = GameConstant.generateGameKey(recordId); -// Set> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1); -// -// if (CollectionUtil.isEmpty(typedTuples)) { -// return gameMessageCountOut; -// } -// for(ZSetOperations.TypedTuple type : typedTuples) { -// ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class); -// if (user.getUserId().longValue() == userId.longValue()) { -// int score = type.getScore().intValue(); -// String userStatus = GameConstant.generateGameStatusKey(recordId); -// String gameStausObj = (String)redisUtil.get(userStatus); -// if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){ -// gameMessageCountOut = new GameMessageCountOut(score/100, score); -// return gameMessageCountOut; -// } -// score += 100; -// redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score); -// gameMessageCountOut.getData().setTotalScore(score); -// gameMessageCountOut.getData().setTotalTimes(score/100); -// return gameMessageCountOut; -// } -// }; -// -// return gameMessageCountOut; -// } -// -//} +package com.ccsens.game.mq; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.ccsens.game.bean.dto.ClientDto; +import com.ccsens.game.bean.dto.message.GameMessageCountOut; +import com.ccsens.game.util.GameConstant; +import com.ccsens.util.RedisUtil; +import com.ccsens.util.bean.message.common.InMessage; +import com.ccsens.util.bean.message.common.OutMessage; +import com.ccsens.util.bean.message.common.OutMessageSet; +import com.ccsens.util.config.RabbitMQConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * @description: + * @author: whj + * @time: 2020/6/5 14:30 + */ +@Slf4j +@Component +@RabbitListener(queues = RabbitMQConfig.GAME_SCORE) +public class GameScoreListener { + + @Autowired + private RedisUtil redisUtil; + @Autowired + private AmqpTemplate rabbitTemplate; + + @RabbitHandler + public void process(String messageJson) throws IOException { + log.info("计算分数 {}",messageJson); + OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class); + Set messageSet = outMessageSet.getMessageSet(); + if (CollectionUtil.isEmpty(messageSet)) { + return; + } + messageSet.forEach(outMessage -> { + + InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class); + Long userId = to.getId(); + GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getData(), GameDto.GameMsg.class); + Long recordId = gameMsg.getRecordId(); + log.info("统计分数:{},{}",userId, recordId); + GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId); + + // 发送给消息系统 + InMessage inMessage = new InMessage(); + Set tos = new HashSet<>(); + tos.add(JSONObject.toJSONString(new InMessage.To(userId))); + inMessage.setTos(tos); + inMessage.setData(JSONObject.toJSONString(gameMessageCountOut)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage); + }); + + } + + public GameMessageCountOut clientAddTimes(Long userId, Long recordId) { + GameMessageCountOut gameMessageCountOut = new GameMessageCountOut(); + log.info("userId:{}", userId); + if (userId == null || recordId == null) { + return gameMessageCountOut; + } + + String gameUserKey = GameConstant.generateGameKey(recordId); + Set> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1); + + if (CollectionUtil.isEmpty(typedTuples)) { + return gameMessageCountOut; + } + for(ZSetOperations.TypedTuple type : typedTuples) { + ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class); + if (user.getUserId().longValue() == userId.longValue()) { + int score = type.getScore().intValue(); + String userStatus = GameConstant.generateGameStatusKey(recordId); + String gameStausObj = (String)redisUtil.get(userStatus); + if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){ + gameMessageCountOut = new GameMessageCountOut(score/100, score); + return gameMessageCountOut; + } + score += 100; + redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score); + gameMessageCountOut.getData().setTotalScore(score); + gameMessageCountOut.getData().setTotalTimes(score/100); + return gameMessageCountOut; + } + }; + + return gameMessageCountOut; + } + +} diff --git a/game/src/main/java/com/ccsens/game/service/RabbitMQListener.java b/game/src/main/java/com/ccsens/game/service/RabbitMQListener.java index 0c2c7b07..8b9d6a17 100644 --- a/game/src/main/java/com/ccsens/game/service/RabbitMQListener.java +++ b/game/src/main/java/com/ccsens/game/service/RabbitMQListener.java @@ -18,21 +18,21 @@ import java.io.IOException; import java.util.List; import java.util.TreeMap; -@Slf4j -@Component -@RabbitListener(queues = RabbitMQConfig.GAME_STATUS) -public class RabbitMQListener { - private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); - @Autowired - private IMessageService messageService; - - @RabbitHandler - public void process(String messageJson) throws IOException { - System.out.println("++++++++++++++"+messageJson); -// List gameMessageList = JacksonUtil.jsonToBean(messageJson, -// GameMessageWithChangeStatusOut.class, true); - log.info("认证MQ {}",messageJson); - messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson, - GameMessageWithChangeStatusOut.class, true)); - } -} \ No newline at end of file +//@Slf4j +//@Component +//@RabbitListener(queues = RabbitMQConfig.GAME_STATUS) +//public class RabbitMQListener { +// private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); +// @Autowired +// private IMessageService messageService; +// +// @RabbitHandler +// public void process(String messageJson) throws IOException { +// System.out.println("++++++++++++++"+messageJson); +//// List gameMessageList = JacksonUtil.jsonToBean(messageJson, +//// GameMessageWithChangeStatusOut.class, true); +// log.info("认证MQ {}",messageJson); +// messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson, +// GameMessageWithChangeStatusOut.class, true)); +// } +//} \ No newline at end of file diff --git a/game/src/main/java/com/ccsens/game/util/SendMsg.java b/game/src/main/java/com/ccsens/game/util/SendMsg.java index 6d81df28..067e0b19 100644 --- a/game/src/main/java/com/ccsens/game/util/SendMsg.java +++ b/game/src/main/java/com/ccsens/game/util/SendMsg.java @@ -3,6 +3,7 @@ package com.ccsens.game.util; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.ccsens.game.bean.dto.ClientDto; import com.ccsens.game.bean.dto.message.ChangeStatusMessageDto; import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; @@ -15,6 +16,8 @@ import com.ccsens.game.persist.dao.GameUserJoinDao; import com.ccsens.game.service.ClientService; import com.ccsens.util.JacksonUtil; import com.ccsens.util.RedisUtil; +import com.ccsens.util.WebConstant; +import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.config.RabbitMQConfig; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.models.auth.In; @@ -53,14 +56,24 @@ public class SendMsg { if (CollectionUtil.isEmpty(userJoins)) { return; } + Set userIds = new HashSet<>(); userJoins.forEach(join -> { - outs.add(getMsg(userJoins, gameRecord, join, status)); +// outs.add(getMsg(userJoins, gameRecord, join, status)); + + InMessage inMessage = new InMessage(); + + userIds.add(String.valueOf(join.getUserId())); + inMessage.setTos(userIds); + inMessage.setData(JSONObject.toJSONString(getMsg(userJoins, gameRecord, join, status))); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); + }); - if (CollectionUtil.isNotEmpty(outs)) { - rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS, JacksonUtil.beanToJson(outs)); - log.info("发送成功:{}", JacksonUtil.beanToJson(outs)); - } +// if (CollectionUtil.isNotEmpty(outs)) { +// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS, JacksonUtil.beanToJson(outs)); +// log.info("发送成功:{}", JacksonUtil.beanToJson(outs)); +// +// } } public GameMessageWithChangeStatusOut getMsg(List userJoins, GameRecord gameRecord, ClientDto.RedisUser join, byte gameStatus) { @@ -70,6 +83,7 @@ public class SendMsg { data.setRecordId(gameRecord.getId()); data.setGameStatus(gameStatus); out.setUserId(join.getUserId()); + out.setType(WebConstant.Message_Type.ChangeStatus.phase); ChangeStatusMessageDto dtos = new ChangeStatusMessageDto(); switch (gameStatus) { case GameConstant.GAME_COMPLETED: diff --git a/tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java b/tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java index 84c9fa2d..86119df1 100644 --- a/tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java +++ b/tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java @@ -20,6 +20,6 @@ public class MessageTest { String j = JacksonUtil.beanToJson(inMessage); System.out.println(j); //FixMe 发送到消息队列 - rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j); } } diff --git a/tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java b/tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java index 8057ec65..89fe84d2 100644 --- a/tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java +++ b/tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java @@ -1,29 +1,29 @@ -package com.ccsens.tall.rabbitMQ; - - -import com.ccsens.util.config.RabbitMQConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - - -@Component -@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2) -public class RabbitController { - private Logger logger = LoggerFactory.getLogger(RabbitController.class); - - @RabbitHandler - public void process(String messageJson) { - logger.info("Rabbit Received: {}",messageJson); - try { - System.out.println("Rabbit Received: " + messageJson); - - }catch (Exception e){ - e.printStackTrace(); - } - - } - -} +//package com.ccsens.tall.rabbitMQ; +// +// +//import com.ccsens.util.config.RabbitMQConfig; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.stereotype.Component; +// +// +//@Component +//@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2) +//public class RabbitController { +// private Logger logger = LoggerFactory.getLogger(RabbitController.class); +// +// @RabbitHandler +// public void process(String messageJson) { +// logger.info("Rabbit Received: {}",messageJson); +// try { +// System.out.println("Rabbit Received: " + messageJson); +// +// }catch (Exception e){ +// e.printStackTrace(); +// } +// +// } +// +//} diff --git a/tall/src/main/java/com/ccsens/tall/service/MessageService.java b/tall/src/main/java/com/ccsens/tall/service/MessageService.java index 38a5e0e7..d3ee2aec 100644 --- a/tall/src/main/java/com/ccsens/tall/service/MessageService.java +++ b/tall/src/main/java/com/ccsens/tall/service/MessageService.java @@ -76,14 +76,14 @@ public class MessageService implements IMessageService{ public void sendDeliverMessageWithUpload(InMessage inMessage) throws Exception { System.out.println(JacksonUtil.beanToJson(inMessage)); //FixMe 发送到消息队列 - rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,JacksonUtil.beanToJson(inMessage)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,JacksonUtil.beanToJson(inMessage)); } @Override public void sendDeliverMessageWithChecker(InMessage inMessage) throws Exception { System.out.println(JacksonUtil.beanToJson(inMessage)); //FixMe 发送到消息队列 - rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1, + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JacksonUtil.beanToJson(inMessage)); } @@ -91,7 +91,7 @@ public class MessageService implements IMessageService{ public void sendDeliverMessageWithDelete(InMessage inMessage) throws Exception { System.out.println(JacksonUtil.beanToJson(inMessage)); //FixMe 发送到消息队列 - rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1 , + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME , JacksonUtil.beanToJson(inMessage)); } diff --git a/tall/src/main/java/com/ccsens/tall/service/RingService.java b/tall/src/main/java/com/ccsens/tall/service/RingService.java index 34b4926b..c337d743 100644 --- a/tall/src/main/java/com/ccsens/tall/service/RingService.java +++ b/tall/src/main/java/com/ccsens/tall/service/RingService.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; import com.ccsens.tall.bean.dto.RingDto; import com.ccsens.tall.bean.dto.message.BaseMessageDto; import com.ccsens.tall.bean.dto.message.RingMessageWithReadDto; @@ -19,6 +20,7 @@ import com.ccsens.tall.persist.dao.SysUserDao; import com.ccsens.util.CodeEnum; import com.ccsens.util.JacksonUtil; import com.ccsens.util.PropUtil; +import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.exception.BaseException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -78,7 +80,7 @@ public class RingService implements IRingService { } sysRingMsgDao.insertSelective(ringMsg); //所有接收者的userId - Set userIdSet = new HashSet<>(); + Set userIdSet = new HashSet<>(); //添加消息详情与接收角色的关联信息 if (CollectionUtil.isNotEmpty(ringSendDto.getRoleList())) { for (Long roleId : ringSendDto.getRoleList()) { @@ -88,16 +90,31 @@ public class RingService implements IRingService { sysRingSend.setRoleId(roleId); sysRingSendDao.insertSelective(sysRingSend); List userIdList = userService.selectUserIdByRoleId(roleId); - userIdSet.addAll(userIdList); + userIdList.forEach(id -> { + userIdSet.add(String.valueOf(id)); + }); +// userIdSet.addAll(userIdList); } } - List userIdList = new ArrayList<>(userIdSet); - //发送消息 - RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto( - ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time); - ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); - rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, - JacksonUtil.beanToJson(ringMessageWithSendDto)); + + if (CollectionUtil.isNotEmpty(userIdSet)) { + //发送消息 + RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto( + ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time); + InMessage inMessage = new InMessage(); + inMessage.setTos(userIdSet); + inMessage.setData(JSONObject.toJSONString(ringMessageWithSendDto)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); + + } + +// List userIdList = new ArrayList<>(userIdSet); +// //发送消息 +// RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto( +// ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time); +// ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); +// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, +// JacksonUtil.beanToJson(ringMessageWithSendDto)); //返回消息的详细信息 RingVo.RingInfo ringInfo = new RingVo.RingInfo(); ringInfo.setMessageId(ringMsg.getId()); @@ -206,12 +223,20 @@ public class RingService implements IRingService { sysRingSendDao.updateByPrimaryKeySelective(sysRingSend); } //将已读消息返回给发送者 - List userIdList = new ArrayList<>(); - userIdList.add(sysRingMsg.getSenderId()); +// List userIdList = new ArrayList<>(); +// userIdList.add(sysRingMsg.getSenderId()); +// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId); +// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); +// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, +// JacksonUtil.beanToJson(ringMessageWithReadDto)); + RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId); - ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); - rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, - JacksonUtil.beanToJson(ringMessageWithReadDto)); + Set userIdSet = new HashSet<>(); + userIdSet.add(String.valueOf(sysRingMsg.getSenderId())); + InMessage inMessage = new InMessage(); + inMessage.setTos(userIdSet); + inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); } } //查询被读的信息返回 @@ -261,14 +286,22 @@ public class RingService implements IRingService { //获取消息 SysRingMsg sysRingMsg = sysRingMsgDao.selectByPrimaryKey(sysRingSend.getRingId()); //ws消息接收者的userId - Set userIdSet = new HashSet<>(); - userIdSet.add(sysRingMsg.getSenderId()); - List userIdList = new ArrayList<>(userIdSet); - //将已读消息返回给发送者 +// Set userIdSet = new HashSet<>(); +// userIdSet.add(sysRingMsg.getSenderId()); +// List userIdList = new ArrayList<>(userIdSet); +// //将已读消息返回给发送者 +// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId); +// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); +// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, +// JacksonUtil.beanToJson(ringMessageWithReadDto)); + + Set userIdSet = new HashSet<>(); + userIdSet.add(String.valueOf(sysRingMsg.getSenderId())); RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId); - ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); - rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, - JacksonUtil.beanToJson(ringMessageWithReadDto)); + InMessage inMessage = new InMessage(); + inMessage.setTos(userIdSet); + inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); } } } diff --git a/tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java b/tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java index 907e1a86..cbe01ad0 100644 --- a/tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java +++ b/tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; import com.ccsens.tall.bean.dto.DeliverDto; import com.ccsens.tall.bean.dto.message.BaseMessageDto; import com.ccsens.tall.bean.dto.message.DeliverMessageWithCheckerDto; @@ -254,13 +255,13 @@ public class TaskDeliverService implements ITaskDeliverService { userIdList.addAll(userService.selectUserIdByRoleId(postLogCheckerId)); } } -// Set userIdSet = new HashSet<>(); + Set userIdSet = new HashSet<>(); if (CollectionUtil.isNotEmpty(userIdList)) { HashSet h = new HashSet<>(userIdList); userIdList.clear(); userIdList.addAll(h); for (Long userId : userIdList) { -// userIdSet.add(userId.toString()); + userIdSet.add(userId.toString()); messageUser = new BaseMessageDto.MessageUser(); messageUser.setUserId(userId); messageUserList.add(messageUser); @@ -283,8 +284,12 @@ public class TaskDeliverService implements ITaskDeliverService { uploadMessage.setData(uploadMessageData); uploadMessage.setReceivers(messageUserList); log.info("检查交付物:{}",JacksonUtil.beanToJson(uploadMessage)); - rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, - JacksonUtil.beanToJson(uploadMessage)); +// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, +// JacksonUtil.beanToJson(uploadMessage)); + InMessage inMessage = new InMessage(); + inMessage.setTos(userIdSet); + inMessage.setData(JSONObject.toJSONString(uploadMessage)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); } else { throw new BaseException(CodeEnum.IS_NOT_EXECUTOR); @@ -740,13 +745,13 @@ public class TaskDeliverService implements ITaskDeliverService { ProTaskDetail task = taskDetailDao.selectByPrimaryKey(deliver.getTaskDetailId()); ProRole role = proRoleDao.selectByPrimaryKey(task.getExecutorRole()); //发送消息 -// Set userIdSet = new HashSet<>(); + Set userIdSet = new HashSet<>(); if (CollectionUtil.isNotEmpty(userIdList)) { HashSet h = new HashSet<>(userIdList); userIdList.clear(); userIdList.addAll(h); for (Long userId : userIdList) { -// userIdSet.add(userId.toString()); + userIdSet.add(userId.toString()); messageUser = new BaseMessageDto.MessageUser(); messageUser.setUserId(userId); messageUserList.add(messageUser); @@ -766,8 +771,15 @@ public class TaskDeliverService implements ITaskDeliverService { deleteMessageData.setUserId(currentUserId); deleteMessage.setData(deleteMessageData); deleteMessage.setReceivers(messageUserList); - rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, - JacksonUtil.beanToJson(deleteMessage)); + + InMessage inMessage = new InMessage(); + inMessage.setTos(userIdSet); + inMessage.setData(JSONObject.toJSONString(deleteMessage)); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); + + +// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, +// JacksonUtil.beanToJson(deleteMessage)); // MessageRule messageRule = MessageRule.defaultRule(MessageConstant.DomainType.User); // String s = JacksonUtil.beanToJson(deleteMessage); // InMessage inMessage = InMessage.newToUserMessage(currentUserId.toString(),userIdSet,null,messageRule,s); diff --git a/tall/src/main/java/com/ccsens/tall/web/DebugController.java b/tall/src/main/java/com/ccsens/tall/web/DebugController.java index 8df8d438..025fe5df 100644 --- a/tall/src/main/java/com/ccsens/tall/web/DebugController.java +++ b/tall/src/main/java/com/ccsens/tall/web/DebugController.java @@ -81,7 +81,7 @@ public class DebugController { System.out.println(j); //FixMe 发送到消息队列 - rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); + rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j); return JsonResponse.newInstance().ok(); } diff --git a/util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java b/util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java index 436bb298..f25bde5a 100644 --- a/util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java +++ b/util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java @@ -9,13 +9,13 @@ import java.util.Map; @Configuration public class RabbitMQConfig { - public static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring"; - public static final String TALL_MESSAGE_1 = "tall_message_1"; - public static final String TALL_MESSAGE_2 = "tall_message_2"; - public static final String GAME_STATUS = "game_status"; - public static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue"; - public static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange"; - public static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey"; + private static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring"; + private static final String TALL_MESSAGE_1 = "tall_message_1"; + private static final String TALL_MESSAGE_2 = "tall_message_2"; + private static final String GAME_STATUS = "game_status"; + private static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue"; + private static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange"; + private static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey"; /**消息队列发送*/ @@ -46,34 +46,34 @@ public class RabbitMQConfig { return new Queue(GAME_SCORE); } - /** - * 延时发送队列 - * @return - */ - @Bean - public Queue delayQueue(){ -// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build(); - return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true); - } - - /** - * 延时交换机 - * @return - */ - @Bean - public CustomExchange delayExchange(){ - Map args = new HashMap<>(); - args.put("x-delayed-type", "direct"); - return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args); - } - - /** - * 绑定 - */ - @Bean - Binding queueBinding(Queue delayQueue, CustomExchange customExchange) { - return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs(); - } +// /** +// * 延时发送队列 +// * @return +// */ +// @Bean +// public Queue delayQueue(){ +//// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build(); +// return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true); +// } +// +// /** +// * 延时交换机 +// * @return +// */ +// @Bean +// public CustomExchange delayExchange(){ +// Map args = new HashMap<>(); +// args.put("x-delayed-type", "direct"); +// return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args); +// } +// +// /** +// * 绑定 +// */ +// @Bean +// Binding queueBinding(Queue delayQueue, CustomExchange customExchange) { +// return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs(); +// } } diff --git a/util/src/main/java/com/ccsens/util/mq/DelayConsumer.java b/util/src/main/java/com/ccsens/util/mq/DelayConsumer.java index 6e1b4e94..1e6e57a7 100644 --- a/util/src/main/java/com/ccsens/util/mq/DelayConsumer.java +++ b/util/src/main/java/com/ccsens/util/mq/DelayConsumer.java @@ -1,33 +1,33 @@ -package com.ccsens.util.mq; - -import com.ccsens.util.config.RabbitMQConfig; -import com.rabbitmq.client.Channel; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; -import org.springframework.messaging.handler.annotation.Headers; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.Date; -import java.util.Map; - -/** - * @description: - * @author: wuHuiJuan - * @create: 2019/12/27 15:40 - */ -@Component -public class DelayConsumer { - @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE) - @RabbitHandler - public void receive(@Payload Object msg, @Headers Map headers, Channel channel ) throws IOException { - System.out.println("接收到的消息:"+msg +"||" + new Date()); - - //ACK 手工签收,通知rabbitMQ,消费端消费成功 - Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); - channel.basicAck(deliveryTag,false); - } - -} +//package com.ccsens.util.mq; +// +//import com.ccsens.util.config.RabbitMQConfig; +//import com.rabbitmq.client.Channel; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.amqp.support.AmqpHeaders; +//import org.springframework.messaging.handler.annotation.Headers; +//import org.springframework.messaging.handler.annotation.Payload; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.Date; +//import java.util.Map; +// +///** +// * @description: +// * @author: wuHuiJuan +// * @create: 2019/12/27 15:40 +// */ +//@Component +//public class DelayConsumer { +// @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE) +// @RabbitHandler +// public void receive(@Payload Object msg, @Headers Map headers, Channel channel ) throws IOException { +// System.out.println("接收到的消息:"+msg +"||" + new Date()); +// +// //ACK 手工签收,通知rabbitMQ,消费端消费成功 +// Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); +// channel.basicAck(deliveryTag,false); +// } +// +//} diff --git a/util/src/main/java/com/ccsens/util/mq/DelayProducer.java b/util/src/main/java/com/ccsens/util/mq/DelayProducer.java index f4c371a6..58f05d74 100644 --- a/util/src/main/java/com/ccsens/util/mq/DelayProducer.java +++ b/util/src/main/java/com/ccsens/util/mq/DelayProducer.java @@ -1,54 +1,54 @@ -package com.ccsens.util.mq; - -import com.ccsens.util.config.RabbitMQConfig; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @description: - * @author: wuHuiJuan - * @create: 2019/12/27 15:16 - */ -@Slf4j -@Component -public class DelayProducer { - @Autowired - private RabbitTemplate rabbitTemplate; - - //消息发送后的回调函数 - /** - * 生产者回调函数:confirm确认消息投递成功 - */ - final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { - - String messageId = correlationData.getId(); - if (ack) { - System.out.println("消息发送成功" + correlationData); - log.info("消息投递成功,{}",messageId); - //进行消息记录的数据库更新 - - }else{ - log.info("消息投递失败"); - } - - }; - - /** - * 通过延迟消息插件发动延迟消息 - * @param msg - * @param expiration - */ - public void sendDelayMessage(Object msg, Long expiration){ - - //绑定异步监听回调函数 - rabbitTemplate.setConfirmCallback(confirmCallback); - - rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{ - message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间 - return message; - }, new CorrelationData("123")); - } -} +//package com.ccsens.util.mq; +// +//import com.ccsens.util.config.RabbitMQConfig; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.rabbit.connection.CorrelationData; +//import org.springframework.amqp.rabbit.core.RabbitTemplate; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +///** +// * @description: +// * @author: wuHuiJuan +// * @create: 2019/12/27 15:16 +// */ +//@Slf4j +//@Component +//public class DelayProducer { +// @Autowired +// private RabbitTemplate rabbitTemplate; +// +// //消息发送后的回调函数 +// /** +// * 生产者回调函数:confirm确认消息投递成功 +// */ +// final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { +// +// String messageId = correlationData.getId(); +// if (ack) { +// System.out.println("消息发送成功" + correlationData); +// log.info("消息投递成功,{}",messageId); +// //进行消息记录的数据库更新 +// +// }else{ +// log.info("消息投递失败"); +// } +// +// }; +// +// /** +// * 通过延迟消息插件发动延迟消息 +// * @param msg +// * @param expiration +// */ +// public void sendDelayMessage(Object msg, Long expiration){ +// +// //绑定异步监听回调函数 +// rabbitTemplate.setConfirmCallback(confirmCallback); +// +// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{ +// message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间 +// return message; +// }, new CorrelationData("123")); +// } +//}