001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.ActiveMQQueue; 029import org.apache.activemq.command.ActiveMQTopic; 030import org.apache.activemq.command.ProducerId; 031import org.apache.activemq.store.MessageStore; 032import org.apache.activemq.store.PersistenceAdapter; 033import org.apache.activemq.store.ProxyMessageStore; 034import org.apache.activemq.store.TopicMessageStore; 035import org.apache.activemq.store.TransactionStore; 036import org.apache.activemq.usage.SystemUsage; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * @org.apache.xbean.XBean 042 * 043 */ 044public class MemoryPersistenceAdapter implements PersistenceAdapter { 045 private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); 046 047 MemoryTransactionStore transactionStore; 048 ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>(); 049 ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 050 private boolean useExternalMessageReferences; 051 052 public Set<ActiveMQDestination> getDestinations() { 053 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size()); 054 for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) { 055 rc.add(iter.next()); 056 } 057 for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) { 058 rc.add(iter.next()); 059 } 060 return rc; 061 } 062 063 public static MemoryPersistenceAdapter newInstance(File file) { 064 return new MemoryPersistenceAdapter(); 065 } 066 067 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 068 MessageStore rc = queues.get(destination); 069 if (rc == null) { 070 rc = new MemoryMessageStore(destination); 071 if (transactionStore != null) { 072 rc = transactionStore.proxy(rc); 073 } 074 queues.put(destination, rc); 075 } 076 return rc; 077 } 078 079 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 080 TopicMessageStore rc = topics.get(destination); 081 if (rc == null) { 082 rc = new MemoryTopicMessageStore(destination); 083 if (transactionStore != null) { 084 rc = transactionStore.proxy(rc); 085 } 086 topics.put(destination, rc); 087 } 088 return rc; 089 } 090 091 /** 092 * Cleanup method to remove any state associated with the given destination 093 * 094 * @param destination Destination to forget 095 */ 096 public void removeQueueMessageStore(ActiveMQQueue destination) { 097 queues.remove(destination); 098 } 099 100 /** 101 * Cleanup method to remove any state associated with the given destination 102 * 103 * @param destination Destination to forget 104 */ 105 public void removeTopicMessageStore(ActiveMQTopic destination) { 106 topics.remove(destination); 107 } 108 109 public TransactionStore createTransactionStore() throws IOException { 110 if (transactionStore == null) { 111 transactionStore = new MemoryTransactionStore(this); 112 } 113 return transactionStore; 114 } 115 116 public void beginTransaction(ConnectionContext context) { 117 } 118 119 public void commitTransaction(ConnectionContext context) { 120 } 121 122 public void rollbackTransaction(ConnectionContext context) { 123 } 124 125 public void start() throws Exception { 126 } 127 128 public void stop() throws Exception { 129 } 130 131 public long getLastMessageBrokerSequenceId() throws IOException { 132 return 0; 133 } 134 135 public void deleteAllMessages() throws IOException { 136 for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) { 137 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 138 if (store != null) { 139 store.delete(); 140 } 141 } 142 for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) { 143 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 144 if (store != null) { 145 store.delete(); 146 } 147 } 148 149 if (transactionStore != null) { 150 transactionStore.delete(); 151 } 152 } 153 154 public boolean isUseExternalMessageReferences() { 155 return useExternalMessageReferences; 156 } 157 158 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 159 this.useExternalMessageReferences = useExternalMessageReferences; 160 } 161 162 protected MemoryMessageStore asMemoryMessageStore(Object value) { 163 if (value instanceof MemoryMessageStore) { 164 return (MemoryMessageStore)value; 165 } 166 if (value instanceof ProxyMessageStore) { 167 MessageStore delegate = ((ProxyMessageStore)value).getDelegate(); 168 if (delegate instanceof MemoryMessageStore) { 169 return (MemoryMessageStore) delegate; 170 } 171 } 172 LOG.warn("Expected an instance of MemoryMessageStore but was: " + value); 173 return null; 174 } 175 176 /** 177 * @param usageManager The UsageManager that is controlling the broker's 178 * memory usage. 179 */ 180 public void setUsageManager(SystemUsage usageManager) { 181 } 182 183 public String toString() { 184 return "MemoryPersistenceAdapter"; 185 } 186 187 public void setBrokerName(String brokerName) { 188 } 189 190 public void setDirectory(File dir) { 191 } 192 193 public File getDirectory(){ 194 return null; 195 } 196 197 public void checkpoint(boolean sync) throws IOException { 198 } 199 200 public long size(){ 201 return 0; 202 } 203 204 public void setCreateTransactionStore(boolean create) throws IOException { 205 if (create) { 206 createTransactionStore(); 207 } 208 } 209 210 public long getLastProducerSequenceId(ProducerId id) { 211 // memory map does duplicate suppression 212 return -1; 213 } 214}