001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.Iterator; 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.ConcurrentHashMap; 023import org.apache.activemq.broker.Broker; 024import org.apache.activemq.broker.BrokerFilter; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.ProducerBrokerExchange; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.broker.region.TopicSubscription; 031import org.apache.activemq.command.*; 032import org.apache.activemq.security.SecurityContext; 033import org.apache.activemq.state.ProducerState; 034import org.apache.activemq.usage.Usage; 035import org.apache.activemq.util.IdGenerator; 036import org.apache.activemq.util.LongSequenceGenerator; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This broker filter handles tracking the state of the broker for purposes of 042 * publishing advisory messages to advisory consumers. 043 */ 044public class AdvisoryBroker extends BrokerFilter { 045 046 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); 047 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 048 049 protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 050 protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>(); 051 protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 052 protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 053 protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); 054 protected final ProducerId advisoryProducerId = new ProducerId(); 055 056 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 057 058 public AdvisoryBroker(Broker next) { 059 super(next); 060 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 061 } 062 063 @Override 064 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 065 super.addConnection(context, info); 066 067 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 068 //do not distribute usernames or passwords in advisory 069 ConnectionInfo copy = info.copy(); 070 copy.setUserName(""); 071 copy.setPassword(""); 072 fireAdvisory(context, topic, copy); 073 connections.put(copy.getConnectionId(), copy); 074 } 075 076 @Override 077 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 078 Subscription answer = super.addConsumer(context, info); 079 080 // Don't advise advisory topics. 081 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 082 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 083 consumers.put(info.getConsumerId(), info); 084 fireConsumerAdvisory(context, info.getDestination(), topic, info); 085 } else { 086 // We need to replay all the previously collected state objects 087 // for this newly added consumer. 088 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 089 // Replay the connections. 090 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) { 091 ConnectionInfo value = iter.next(); 092 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 093 fireAdvisory(context, topic, value, info.getConsumerId()); 094 } 095 } 096 097 // We check here whether the Destination is Temporary Destination specific or not since we 098 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination 099 // notifications. If its not just temporary destination related destinations then we have 100 // to send them all, a composite destination could want both. 101 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) { 102 // Replay the temporary destinations. 103 for (DestinationInfo destination : destinations.values()) { 104 if (destination.getDestination().isTemporary()) { 105 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 106 fireAdvisory(context, topic, destination, info.getConsumerId()); 107 } 108 } 109 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 110 // Replay all the destinations. 111 for (DestinationInfo destination : destinations.values()) { 112 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 113 fireAdvisory(context, topic, destination, info.getConsumerId()); 114 } 115 } 116 117 // Replay the producers. 118 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 119 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) { 120 ProducerInfo value = iter.next(); 121 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 122 fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); 123 } 124 } 125 126 // Replay the consumers. 127 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 128 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) { 129 ConsumerInfo value = iter.next(); 130 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 131 fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); 132 } 133 } 134 135 // Replay network bridges 136 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { 137 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) { 138 BrokerInfo key = iter.next(); 139 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 140 fireAdvisory(context, topic, key, null, networkBridges.get(key)); 141 } 142 } 143 } 144 return answer; 145 } 146 147 @Override 148 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 149 super.addProducer(context, info); 150 151 // Don't advise advisory topics. 152 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 153 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 154 fireProducerAdvisory(context, info.getDestination(), topic, info); 155 producers.put(info.getProducerId(), info); 156 } 157 } 158 159 @Override 160 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { 161 Destination answer = super.addDestination(context, destination,create); 162 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 163 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 164 DestinationInfo previous = destinations.putIfAbsent(destination, info); 165 if( previous==null ) { 166 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 167 fireAdvisory(context, topic, info); 168 } 169 } 170 return answer; 171 } 172 173 @Override 174 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 175 ActiveMQDestination destination = info.getDestination(); 176 next.addDestinationInfo(context, info); 177 178 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 179 DestinationInfo previous = destinations.putIfAbsent(destination, info); 180 if( previous==null ) { 181 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 182 fireAdvisory(context, topic, info); 183 } 184 } 185 } 186 187 @Override 188 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 189 super.removeDestination(context, destination, timeout); 190 DestinationInfo info = destinations.remove(destination); 191 if (info != null) { 192 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 193 info = info.copy(); 194 info.setDestination(destination); 195 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 196 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 197 fireAdvisory(context, topic, info); 198 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination); 199 for(ActiveMQTopic advisoryDestination : advisoryDestinations) { 200 try { 201 next.removeDestination(context, advisoryDestination, -1); 202 } catch (Exception expectedIfDestinationDidNotExistYet) { 203 } 204 } 205 } 206 } 207 208 @Override 209 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 210 super.removeDestinationInfo(context, destInfo); 211 DestinationInfo info = destinations.remove(destInfo.getDestination()); 212 if (info != null) { 213 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 214 info = info.copy(); 215 info.setDestination(destInfo.getDestination()); 216 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 217 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 218 fireAdvisory(context, topic, info); 219 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination()); 220 for(ActiveMQTopic advisoryDestination : advisoryDestinations) { 221 try { 222 next.removeDestination(context, advisoryDestination, -1); 223 } catch (Exception expectedIfDestinationDidNotExistYet) { 224 } 225 } 226 } 227 } 228 229 @Override 230 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 231 super.removeConnection(context, info, error); 232 233 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 234 fireAdvisory(context, topic, info.createRemoveCommand()); 235 connections.remove(info.getConnectionId()); 236 } 237 238 @Override 239 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 240 super.removeConsumer(context, info); 241 242 // Don't advise advisory topics. 243 ActiveMQDestination dest = info.getDestination(); 244 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 245 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 246 consumers.remove(info.getConsumerId()); 247 if (!dest.isTemporary() || destinations.containsKey(dest)) { 248 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); 249 } 250 } 251 } 252 253 @Override 254 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 255 super.removeProducer(context, info); 256 257 // Don't advise advisory topics. 258 ActiveMQDestination dest = info.getDestination(); 259 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 260 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 261 producers.remove(info.getProducerId()); 262 if (!dest.isTemporary() || destinations.contains(dest)) { 263 fireProducerAdvisory(context, dest,topic, info.createRemoveCommand()); 264 } 265 } 266 } 267 268 @Override 269 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { 270 super.messageExpired(context, messageReference, subscription); 271 try { 272 if(!messageReference.isAdvisory()) { 273 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); 274 Message payload = messageReference.getMessage().copy(); 275 payload.clearBody(); 276 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 277 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 278 fireAdvisory(context, topic, payload, null, advisoryMessage); 279 } 280 } catch (Exception e) { 281 handleFireFailure("expired", e); 282 } 283 } 284 285 @Override 286 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 287 super.messageConsumed(context, messageReference); 288 try { 289 if(!messageReference.isAdvisory()) { 290 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); 291 Message payload = messageReference.getMessage().copy(); 292 payload.clearBody(); 293 fireAdvisory(context, topic,payload); 294 } 295 } catch (Exception e) { 296 handleFireFailure("consumed", e); 297 } 298 } 299 300 @Override 301 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 302 super.messageDelivered(context, messageReference); 303 try { 304 if (!messageReference.isAdvisory()) { 305 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); 306 Message payload = messageReference.getMessage().copy(); 307 payload.clearBody(); 308 fireAdvisory(context, topic,payload); 309 } 310 } catch (Exception e) { 311 handleFireFailure("delivered", e); 312 } 313 } 314 315 @Override 316 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 317 super.messageDiscarded(context, sub, messageReference); 318 try { 319 if (!messageReference.isAdvisory()) { 320 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); 321 Message payload = messageReference.getMessage().copy(); 322 payload.clearBody(); 323 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 324 if (sub instanceof TopicSubscription) { 325 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded()); 326 } 327 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); 328 fireAdvisory(context, topic, payload, null, advisoryMessage); 329 } 330 } catch (Exception e) { 331 handleFireFailure("discarded", e); 332 } 333 } 334 335 @Override 336 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 337 super.slowConsumer(context, destination,subs); 338 try { 339 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 340 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 341 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 342 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 343 } catch (Exception e) { 344 handleFireFailure("slow consumer", e); 345 } 346 } 347 348 @Override 349 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { 350 super.fastProducer(context, producerInfo); 351 try { 352 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination()); 353 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 354 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 355 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 356 } catch (Exception e) { 357 handleFireFailure("fast producer", e); 358 } 359 } 360 361 @Override 362 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 363 super.isFull(context, destination, usage); 364 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 365 try { 366 367 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 368 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 369 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 370 fireAdvisory(context, topic, null, null, advisoryMessage); 371 372 } catch (Exception e) { 373 handleFireFailure("is full", e); 374 } 375 } 376 } 377 378 @Override 379 public void nowMasterBroker() { 380 super.nowMasterBroker(); 381 try { 382 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 383 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 384 ConnectionContext context = new ConnectionContext(); 385 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 386 context.setBroker(getBrokerService().getBroker()); 387 fireAdvisory(context, topic,null,null,advisoryMessage); 388 } catch (Exception e) { 389 handleFireFailure("now master broker", e); 390 } 391 } 392 393 @Override 394 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 395 Subscription subscription){ 396 super.sendToDeadLetterQueue(context, messageReference, subscription); 397 try { 398 if(!messageReference.isAdvisory()) { 399 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); 400 Message payload = messageReference.getMessage().copy(); 401 payload.clearBody(); 402 fireAdvisory(context, topic,payload); 403 } 404 } catch (Exception e) { 405 handleFireFailure("add to DLQ", e); 406 } 407 } 408 409 @Override 410 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 411 try { 412 if (brokerInfo != null) { 413 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 414 advisoryMessage.setBooleanProperty("started", true); 415 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); 416 advisoryMessage.setStringProperty("remoteIp", remoteIp); 417 networkBridges.putIfAbsent(brokerInfo, advisoryMessage); 418 419 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 420 421 ConnectionContext context = new ConnectionContext(); 422 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 423 context.setBroker(getBrokerService().getBroker()); 424 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 425 } 426 } catch (Exception e) { 427 handleFireFailure("network bridge started", e); 428 } 429 } 430 431 @Override 432 public void networkBridgeStopped(BrokerInfo brokerInfo) { 433 try { 434 if (brokerInfo != null) { 435 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 436 advisoryMessage.setBooleanProperty("started", false); 437 networkBridges.remove(brokerInfo); 438 439 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 440 441 ConnectionContext context = new ConnectionContext(); 442 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 443 context.setBroker(getBrokerService().getBroker()); 444 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 445 } 446 } catch (Exception e) { 447 handleFireFailure("network bridge stopped", e); 448 } 449 } 450 451 private void handleFireFailure(String message, Throwable cause) { 452 LOG.warn("Failed to fire " + message + " advisory, reason: " + cause); 453 if (LOG.isDebugEnabled()) { 454 LOG.debug(message + " detail", cause); 455 } 456 } 457 458 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 459 fireAdvisory(context, topic, command, null); 460 } 461 462 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 463 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 464 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 465 } 466 467 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { 468 fireConsumerAdvisory(context, consumerDestination,topic, command, null); 469 } 470 471 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 472 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 473 int count = 0; 474 Set<Destination>set = getDestinations(consumerDestination); 475 if (set != null) { 476 for (Destination dest:set) { 477 count += dest.getDestinationStatistics().getConsumers().getCount(); 478 } 479 } 480 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); 481 482 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 483 } 484 485 protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 486 fireProducerAdvisory(context,producerDestination, topic, command, null); 487 } 488 489 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 490 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 491 int count = 0; 492 if (producerDestination != null) { 493 Set<Destination> set = getDestinations(producerDestination); 494 if (set != null) { 495 for (Destination dest : set) { 496 count += dest.getDestinationStatistics().getProducers().getCount(); 497 } 498 } 499 } 500 advisoryMessage.setIntProperty("producerCount", count); 501 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 502 } 503 504 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 505 if (getBrokerService().isStarted()) { 506 //set properties 507 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 508 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 509 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 510 511 String url = getBrokerService().getVmConnectorURI().toString(); 512 if (getBrokerService().getDefaultSocketURIString() != null) { 513 url = getBrokerService().getDefaultSocketURIString(); 514 } 515 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 516 517 //set the data structure 518 advisoryMessage.setDataStructure(command); 519 advisoryMessage.setPersistent(false); 520 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 521 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 522 advisoryMessage.setTargetConsumerId(targetConsumerId); 523 advisoryMessage.setDestination(topic); 524 advisoryMessage.setResponseRequired(false); 525 advisoryMessage.setProducerId(advisoryProducerId); 526 boolean originalFlowControl = context.isProducerFlowControl(); 527 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 528 producerExchange.setConnectionContext(context); 529 producerExchange.setMutable(true); 530 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 531 try { 532 context.setProducerFlowControl(false); 533 next.send(producerExchange, advisoryMessage); 534 } finally { 535 context.setProducerFlowControl(originalFlowControl); 536 } 537 } 538 } 539 540 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 541 return connections; 542 } 543 544 public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() { 545 return consumers; 546 } 547 548 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 549 return producers; 550 } 551 552 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 553 return destinations; 554 } 555}