Browse Source

生ä延迟发送

master
6 years ago
parent
commit
a4aa078c42
  1. 10
      game/src/main/java/com/ccsens/game/service/ClientService.java
  2. 96
      game/src/main/java/com/ccsens/game/service/ScreenService.java
  3. 2
      game/src/main/resources/application-dev.yml
  4. 4
      game/src/main/resources/application.yml
  5. 53
      util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java
  6. 62
      util/src/main/java/com/ccsens/util/mq/DelayConsumer.java
  7. 107
      util/src/main/java/com/ccsens/util/mq/DelayProducer.java

10
game/src/main/java/com/ccsens/game/service/ClientService.java

@ -17,7 +17,7 @@ import com.ccsens.util.CodeEnum;
import com.ccsens.util.JsonResponse;
import com.ccsens.util.RedisUtil;
import com.ccsens.util.exception.BaseException;
//import com.ccsens.util.mq.DelayProducer;
import com.ccsens.util.mq.DelayProducer;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
@ -51,8 +51,8 @@ public class ClientService implements IClientService {
private RedisUtil redisUtil;
@Autowired
private TallFeignClient tallFeignClient;
// @Autowired
// private DelayProducer delaySender;
@Autowired
private DelayProducer delaySender;
@Override
public ClientVo.Join join(ClientDto.Join join, Long userId) {
@ -122,14 +122,14 @@ public class ClientService implements IClientService {
//延时发送开始
long expiration = gameRecord.getStartTime() - System.currentTimeMillis();
//TODO 发送内容未定
// delaySender.sendDelayMessage("开始啦", expiration);
delaySender.sendDelayMessage("开始啦", expiration > 0 ? expiration : 0);
}
if (prepare || processing) {
//延时发送结束
long expiration = gameRecord.getEndTime() - System.currentTimeMillis();
//TODO 发送内容未定
// delaySender.sendDelayMessage("结束了", expiration);
delaySender.sendDelayMessage("结束了", expiration > 0 ? expiration : 0);
}

96
game/src/main/java/com/ccsens/game/service/ScreenService.java

@ -3,6 +3,7 @@ package com.ccsens.game.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.game.bean.dto.ScreenDto;
import com.ccsens.game.bean.po.*;
import com.ccsens.game.bean.vo.ScreenVo;
@ -11,24 +12,27 @@ import com.ccsens.game.util.GameConstant;
import com.ccsens.util.CodeEnum;
import com.ccsens.util.WebConstant;
import com.ccsens.util.bean.dto.QueryDto;
import com.ccsens.util.config.RabbitMQConfig;
import com.ccsens.util.exception.BaseException;
//import com.ccsens.util.mq.DelayProducer;
import com.ccsens.util.mq.DelayProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;
@Slf4j
@Service
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class ScreenService implements IScreenService{
@Autowired
private GameUserPayDao typeMemberDao;
private GameUserPayDao gameUserPayDao;
@Autowired
private GameTypeDao gameTypeDao;
@Autowired
@ -40,12 +44,16 @@ public class ScreenService implements IScreenService{
@Autowired
private GamePrizeInstructionsDao prizeInstructionsDao;
@Autowired
private GameUserJoinDao gameMemberJoinDao;
// @Autowired
// private DelayProducer delayProducer;
private GameUserJoinDao gameUserJoinDao;
@Autowired
private DelayProducer delayProducer;
@Autowired
private DelayProducer delayProducer2;
@Autowired
private Snowflake snowflake;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 传入用户信息返回游戏大屏路径
@ -69,7 +77,7 @@ public class ScreenService implements IScreenService{
GameUserPay gameUserPay = null;
GameUserPayExample gameUserPayExample = new GameUserPayExample();
gameUserPayExample.createCriteria().andUserIdEqualTo(params.getUserId()).andGameTypeIdEqualTo(gameType.getId());
List<GameUserPay> gameUserPayList = typeMemberDao.selectByExample(gameUserPayExample);
List<GameUserPay> gameUserPayList = gameUserPayDao.selectByExample(gameUserPayExample);
if(CollectionUtil.isNotEmpty(gameUserPayList)){
gameUserPay = gameUserPayList.get(0);
}else {
@ -81,7 +89,7 @@ public class ScreenService implements IScreenService{
gameUserPay.setUsedCount(0);
gameUserPay.setCreatedTime(System.currentTimeMillis());
gameUserPay.setDueTime(gameUserPay.getCreatedTime() + (3600*24*30));
typeMemberDao.insertSelective(gameUserPay);
gameUserPayDao.insertSelective(gameUserPay);
}
//3、根据用户购买的记录,添加一场新的游戏记录
@ -115,7 +123,7 @@ public class ScreenService implements IScreenService{
if(ObjectUtil.isNull(gameRecord)){
throw new BaseException(CodeEnum.NOT_GAME_RECORD);
}
GameUserPay gameUserPay = typeMemberDao.selectByPrimaryKey(gameRecord.getUserPayId());
GameUserPay gameUserPay = gameUserPayDao.selectByPrimaryKey(gameRecord.getUserPayId());
if(ObjectUtil.isNull(gameUserPay)){
throw new BaseException(CodeEnum.NOT_GAME_TYPE);
}
@ -127,7 +135,7 @@ public class ScreenService implements IScreenService{
//查询参加这个游戏的用户
GameUserJoinExample gameuserJoinExample = new GameUserJoinExample();
gameuserJoinExample.createCriteria().andRecordIdEqualTo(memberRecord.getMemberRecord());
List<GameUserJoin> userJoinList = gameMemberJoinDao.selectByExample(gameuserJoinExample);
List<GameUserJoin> userJoinList = gameUserJoinDao.selectByExample(gameuserJoinExample);
if(CollectionUtil.isNotEmpty(userJoinList)){
gameInfoVo.setTotalMembers(userJoinList.size());
}else {
@ -197,7 +205,7 @@ public class ScreenService implements IScreenService{
}
GameUserJoinExample gameuserJoinExample = new GameUserJoinExample();
gameuserJoinExample.createCriteria().andRecordIdEqualTo(memberRecord.getMemberRecord());
List<GameUserJoin> userJoinList = gameMemberJoinDao.selectByExample(gameuserJoinExample);
List<GameUserJoin> userJoinList = gameUserJoinDao.selectByExample(gameuserJoinExample);
gameStatusVo.setGameStatus(gameRecord.getGameStatus());
switch (gameStatusVo.getGameStatus()){
@ -249,7 +257,7 @@ public class ScreenService implements IScreenService{
if(ObjectUtil.isNull(gameRecord)){
throw new BaseException(CodeEnum.NOT_GAME_RECORD);
}
GameUserPay gameUserPay = typeMemberDao.selectByPrimaryKey(gameRecord.getUserPayId());
GameUserPay gameUserPay = gameUserPayDao.selectByPrimaryKey(gameRecord.getUserPayId());
if(ObjectUtil.isNull(gameUserPay)){
throw new BaseException(CodeEnum.NOT_GAME_TYPE);
}
@ -266,7 +274,7 @@ public class ScreenService implements IScreenService{
gameRecordDao.insertSelective(gameRecordNew);
//修改购买的游戏的使用次数
gameUserPay.setUsedCount(gameUserPay.getUsedCount() + 1);
typeMemberDao.updateByPrimaryKeySelective(gameUserPay);
gameUserPayDao.updateByPrimaryKeySelective(gameUserPay);
return gameRecordNew.getUrl();
}
@ -368,17 +376,73 @@ public class ScreenService implements IScreenService{
gameRecord.setGameStatus(GameConstant.GAME_PREPARATION);
gameRecord.setStartTime(current + GameConstant.COUNT_DOWN_TIME);
gameRecord.setEndTime(gameRecord.getStartTime() + GameConstant.GAME_TIME);
gameRecord.setTimeDifference((int)moreTime);
gameRecordDao.updateByPrimaryKeySelective(gameRecord);
//TODO
//延时通知
// delayProducer.sendDelayMessage("修改游戏开始状态",1L);
// delayProducer.sendDelayMessage("修改游戏结束状态",1L);
long startSend = gameRecord.getStartTime() - System.currentTimeMillis();
long endSend = gameRecord.getEndTime() - System.currentTimeMillis();
System.out.println("开始延时:" + startSend + ",结束延时:" + endSend);
System.out.println("发送开始时间" + new Date());
// delayProducer.sendDelayMessage(json.toJSONString(), startSend > 0 ? startSend : 0);
// JSONObject json2 = new JSONObject();
// json2.put("name", "修改游戏结束状态");
// delayProducer2.sendDelayMessage(json2.toJSONString(), endSend > 0 ? endSend : 0);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(20);
Runnable startRunble = new Runnable() {
@Override
public void run() {
//更新状态
gameRecord.setGameStatus(GameConstant.GAME_PREPARATION);
gameRecordDao.updateByPrimaryKeySelective(gameRecord);
//发送消息
}
};
sendMsg(executor, endSend, startRunble);
Runnable endRunble = new Runnable() {
@Override
public void run() {
//更新状态
gameRecord.setGameStatus(GameConstant.GAME_PREPARATION);
gameRecordDao.updateByPrimaryKeySelective(gameRecord);
//发送消息
}
};
sendMsg(executor, endSend, endRunble);
//查询游戏用户,通知游戏开始和结束
GameUserJoinExample joinExample = new GameUserJoinExample();
joinExample.createCriteria().andRecordIdEqualTo(gameRecord.getId());
// List<GameUserJoin> userJoins = gameUserJoinDao.selectByExample(joinExample);
// userJoins.forEach(join -> {
// // 游戏开始
// long clientStartSend = gameRecord.getStartTime() - System.currentTimeMillis();
// long clientEndSend = gameRecord.getEndTime() - System.currentTimeMillis();
// log.info("{}:{}--{}", join.getUserId(), clientStartSend, clientEndSend);
// json.put("name", "开始通知客户开始"+ join.getUserId());
// delayProducer.sendDelayMessage(json, clientStartSend > 0 ? clientStartSend : 0);
// json.put("name", "开始通知客户结束"+ join.getUserId());
// delayProducer.sendDelayMessage(json , clientEndSend > 0 ? clientEndSend : 0);
//
// });
//返回大屏开始时间
ScreenVo.StartGame startGame = new ScreenVo.StartGame();
startGame.setStartLocalTime(gameRecord.getStartTime() + moreTime);
return startGame;
}
/**
* 定时任务
* @param executor
* @param delayTime
* @param runnable
*/
private void sendMsg(ScheduledExecutorService executor, long delayTime, Runnable runnable) {
executor.schedule(new Thread(runnable), delayTime, TimeUnit.MILLISECONDS);
}
}

2
game/src/main/resources/application-dev.yml

@ -8,7 +8,7 @@ spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
rabbitmq:
host: api.ccsens.com
host: 49.233.89.188
password: 111111
port: 5672
username: admin

4
game/src/main/resources/application.yml

@ -1,4 +1,4 @@
spring:
profiles:
active: test
include: common, util-test
active: dev
include: common, util-dev

53
util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java

@ -29,33 +29,34 @@ public class RabbitMQConfig {
return new Queue(TALL_MESSAGE_2);
}
// /**
// * 延时发送队列
// * @return
// */
// @Bean
// public Queue delayQueue(){
/**
* 延时发送队列
* @return
*/
@Bean
public Queue delayQueue(){
// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build();
// }
//
// /**
// * 延时交换机
// * @return
// */
// @Bean
// public CustomExchange delayExchange(){
// Map<String, Object> args = new HashMap<>();
// args.put("x-delayed-type", "direct");
// return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args);
// }
//
// /**
// * 绑定
// */
// @Bean
// Binding queueBinding(Queue delayQueue, CustomExchange customExchange) {
// return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs();
// }
return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true);
}
/**
* 延时交换机
* @return
*/
@Bean
public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 绑定
*/
@Bean
Binding queueBinding(Queue delayQueue, CustomExchange customExchange) {
return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs();
}
}

62
util/src/main/java/com/ccsens/util/mq/DelayConsumer.java

@ -1,29 +1,33 @@
//package com.ccsens.util.mq;
//
//import cn.hutool.core.date.DateUtil;
//import com.ccsens.util.config.RabbitMQConfig;
//import com.rabbitmq.client.AMQP;
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.amqp.support.AmqpHeaders;
//import org.springframework.messaging.handler.annotation.Headers;
//import org.springframework.messaging.handler.annotation.Payload;
//import org.springframework.stereotype.Component;
//
//import java.util.Date;
//import java.util.Map;
//
///**
// * @description:
// * @author: wuHuiJuan
// * @create: 2019/12/27 15:40
// */
//@Component
//public class DelayConsumer {
// @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE)
// @RabbitHandler
// public void receive(@Payload String msg, @Headers Map<String, Object> headers, AMQP.Channel channel ) {
// System.out.println("接收到的消息:"+msg +"||" + new Date());
// }
//
//}
package com.ccsens.util.mq;
import com.ccsens.util.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
/**
* @description:
* @author: wuHuiJuan
* @create: 2019/12/27 15:40
*/
@Component
public class DelayConsumer {
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE)
@RabbitHandler
public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException {
System.out.println("接收到的消息:"+msg +"||" + new Date());
//ACK 手工签收,通知rabbitMQ,消费端消费成功
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
}

107
util/src/main/java/com/ccsens/util/mq/DelayProducer.java

@ -1,53 +1,54 @@
//package com.ccsens.util.mq;
//
//import com.ccsens.util.config.RabbitMQConfig;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.rabbit.connection.CorrelationData;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
///**
// * @description:
// * @author: wuHuiJuan
// * @create: 2019/12/27 15:16
// */
//@Slf4j
//@Component
//public class DelayProducer {
// @Autowired
// private RabbitTemplate rabbitTemplate;
//
// //消息发送后的回调函数
// /**
// * 生产者回调函数:confirm确认消息投递成功
// */
// final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
//
// String messageId = correlationData.getId();
// if (ack) {
// log.info("消息投递成功,{}",messageId);
// //进行消息记录的数据库更新
//
// }else{
// log.info("消息投递失败");
// }
//
// };
//
// /**
// * 通过延迟消息插件发动延迟消息
// * @param msg
// * @param expiration
// */
// public void sendDelayMessage(Object msg, Long expiration){
//
// //绑定异步监听回调函数
// rabbitTemplate.setConfirmCallback(confirmCallback);
//
// rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{
// message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
// return message;
// },new CorrelationData("123"));
// }
//}
package com.ccsens.util.mq;
import com.ccsens.util.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @description:
* @author: wuHuiJuan
* @create: 2019/12/27 15:16
*/
@Slf4j
@Component
public class DelayProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
//消息发送后的回调函数
/**
* 生产者回调函数confirm确认消息投递成功
*/
final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
String messageId = correlationData.getId();
if (ack) {
System.out.println("消息发送成功" + correlationData);
log.info("消息投递成功,{}",messageId);
//进行消息记录的数据库更新
}else{
log.info("消息投递失败");
}
};
/**
* 通过延迟消息插件发动延迟消息
* @param msg
* @param expiration
*/
public void sendDelayMessage(Object msg, Long expiration){
//绑定异步监听回调函数
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{
message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
return message;
}, new CorrelationData("123"));
}
}

Loading…
Cancel
Save