应用场景:ABC三台服务器使用ehcache缓存数据,其中A缓存数据发生变化,BC需同步A服务器的数据变化。1. 建立openMQ链接Topic链接建立imqobjmgr add -t tf -l 'MyConnectionFactory'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
imqobjmgr add -t t -l 'ehcache' -o 'imqDestinationName=EhcacheTopicDest'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
Queue 链接建立
imqobjmgr add -t qf -l 'MyQueueConnectionFactory'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
imqobjmgr add -t q -l 'queueEhcache' -o 'imqDestinationName=EhcacheQueueDest'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
参考资料: 进入mq目录,imqobjmgr -H
提示成功后,在c盘tmp下拿到 .bings文件(所有链接以来此文件)
2.配置ehcache xml文件
监听工厂配置
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
properties="
initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
userName=admin,password=admin,
replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
replicationTopicBindingName=ehcache,
getQueueConnectionFactoryBindingName=MyQueueConnectionFactory,
getQueueBindingName=QueueEhcache, providerURL=file:C:/tmp"
propertySeparator="," />
缓存节点配置
<cache name="sampleCacheAsync" maxElementsInMemory="1000" eternal="false" timeToIdleSeconds="1000" timeToLiveSeconds="1000" overflowToDisk="false">
<cacheEventListenerFactory class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory" properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true, asynchronousReplicationIntervalMillis=1000" />
<cacheLoaderFactory class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory" properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory, providerURL=file:C:/tmp, replicationTopicConnectionFactoryBindingName=MyConnectionFactory, replicationTopicBindingName=ehcache, getQueueConnectionFactoryBindingName=MyQueueConnectionFactory, getQueueBindingName=QueueEhcache, timeoutMillis=10000, userName=admin, password=admin" propertySeparator="," />
</cache>
服务端测试类
import java.util.Timer;
import java.util.TimerTask;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;public class EhCacheManager {
private static CacheManager cacheManager;
public static void main(String args[]){
cacheManager = new CacheManager();
Cache cache = EhCacheManager.getCacheObject("sampleCacheAsync");
if(cache != null){
System.out.println("load cache success!");
}
new EhCacheManager().start();
}
public static Cache getCacheObject(String objName){
if(cacheManager == null){
System.out.println("no init....... ");
}
return cacheManager.getCache(objName);
}public void start(){
Timer timer = new Timer(false);
timer.schedule(new MasterListener(), 0, 10000);
}public class MasterListener extends TimerTask {
@Override
public void run() {
printCache();
}
}public static void printCache(){
Cache cache = EhCacheManager.getCacheObject("sampleCacheAsync");
if(cache != null){
Element element = cache.get("1234");
if(element != null){
System.out.println(element.getValue());
}else{
System.out.println("no element object");
}
}else
{
System.out.println("no cache object!");
}
cache.put(new Element("1234", "============"));
}
}
通过发送topic message 改变缓存
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.archermind.mercury.configure.PropertiesDeploy;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.Queue;
import com.sun.messaging.Topic;public class JmsSender {
private static final String ACTION_PROPERTY = "action";
private static final String CACHE_NAME_PROPERTY = "cacheName";
private static final String KEY_PROPERTY = "key";
private static ConcurrentHashMap<String, ConnectionFactory> hashMap = new ConcurrentHashMap<String, ConnectionFactory>();
public static final Logger logger = LoggerFactory.getLogger(JmsSender.class);private void sent(String target, Object obj,String type) throws Exception {
if (!hashMap.containsKey(target)) {
initConnectionFactory(target);
}
ConnectionFactory connectionFactory = hashMap.get(target);
if (connectionFactory == null) {
initConnectionFactory(target);
connectionFactory = hashMap.get(target);
}
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.DUPS_OK_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(build(type,target)); String payload = "this is an object";
ObjectMessage objectMessage = session.createObjectMessage(payload);
objectMessage.setStringProperty(ACTION_PROPERTY, "PUT");
objectMessage.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
objectMessage.setStringProperty(KEY_PROPERTY, "1234");logger.info("send object.");
messageProducer.send(objectMessage);
logger.info("send complete.");
session.close();
connection.close();
}public void sendTopicMsg(Object obj, String target) {
try {
sent(target,obj,"topic");
} catch (Exception e) {
logger.error("sendTopicMsg error.",e);
}
}public void sendQueueMsg(Object obj, String target) {
try {
sent(target,obj,"Queue");
} catch (Exception e) {
logger.error("sendTopicMsg error.",e);
}
}
private void initConnectionFactory(String target)
throws JMSException {
String targetUrl =
"mq://192.168.20.35:7676";
logger.info("target.url"+target);
logger.info("configuremap contains target:"+PropertiesDeploy.getConfigureMap().keySet().contains(target+".url"));
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setProperty(ConnectionConfiguration.imqAddressList,
targetUrl);
connectionFactory.setProperty(
ConnectionConfiguration.imqReconnectEnabled, "true");
hashMap.put(target, connectionFactory);
}private Destination build(String type,String target) throws JMSException{
if("topic".equalsIgnoreCase(type)){
return new Topic(target);
}else{
return new Queue(target);
}
}public static void main(String[] args) throws Exception {
JmsSender sender = new JmsSender();
sender.sent("EhcacheTopicDest",null,"topic");
}
}
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
imqobjmgr add -t t -l 'ehcache' -o 'imqDestinationName=EhcacheTopicDest'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
Queue 链接建立
imqobjmgr add -t qf -l 'MyQueueConnectionFactory'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
imqobjmgr add -t q -l 'queueEhcache' -o 'imqDestinationName=EhcacheQueueDest'
-j java.naming.provider.url=file:///tmp
-j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
-f
参考资料: 进入mq目录,imqobjmgr -H
提示成功后,在c盘tmp下拿到 .bings文件(所有链接以来此文件)
2.配置ehcache xml文件
监听工厂配置
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
properties="
initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
userName=admin,password=admin,
replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
replicationTopicBindingName=ehcache,
getQueueConnectionFactoryBindingName=MyQueueConnectionFactory,
getQueueBindingName=QueueEhcache, providerURL=file:C:/tmp"
propertySeparator="," />
缓存节点配置
<cache name="sampleCacheAsync" maxElementsInMemory="1000" eternal="false" timeToIdleSeconds="1000" timeToLiveSeconds="1000" overflowToDisk="false">
<cacheEventListenerFactory class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory" properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true, asynchronousReplicationIntervalMillis=1000" />
<cacheLoaderFactory class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory" properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory, providerURL=file:C:/tmp, replicationTopicConnectionFactoryBindingName=MyConnectionFactory, replicationTopicBindingName=ehcache, getQueueConnectionFactoryBindingName=MyQueueConnectionFactory, getQueueBindingName=QueueEhcache, timeoutMillis=10000, userName=admin, password=admin" propertySeparator="," />
</cache>
服务端测试类
import java.util.Timer;
import java.util.TimerTask;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;public class EhCacheManager {
private static CacheManager cacheManager;
public static void main(String args[]){
cacheManager = new CacheManager();
Cache cache = EhCacheManager.getCacheObject("sampleCacheAsync");
if(cache != null){
System.out.println("load cache success!");
}
new EhCacheManager().start();
}
public static Cache getCacheObject(String objName){
if(cacheManager == null){
System.out.println("no init....... ");
}
return cacheManager.getCache(objName);
}public void start(){
Timer timer = new Timer(false);
timer.schedule(new MasterListener(), 0, 10000);
}public class MasterListener extends TimerTask {
@Override
public void run() {
printCache();
}
}public static void printCache(){
Cache cache = EhCacheManager.getCacheObject("sampleCacheAsync");
if(cache != null){
Element element = cache.get("1234");
if(element != null){
System.out.println(element.getValue());
}else{
System.out.println("no element object");
}
}else
{
System.out.println("no cache object!");
}
cache.put(new Element("1234", "============"));
}
}
通过发送topic message 改变缓存
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.archermind.mercury.configure.PropertiesDeploy;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.Queue;
import com.sun.messaging.Topic;public class JmsSender {
private static final String ACTION_PROPERTY = "action";
private static final String CACHE_NAME_PROPERTY = "cacheName";
private static final String KEY_PROPERTY = "key";
private static ConcurrentHashMap<String, ConnectionFactory> hashMap = new ConcurrentHashMap<String, ConnectionFactory>();
public static final Logger logger = LoggerFactory.getLogger(JmsSender.class);private void sent(String target, Object obj,String type) throws Exception {
if (!hashMap.containsKey(target)) {
initConnectionFactory(target);
}
ConnectionFactory connectionFactory = hashMap.get(target);
if (connectionFactory == null) {
initConnectionFactory(target);
connectionFactory = hashMap.get(target);
}
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.DUPS_OK_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(build(type,target)); String payload = "this is an object";
ObjectMessage objectMessage = session.createObjectMessage(payload);
objectMessage.setStringProperty(ACTION_PROPERTY, "PUT");
objectMessage.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
objectMessage.setStringProperty(KEY_PROPERTY, "1234");logger.info("send object.");
messageProducer.send(objectMessage);
logger.info("send complete.");
session.close();
connection.close();
}public void sendTopicMsg(Object obj, String target) {
try {
sent(target,obj,"topic");
} catch (Exception e) {
logger.error("sendTopicMsg error.",e);
}
}public void sendQueueMsg(Object obj, String target) {
try {
sent(target,obj,"Queue");
} catch (Exception e) {
logger.error("sendTopicMsg error.",e);
}
}
private void initConnectionFactory(String target)
throws JMSException {
String targetUrl =
"mq://192.168.20.35:7676";
logger.info("target.url"+target);
logger.info("configuremap contains target:"+PropertiesDeploy.getConfigureMap().keySet().contains(target+".url"));
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setProperty(ConnectionConfiguration.imqAddressList,
targetUrl);
connectionFactory.setProperty(
ConnectionConfiguration.imqReconnectEnabled, "true");
hashMap.put(target, connectionFactory);
}private Destination build(String type,String target) throws JMSException{
if("topic".equalsIgnoreCase(type)){
return new Topic(target);
}else{
return new Queue(target);
}
}public static void main(String[] args) throws Exception {
JmsSender sender = new JmsSender();
sender.sent("EhcacheTopicDest",null,"topic");
}
}
解决方案 »
- RMI问题 RMI打成可运行jar后无法正常操作
- tepestry5.0中uploadfile如何使用?
- 本人在做一个的“参数化的视图”,现在有些问题,请大家一起讨论一下,给我点意见
- [100分求助]java 生成报表,预览,打印。设计及源码。
- 一个web服务部署与多台服务器,如何得到当前时间
- 提个菜鸟问题
- 如何从数据库里取出大字段,反映到网页上,通过单击自动选择程序打开?
- 请问我想用JFREECHART 在柱状图上生成URL,总是出错不知道是怎么回事,请帮助我,谢谢!
- 实体Bean问题:
- Jbuilder 开发Web Service的错误
- 请教一个关于Struts2的问题,哪位高手能帮我解决一下,谢谢
- Java 实现在线浏览pdf文档(如豆丁网www.docin.com)
官方文档存在误导嫌疑,尤其在ehcache文件配置中,topic,queue队列必须全部配置否则出现jndi错误
推荐一个同事的blog其中对openMQ 进行了详细的阐述,以及其对openMQ吞吐量分析。