首先声明,我没用过activemq, 我只使用过weblogic里面的JMS
不过既然你指名找我,我就粗粗的看了看!
我想你是使用JMS服务,所以我只能看到这里了!http://activemq.apache.org/how-do-i-get-started-with-jms.html-〉
http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS.html#wp84181我用不到的东西很少特意去研究,sorry! 现在帮不了你更多了!google/baidu 去搜索,或者 csdn 看看。
不过既然你指名找我,我就粗粗的看了看!
我想你是使用JMS服务,所以我只能看到这里了!http://activemq.apache.org/how-do-i-get-started-with-jms.html-〉
http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS.html#wp84181我用不到的东西很少特意去研究,sorry! 现在帮不了你更多了!google/baidu 去搜索,或者 csdn 看看。
英文好的话,自己看吧!我想你要的东西里面肯定有!
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode("***");
producer.setTimeToLive(?); producer.send(message);
一.Prop类(用来读取属性文件,单例)
package com.sitinspring.standardWeblogicJms;import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;import javax.naming.Context;
import javax.naming.InitialContext;public class Props {
private static final String File_Name = "jmsSetting.properties"; private static Properties propts; public static void makeProptsInstance() {
propts = new Properties(); try {
propts.load(new FileInputStream(File_Name));
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static String get(String name){
if(propts==null){
makeProptsInstance();
}
return (String)propts.get(name);
}
@SuppressWarnings("unchecked")
public static Context getInitialContext(){
Context context=null;
String jndiFactory=Props.get("jndi.factory");
String providerUrl=Props.get("jndi.provider.url");
Hashtable env=new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
env.put(Context.PROVIDER_URL, providerUrl);
try {
context=new InitialContext(env);
} catch (Exception ex) {
ex.printStackTrace();
}
return context;
}
}
二.QueueBase类(QueueComsumer和QueueSupplier的基类,用于归纳一些两类共通的东西)
package com.sitinspring.standardWeblogicJms;import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;public class QueueBase{
protected QueueConnectionFactory queueConnectionFactory;
protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected Queue queue;
public QueueBase(Context context){
try{
String jmsFactory=Props.get("jms.factory");
queueConnectionFactory=(QueueConnectionFactory)context.lookup(jmsFactory);
queueConnection=queueConnectionFactory.createQueueConnection();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName=Props.get("queue.name");
queue=(Queue)context.lookup(queueName);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
三.QueueComsumer类(用于接收消息)
package com.sitinspring.standardWeblogicJms;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.naming.Context;public class QueueComsumer extends QueueBase implements MessageListener {
private QueueReceiver queueReceiver; public QueueComsumer(Context context) {
super(context); try {
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (Exception ex) {
ex.printStackTrace();
}
} public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtmsg = (TextMessage) message; try {
System.out.println("I have received the TextMassage:"
+ txtmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
} public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
} public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueComsumer queueComsumer = new QueueComsumer(context); synchronized (queueComsumer) {
while (true) {
try {
queueComsumer.wait();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace(); try {
queueComsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
}
四.QueueSupplier类(用于发送消息)
package com.sitinspring.standardWeblogicJms;import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.naming.Context;public class QueueSupplier extends QueueBase {
private QueueSender queueSender; public QueueSupplier(Context context) {
super(context); try {
queueSender = queueSession.createSender(queue);
} catch (Exception ex) {
ex.printStackTrace();
}
} public void sendMsg(String msg) throws JMSException {
TextMessage txtMsg = queueSession.createTextMessage();
txtMsg.setText(msg); queueConnection.start();
queueSender.send(txtMsg);
} public void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
} public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueSupplier queueSupplier = new QueueSupplier(context); try {
queueSupplier.sendMsg("Hello World"); System.out.println("A message have been sent!");
} catch (JMSException ex) {
ex.printStackTrace();
} finally {
try {
queueSupplier.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
是做什么用的啊>?
.属性文件(jmsSetting.properties)
#jndi factory
jndi.factory=weblogic.jndi.WLInitialContextFactory#jndi provider url
jndi.provider.url=t3://127.0.0.1:7001/#jms factory
jms.factory=MyJMSConnectionFactory#queue name
queue.name=MyJMSQueue
at org.apache.activemq.jndi.ReadOnlyContext.lookup(ReadOnlyContext.java:215)
at javax.naming.InitialContext.lookup(InitialContext.java:351)
at com.QueueBase.<init>(QueueBase.java:19)
at com.QueueComsumer.<init>(QueueComsumer.java:14)
at com.QueueComsumer.main(QueueComsumer.java:46)
java.lang.NullPointerException
at com.QueueComsumer.<init>(QueueComsumer.java:17)
at com.QueueComsumer.main(QueueComsumer.java:46)
我用的是tomcat
我的属性文件(jmsSetting.properties) 是这样写的 #jndi factory
jndi.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory#jndi provider url
jndi.provider.url=tcp://10.64.8.54:51618/#jms factory
jms.factory=MyJMSConnectionFactory#queue name
queue.name=MyJMSQueue
我想错误在这jms.factory=MyJMSConnectionFactory 但是 MyJMSConnectionFactory是什么啊
还有#jndi provider url
jndi.provider.url=t3://127.0.0.1:7001/ 这个t3是什么啊?
不好意思 太打扰 朋友了!!!!!!
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;//发送TextMessage
public class SendMessage {
private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{ Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {
private String url = "tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}LZ发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息
大家好 那怎么从qctivemq队列里面取信息呢!我用的是下面这个 我在
对方给我一个队列名 queue_test
下面是我写的
private Destination destination;
if (topic) {
destination = session.createTopic("queue_test");
} else {
destination = session.createQueue("queue_test");
}MessageConsumer consumer = session.createConsumer(destination);
consumer.receive();以上是取的代码 部分!
但是 我好象没取出来 也可能取出来了 但是 我没看见!! 怎么 可以 显示出来啊 我能看见啊 ! 发的我会了 谢谢大家啊!!!
public void receive() {
try {
receiver = session.createReceiver(queue_test);
TextMessage tm = (TextMessage) receiver.receive();
System.out.println("receive successful!");
messageContent = tm.getText();
System.out.println("The content of the receiving message is:" + messageContent); } catch (JMSException jmse1) {
System.out.println("JMSException occured while receiving!");
jmse1.printStackTrace();
System.exit( -1);
}
}
我贴的是我自己的一段应用。lz 自己整理下吧。
另:为什么不整合spring.
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveMessage {
private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";
public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
这个是我自己做测试用的
适用收取TextMessage消息
希望对LZ有用
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;public class TryActiveMQ_Send {
public static void main(String args[])throws Exception{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
Message message = session.createTextMessage("我叫AAA!");
producer.send(message);
//session.commit();
session.close();
connection.close();
}
}
--------------------------------------------------------
package lzTest1;import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class TryActiveMQ_Receive {
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("收到消息: " + text);
} else {
System.out.println("收到消息: " + message);
}
//session.commit();
session.close();
connection.close();
}
}