Browse Source

更改发送的ws消息

master
zy_Java 6 years ago
parent
commit
105e859bcf
  1. 217
      game/src/main/java/com/ccsens/game/mq/GameScoreListener.java
  2. 2
      game/src/main/java/com/ccsens/game/service/MessageService.java
  3. 36
      game/src/main/java/com/ccsens/game/service/RabbitMQListener.java
  4. 29
      game/src/main/java/com/ccsens/game/util/SendMsg.java
  5. 2
      mt/src/main/java/com/ccsens/mt/service/MessageServicer.java
  6. 2
      tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java
  7. 58
      tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java
  8. 9
      tall/src/main/java/com/ccsens/tall/service/MessageService.java
  9. 2
      tall/src/main/java/com/ccsens/tall/service/ProjectMessageService.java
  10. 80
      tall/src/main/java/com/ccsens/tall/service/RingService.java
  11. 57
      tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java
  12. 2
      tall/src/main/java/com/ccsens/tall/service/WpsService.java
  13. 2
      tall/src/main/java/com/ccsens/tall/web/DebugController.java
  14. 4
      tall/src/main/resources/application.yml
  15. 70
      util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java
  16. 66
      util/src/main/java/com/ccsens/util/mq/DelayConsumer.java
  17. 108
      util/src/main/java/com/ccsens/util/mq/DelayProducer.java

217
game/src/main/java/com/ccsens/game/mq/GameScoreListener.java

@ -1,109 +1,108 @@
//package com.ccsens.game.mq; package com.ccsens.game.mq;
//
//import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
//import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
//import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
//import com.ccsens.game.bean.dto.ClientDto; import com.ccsens.game.bean.dto.ClientDto;
//import com.ccsens.game.bean.dto.message.GameMessageCountOut; import com.ccsens.game.bean.dto.message.GameMessageCountOut;
//import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; import com.ccsens.game.util.GameConstant;
//import com.ccsens.game.netty.ChannelManager; import com.ccsens.util.RedisUtil;
//import com.ccsens.game.util.GameConstant; import com.ccsens.util.bean.message.common.InMessage;
//import com.ccsens.util.JacksonUtil; import com.ccsens.util.bean.message.common.MessageConstant;
//import com.ccsens.util.RedisUtil; import com.ccsens.util.bean.message.common.OutMessage;
//import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.bean.message.common.OutMessageSet;
//import com.ccsens.util.bean.message.common.OutMessage; import com.ccsens.util.config.RabbitMQConfig;
//import com.ccsens.util.bean.message.common.OutMessageSet; import lombok.extern.slf4j.Slf4j;
//import com.ccsens.util.config.RabbitMQConfig; import org.springframework.amqp.core.AmqpTemplate;
//import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler;
//import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.ZSetOperations;
//import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
//import org.springframework.data.redis.core.ZSetOperations;
//import org.springframework.stereotype.Component; import java.io.IOException;
// import java.util.ArrayList;
//import java.io.IOException; import java.util.HashSet;
//import java.util.ArrayList; import java.util.List;
//import java.util.HashSet; import java.util.Set;
//import java.util.List;
//import java.util.Set; /**
// * @description:
///** * @author: whj
// * @description: * @time: 2020/6/5 14:30
// * @author: whj */
// * @time: 2020/6/5 14:30 @Slf4j
// */ @Component
//@Slf4j @RabbitListener(queues = RabbitMQConfig.GAME_SCORE)
//@Component public class GameScoreListener {
//@RabbitListener(queues = RabbitMQConfig.GAME_SCORE)
//public class GameScoreListener { @Autowired
// private RedisUtil redisUtil;
// @Autowired @Autowired
// private RedisUtil redisUtil; private AmqpTemplate rabbitTemplate;
// @Autowired
// private AmqpTemplate rabbitTemplate; @RabbitHandler
// public void process(String messageJson) throws IOException {
// @RabbitHandler log.info("计算分数 {}",messageJson);
// public void process(String messageJson) throws IOException { OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class);
// log.info("计算分数 {}",messageJson); Set<OutMessage> messageSet = outMessageSet.getMessageSet();
// OutMessageSet outMessageSet = JSONObject.parseObject(messageJson, OutMessageSet.class); if (CollectionUtil.isEmpty(messageSet)) {
// Set<OutMessage> messageSet = outMessageSet.getMessageSet(); return;
// if (CollectionUtil.isEmpty(messageSet)) { }
// return; messageSet.forEach(outMessage -> {
// }
// messageSet.forEach(outMessage -> { InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class);
// Long userId = to.getId();
// InMessage.To to = JSONObject.parseObject(outMessage.getFrom(), InMessage.To.class); GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getData(), GameDto.GameMsg.class);
// Long userId = to.getId(); Long recordId = gameMsg.getRecordId();
// GameDto.GameMsg gameMsg = JSONObject.parseObject(outMessage.getAuthMessage(), GameDto.GameMsg.class); log.info("统计分数:{},{}",userId, recordId);
// Long recordId = gameMsg.getRecordId(); GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId);
// log.info("统计分数:{},{}",userId, recordId);
// GameMessageCountOut gameMessageCountOut = clientAddTimes(userId,recordId); // 发送给消息系统
// InMessage inMessage = new InMessage();
// // 发送给消息系统 Set<String> tos = new HashSet<>();
// InMessage inMessage = new InMessage(); tos.add(JSONObject.toJSONString(new InMessage.To(userId)));
// Set<String> tos = new HashSet<>(); inMessage.setToDomain(MessageConstant.DomainType.User);
// tos.add(JSONObject.toJSONString(new InMessage.To(userId))); inMessage.setTos(tos);
// inMessage.setTos(tos); inMessage.setData(JSONObject.toJSONString(gameMessageCountOut.getData()));
// inMessage.setData(JSONObject.toJSONString(gameMessageCountOut)); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);
// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_SCORE, inMessage); });
// });
// }
// }
// public GameMessageCountOut clientAddTimes(Long userId, Long recordId) {
// public GameMessageCountOut clientAddTimes(Long userId, Long recordId) { GameMessageCountOut gameMessageCountOut = new GameMessageCountOut();
// GameMessageCountOut gameMessageCountOut = new GameMessageCountOut(); log.info("userId:{}", userId);
// log.info("userId:{}", userId); if (userId == null || recordId == null) {
// if (userId == null || recordId == null) { return gameMessageCountOut;
// return gameMessageCountOut; }
// }
// String gameUserKey = GameConstant.generateGameKey(recordId);
// String gameUserKey = GameConstant.generateGameKey(recordId); Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1);
// Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisUtil.zsRevGetWithScore(gameUserKey, 0, -1);
// if (CollectionUtil.isEmpty(typedTuples)) {
// if (CollectionUtil.isEmpty(typedTuples)) { return gameMessageCountOut;
// return gameMessageCountOut; }
// } for(ZSetOperations.TypedTuple<Object> type : typedTuples) {
// for(ZSetOperations.TypedTuple<Object> type : typedTuples) { ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class);
// ClientDto.RedisUser user = JSON.parseObject((String)type.getValue(), ClientDto.RedisUser.class); if (user.getUserId().longValue() == userId.longValue()) {
// if (user.getUserId().longValue() == userId.longValue()) { int score = type.getScore().intValue();
// int score = type.getScore().intValue(); String userStatus = GameConstant.generateGameStatusKey(recordId);
// String userStatus = GameConstant.generateGameStatusKey(recordId); String gameStausObj = (String)redisUtil.get(userStatus);
// String gameStausObj = (String)redisUtil.get(userStatus); if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){
// if (StrUtil.isBlank(gameStausObj) || gameStausObj.equals(String.valueOf(GameConstant.GAME_COMPLETED))){ gameMessageCountOut = new GameMessageCountOut(score/100, score);
// gameMessageCountOut = new GameMessageCountOut(score/100, score); return gameMessageCountOut;
// return gameMessageCountOut; }
// } score += 100;
// score += 100; redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score);
// redisUtil.zsSet(gameUserKey, JSON.toJSONString(user), score); gameMessageCountOut.getData().setTotalScore(score);
// gameMessageCountOut.getData().setTotalScore(score); gameMessageCountOut.getData().setTotalTimes(score/100);
// gameMessageCountOut.getData().setTotalTimes(score/100); return gameMessageCountOut;
// return gameMessageCountOut; }
// } };
// };
// return gameMessageCountOut;
// return gameMessageCountOut; }
// }
// }
//}

2
game/src/main/java/com/ccsens/game/service/MessageService.java

@ -11,6 +11,7 @@ import com.ccsens.game.persist.dao.GameUserJoinDao;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.WebConstant; import com.ccsens.util.WebConstant;
import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,6 +41,7 @@ public class MessageService implements IMessageService {
// JacksonUtil.beanToJson(message)); // JacksonUtil.beanToJson(message));
InMessage inMessage = new InMessage(); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(message.receiversTransTos()); inMessage.setTos(message.receiversTransTos());
inMessage.setData(JSONObject.toJSONString(message.getData())); inMessage.setData(JSONObject.toJSONString(message.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);

36
game/src/main/java/com/ccsens/game/service/RabbitMQListener.java

@ -18,21 +18,21 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
@Slf4j //@Slf4j
@Component //@Component
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS) //@RabbitListener(queues = RabbitMQConfig.GAME_STATUS)
public class RabbitMQListener { //public class RabbitMQListener {
private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); // private Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
@Autowired // @Autowired
private IMessageService messageService; // private IMessageService messageService;
//
@RabbitHandler // @RabbitHandler
public void process(String messageJson) throws IOException { // public void process(String messageJson) throws IOException {
System.out.println("++++++++++++++"+messageJson); // System.out.println("++++++++++++++"+messageJson);
// List<GameMessageWithChangeStatusOut> gameMessageList = JacksonUtil.jsonToBean(messageJson, //// List<GameMessageWithChangeStatusOut> gameMessageList = JacksonUtil.jsonToBean(messageJson,
// GameMessageWithChangeStatusOut.class, true); //// GameMessageWithChangeStatusOut.class, true);
log.info("认证MQ {}",messageJson); // log.info("认证MQ {}",messageJson);
messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson, // messageService.doChangeStatusMessage(JacksonUtil.jsonToBean(messageJson,
GameMessageWithChangeStatusOut.class, true)); // GameMessageWithChangeStatusOut.class, true));
} // }
} //}

29
game/src/main/java/com/ccsens/game/util/SendMsg.java

@ -3,6 +3,7 @@ package com.ccsens.game.util;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.game.bean.dto.ClientDto; import com.ccsens.game.bean.dto.ClientDto;
import com.ccsens.game.bean.dto.message.ChangeStatusMessageDto; import com.ccsens.game.bean.dto.message.ChangeStatusMessageDto;
import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut; import com.ccsens.game.bean.dto.message.GameMessageWithChangeStatusOut;
@ -15,6 +16,9 @@ import com.ccsens.game.persist.dao.GameUserJoinDao;
import com.ccsens.game.service.ClientService; import com.ccsens.game.service.ClientService;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.RedisUtil; import com.ccsens.util.RedisUtil;
import com.ccsens.util.WebConstant;
import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.models.auth.In; import io.swagger.models.auth.In;
@ -53,14 +57,21 @@ public class SendMsg {
if (CollectionUtil.isEmpty(userJoins)) { if (CollectionUtil.isEmpty(userJoins)) {
return; return;
} }
Set<String> userIds = new HashSet<>();
userJoins.forEach(join -> { userJoins.forEach(join -> {
outs.add(getMsg(userJoins, gameRecord, join, status)); // outs.add(getMsg(userJoins, gameRecord, join, status));
InMessage inMessage = new InMessage();
userIds.add(String.valueOf(join.getUserId()));
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(userIds);
inMessage.setData(JSONObject.toJSONString(getMsg(userJoins, gameRecord, join, status)));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
}); });
if (CollectionUtil.isNotEmpty(outs)) {
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS, JacksonUtil.beanToJson(outs));
log.info("发送成功:{}", JacksonUtil.beanToJson(outs));
}
} }
public GameMessageWithChangeStatusOut getMsg(List<ClientDto.RedisUser> userJoins, GameRecord gameRecord, ClientDto.RedisUser join, byte gameStatus) { public GameMessageWithChangeStatusOut getMsg(List<ClientDto.RedisUser> userJoins, GameRecord gameRecord, ClientDto.RedisUser join, byte gameStatus) {
@ -70,6 +81,7 @@ public class SendMsg {
data.setRecordId(gameRecord.getId()); data.setRecordId(gameRecord.getId());
data.setGameStatus(gameStatus); data.setGameStatus(gameStatus);
out.setUserId(join.getUserId()); out.setUserId(join.getUserId());
out.setType(WebConstant.Message_Type.ChangeStatus.phase);
ChangeStatusMessageDto dtos = new ChangeStatusMessageDto(); ChangeStatusMessageDto dtos = new ChangeStatusMessageDto();
switch (gameStatus) { switch (gameStatus) {
case GameConstant.GAME_COMPLETED: case GameConstant.GAME_COMPLETED:
@ -100,6 +112,7 @@ public class SendMsg {
pendingData.setEndLocalTime(join.getLocalEndTime()); pendingData.setEndLocalTime(join.getLocalEndTime());
dtos.setPendingData(pendingData); dtos.setPendingData(pendingData);
break; break;
default:
} }
data.setChangeStatusMessageDto(dtos); data.setChangeStatusMessageDto(dtos);
out.setData(data); out.setData(data);
@ -109,9 +122,9 @@ public class SendMsg {
/** /**
* 定时任务 * 定时任务
* *
* @param executor * @param executor 执行者
* @param delayTime * @param delayTime 延时时间
* @param runnable * @param runnable 线程
*/ */
public void sendMsg(ScheduledExecutorService executor, long delayTime, Runnable runnable) { public void sendMsg(ScheduledExecutorService executor, long delayTime, Runnable runnable) {
executor.schedule(new Thread(runnable), delayTime, TimeUnit.MILLISECONDS); executor.schedule(new Thread(runnable), delayTime, TimeUnit.MILLISECONDS);

2
mt/src/main/java/com/ccsens/mt/service/MessageServicer.java

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.ccsens.mt.bean.dto.message.SyncMessageWithShowDto; import com.ccsens.mt.bean.dto.message.SyncMessageWithShowDto;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -20,6 +21,7 @@ public class MessageServicer implements IMessageService{
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, // rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(message)); // JacksonUtil.beanToJson(message));
InMessage inMessage = new InMessage(); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(message.receiversTransTos()); inMessage.setTos(message.receiversTransTos());
inMessage.setData(JSONObject.toJSONString(message.getData())); inMessage.setData(JSONObject.toJSONString(message.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);

2
tall/src/main/java/com/ccsens/tall/rabbitMQ/MessageTest.java

@ -20,6 +20,6 @@ public class MessageTest {
String j = JacksonUtil.beanToJson(inMessage); String j = JacksonUtil.beanToJson(inMessage);
System.out.println(j); System.out.println(j);
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j);
} }
} }

58
tall/src/main/java/com/ccsens/tall/rabbitMQ/RabbitController.java

@ -1,29 +1,29 @@
package com.ccsens.tall.rabbitMQ; //package com.ccsens.tall.rabbitMQ;
//
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import org.slf4j.Logger; //import org.slf4j.Logger;
import org.slf4j.LoggerFactory; //import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
//
@Component //@Component
@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2) //@RabbitListener(queues = RabbitMQConfig.TALL_MESSAGE_2)
public class RabbitController { //public class RabbitController {
private Logger logger = LoggerFactory.getLogger(RabbitController.class); // private Logger logger = LoggerFactory.getLogger(RabbitController.class);
//
@RabbitHandler // @RabbitHandler
public void process(String messageJson) { // public void process(String messageJson) {
logger.info("Rabbit Received: {}",messageJson); // logger.info("Rabbit Received: {}",messageJson);
try { // try {
System.out.println("Rabbit Received: " + messageJson); // System.out.println("Rabbit Received: " + messageJson);
//
}catch (Exception e){ // }catch (Exception e){
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
//
} //}

9
tall/src/main/java/com/ccsens/tall/service/MessageService.java

@ -7,6 +7,7 @@ import com.ccsens.tall.bean.dto.message.*;
import com.ccsens.tall.bean.vo.MemberVo; import com.ccsens.tall.bean.vo.MemberVo;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
@ -66,6 +67,7 @@ public class MessageService implements IMessageService{
// JacksonUtil.beanToJson(message)); // JacksonUtil.beanToJson(message));
InMessage inMessage = new InMessage(); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(message.receiversTransTos()); inMessage.setTos(message.receiversTransTos());
inMessage.setData(JSONObject.toJSONString(message.getData())); inMessage.setData(JSONObject.toJSONString(message.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);
@ -76,14 +78,14 @@ public class MessageService implements IMessageService{
public void sendDeliverMessageWithUpload(InMessage inMessage) throws Exception { public void sendDeliverMessageWithUpload(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,JacksonUtil.beanToJson(inMessage)); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,JacksonUtil.beanToJson(inMessage));
} }
@Override @Override
public void sendDeliverMessageWithChecker(InMessage inMessage) throws Exception { public void sendDeliverMessageWithChecker(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1, rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,
JacksonUtil.beanToJson(inMessage)); JacksonUtil.beanToJson(inMessage));
} }
@ -91,7 +93,7 @@ public class MessageService implements IMessageService{
public void sendDeliverMessageWithDelete(InMessage inMessage) throws Exception { public void sendDeliverMessageWithDelete(InMessage inMessage) throws Exception {
System.out.println(JacksonUtil.beanToJson(inMessage)); System.out.println(JacksonUtil.beanToJson(inMessage));
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1 , rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME ,
JacksonUtil.beanToJson(inMessage)); JacksonUtil.beanToJson(inMessage));
} }
@ -102,6 +104,7 @@ public class MessageService implements IMessageService{
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME , // rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME ,
// JacksonUtil.beanToJson(syncMessage)); // JacksonUtil.beanToJson(syncMessage));
InMessage inMessage = new InMessage(); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(syncMessage.receiversTransTos()); inMessage.setTos(syncMessage.receiversTransTos());
inMessage.setData(JSONObject.toJSONString(syncMessage.getData())); inMessage.setData(JSONObject.toJSONString(syncMessage.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, inMessage);

2
tall/src/main/java/com/ccsens/tall/service/ProjectMessageService.java

@ -23,6 +23,7 @@ import com.ccsens.tall.util.WxTemplateUtil;
import com.ccsens.util.*; import com.ccsens.util.*;
import com.ccsens.util.annotation.OperateType; import com.ccsens.util.annotation.OperateType;
import com.ccsens.util.bean.message.common.InMessage; import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import com.ccsens.util.exception.BaseException; import com.ccsens.util.exception.BaseException;
import com.ccsens.util.wx.WxGzhUtil; import com.ccsens.util.wx.WxGzhUtil;
@ -105,6 +106,7 @@ public class ProjectMessageService implements IProjectMessageService {
// 操作发送 // 操作发送
initMessageSend(userIds, oauthUserIds, operationId); initMessageSend(userIds, oauthUserIds, operationId);
InMessage inMessage = new InMessage(); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(InMessage.transTos(userIds)); inMessage.setTos(InMessage.transTos(userIds));
inMessage.setData(JSONObject.toJSONString(newMessages)); inMessage.setData(JSONObject.toJSONString(newMessages));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage)); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));

80
tall/src/main/java/com/ccsens/tall/service/RingService.java

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Snowflake; import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.tall.bean.dto.RingDto; import com.ccsens.tall.bean.dto.RingDto;
import com.ccsens.tall.bean.dto.message.BaseMessageDto; import com.ccsens.tall.bean.dto.message.BaseMessageDto;
import com.ccsens.tall.bean.dto.message.RingMessageWithReadDto; import com.ccsens.tall.bean.dto.message.RingMessageWithReadDto;
@ -19,6 +20,8 @@ import com.ccsens.tall.persist.dao.SysUserDao;
import com.ccsens.util.CodeEnum; import com.ccsens.util.CodeEnum;
import com.ccsens.util.JacksonUtil; import com.ccsens.util.JacksonUtil;
import com.ccsens.util.PropUtil; import com.ccsens.util.PropUtil;
import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.config.RabbitMQConfig; import com.ccsens.util.config.RabbitMQConfig;
import com.ccsens.util.exception.BaseException; import com.ccsens.util.exception.BaseException;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@ -78,7 +81,7 @@ public class RingService implements IRingService {
} }
sysRingMsgDao.insertSelective(ringMsg); sysRingMsgDao.insertSelective(ringMsg);
//所有接收者的userId //所有接收者的userId
Set<Long> userIdSet = new HashSet<>(); Set<String> userIdSet = new HashSet<>();
//添加消息详情与接收角色的关联信息 //添加消息详情与接收角色的关联信息
if (CollectionUtil.isNotEmpty(ringSendDto.getRoleList())) { if (CollectionUtil.isNotEmpty(ringSendDto.getRoleList())) {
for (Long roleId : ringSendDto.getRoleList()) { for (Long roleId : ringSendDto.getRoleList()) {
@ -88,16 +91,33 @@ public class RingService implements IRingService {
sysRingSend.setRoleId(roleId); sysRingSend.setRoleId(roleId);
sysRingSendDao.insertSelective(sysRingSend); sysRingSendDao.insertSelective(sysRingSend);
List<Long> userIdList = userService.selectUserIdByRoleId(roleId); List<Long> userIdList = userService.selectUserIdByRoleId(roleId);
userIdSet.addAll(userIdList); userIdList.forEach(id -> {
userIdSet.add(String.valueOf(id));
});
// userIdSet.addAll(userIdList);
} }
} }
List<Long> userIdList = new ArrayList<>(userIdSet);
//发送消息 if (CollectionUtil.isNotEmpty(userIdSet)) {
RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto( //发送消息
ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time); RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto(
ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time);
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, System.out.println(JSONObject.toJSONString(ringMessageWithSendDto.getData()));
JacksonUtil.beanToJson(ringMessageWithSendDto)); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(ringMessageWithSendDto.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
}
// List<Long> userIdList = new ArrayList<>(userIdSet);
// //发送消息
// RingMessageWithSendDto ringMessageWithSendDto = new RingMessageWithSendDto(
// ringMsg.getId(), ringSendDto.getProjectId(), ringMsg.getValue(), time);
// ringMessageWithSendDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithSendDto));
//返回消息的详细信息 //返回消息的详细信息
RingVo.RingInfo ringInfo = new RingVo.RingInfo(); RingVo.RingInfo ringInfo = new RingVo.RingInfo();
ringInfo.setMessageId(ringMsg.getId()); ringInfo.setMessageId(ringMsg.getId());
@ -206,12 +226,21 @@ public class RingService implements IRingService {
sysRingSendDao.updateByPrimaryKeySelective(sysRingSend); sysRingSendDao.updateByPrimaryKeySelective(sysRingSend);
} }
//将已读消息返回给发送者 //将已读消息返回给发送者
List<Long> userIdList = new ArrayList<>(); // List<Long> userIdList = new ArrayList<>();
userIdList.add(sysRingMsg.getSenderId()); // userIdList.add(sysRingMsg.getSenderId());
// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId);
// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithReadDto));
RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId); RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(msgId, message.getProjectId(), roleId);
ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); Set<String> userIdSet = new HashSet<>();
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, userIdSet.add(String.valueOf(sysRingMsg.getSenderId()));
JacksonUtil.beanToJson(ringMessageWithReadDto)); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
} }
} }
//查询被读的信息返回 //查询被读的信息返回
@ -261,14 +290,23 @@ public class RingService implements IRingService {
//获取消息 //获取消息
SysRingMsg sysRingMsg = sysRingMsgDao.selectByPrimaryKey(sysRingSend.getRingId()); SysRingMsg sysRingMsg = sysRingMsgDao.selectByPrimaryKey(sysRingSend.getRingId());
//ws消息接收者的userId //ws消息接收者的userId
Set<Long> userIdSet = new HashSet<>(); // Set<Long> userIdSet = new HashSet<>();
userIdSet.add(sysRingMsg.getSenderId()); // userIdSet.add(sysRingMsg.getSenderId());
List<Long> userIdList = new ArrayList<>(userIdSet); // List<Long> userIdList = new ArrayList<>(userIdSet);
//将已读消息返回给发送者 // //将已读消息返回给发送者
// RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId);
// ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(ringMessageWithReadDto));
Set<String> userIdSet = new HashSet<>();
userIdSet.add(String.valueOf(sysRingMsg.getSenderId()));
RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId); RingMessageWithReadDto ringMessageWithReadDto = new RingMessageWithReadDto(sysRingMsg.getId(), projectId, roleId);
ringMessageWithReadDto.setReceivers(BaseMessageDto.MessageUser.userIdToUsers(userIdList)); InMessage inMessage = new InMessage();
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, inMessage.setToDomain(MessageConstant.DomainType.User);
JacksonUtil.beanToJson(ringMessageWithReadDto)); inMessage.setTos(userIdSet);
inMessage.setData(JSONObject.toJSONString(ringMessageWithReadDto.getData()));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME, JSONObject.toJSONString(inMessage));
} }
} }
} }

57
tall/src/main/java/com/ccsens/tall/service/TaskDeliverService.java

@ -248,11 +248,13 @@ public class TaskDeliverService implements ITaskDeliverService {
userIdSet.addAll(userService.selectUserIdByRoleId(postLogCheckerId)); userIdSet.addAll(userService.selectUserIdByRoleId(postLogCheckerId));
} }
} }
// ws 发送消息
Set<String> userIds = new HashSet<>();
for (Long userId : userIdSet) { for (Long userId : userIdSet) {
messageUser = new BaseMessageDto.MessageUser(); messageUser = new BaseMessageDto.MessageUser();
messageUser.setUserId(userId); messageUser.setUserId(userId);
messageUserList.add(messageUser); messageUserList.add(messageUser);
userIdStr.add(userId.toString()); userIds.add(userId.toString());
} }
Long roleId; Long roleId;
if (role.getName().equals(WebConstant.ROLE_NAME.AllMember.phase)) { if (role.getName().equals(WebConstant.ROLE_NAME.AllMember.phase)) {
@ -261,25 +263,29 @@ public class TaskDeliverService implements ITaskDeliverService {
} else { } else {
roleId = taskDetail.getExecutorRole(); roleId = taskDetail.getExecutorRole();
} }
DeliverMessageWithUploadDto uploadMessage = new DeliverMessageWithUploadDto(taskDetail.getProjectId(),roleId,taskDetail.getId(), // DeliverMessageWithUploadDto uploadMessage = new DeliverMessageWithUploadDto(taskDetail.getProjectId(),roleId,taskDetail.getId(),
taskDeliver.getId(),taskDeliver.getName(),currentUserId,now,uploadDeliver.getFileInfo(),messageUserList); // taskDeliver.getId(),taskDeliver.getName(),currentUserId,now,uploadDeliver.getFileInfo(),messageUserList);
log.info("上传交付物:{}",JacksonUtil.beanToJson(uploadMessage)); // log.info("上传交付物:{}",JacksonUtil.beanToJson(uploadMessage));
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, DeliverMessageWithUploadDto.Data data = new DeliverMessageWithUploadDto.Data();
JacksonUtil.beanToJson(uploadMessage)); data.setProjectId(taskDetail.getProjectId());
// DeliverMessageWithUploadDto.Data data = new DeliverMessageWithUploadDto.Data(); data.setDeliverId(taskDeliver.getId());
// data.setProjectId(taskDetail.getProjectId()); data.setDeliverName(taskDeliver.getName());
// log.info("上传交付物Data消息:{}",JacksonUtil.beanToJson(uploadMessage.getData())); data.setRoleId(roleId);
// InMessage inMessage = new InMessage(); data.setTaskId(taskDetail.getId());
// inMessage.setData(JacksonUtil.beanToJson(data)); data.setUploader(currentUserId);
// inMessage.setToDomain(MessageConstant.DomainType.User); data.setUploadTime(now);
// inMessage.setTos(userIdStr); data.setFile(uploadDeliver.getFileInfo());
// log.info("上传交付物发送消息:{}",JacksonUtil.beanToJson(inMessage));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, InMessage inMessage = new InMessage();
// JacksonUtil.beanToJson(inMessage)); inMessage.setToDomain(MessageConstant.DomainType.User);
//----------------------------------------------------------------- inMessage.setTos(userIds);
inMessage.setData(JacksonUtil.beanToJson(data));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,
JacksonUtil.beanToJson(inMessage) );
// //用智能助手发送消息 // //用智能助手发送消息
// SysProject project = sysProjectDao.selectByPrimaryKey(taskDetail.getProjectId()); // SysProject project = sysProjectDao.selectByPrimaryKey(taskDetail.getProjectId());
// robotService.addDeliverRobotSend(currentUserId,taskDeliver.getName(),subTimeId, project); // robotService.addDeliverRobotSend(currentUserId,taskDeliver.getName(),subTimeId, project);
//返回 //返回
List<ProjectVo.DeliverInfo> deliverInfoList = taskDeliverDao.selectByDeliverId(uploadDeliver.getDeliverId()); List<ProjectVo.DeliverInfo> deliverInfoList = taskDeliverDao.selectByDeliverId(uploadDeliver.getDeliverId());
if (CollectionUtil.isNotEmpty(deliverInfoList)) { if (CollectionUtil.isNotEmpty(deliverInfoList)) {
@ -594,10 +600,8 @@ public class TaskDeliverService implements ITaskDeliverService {
} }
checkerDto.setReceivers(messageUserList); checkerDto.setReceivers(messageUserList);
log.info("检查交付物:{}",JacksonUtil.beanToJson(checkerDto)); log.info("检查交付物:{}",JacksonUtil.beanToJson(checkerDto));
// rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
// JacksonUtil.beanToJson(checkerDto));
MessageRule messageRule = MessageRule.defaultRule(MessageConstant.DomainType.User); MessageRule messageRule = MessageRule.defaultRule(MessageConstant.DomainType.User);
String s = JacksonUtil.beanToJson(checkerDto); String s = JacksonUtil.beanToJson(checkerDtoData);
InMessage inMessage = InMessage.newToUserMessage(currentUserId.toString(),userIdSet,null,messageRule,s); InMessage inMessage = InMessage.newToUserMessage(currentUserId.toString(),userIdSet,null,messageRule,s);
messageService.sendDeliverMessageWithChecker(inMessage); messageService.sendDeliverMessageWithChecker(inMessage);
@ -766,13 +770,13 @@ public class TaskDeliverService implements ITaskDeliverService {
ProTaskDetail task = taskDetailDao.selectByPrimaryKey(deliver.getTaskDetailId()); ProTaskDetail task = taskDetailDao.selectByPrimaryKey(deliver.getTaskDetailId());
ProRole role = proRoleDao.selectByPrimaryKey(task.getExecutorRole()); ProRole role = proRoleDao.selectByPrimaryKey(task.getExecutorRole());
//发送消息 //发送消息
// Set<String> userIdSet = new HashSet<>(); Set<String> userIdSet = new HashSet<>();
if (CollectionUtil.isNotEmpty(userIdList)) { if (CollectionUtil.isNotEmpty(userIdList)) {
HashSet<Long> h = new HashSet<>(userIdList); HashSet<Long> h = new HashSet<>(userIdList);
userIdList.clear(); userIdList.clear();
userIdList.addAll(h); userIdList.addAll(h);
for (Long userId : userIdList) { for (Long userId : userIdList) {
// userIdSet.add(userId.toString()); userIdSet.add(userId.toString());
messageUser = new BaseMessageDto.MessageUser(); messageUser = new BaseMessageDto.MessageUser();
messageUser.setUserId(userId); messageUser.setUserId(userId);
messageUserList.add(messageUser); messageUserList.add(messageUser);
@ -792,8 +796,13 @@ public class TaskDeliverService implements ITaskDeliverService {
deleteMessageData.setUserId(currentUserId); deleteMessageData.setUserId(currentUserId);
deleteMessage.setData(deleteMessageData); deleteMessage.setData(deleteMessageData);
deleteMessage.setReceivers(messageUserList); deleteMessage.setReceivers(messageUserList);
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME,
JacksonUtil.beanToJson(deleteMessage)); InMessage inMessage = new InMessage();
inMessage.setToDomain(MessageConstant.DomainType.User);
inMessage.setTos(userIdSet);
inMessage.setData(JacksonUtil.beanToJson(deleteMessageData));
rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,
JacksonUtil.beanToJson(inMessage));
//用智能助手发送消息 //用智能助手发送消息
robotService.deleteDeliverRobotSend(currentUserId,deliver.getName(),subTimeId); robotService.deleteDeliverRobotSend(currentUserId,deliver.getName(),subTimeId);

2
tall/src/main/java/com/ccsens/tall/service/WpsService.java

@ -212,7 +212,7 @@ public class WpsService implements IWpsService {
paramMap.put("_w_appid", WebConstant.Wps.APPID); paramMap.put("_w_appid", WebConstant.Wps.APPID);
paramMap.put("_w_token", token); paramMap.put("_w_token", token);
String newSignature = WpsSignature.getSignature(paramMap, WebConstant.Wps.APPKEY); String newSignature = WpsSignature.getSignature(paramMap, WebConstant.Wps.APPKEY);
String fileUrl = "http://wwo.wps.cn/office/{}/{}?_w_appid=" + WebConstant.Wps.APPID + "&_w_signature={}&_w_token={}"; String fileUrl = "https://wwo.wps.cn/office/{}/{}?_w_appid=" + WebConstant.Wps.APPID + "&_w_signature={}&_w_token={}";
return StrUtil.format(fileUrl, fileType, fileId, newSignature, token); return StrUtil.format(fileUrl, fileType, fileId, newSignature, token);
} }

2
tall/src/main/java/com/ccsens/tall/web/DebugController.java

@ -81,7 +81,7 @@ public class DebugController {
System.out.println(j); System.out.println(j);
//FixMe 发送到消息队列 //FixMe 发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.TALL_MESSAGE_1,j); rabbitTemplate.convertAndSend(RabbitMQConfig.MESSAGE_QUEUE_NAME,j);
return JsonResponse.newInstance().ok(); return JsonResponse.newInstance().ok();
} }

4
tall/src/main/resources/application.yml

@ -1,4 +1,4 @@
spring: spring:
profiles: profiles:
active: dev active: test
include: util-dev,common include: util-test,common

70
util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java

@ -9,13 +9,13 @@ import java.util.Map;
@Configuration @Configuration
public class RabbitMQConfig { public class RabbitMQConfig {
public static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring"; private static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring";
public static final String TALL_MESSAGE_1 = "tall_message_1"; private static final String TALL_MESSAGE_1 = "tall_message_1";
public static final String TALL_MESSAGE_2 = "tall_message_2"; private static final String TALL_MESSAGE_2 = "tall_message_2";
public static final String GAME_STATUS = "game_status"; private static final String GAME_STATUS = "game_status";
public static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue"; private static final String GAME_STATUS_DELAY_SEND_QUEUE = "gameStatusDelaySendQueue";
public static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange"; private static final String GAME_STATUS_DELAY_SEND_EXCHANGE = "gameStatusDelaySendExchange";
public static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey"; private static final String GAME_STATUS_DELAY_SEND_ROUTING_KEY = "gameStatusDelaySendRoutingKey";
/**消息队列发送*/ /**消息队列发送*/
@ -46,34 +46,34 @@ public class RabbitMQConfig {
return new Queue(GAME_SCORE); return new Queue(GAME_SCORE);
} }
/** // /**
* 延时发送队列 // * 延时发送队列
* @return // * @return
*/ // */
@Bean // @Bean
public Queue delayQueue(){ // public Queue delayQueue(){
// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build(); //// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build();
return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true); // return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true);
} // }
//
/** // /**
* 延时交换机 // * 延时交换机
* @return // * @return
*/ // */
@Bean // @Bean
public CustomExchange delayExchange(){ // public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>(); // Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // args.put("x-delayed-type", "direct");
return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args); // return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args);
} // }
//
/** // /**
* 绑定 // * 绑定
*/ // */
@Bean // @Bean
Binding queueBinding(Queue delayQueue, CustomExchange customExchange) { // Binding queueBinding(Queue delayQueue, CustomExchange customExchange) {
return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs(); // return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs();
} // }
} }

66
util/src/main/java/com/ccsens/util/mq/DelayConsumer.java

@ -1,33 +1,33 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders; //import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers; //import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload; //import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.Date; //import java.util.Date;
import java.util.Map; //import java.util.Map;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:40 // * @create: 2019/12/27 15:40
*/ // */
@Component //@Component
public class DelayConsumer { //public class DelayConsumer {
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE) // @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE)
@RabbitHandler // @RabbitHandler
public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException { // public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException {
System.out.println("接收到的消息:"+msg +"||" + new Date()); // System.out.println("接收到的消息:"+msg +"||" + new Date());
//
//ACK 手工签收,通知rabbitMQ,消费端消费成功 // //ACK 手工签收,通知rabbitMQ,消费端消费成功
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false); // channel.basicAck(deliveryTag,false);
} // }
//
} //}

108
util/src/main/java/com/ccsens/util/mq/DelayProducer.java

@ -1,54 +1,54 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData; //import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; //import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:16 // * @create: 2019/12/27 15:16
*/ // */
@Slf4j //@Slf4j
@Component //@Component
public class DelayProducer { //public class DelayProducer {
@Autowired // @Autowired
private RabbitTemplate rabbitTemplate; // private RabbitTemplate rabbitTemplate;
//
//消息发送后的回调函数 // //消息发送后的回调函数
/** // /**
* 生产者回调函数confirm确认消息投递成功 // * 生产者回调函数:confirm确认消息投递成功
*/ // */
final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { // final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
//
String messageId = correlationData.getId(); // String messageId = correlationData.getId();
if (ack) { // if (ack) {
System.out.println("消息发送成功" + correlationData); // System.out.println("消息发送成功" + correlationData);
log.info("消息投递成功,{}",messageId); // log.info("消息投递成功,{}",messageId);
//进行消息记录的数据库更新 // //进行消息记录的数据库更新
//
}else{ // }else{
log.info("消息投递失败"); // log.info("消息投递失败");
} // }
//
}; // };
//
/** // /**
* 通过延迟消息插件发动延迟消息 // * 通过延迟消息插件发动延迟消息
* @param msg // * @param msg
* @param expiration // * @param expiration
*/ // */
public void sendDelayMessage(Object msg, Long expiration){ // public void sendDelayMessage(Object msg, Long expiration){
//
//绑定异步监听回调函数 // //绑定异步监听回调函数
rabbitTemplate.setConfirmCallback(confirmCallback); // rabbitTemplate.setConfirmCallback(confirmCallback);
//
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{ // rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{
message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间 // message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
return message; // return message;
}, new CorrelationData("123")); // }, new CorrelationData("123"));
} // }
} //}

Loading…
Cancel
Save