Browse Source

消息系统

master
zhizhi wu 6 years ago
parent
commit
321fa0089e
  1. 215
      game/src/main/java/com/ccsens/game/mq/GameScoreListener.java
  2. 36
      game/src/main/java/com/ccsens/game/service/RabbitMQListener.java
  3. 24
      game/src/main/java/com/ccsens/game/util/SendMsg.java
  4. 2
      tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java
  5. 58
      tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java
  6. 6
      tall/src/main/java/com/ccsens/tall/service/MessageService.java
  7. 75
      tall/src/main/java/com/ccsens/tall/service/RingService.java
  8. 28
      tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java
  9. 2
      tall/src/main/java/com/ccsens/tall/web/DebugController.java
  10. 70
      util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java
  11. 66
      util/src/main/java/com/ccsens/util/mq/DelayConsumer.java
  12. 108
      util/src/main/java/com/ccsens/util/mq/DelayProducer.java

215
game/src/main/java/com/ccsens/game/mq/GameScoreListener.java

@ -1,109 +1,106 @@
//package com.ccsens.game.mq; package com.ccsens.game.mq;
//
//import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
//import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
//import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
//import com.ccsens.game.bean.dto.ClientDto; import com.ccsens.game.bean.dto.ClientDto;
//import com.ccsens.game.bean.dto.message.GameMessageCountOut; import com.ccsens.game.bean.dto.message.GameMessageCountOut;
//import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; import com.ccsens.game.util.GameConstant;
//import com.ccsens.game.netty.ChannelManager; import com.ccsens.util.RedisUtil;
//import com.ccsens.game.util.GameConstant; import com.ccsens.util.bean.message.common.InMessage;
//import com.ccsens.util.JacksonUtil; import com.ccsens.util.bean.message.common.OutMessage;
//import com.ccsens.util.RedisUtil; import com.ccsens.util.bean.message.common.OutMessageSet;
//import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.config.RabbitMQConfig;
//import com.ccsens.util.bean.message.common.OutMessage; import lombok.extern.slf4j.Slf4j;
//import com.ccsens.util.bean.message.common.OutMessageSet; import org.springframework.amqp.core.AmqpTemplate;
//import com.ccsens.util.config.RabbitMQConfig; import org.springframework.amqp.rabbit.annotation.RabbitHandler;
//import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.data.redis.core.ZSetOperations;
//import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.ZSetOperations; import java.io.IOException;
//import org.springframework.stereotype.Component; import java.util.ArrayList;
// import java.util.HashSet;
//import java.io.IOException; import java.util.List;
//import java.util.ArrayList; import java.util.Set;
//import java.util.HashSet;
//import java.util.List; /**
//import java.util.Set; * @description:
// * @author: whj
///** * @time: 2020/6/5 14:30
// * @description: */
// * @author: whj @Slf4j
// * @time: 2020/6/5 14:30 @Component
// */ @RabbitListener(queues = RabbitMQConfig.GAME_SCORE)
//@Slf4j public class GameScoreListener {
//@Component
//@RabbitListener(queues = RabbitMQConfig.GAME_SCORE) @Autowired
//public class GameScoreListener { private RedisUtil redisUtil;
// @Autowired
// @Autowired private AmqpTemplate rabbitTemplate;
// private RedisUtil redisUtil;
// @Autowired @RabbitHandler
// private AmqpTemplate rabbitTemplate; public void process(String messageJson) throws IOException {
// log.info("计算分数 {}",messageJson);
// @RabbitHandler OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class);
// public void process(String messageJson) throws IOException { Set<OutMessage> messageSet = outMessageSet.getMessageSet();
// log.info("计算分数 {}",messageJson); if (CollectionUtil.isEmpty(messageSet)) {
// OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class); return;
// Set<OutMessage> messageSet = outMessageSet.getMessageSet(); }
// if (CollectionUtil.isEmpty(messageSet)) { messageSet.forEach(outMessage -> {
// return;
// } InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class);
// messageSet.forEach(outMessage -> { Long userId = to.getId();
// GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getData(), GameDto.GameMsg.class);
// InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class); Long recordId = gameMsg.getRecordId();
// Long userId = to.getId(); log.info("统计分数:{},{}",userId, recordId);
// GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getAuthMessage(), GameDto.GameMsg.class); GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId);
// Long recordId = gameMsg.getRecordId();
// log.info("统计分数:{},{}",userId, recordId); // 发送给消息系统
// GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId); InMessage inMessage = new InMessage();
// Set<String> tos = new HashSet<>();
// // 发送给消息系统 tos.add(JSONObject.toJSONString(new InMessage.To(userId)));
// InMessage inMessage = new InMessage(); inMessage.setTos(tos);
// Set<String> tos = new HashSet<>(); inMessage.setData(JSONObject.toJSONString(gameMessageCountOut));
// tos.add(JSONObject.toJSONString(new InMessage.To(userId))); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);
// inMessage.setTos(tos); });
// inMessage.setData(JSONObject.toJSONString(gameMessageCountOut));
// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_SCORE, inMessage); }
// });
// public GameMessageCountOut clientAddTimes(Long userId, Long recordId) {
// } GameMessageCountOut gameMessageCountOut = new GameMessageCountOut();
// log.info("userId:{}", userId);
// public GameMessageCountOut clientAddTimes(Long userId, Long recordId) { if (userId == null || recordId == null) {
// GameMessageCountOut gameMessageCountOut = new GameMessageCountOut(); return gameMessageCountOut;
// log.info("userId:{}", userId); }
// if (userId == null || recordId == null) {
// return gameMessageCountOut; String gameUserKey = GameConstant.generateGameKey(recordId);
// } Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1);
//
// String gameUserKey = GameConstant.generateGameKey(recordId); if (CollectionUtil.isEmpty(typedTuples)) {
// Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1); return gameMessageCountOut;
// }
// if (CollectionUtil.isEmpty(typedTuples)) { for(ZSetOperations.TypedTuple<Object> type : typedTuples) {
// return gameMessageCountOut; ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class);
// } if (user.getUserId().longValue() == userId.longValue()) {
// for(ZSetOperations.TypedTuple<Object> type : typedTuples) { int score = type.getScore().intValue();
// ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class); String userStatus = GameConstant.generateGameStatusKey(recordId);
// if (user.getUserId().longValue() == userId.longValue()) { String gameStausObj = (String)redisUtil.get(userStatus);
// int score = type.getScore().intValue(); if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){
// String userStatus = GameConstant.generateGameStatusKey(recordId); gameMessageCountOut = new GameMessageCountOut(score/100, score);
// String gameStausObj = (String)redisUtil.get(userStatus); return gameMessageCountOut;
// if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){ }
// gameMessageCountOut = new GameMessageCountOut(score/100, score); score += 100;
// return gameMessageCountOut; redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score);
// } gameMessageCountOut.getData().setTotalScore(score);
// score += 100; gameMessageCountOut.getData().setTotalTimes(score/100);
// redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score); return gameMessageCountOut;
// gameMessageCountOut.getData().setTotalScore(score); }
// gameMessageCountOut.getData().setTotalTimes(score/100); };
// return gameMessageCountOut;
// } return gameMessageCountOut;
// }; }
//
// return gameMessageCountOut; }
// }
//
//}

36
game/src/main/java/com/ccsens/game/service/RabbitMQListener.java

@ -18,21 +18,21 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
@Slf4j //@Slf4j
@Component //@Component
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS) //@RabbitListener(queues = RabbitMQConfig.GAME_STATUS)
public class RabbitMQListener { //public class RabbitMQListener {
private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); // private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
@Autowired // @Autowired
private IMessageService messageService; // private IMessageService messageService;
//
@RabbitHandler // @RabbitHandler
public void process(String messageJson) throws IOException { // public void process(String messageJson) throws IOException {
System.out.println("++++++++++++++"+messageJson); // System.out.println("++++++++++++++"+messageJson);
// List<GameMessageWithChangeStatusOut> gameMessageList = JacksonUtil.jsonToBean(messageJson, //// List<GameMessageWithChangeStatusOut> gameMessageList = JacksonUtil.jsonToBean(messageJson,
// GameMessageWithChangeStatusOut.class, true); //// GameMessageWithChangeStatusOut.class, true);
log.info("认证MQ {}",messageJson); // log.info("认证MQ {}",messageJson);
messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson, // messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson,
GameMessageWithChangeStatusOut.class, true)); // GameMessageWithChangeStatusOut.class, true));
} // }
} //}

24
game/src/main/java/com/ccsens/game/util/SendMsg.java

@ -3,6 +3,7 @@ package com.ccsens.game.util;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.game.bean.dto.ClientDto; import com.ccsens.game.bean.dto.ClientDto;
import com.ccsens.game.bean.dto.message.ChangeStatusMessageDto; import com.ccsens.game.bean.dto.message.ChangeStatusMessageDto;
import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut;
@ -15,6 +16,8 @@ import com.ccsens.game.persist.dao.GameUserJoinDao;
import com.ccsens.game.service.ClientService; import com.ccsens.game.service.ClientService;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.RedisUtil; import com.ccsens.util.RedisUtil;
import com.ccsens.util.WebConstant;
import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.models.auth.In; import io.swagger.models.auth.In;
@ -53,14 +56,24 @@ public class SendMsg {
if (CollectionUtil.isEmpty(userJoins)) { if (CollectionUtil.isEmpty(userJoins)) {
return; return;
} }
Set<String> userIds = new HashSet<>();
userJoins.forEach(join -> { userJoins.forEach(join -> {
outs.add(getMsg(userJoins, gameRecord, join, status)); // outs.add(getMsg(userJoins, gameRecord, join, status));
InMessage inMessage = new InMessage();
userIds.add(String.valueOf(join.getUserId()));
inMessage.setTos(userIds);
inMessage.setData(JSONObject.toJSONString(getMsg(userJoins, gameRecord, join, status)));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
}); });
if (CollectionUtil.isNotEmpty(outs)) {
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS, JacksonUtil.beanToJson(outs));
log.info("发送成功:{}", JacksonUtil.beanToJson(outs));
} // if (CollectionUtil.isNotEmpty(outs)) {
// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS, JacksonUtil.beanToJson(outs));
// log.info("发送成功:{}", JacksonUtil.beanToJson(outs));
//
// }
} }
public GameMessageWithChangeStatusOut getMsg(List<ClientDto.RedisUser> userJoins, GameRecord gameRecord, ClientDto.RedisUser join, byte gameStatus) { public GameMessageWithChangeStatusOut getMsg(List<ClientDto.RedisUser> userJoins, GameRecord gameRecord, ClientDto.RedisUser join, byte gameStatus) {
@ -70,6 +83,7 @@ public class SendMsg {
data.setRecordId(gameRecord.getId()); data.setRecordId(gameRecord.getId());
data.setGameStatus(gameStatus); data.setGameStatus(gameStatus);
out.setUserId(join.getUserId()); out.setUserId(join.getUserId());
out.setType(WebConstant.Message_Type.ChangeStatus.phase);
ChangeStatusMessageDto dtos = new ChangeStatusMessageDto(); ChangeStatusMessageDto dtos = new ChangeStatusMessageDto();
switch (gameStatus) { switch (gameStatus) {
case GameConstant.GAME_COMPLETED: case GameConstant.GAME_COMPLETED:

2
tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java

@ -20,6 +20,6 @@ public class MessageTest {
String j = JacksonUtil.beanToJson(inMessage); String j = JacksonUtil.beanToJson(inMessage);
System.out.println(j); System.out.println(j);
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j);
} }
} }

58
tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java

@ -1,29 +1,29 @@
package com.ccsens.tall.rabbitMQ; //package com.ccsens.tall.rabbitMQ;
//
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import org.slf4j.Logger; //import org.slf4j.Logger;
import org.slf4j.LoggerFactory; //import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
//
@Component //@Component
@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2) //@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2)
public class RabbitController { //public class RabbitController {
private Logger logger = LoggerFactory.getLogger(RabbitController.class); // private Logger logger = LoggerFactory.getLogger(RabbitController.class);
//
@RabbitHandler // @RabbitHandler
public void process(String messageJson) { // public void process(String messageJson) {
logger.info("Rabbit Received: {}",messageJson); // logger.info("Rabbit Received: {}",messageJson);
try { // try {
System.out.println("Rabbit Received: " + messageJson); // System.out.println("Rabbit Received: " + messageJson);
//
}catch (Exception e){ // }catch (Exception e){
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
//
} //}

6
tall/src/main/java/com/ccsens/tall/service/MessageService.java

@ -76,14 +76,14 @@ public class MessageService implements IMessageService{
public void sendDeliverMessageWithUpload(InMessage inMessage) throws Exception { public void sendDeliverMessageWithUpload(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,JacksonUtil.beanToJson(inMessage)); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,JacksonUtil.beanToJson(inMessage));
} }
@Override @Override
public void sendDeliverMessageWithChecker(InMessage inMessage) throws Exception { public void sendDeliverMessageWithChecker(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1, rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,
JacksonUtil.beanToJson(inMessage)); JacksonUtil.beanToJson(inMessage));
} }
@ -91,7 +91,7 @@ public class MessageService implements IMessageService{
public void sendDeliverMessageWithDelete(InMessage inMessage) throws Exception { public void sendDeliverMessageWithDelete(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1 , rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME ,
JacksonUtil.beanToJson(inMessage)); JacksonUtil.beanToJson(inMessage));
} }

75
tall/src/main/java/com/ccsens/tall/service/RingService.java

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Snowflake; import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.tall.bean.dto.RingDto; import com.ccsens.tall.bean.dto.RingDto;
import com.ccsens.tall.bean.dto.message.BaseMessageDto; import com.ccsens.tall.bean.dto.message.BaseMessageDto;
import com.ccsens.tall.bean.dto.message.RingMessageWithReadDto; import com.ccsens.tall.bean.dto.message.RingMessageWithReadDto;
@ -19,6 +20,7 @@ import com.ccsens.tall.persist.dao.SysUserDao;
import com.ccsens.util.CodeEnum; import com.ccsens.util.CodeEnum;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.PropUtil; import com.ccsens.util.PropUtil;
import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import com.ccsens.util.exception.BaseException; import com.ccsens.util.exception.BaseException;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@ -78,7 +80,7 @@ public class RingService implements IRingService {
} }
sysRingMsgDao.insertSelective(ringMsg); sysRingMsgDao.insertSelective(ringMsg);
//所有接收者的userId //所有接收者的userId
Set<Long> userIdSet = new HashSet<>(); Set<String> userIdSet = new HashSet<>();
//添加消息详情与接收角色的关联信息 //添加消息详情与接收角色的关联信息
if (CollectionUtil.isNotEmpty(ringSendDto.getRoleList())) { if (CollectionUtil.isNotEmpty(ringSendDto.getRoleList())) {
for (Long roleId : ringSendDto.getRoleList()) { for (Long roleId : ringSendDto.getRoleList()) {
@ -88,16 +90,31 @@ public class RingService implements IRingService {
sysRingSend.setRoleId(roleId); sysRingSend.setRoleId(roleId);
sysRingSendDao.insertSelective(sysRingSend); sysRingSendDao.insertSelective(sysRingSend);
List<Long> userIdList = userService.selectUserIdByRoleId(roleId); List<Long> userIdList = userService.selectUserIdByRoleId(roleId);
userIdSet.addAll(userIdList); userIdList.forEach(id -> {
userIdSet.add(String.valueOf(id));
});
// userIdSet.addAll(userIdList);
} }
} }
List<Long> userIdList = new ArrayList<>(userIdSet);
//发送消息 if (CollectionUtil.isNotEmpty(userIdSet)) {
RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto( //发送消息
ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time); RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto(
ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time);
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, InMessage inMessage = new InMessage();
JacksonUtil.beanToJson(ringMessageWithSendDto)); inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(ringMessageWithSendDto));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
}
// List<Long> userIdList = new ArrayList<>(userIdSet);
// //发送消息
// RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto(
// ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time);
// ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithSendDto));
//返回消息的详细信息 //返回消息的详细信息
RingVo.RingInfo ringInfo = new RingVo.RingInfo(); RingVo.RingInfo ringInfo = new RingVo.RingInfo();
ringInfo.setMessageId(ringMsg.getId()); ringInfo.setMessageId(ringMsg.getId());
@ -206,12 +223,20 @@ public class RingService implements IRingService {
sysRingSendDao.updateByPrimaryKeySelective(sysRingSend); sysRingSendDao.updateByPrimaryKeySelective(sysRingSend);
} }
//将已读消息返回给发送者 //将已读消息返回给发送者
List<Long> userIdList = new ArrayList<>(); // List<Long> userIdList = new ArrayList<>();
userIdList.add(sysRingMsg.getSenderId()); // userIdList.add(sysRingMsg.getSenderId());
// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId);
// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithReadDto));
RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId); RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId);
ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); Set<String> userIdSet = new HashSet<>();
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, userIdSet.add(String.valueOf(sysRingMsg.getSenderId()));
JacksonUtil.beanToJson(ringMessageWithReadDto)); InMessage inMessage = new InMessage();
inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
} }
} }
//查询被读的信息返回 //查询被读的信息返回
@ -261,14 +286,22 @@ public class RingService implements IRingService {
//获取消息 //获取消息
SysRingMsg sysRingMsg = sysRingMsgDao.selectByPrimaryKey(sysRingSend.getRingId()); SysRingMsg sysRingMsg = sysRingMsgDao.selectByPrimaryKey(sysRingSend.getRingId());
//ws消息接收者的userId //ws消息接收者的userId
Set<Long> userIdSet = new HashSet<>(); // Set<Long> userIdSet = new HashSet<>();
userIdSet.add(sysRingMsg.getSenderId()); // userIdSet.add(sysRingMsg.getSenderId());
List<Long> userIdList = new ArrayList<>(userIdSet); // List<Long> userIdList = new ArrayList<>(userIdSet);
//将已读消息返回给发送者 // //将已读消息返回给发送者
// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId);
// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithReadDto));
Set<String> userIdSet = new HashSet<>();
userIdSet.add(String.valueOf(sysRingMsg.getSenderId()));
RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId); RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId);
ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); InMessage inMessage = new InMessage();
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, inMessage.setTos(userIdSet);
JacksonUtil.beanToJson(ringMessageWithReadDto)); inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
} }
} }
} }

28
tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Snowflake; import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.tall.bean.dto.DeliverDto; import com.ccsens.tall.bean.dto.DeliverDto;
import com.ccsens.tall.bean.dto.message.BaseMessageDto; import com.ccsens.tall.bean.dto.message.BaseMessageDto;
import com.ccsens.tall.bean.dto.message.DeliverMessageWithCheckerDto; import com.ccsens.tall.bean.dto.message.DeliverMessageWithCheckerDto;
@ -254,13 +255,13 @@ public class TaskDeliverService implements ITaskDeliverService {
userIdList.addAll(userService.selectUserIdByRoleId(postLogCheckerId)); userIdList.addAll(userService.selectUserIdByRoleId(postLogCheckerId));
} }
} }
// Set<String> userIdSet = new HashSet<>(); Set<String> userIdSet = new HashSet<>();
if (CollectionUtil.isNotEmpty(userIdList)) { if (CollectionUtil.isNotEmpty(userIdList)) {
HashSet<Long> h = new HashSet<>(userIdList); HashSet<Long> h = new HashSet<>(userIdList);
userIdList.clear(); userIdList.clear();
userIdList.addAll(h); userIdList.addAll(h);
for (Long userId : userIdList) { for (Long userId : userIdList) {
// userIdSet.add(userId.toString()); userIdSet.add(userId.toString());
messageUser = new BaseMessageDto.MessageUser(); messageUser = new BaseMessageDto.MessageUser();
messageUser.setUserId(userId); messageUser.setUserId(userId);
messageUserList.add(messageUser); messageUserList.add(messageUser);
@ -283,8 +284,12 @@ public class TaskDeliverService implements ITaskDeliverService {
uploadMessage.setData(uploadMessageData); uploadMessage.setData(uploadMessageData);
uploadMessage.setReceivers(messageUserList); uploadMessage.setReceivers(messageUserList);
log.info("检查交付物:{}",JacksonUtil.beanToJson(uploadMessage)); log.info("检查交付物:{}",JacksonUtil.beanToJson(uploadMessage));
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, // rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
JacksonUtil.beanToJson(uploadMessage)); // JacksonUtil.beanToJson(uploadMessage));
InMessage inMessage = new InMessage();
inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(uploadMessage));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
} else { } else {
throw new BaseException(CodeEnum.IS_NOT_EXECUTOR); throw new BaseException(CodeEnum.IS_NOT_EXECUTOR);
@ -740,13 +745,13 @@ public class TaskDeliverService implements ITaskDeliverService {
ProTaskDetail task = taskDetailDao.selectByPrimaryKey(deliver.getTaskDetailId()); ProTaskDetail task = taskDetailDao.selectByPrimaryKey(deliver.getTaskDetailId());
ProRole role = proRoleDao.selectByPrimaryKey(task.getExecutorRole()); ProRole role = proRoleDao.selectByPrimaryKey(task.getExecutorRole());
//发送消息 //发送消息
// Set<String> userIdSet = new HashSet<>(); Set<String> userIdSet = new HashSet<>();
if (CollectionUtil.isNotEmpty(userIdList)) { if (CollectionUtil.isNotEmpty(userIdList)) {
HashSet<Long> h = new HashSet<>(userIdList); HashSet<Long> h = new HashSet<>(userIdList);
userIdList.clear(); userIdList.clear();
userIdList.addAll(h); userIdList.addAll(h);
for (Long userId : userIdList) { for (Long userId : userIdList) {
// userIdSet.add(userId.toString()); userIdSet.add(userId.toString());
messageUser = new BaseMessageDto.MessageUser(); messageUser = new BaseMessageDto.MessageUser();
messageUser.setUserId(userId); messageUser.setUserId(userId);
messageUserList.add(messageUser); messageUserList.add(messageUser);
@ -766,8 +771,15 @@ public class TaskDeliverService implements ITaskDeliverService {
deleteMessageData.setUserId(currentUserId); deleteMessageData.setUserId(currentUserId);
deleteMessage.setData(deleteMessageData); deleteMessage.setData(deleteMessageData);
deleteMessage.setReceivers(messageUserList); deleteMessage.setReceivers(messageUserList);
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
JacksonUtil.beanToJson(deleteMessage)); InMessage inMessage = new InMessage();
inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(deleteMessage));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(deleteMessage));
// MessageRule messageRule = MessageRule.defaultRule(MessageConstant.DomainType.User); // MessageRule messageRule = MessageRule.defaultRule(MessageConstant.DomainType.User);
// String s = JacksonUtil.beanToJson(deleteMessage); // String s = JacksonUtil.beanToJson(deleteMessage);
// InMessage inMessage = InMessage.newToUserMessage(currentUserId.toString(),userIdSet,null,messageRule,s); // InMessage inMessage = InMessage.newToUserMessage(currentUserId.toString(),userIdSet,null,messageRule,s);

2
tall/src/main/java/com/ccsens/tall/web/DebugController.java

@ -81,7 +81,7 @@ public class DebugController {
System.out.println(j); System.out.println(j);
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j);
return JsonResponse.newInstance().ok(); return JsonResponse.newInstance().ok();
} }

70
util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java

@ -9,13 +9,13 @@ import java.util.Map;
@Configuration @Configuration
public class RabbitMQConfig { public class RabbitMQConfig {
public static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring"; private static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring";
public static final String TALL_MESSAGE_1 = "tall_message_1"; private static final String TALL_MESSAGE_1 = "tall_message_1";
public static final String TALL_MESSAGE_2 = "tall_message_2"; private static final String TALL_MESSAGE_2 = "tall_message_2";
public static final String GAME_STATUS = "game_status"; private static final String GAME_STATUS = "game_status";
public static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue"; private static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue";
public static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange"; private static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange";
public static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey"; private static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey";
/**消息队列发送*/ /**消息队列发送*/
@ -46,34 +46,34 @@ public class RabbitMQConfig {
return new Queue(GAME_SCORE); return new Queue(GAME_SCORE);
} }
/** // /**
* 延时发送队列 // * 延时发送队列
* @return // * @return
*/ // */
@Bean // @Bean
public Queue delayQueue(){ // public Queue delayQueue(){
// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build(); //// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build();
return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true); // return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true);
} // }
//
/** // /**
* 延时交换机 // * 延时交换机
* @return // * @return
*/ // */
@Bean // @Bean
public CustomExchange delayExchange(){ // public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>(); // Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // args.put("x-delayed-type", "direct");
return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args); // return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args);
} // }
//
/** // /**
* 绑定 // * 绑定
*/ // */
@Bean // @Bean
Binding queueBinding(Queue delayQueue, CustomExchange customExchange) { // Binding queueBinding(Queue delayQueue, CustomExchange customExchange) {
return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs(); // return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs();
} // }
} }

66
util/src/main/java/com/ccsens/util/mq/DelayConsumer.java

@ -1,33 +1,33 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders; //import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers; //import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload; //import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.Date; //import java.util.Date;
import java.util.Map; //import java.util.Map;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:40 // * @create: 2019/12/27 15:40
*/ // */
@Component //@Component
public class DelayConsumer { //public class DelayConsumer {
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE) // @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE)
@RabbitHandler // @RabbitHandler
public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException { // public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException {
System.out.println("接收到的消息:"+msg +"||" + new Date()); // System.out.println("接收到的消息:"+msg +"||" + new Date());
//
//ACK 手工签收,通知rabbitMQ,消费端消费成功 // //ACK 手工签收,通知rabbitMQ,消费端消费成功
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false); // channel.basicAck(deliveryTag,false);
} // }
//
} //}

108
util/src/main/java/com/ccsens/util/mq/DelayProducer.java

@ -1,54 +1,54 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData; //import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; //import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:16 // * @create: 2019/12/27 15:16
*/ // */
@Slf4j //@Slf4j
@Component //@Component
public class DelayProducer { //public class DelayProducer {
@Autowired // @Autowired
private RabbitTemplate rabbitTemplate; // private RabbitTemplate rabbitTemplate;
//
//消息发送后的回调函数 // //消息发送后的回调函数
/** // /**
* 生产者回调函数confirm确认消息投递成功 // * 生产者回调函数:confirm确认消息投递成功
*/ // */
final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { // final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
//
String messageId = correlationData.getId(); // String messageId = correlationData.getId();
if (ack) { // if (ack) {
System.out.println("消息发送成功" + correlationData); // System.out.println("消息发送成功" + correlationData);
log.info("消息投递成功,{}",messageId); // log.info("消息投递成功,{}",messageId);
//进行消息记录的数据库更新 // //进行消息记录的数据库更新
//
}else{ // }else{
log.info("消息投递失败"); // log.info("消息投递失败");
} // }
//
}; // };
//
/** // /**
* 通过延迟消息插件发动延迟消息 // * 通过延迟消息插件发动延迟消息
* @param msg // * @param msg
* @param expiration // * @param expiration
*/ // */
public void sendDelayMessage(Object msg, Long expiration){ // public void sendDelayMessage(Object msg, Long expiration){
//
//绑定异步监听回调函数 // //绑定异步监听回调函数
rabbitTemplate.setConfirmCallback(confirmCallback); // rabbitTemplate.setConfirmCallback(confirmCallback);
//
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{ // rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{
message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间 // message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
return message; // return message;
}, new CorrelationData("123")); // }, new CorrelationData("123"));
} // }
} //}

Loading…
Cancel
Save