Browse Source

改变消息状态

ptos
zy_Java 3 years ago
parent
commit
33fa3420ac
  1. 12
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  2. 1
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java
  3. 2
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/persist/MessageDao.java
  4. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/service/IUserService.java
  5. 26
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/service/UserService.java
  6. 4
      ccmq/src/main/java/wiki/tall/ccmq/common/config/SpringConfig.java
  7. 34
      ccmq/src/main/java/wiki/tall/ccmq/common/controller/web/DebugController.java
  8. 41
      ccmq/src/main/java/wiki/tall/ccmq/common/controller/web/MessageController.java
  9. 2
      ccmq/src/main/resources/application-prod.properties
  10. 2
      ccmq/src/main/resources/logback-spring.xml

12
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -371,17 +371,23 @@ public class MessageHandler {
new ServerAckMessage(MessageConstant.Error.AckParameterError))
);
}
//没有错误不回复消息
break;
}
case SetMsgSuccess: {
logger.info("消息处理成功:{}",data);
boolean setStatusSuccess = false;
SetSuccessStatusMessage inSysData = JacksonUtil.jsonToBean(data, SetSuccessStatusMessage.class);
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
inSysData.setMessage(wrapperedChannel.getMessage());
// WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
// inSysData.setMessage(wrapperedChannel.getMessage());
if(null != inSysData.getData() && StrUtil.isNotEmpty(inSysData.getData().getMsgId())){
updateMessageStatus(inSysData.getData().getMsgId(), MessageConstant.Status.Succeed);
getMessageDao().updateMessageStatus(inSysData.getData().getMsgId(), MessageConstant.Status.Succeed);
//TODO 没有rawId
// updateMessageStatus(inSysData.getData().getMsgId(), MessageConstant.Status.Succeed);
setStatusSuccess = true;
logger.info("修改消息状态:{}",inSysData.getData().getMsgId());
}
if(!setStatusSuccess){
outMessage = new OutMessage(JacksonUtil.beanToJson(

1
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java

@ -204,6 +204,7 @@ public class MessageConstant {
/**
* (UserQueueRestServer)
*
*/
public enum DomainType{
//Netty Client

2
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/persist/MessageDao.java

@ -160,7 +160,7 @@ public class MessageDao implements IMessageDao {
//状态为pending
.addCriteria(Criteria.where("status").is(MessageConstant.Status.Pending))
.addCriteria(new Criteria().andOperator(
//未过期消息
//未过期消息
new Criteria().orOperator(
Criteria.where("rule.expireAt").is(0),
Criteria.where("rule.expireAt").gt(DateUtil.currentSeconds()

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/service/IUserService.java

@ -2,8 +2,12 @@ package com.ccsens.ccmq.lowlevel.service;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
import java.util.Set;
//@FeignClient(value = "eureka-client-btpro",fallback = TokenSer.class)
public interface IUserService {
//@GetMapping(value="/btpro/v1.0/token")
String getUserIdByToken(@RequestParam(value = "token") String token);
}

26
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/service/UserService.java

@ -5,6 +5,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ -17,10 +18,15 @@ import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import wiki.tall.ccmq.common.exception.BaseException;
import com.alibaba.fastjson.JSONObject;
import wiki.tall.ccmq.common.util.JsonResponse;
import wiki.tall.ccmq.common.util.WebConstant;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author wei
@ -65,9 +71,9 @@ public class UserService implements IUserService{
// }
UserInfoByToken userInfoByToken = new UserInfoByToken();
userInfoByToken.setToken(token);
userInfoByToken.setToken(WebConstant.HEADER_KEY_TOKEN_PREFIX + token);
String url = "https://localhost:7290/users/token";
String url = "http://localhost:7290/users/token";
HttpHeaders httpHeaders = new HttpHeaders();
MediaType type=MediaType.parseMediaType("application/json;charset=UTF-8");
@ -86,15 +92,25 @@ public class UserService implements IUserService{
String strBody = null;
if(response.getStatusCodeValue() == 200){
strBody = response.getBody();
System.out.println("认证接口返回:" + strBody);
}
if(StrUtil.isNotEmpty(strBody)) {
JSONObject jsonObject = JSONObject.parseObject(strBody);
//请求正确返回则,否则无操作
Integer code = jsonObject.getInteger("code");
if (code == null || code != 200) {
throw new BaseException("返回参数异常");
}
String data = jsonObject.getString("data");
if(ObjectUtil.isNotNull(data)){
try {
tokenToUserId = JSONObject.parseObject(strBody,TokenToUserId.class);
}catch (Exception e) {
tokenToUserId = JSON.parseObject(data, TokenToUserId.class);
}catch (Exception e){
throw new BaseException("返回参数异常");
}
}
if(ObjectUtil.isNotNull(tokenToUserId)){
System.out.println("返回的用户信息:" + tokenToUserId);
userId = tokenToUserId.getId().toString();
}
return userId;

4
ccmq/src/main/java/wiki/tall/ccmq/common/config/SpringConfig.java

@ -117,8 +117,8 @@ public class SpringConfig implements WebMvcConfigurer {
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(tokenInterceptor())
.addPathPatterns("/message/**");
// registry.addInterceptor(tokenInterceptor())
// .addPathPatterns("/message/**");
}
@Bean

34
ccmq/src/main/java/wiki/tall/ccmq/common/controller/web/DebugController.java

@ -0,0 +1,34 @@
package wiki.tall.ccmq.common.controller.web;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import wiki.tall.ccmq.common.util.JsonResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@Api(tags = "DEBUG" , description = "DebugController | ")
@RestController
@RequestMapping("/debug")
public class DebugController {
@ApiOperation(value = "/测试",notes = "")
@ApiImplicitParams({
})
@RequestMapping(value="",method = RequestMethod.GET,produces = {"application/json;charset=UTF-8"})
public JsonResponse debug(HttpServletRequest request) throws Exception {
return JsonResponse.newInstance().ok("测试");
}
}

41
ccmq/src/main/java/wiki/tall/ccmq/common/controller/web/MessageController.java

@ -1,15 +1,35 @@
package wiki.tall.ccmq.common.controller.web;
import com.ccsens.ccmq.lowlevel.message.MessageHandler;
import com.ccsens.ccmq.lowlevel.message.common.InMessage;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import com.ccsens.ccmq.lowlevel.message.common.MessageRule;
import com.ccsens.ccmq.lowlevel.service.IUserService;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import wiki.tall.ccmq.common.util.JsonResponse;
import javax.annotation.Resource;
import java.util.Set;
/**
* @author wei
*/
@RestController
@RequestMapping("/message")
@Slf4j
public class MessageController {
@Resource
private IUserService userService;
// @RequestMapping(value = "/nodeMsg",method = RequestMethod.GET,produces = {"application/json;charset=UTF-8"})
// public JsonResponse getNodeMessage(HttpServletRequest request,Long taskId) throws Exception {
// Long currentUserId = Long.valueOf(((Claims) request.getAttribute(WebConstant.REQUEST_KEY_CLAIMS)).getSubject());
@ -22,5 +42,26 @@ public class MessageController {
// Long currentUserId = Long.valueOf(((Claims) request.getAttribute(WebConstant.REQUEST_KEY_CLAIMS)).getSubject());
// List<BaseMessageDto> messages = messageService.getUserMessages(currentUserId,projectId,offset,limit);
// return JsonResponse.newInstance().ok(messages);
// }
@ApiOperation(value = "/发送消息",notes = "")
@ApiImplicitParams({
})
@RequestMapping(value="send",method = RequestMethod.POST,produces = {"application/json;charset=UTF-8"})
public JsonResponse sendMessage(@ApiParam @Validated @RequestBody InMessage message) throws Exception {
log.info("服务调用接口发送消息:{}",message);
message.setFromDomain(MessageConstant.DomainType.Rest);
MessageHandler.handleMessage(message);
return JsonResponse.newInstance().ok();
}
// @ApiOperation(value = "/处理消息",notes = "")
// @ApiImplicitParams({
// })
// @RequestMapping(value="",method = RequestMethod.POST,produces = {"application/json;charset=UTF-8"})
// public JsonResponse disposeMessage(@ApiParam @Validated @RequestBody InMessage message) throws Exception {
//
// return JsonResponse.newInstance().ok();
// }
}

2
ccmq/src/main/resources/application-prod.properties

@ -23,6 +23,7 @@ spring.redis.jedis.pool.max-idle=10
spring.redis.jedis.pool.min-idle=0
# RabbitMQ配置信息
#spring.rabbitmq.host=49.233.89.188
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
@ -30,6 +31,7 @@ spring.rabbitmq.password=111111
# spring.rabbitmq.listener.simple.acknowledge-mode=manual
# MongoDB配置信息
#spring.data.mongodb.uri=mongodb://wei:111111@49.233.89.188:27017/test?maxPoolSize=1000
spring.data.mongodb.uri=mongodb://wei:111111@127.0.0.1:27017/test?maxPoolSize=1000
# 最大连接数,每一台服务器

2
ccmq/src/main/resources/logback-spring.xml

@ -12,7 +12,7 @@
</springProfile>
<springProfile name="prod">
<property name="log.level.root" value="INFO"/>
<property name="log.path" value="/home/cloud/tall-message/${log.dir}"/>
<property name="log.path" value="/home/ptos_message/server/${log.dir}"/>
</springProfile>
<!--<property name="log.name.my" value="log-my.log" />-->
<property name="log.name.debug" value="log-debug.log" />

Loading…
Cancel
Save