熟悉activemq的初步試用
1.在服務(wù)器(阿里云ubuntu16.04)上安裝activemq,我是直接下載activemq:
wget http://archive.apache.org/dist/activemq/apache-activemq/5.6.0/apache-activemq-5.6.0-bin.tar.gz
2.解壓及安裝,可以通過(guò)activemq --help 查看一些命令及參數(shù)信息;
3.啟動(dòng)activemq;
4.編寫簡(jiǎn)單的java demo:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:07
*/
public class Producter {
/**
* ActiveMq 的默認(rèn)用戶名
*/
private static final String USERNAME = "xx";
/**
* ActiveMq 的默認(rèn)登錄密碼
*/
private static final String PASSWORD = "xx";
/**
* ActiveMQ 的鏈接地址
*/
private static final String BROKEN_URL = "xx";
AtomicInteger count = new AtomicInteger(0);
/**
* 鏈接工廠
*/
ConnectionFactory connectionFactory;
/**
* 鏈接對(duì)象
*/
Connection connection;
/**
* 事務(wù)管理
*/
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//創(chuàng)建一個(gè)鏈接工廠
// connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://xx");
//從工廠中創(chuàng)建一個(gè)鏈接
connection = connectionFactory.createConnection();
//開(kāi)啟鏈接
connection.start();
//創(chuàng)建一個(gè)事務(wù)(這里通過(guò)參數(shù)可以設(shè)置事務(wù)的級(jí)別)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//創(chuàng)建一個(gè)消息隊(duì)列
Queue queue = session.createQueue(disname);
//消息生產(chǎn)者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//創(chuàng)建一條消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:生產(chǎn)者!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:生產(chǎn)東西!,count:"+num);
//發(fā)送消息
messageProducer.send(msg);
//提交事務(wù)
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:08
*/
public class Comsumer {
/**
* ActiveMq 的默認(rèn)用戶名
*/
private static final String USERNAME = "xx";
/**
* ActiveMq 的默認(rèn)登錄密碼
*/
private static final String PASSWORD = "xx";
/**
* ActiveMQ 的鏈接地址
*/
private static final String BROKEN_URL = "xx";
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://xx");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消費(fèi)者,我正在消費(fèi)Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
test:
/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:48
*/
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("zq-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:17
*/
public class TestMq {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("zq-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
以上僅僅是一個(gè)簡(jiǎn)單的demo,實(shí)際生產(chǎn)環(huán)境種使用,須要考慮使用場(chǎng)景,配置activemq的配置文件,以及與項(xiàng)目的整合問(wèn)題。

浙公網(wǎng)安備 33010602011771號(hào)