Browse Source

去掉mq延迟

master
zy_Java 5 years ago
parent
commit
9c330df9f8
  1. 4
      tall/src/main/resources/application-prod.yml
  2. 61
      util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java
  3. 66
      util/src/main/java/com/ccsens/util/mq/DelayConsumer.java
  4. 108
      util/src/main/java/com/ccsens/util/mq/DelayProducer.java

4
tall/src/main/resources/application-prod.yml

@ -4,8 +4,8 @@ server:
context-path: /v1.0 context-path: /v1.0
spring: spring:
snowflake: snowflake:
datacenterId: 2 datacenterId: 1
workerId: 2 workerId: 1
application: application:
name: tall name: tall
datasource: datasource:

61
util/src/main/java/com/ccsens/util/config/RabbitMQConfig.java

@ -1,12 +1,9 @@
package com.ccsens.util.config; package com.ccsens.util.config;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration @Configuration
public class RabbitMQConfig { public class RabbitMQConfig {
public static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring"; public static final String RabbitMQ_QUEUE_NAME = "Queue_Anyring";
@ -34,34 +31,34 @@ public class RabbitMQConfig {
return new Queue(TALL_MESSAGE_2); return new Queue(TALL_MESSAGE_2);
} }
/** // /**
* 延时发送队列 // * 延时发送队列
* @return // * @return
*/ // */
@Bean // @Bean
public Queue delayQueue(){ // public Queue delayQueue(){
// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build(); //// return QueueBuilder.durable(GAME_STATUS_DELAY_SEND_QUEUE).build();
return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true); // return new Queue(GAME_STATUS_DELAY_SEND_QUEUE,true);
} // }
//
/** // /**
* 延时交换机 // * 延时交换机
* @return // * @return
*/ // */
@Bean // @Bean
public CustomExchange delayExchange(){ // public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>(); // Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // args.put("x-delayed-type", "direct");
return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args); // return new CustomExchange(GAME_STATUS_DELAY_SEND_EXCHANGE, "x-delayed-message", true, false, args);
} // }
//
/** // /**
* 绑定 // * 绑定
*/ // */
@Bean // @Bean
Binding queueBinding(Queue delayQueue, CustomExchange customExchange) { // Binding queueBinding(Queue delayQueue, CustomExchange customExchange) {
return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs(); // return BindingBuilder.bind(delayQueue).to(customExchange).with(GAME_STATUS_DELAY_SEND_ROUTING_KEY).noargs();
} // }
} }

66
util/src/main/java/com/ccsens/util/mq/DelayConsumer.java

@ -1,33 +1,33 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders; //import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers; //import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload; //import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.Date; //import java.util.Date;
import java.util.Map; //import java.util.Map;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:40 // * @create: 2019/12/27 15:40
*/ // */
@Component //@Component
public class DelayConsumer { //public class DelayConsumer {
@RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE) // @RabbitListener(queues = RabbitMQConfig.GAME_STATUS_DELAY_SEND_QUEUE)
@RabbitHandler // @RabbitHandler
public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException { // public void receive(@Payload Object msg, @Headers Map<String, Object> headers, Channel channel ) throws IOException {
System.out.println("接收到的消息:"+msg +"||" + new Date()); // System.out.println("接收到的消息:"+msg +"||" + new Date());
//
//ACK 手工签收,通知rabbitMQ,消费端消费成功 // //ACK 手工签收,通知rabbitMQ,消费端消费成功
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false); // channel.basicAck(deliveryTag,false);
} // }
//
} //}

108
util/src/main/java/com/ccsens/util/mq/DelayProducer.java

@ -1,54 +1,54 @@
package com.ccsens.util.mq; //package com.ccsens.util.mq;
//
import com.ccsens.util.config.RabbitMQConfig; //import com.ccsens.util.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData; //import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; //import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
/** ///**
* @description: // * @description:
* @author: wuHuiJuan // * @author: wuHuiJuan
* @create: 2019/12/27 15:16 // * @create: 2019/12/27 15:16
*/ // */
@Slf4j //@Slf4j
@Component //@Component
public class DelayProducer { //public class DelayProducer {
@Autowired // @Autowired
private RabbitTemplate rabbitTemplate; // private RabbitTemplate rabbitTemplate;
//
//消息发送后的回调函数 // //消息发送后的回调函数
/** // /**
* 生产者回调函数confirm确认消息投递成功 // * 生产者回调函数:confirm确认消息投递成功
*/ // */
final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { // final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
//
String messageId = correlationData.getId(); // String messageId = correlationData.getId();
if (ack) { // if (ack) {
System.out.println("消息发送成功" + correlationData); // System.out.println("消息发送成功" + correlationData);
log.info("消息投递成功,{}",messageId); // log.info("消息投递成功,{}",messageId);
//进行消息记录的数据库更新 // //进行消息记录的数据库更新
//
}else{ // }else{
log.info("消息投递失败"); // log.info("消息投递失败");
} // }
//
}; // };
//
/** // /**
* 通过延迟消息插件发动延迟消息 // * 通过延迟消息插件发动延迟消息
* @param msg // * @param msg
* @param expiration // * @param expiration
*/ // */
public void sendDelayMessage(Object msg, Long expiration){ // public void sendDelayMessage(Object msg, Long expiration){
//
//绑定异步监听回调函数 // //绑定异步监听回调函数
rabbitTemplate.setConfirmCallback(confirmCallback); // rabbitTemplate.setConfirmCallback(confirmCallback);
//
rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{ // rabbitTemplate.convertAndSend(RabbitMQConfig.GAME_STATUS_DELAY_SEND_EXCHANGE,RabbitMQConfig.GAME_STATUS_DELAY_SEND_ROUTING_KEY, msg,(message)->{
message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间 // message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
return message; // return message;
}, new CorrelationData("123")); // }, new CorrelationData("123"));
} // }
} //}

Loading…
Cancel
Save