配置文件
rabbitMQ: host: rabbitmq.com port: 30000 username: xxxx password: xxxxx topic: amq.topic queueName: icost-beta virtualHost: icost-beta #開啟發送確認機制,將來消息到達交換機以后有一個回調 publisher-confirm-type: correlated #消息到達消息隊列回調(如果消息沒有成功到達隊列,會觸發回調方法) publisher-returns: true template: retry: enabled: true # 開啟重發機制 initial-interval: 1000ms #間隔 1秒 max-attempts: 6 #最多發6次 multiplier: 1.2 #每次間隔 時間*1.2 max-interval: 10000ms #每次最大間隔時間 listener: simple: acknowledge-mode: manual
讀取配置
@Value("${rabbitMQ.username}")
private String username;
@Value("${rabbitMQ.password}")
private String password;
@Value("${rabbitMQ.host}")
private String host;
@Value("${rabbitMQ.port}")
private int port;
@Value("${rabbitMQ.topic}")
private String topic;
//聲明隊列
@Value("${rabbitMQ.queueName}")
private String queueName;
@Value("${rabbitMQ.virtualHost}")
private String virtualHost;
發送消息
/** * 發布信息 */ private void sendProducer (MQMessage mqMessage)throws IOException, TimeoutException{ Gson gson = new Gson(); // 發送同步消息 String messageStr = gson.toJson(mqMessage); //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //設置 RabbitMQ 地址 factory.setHost(host); factory.setPort(port); factory.setVirtualHost(virtualHost); //建立到代理服務器到連接 Connection conn = factory.newConnection(); //獲得信道 Channel channel = conn.createChannel(); //聲明交換機 channel.exchangeDeclare(topic, "topic",true); //消息持久化 channel.queueDeclare(queueName,true,false,false,null); String domainType =null; if("ContractCreate".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("ContractModification".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("ContractPayment".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("ContractSettlement".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("EngineerInstruction".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("JicaiContract".equals(mqMessage.getDomainType())){ domainType = "Contract."+mqMessage.getDomainType(); }else if("bid".equals(mqMessage.getDomainType())){ domainType = "bid."+mqMessage.getDomainType(); }else if("vendor".equals(mqMessage.getDomainType())){ domainType = "vendor."+mqMessage.getDomainType(); }else if("SubProject".equals(mqMessage.getDomainType())){ domainType = "SubProject."+mqMessage.getDomainType(); }else if("Project".equals(mqMessage.getDomainType())){ domainType = "SubProject."+mqMessage.getDomainType(); }else if("ContractTemplate".equals(mqMessage.getDomainType())){ domainType = "TargetCost."+mqMessage.getDomainType(); }else if("ContractPlanning".equals(mqMessage.getDomainType())){ domainType = "TargetCost."+mqMessage.getDomainType(); }else if("TargetCost".equals(mqMessage.getDomainType())){ domainType = "TargetCost."+mqMessage.getDomainType(); }else{ domainType = mqMessage.getDomainType(); } //發布消息==>使用動態路由(通配符方式) String key = domainType; //指定發布的路由key ContractCreate||ContractModification||ContractPayment||ContractSettlement||EngineerInstruction||JicaiContract channel.basicPublish(topic, key, true,null,messageStr.getBytes(StandardCharsets.UTF_8)); // 同步發送消息,只要不拋異常就是成功。 log.info(new Date() + " Send mq message success:--"+messageStr.toString()); channel.close(); conn.close(); }
接收消息
/** * 訂閱消息,處理業務 */ public void normalSubscribe() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //設置 RabbitMQ 地址 factory.setHost(host); factory.setPort(port); factory.setVirtualHost(virtualHost); //建立到代理服務器到連接 Connection conn = factory.newConnection(); //獲得信道 final Channel channel = conn.createChannel(); //聲明交換機 channel.exchangeDeclare(topic, "topic",true); //綁定臨時隊列與交換機并設置獲取交換機中動態路由 String key = "bid.*";//使用通配符指定路由key channel.queueBind(queueName, topic, key); //消費消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //接收到的消息內容 String msg = new String(body); MQWorkflowMessageBody messageBody = objectMapper.readValue(msg, MQWorkflowMessageBody.class); if ("bid".equals(messageBody.getDomainType()) && StringUtils.isNotBlank(messageBody.getBusinessKey()) && StringUtils.isNotBlank(String.valueOf(messageBody.getStatus()))) { handler.processRequest(messageBody.getBusinessKey(), String.valueOf(messageBody.getStatus())); } } catch (Exception e) { e.printStackTrace(); } } }); }
浙公網安備 33010602011771號