37 changed files with 394 additions and 127 deletions
@ -0,0 +1,44 @@ |
|||
package com.ccsens.ccmq.lowlevel.message.api; |
|||
|
|||
import cn.hutool.core.util.StrUtil; |
|||
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; |
|||
import com.rabbitmq.client.ConnectionFactory; |
|||
import io.swagger.annotations.Api; |
|||
import io.swagger.annotations.ApiOperation; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestMethod; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import wiki.tall.ccmq.common.exception.BaseException; |
|||
import wiki.tall.ccmq.common.util.JsonResponse; |
|||
import wiki.tall.ccmq.common.util.MqUtil; |
|||
import wiki.tall.ccmq.common.util.WebConstant; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* 消息队列 |
|||
* @author: whj |
|||
* @time: 2020/6/4 14:46 |
|||
*/ |
|||
@Api(tags = "消息队列") |
|||
@RestController() |
|||
@RequestMapping("/queue") |
|||
@Slf4j |
|||
public class QueueController { |
|||
|
|||
@Resource |
|||
private ConnectionFactory connectionFactory; |
|||
|
|||
@ApiOperation(value = "添加消息队列") |
|||
@RequestMapping(value = "add", method = {RequestMethod.POST, RequestMethod.GET}) |
|||
public JsonResponse<MessageConstant> add(String queueName) { |
|||
log.info("添加消息队列:{}", queueName); |
|||
if (StrUtil.isEmpty(queueName)) { |
|||
throw new BaseException(MessageConstant.Error.QUEUE_NAME_EMPTY); |
|||
} |
|||
WebConstant.QueueAdd queueAdd = MqUtil.initQueue(connectionFactory, queueName); |
|||
log.info("消息队列{}添加结果:{}", queueName, queueAdd); |
|||
return JsonResponse.newInstance().ok(queueAdd); |
|||
} |
|||
} |
@ -0,0 +1,89 @@ |
|||
package wiki.tall.ccmq.common.util; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; |
|||
import com.rabbitmq.client.Channel; |
|||
import com.rabbitmq.client.Connection; |
|||
import com.rabbitmq.client.ConnectionFactory; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.amqp.core.AmqpTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
import wiki.tall.ccmq.common.exception.BaseException; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.concurrent.TimeoutException; |
|||
|
|||
/** |
|||
* @description: |
|||
* @author: whj |
|||
* @time: 2020/6/4 10:46 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class MqUtil { |
|||
|
|||
|
|||
/** |
|||
* 初始化某个消息队列 |
|||
* @param connectionFactory |
|||
* @param queueName |
|||
* @return 0:失败 1:成功 2:已存在 |
|||
*/ |
|||
public static WebConstant.QueueAdd initQueue(ConnectionFactory connectionFactory, String queueName) { |
|||
log.info("创建队列:{}", queueName); |
|||
if (RedisUtil.sHas(WebConstant.REDIS_QUEUE, queueName)) { |
|||
log.info("队列已在缓存中存在"); |
|||
return WebConstant.QueueAdd.EXISTED; |
|||
} |
|||
|
|||
Channel channel; |
|||
try { |
|||
channel = connectionFactory.newConnection().createChannel(); |
|||
} catch (Exception e) { |
|||
log.error("创建mq链接异常:{}", e); |
|||
return WebConstant.QueueAdd.FAIL; |
|||
} |
|||
try { |
|||
channel.queueDeclarePassive(queueName); |
|||
log.info("{}已存在", queueName); |
|||
RedisUtil.sSet(WebConstant.REDIS_QUEUE, queueName); |
|||
return WebConstant.QueueAdd.EXISTED; |
|||
} catch (IOException e) { |
|||
log.info("创建队列:{}", queueName); |
|||
try { |
|||
connectionFactory.newConnection().createChannel().queueDeclare(queueName, true, false, false, null); |
|||
} catch (Exception ex) { |
|||
ex.printStackTrace(); |
|||
log.error("创建队列时出现异常", e); |
|||
return WebConstant.QueueAdd.FAIL; |
|||
} |
|||
} |
|||
return WebConstant.QueueAdd.SUCCESS; |
|||
} |
|||
|
|||
/** |
|||
* 将缓存中的队列全部初始化 |
|||
* @return |
|||
*/ |
|||
public static String[] initAllQueue(ConnectionFactory connectionFactory){ |
|||
|
|||
Set<Object> queueNames = RedisUtil.sGet(WebConstant.REDIS_QUEUE); |
|||
if (CollectionUtil.isEmpty(queueNames)) { |
|||
log.info("没有需要创建的队列"); |
|||
return null; |
|||
} |
|||
queueNames.forEach(obj -> { |
|||
String queueName = (String)obj; |
|||
WebConstant.QueueAdd queueAdd = initQueue(connectionFactory, queueName); |
|||
if (queueAdd == WebConstant.QueueAdd.FAIL) { |
|||
throw new BaseException(MessageConstant.Error.QUEUE_NEW_FAIL); |
|||
} |
|||
}); |
|||
return (String[])queueNames.toArray(); |
|||
} |
|||
} |
Loading…
Reference in new issue