001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.fanout; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.activemq.command.Command; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.RemoveInfo; 031import org.apache.activemq.command.Response; 032import org.apache.activemq.state.ConnectionStateTracker; 033import org.apache.activemq.thread.DefaultThreadPools; 034import org.apache.activemq.thread.Task; 035import org.apache.activemq.thread.TaskRunner; 036import org.apache.activemq.transport.CompositeTransport; 037import org.apache.activemq.transport.DefaultTransportListener; 038import org.apache.activemq.transport.FutureResponse; 039import org.apache.activemq.transport.ResponseCallback; 040import org.apache.activemq.transport.Transport; 041import org.apache.activemq.transport.TransportFactory; 042import org.apache.activemq.transport.TransportListener; 043import org.apache.activemq.util.IOExceptionSupport; 044import org.apache.activemq.util.ServiceStopper; 045import org.apache.activemq.util.ServiceSupport; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A Transport that fans out a connection to multiple brokers. 051 * 052 * 053 */ 054public class FanoutTransport implements CompositeTransport { 055 056 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class); 057 058 private TransportListener transportListener; 059 private boolean disposed; 060 private boolean connected; 061 062 private final Object reconnectMutex = new Object(); 063 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 064 private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); 065 066 private final TaskRunner reconnectTask; 067 private boolean started; 068 069 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>(); 070 private int connectedCount; 071 072 private int minAckCount = 2; 073 074 private long initialReconnectDelay = 10; 075 private long maxReconnectDelay = 1000 * 30; 076 private long backOffMultiplier = 2; 077 private final boolean useExponentialBackOff = true; 078 private int maxReconnectAttempts; 079 private Exception connectionFailure; 080 private FanoutTransportHandler primary; 081 private boolean fanOutQueues = false; 082 083 static class RequestCounter { 084 085 final Command command; 086 final AtomicInteger ackCount; 087 088 RequestCounter(Command command, int count) { 089 this.command = command; 090 this.ackCount = new AtomicInteger(count); 091 } 092 093 @Override 094 public String toString() { 095 return command.getCommandId() + "=" + ackCount.get(); 096 } 097 } 098 099 class FanoutTransportHandler extends DefaultTransportListener { 100 101 private final URI uri; 102 private Transport transport; 103 104 private int connectFailures; 105 private long reconnectDelay = initialReconnectDelay; 106 private long reconnectDate; 107 108 public FanoutTransportHandler(URI uri) { 109 this.uri = uri; 110 } 111 112 @Override 113 public void onCommand(Object o) { 114 Command command = (Command)o; 115 if (command.isResponse()) { 116 Integer id = new Integer(((Response)command).getCorrelationId()); 117 RequestCounter rc = requestMap.get(id); 118 if (rc != null) { 119 if (rc.ackCount.decrementAndGet() <= 0) { 120 requestMap.remove(id); 121 transportListenerOnCommand(command); 122 } 123 } else { 124 transportListenerOnCommand(command); 125 } 126 } else { 127 transportListenerOnCommand(command); 128 } 129 } 130 131 @Override 132 public void onException(IOException error) { 133 try { 134 synchronized (reconnectMutex) { 135 if (transport == null || !transport.isConnected()) { 136 return; 137 } 138 139 LOG.debug("Transport failed, starting up reconnect task", error); 140 141 ServiceSupport.dispose(transport); 142 transport = null; 143 connectedCount--; 144 if (primary == this) { 145 primary = null; 146 } 147 reconnectTask.wakeup(); 148 } 149 } catch (InterruptedException e) { 150 Thread.currentThread().interrupt(); 151 if (transportListener != null) { 152 transportListener.onException(new InterruptedIOException()); 153 } 154 } 155 } 156 } 157 158 public FanoutTransport() throws InterruptedIOException { 159 // Setup a task that is used to reconnect the a connection async. 160 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 161 public boolean iterate() { 162 return doConnect(); 163 } 164 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this)); 165 } 166 167 /** 168 * @return 169 */ 170 private boolean doConnect() { 171 long closestReconnectDate = 0; 172 synchronized (reconnectMutex) { 173 174 if (disposed || connectionFailure != null) { 175 reconnectMutex.notifyAll(); 176 } 177 178 if (transports.size() == connectedCount || disposed || connectionFailure != null) { 179 return false; 180 } else { 181 182 if (transports.isEmpty()) { 183 // connectionFailure = new IOException("No uris available to 184 // connect to."); 185 } else { 186 187 // Try to connect them up. 188 Iterator<FanoutTransportHandler> iter = transports.iterator(); 189 for (int i = 0; iter.hasNext() && !disposed; i++) { 190 191 long now = System.currentTimeMillis(); 192 193 FanoutTransportHandler fanoutHandler = iter.next(); 194 if (fanoutHandler.transport != null) { 195 continue; 196 } 197 198 // Are we waiting a little to try to reconnect this one? 199 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) { 200 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 201 closestReconnectDate = fanoutHandler.reconnectDate; 202 } 203 continue; 204 } 205 206 URI uri = fanoutHandler.uri; 207 try { 208 LOG.debug("Stopped: " + this); 209 LOG.debug("Attempting connect to: " + uri); 210 Transport t = TransportFactory.compositeConnect(uri); 211 fanoutHandler.transport = t; 212 t.setTransportListener(fanoutHandler); 213 if (started) { 214 restoreTransport(fanoutHandler); 215 } 216 LOG.debug("Connection established"); 217 fanoutHandler.reconnectDelay = initialReconnectDelay; 218 fanoutHandler.connectFailures = 0; 219 if (primary == null) { 220 primary = fanoutHandler; 221 } 222 connectedCount++; 223 } catch (Exception e) { 224 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 225 226 if( fanoutHandler.transport !=null ) { 227 ServiceSupport.dispose(fanoutHandler.transport); 228 fanoutHandler.transport=null; 229 } 230 231 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 232 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 233 connectionFailure = e; 234 reconnectMutex.notifyAll(); 235 return false; 236 } else { 237 238 if (useExponentialBackOff) { 239 // Exponential increment of reconnect delay. 240 fanoutHandler.reconnectDelay *= backOffMultiplier; 241 if (fanoutHandler.reconnectDelay > maxReconnectDelay) { 242 fanoutHandler.reconnectDelay = maxReconnectDelay; 243 } 244 } 245 246 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 247 248 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 249 closestReconnectDate = fanoutHandler.reconnectDate; 250 } 251 } 252 } 253 } 254 if (transports.size() == connectedCount || disposed) { 255 reconnectMutex.notifyAll(); 256 return false; 257 } 258 259 } 260 } 261 262 } 263 264 try { 265 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 266 if (reconnectDelay > 0) { 267 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 268 Thread.sleep(reconnectDelay); 269 } 270 } catch (InterruptedException e1) { 271 Thread.currentThread().interrupt(); 272 } 273 return true; 274 } 275 276 public void start() throws Exception { 277 synchronized (reconnectMutex) { 278 LOG.debug("Started."); 279 if (started) { 280 return; 281 } 282 started = true; 283 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 284 FanoutTransportHandler th = iter.next(); 285 if (th.transport != null) { 286 restoreTransport(th); 287 } 288 } 289 connected=true; 290 } 291 } 292 293 public void stop() throws Exception { 294 synchronized (reconnectMutex) { 295 ServiceStopper ss = new ServiceStopper(); 296 297 if (!started) { 298 return; 299 } 300 started = false; 301 disposed = true; 302 connected=false; 303 304 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 305 FanoutTransportHandler th = iter.next(); 306 if (th.transport != null) { 307 ss.stop(th.transport); 308 } 309 } 310 311 LOG.debug("Stopped: " + this); 312 ss.throwFirstException(); 313 } 314 reconnectTask.shutdown(); 315 } 316 317 public int getMinAckCount() { 318 return minAckCount; 319 } 320 321 public void setMinAckCount(int minAckCount) { 322 this.minAckCount = minAckCount; 323 } 324 325 public long getInitialReconnectDelay() { 326 return initialReconnectDelay; 327 } 328 329 public void setInitialReconnectDelay(long initialReconnectDelay) { 330 this.initialReconnectDelay = initialReconnectDelay; 331 } 332 333 public long getMaxReconnectDelay() { 334 return maxReconnectDelay; 335 } 336 337 public void setMaxReconnectDelay(long maxReconnectDelay) { 338 this.maxReconnectDelay = maxReconnectDelay; 339 } 340 341 public long getReconnectDelayExponent() { 342 return backOffMultiplier; 343 } 344 345 public void setReconnectDelayExponent(long reconnectDelayExponent) { 346 this.backOffMultiplier = reconnectDelayExponent; 347 } 348 349 public int getMaxReconnectAttempts() { 350 return maxReconnectAttempts; 351 } 352 353 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 354 this.maxReconnectAttempts = maxReconnectAttempts; 355 } 356 357 public void oneway(Object o) throws IOException { 358 final Command command = (Command)o; 359 try { 360 synchronized (reconnectMutex) { 361 362 // Wait for transport to be connected. 363 while (connectedCount < minAckCount && !disposed && connectionFailure == null) { 364 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); 365 reconnectMutex.wait(1000); 366 } 367 368 // Still not fully connected. 369 if (connectedCount < minAckCount) { 370 371 Exception error; 372 373 // Throw the right kind of error.. 374 if (disposed) { 375 error = new IOException("Transport disposed."); 376 } else if (connectionFailure != null) { 377 error = connectionFailure; 378 } else { 379 error = new IOException("Unexpected failure."); 380 } 381 382 if (error instanceof IOException) { 383 throw (IOException)error; 384 } 385 throw IOExceptionSupport.create(error); 386 } 387 388 // If it was a request and it was not being tracked by 389 // the state tracker, 390 // then hold it in the requestMap so that we can replay 391 // it later. 392 boolean fanout = isFanoutCommand(command); 393 if (stateTracker.track(command) == null && command.isResponseRequired()) { 394 int size = fanout ? minAckCount : 1; 395 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); 396 } 397 398 // Send the message. 399 if (fanout) { 400 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 401 FanoutTransportHandler th = iter.next(); 402 if (th.transport != null) { 403 try { 404 th.transport.oneway(command); 405 } catch (IOException e) { 406 LOG.debug("Send attempt: failed."); 407 th.onException(e); 408 } 409 } 410 } 411 } else { 412 try { 413 primary.transport.oneway(command); 414 } catch (IOException e) { 415 LOG.debug("Send attempt: failed."); 416 primary.onException(e); 417 } 418 } 419 420 } 421 } catch (InterruptedException e) { 422 // Some one may be trying to stop our thread. 423 Thread.currentThread().interrupt(); 424 throw new InterruptedIOException(); 425 } 426 } 427 428 /** 429 * @param command 430 * @return 431 */ 432 private boolean isFanoutCommand(Command command) { 433 if (command.isMessage()) { 434 if( fanOutQueues ) { 435 return true; 436 } 437 return ((Message)command).getDestination().isTopic(); 438 } 439 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || 440 command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { 441 return false; 442 } 443 return true; 444 } 445 446 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 447 throw new AssertionError("Unsupported Method"); 448 } 449 450 public Object request(Object command) throws IOException { 451 throw new AssertionError("Unsupported Method"); 452 } 453 454 public Object request(Object command, int timeout) throws IOException { 455 throw new AssertionError("Unsupported Method"); 456 } 457 458 public void reconnect() { 459 LOG.debug("Waking up reconnect task"); 460 try { 461 reconnectTask.wakeup(); 462 } catch (InterruptedException e) { 463 Thread.currentThread().interrupt(); 464 } 465 } 466 467 public TransportListener getTransportListener() { 468 return transportListener; 469 } 470 471 public void setTransportListener(TransportListener commandListener) { 472 this.transportListener = commandListener; 473 } 474 475 public <T> T narrow(Class<T> target) { 476 477 if (target.isAssignableFrom(getClass())) { 478 return target.cast(this); 479 } 480 481 synchronized (reconnectMutex) { 482 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 483 FanoutTransportHandler th = iter.next(); 484 if (th.transport != null) { 485 T rc = th.transport.narrow(target); 486 if (rc != null) { 487 return rc; 488 } 489 } 490 } 491 } 492 493 return null; 494 495 } 496 497 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { 498 th.transport.start(); 499 stateTracker.setRestoreConsumers(th.transport == primary); 500 stateTracker.restore(th.transport); 501 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) { 502 RequestCounter rc = iter2.next(); 503 th.transport.oneway(rc.command); 504 } 505 } 506 507 public void add(boolean reblance,URI uris[]) { 508 509 synchronized (reconnectMutex) { 510 for (int i = 0; i < uris.length; i++) { 511 URI uri = uris[i]; 512 513 boolean match = false; 514 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 515 FanoutTransportHandler th = iter.next(); 516 if (th.uri.equals(uri)) { 517 match = true; 518 break; 519 } 520 } 521 if (!match) { 522 FanoutTransportHandler th = new FanoutTransportHandler(uri); 523 transports.add(th); 524 reconnect(); 525 } 526 } 527 } 528 529 } 530 531 public void remove(boolean rebalance,URI uris[]) { 532 533 synchronized (reconnectMutex) { 534 for (int i = 0; i < uris.length; i++) { 535 URI uri = uris[i]; 536 537 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 538 FanoutTransportHandler th = iter.next(); 539 if (th.uri.equals(uri)) { 540 if (th.transport != null) { 541 ServiceSupport.dispose(th.transport); 542 connectedCount--; 543 } 544 iter.remove(); 545 break; 546 } 547 } 548 } 549 } 550 551 } 552 553 public void reconnect(URI uri) throws IOException { 554 add(true,new URI[]{uri}); 555 556 } 557 558 public boolean isReconnectSupported() { 559 return true; 560 } 561 562 public boolean isUpdateURIsSupported() { 563 return true; 564 } 565 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 566 add(reblance,uris); 567 } 568 569 570 public String getRemoteAddress() { 571 if (primary != null) { 572 if (primary.transport != null) { 573 return primary.transport.getRemoteAddress(); 574 } 575 } 576 return null; 577 } 578 579 protected void transportListenerOnCommand(Command command) { 580 if (transportListener != null) { 581 transportListener.onCommand(command); 582 } 583 } 584 585 public boolean isFaultTolerant() { 586 return true; 587 } 588 589 public boolean isFanOutQueues() { 590 return fanOutQueues; 591 } 592 593 public void setFanOutQueues(boolean fanOutQueues) { 594 this.fanOutQueues = fanOutQueues; 595 } 596 597 public boolean isDisposed() { 598 return disposed; 599 } 600 601 602 public boolean isConnected() { 603 return connected; 604 } 605 606 public int getReceiveCounter() { 607 int rc = 0; 608 synchronized (reconnectMutex) { 609 for (FanoutTransportHandler th : transports) { 610 if (th.transport != null) { 611 rc += th.transport.getReceiveCounter(); 612 } 613 } 614 } 615 return rc; 616 } 617}