001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.wireformat.WireFormat; 034 035/** 036 * 037 * 038 */ 039public final class OpenWireFormat implements WireFormat { 040 041 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 042 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; 043 public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB 044 045 static final byte NULL_TYPE = CommandTypes.NULL; 046 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 047 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 048 049 private DataStreamMarshaller dataMarshallers[]; 050 private int version; 051 private boolean stackTraceEnabled; 052 private boolean tcpNoDelayEnabled; 053 private boolean cacheEnabled; 054 private boolean tightEncodingEnabled; 055 private boolean sizePrefixDisabled; 056 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 057 058 // The following fields are used for value caching 059 private short nextMarshallCacheIndex; 060 private short nextMarshallCacheEvictionIndex; 061 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 062 private DataStructure marshallCache[] = null; 063 private DataStructure unmarshallCache[] = null; 064 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 065 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 066 private WireFormatInfo preferedWireFormatInfo; 067 068 public OpenWireFormat() { 069 this(DEFAULT_VERSION); 070 } 071 072 public OpenWireFormat(int i) { 073 setVersion(i); 074 } 075 076 public int hashCode() { 077 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 078 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 079 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 080 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 081 } 082 083 public OpenWireFormat copy() { 084 OpenWireFormat answer = new OpenWireFormat(version); 085 answer.stackTraceEnabled = stackTraceEnabled; 086 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 087 answer.cacheEnabled = cacheEnabled; 088 answer.tightEncodingEnabled = tightEncodingEnabled; 089 answer.sizePrefixDisabled = sizePrefixDisabled; 090 answer.preferedWireFormatInfo = preferedWireFormatInfo; 091 return answer; 092 } 093 094 public boolean equals(Object object) { 095 if (object == null) { 096 return false; 097 } 098 OpenWireFormat o = (OpenWireFormat)object; 099 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 100 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 101 && o.sizePrefixDisabled == sizePrefixDisabled; 102 } 103 104 105 public String toString() { 106 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 107 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; 108 // return "OpenWireFormat{id="+id+", 109 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 110 } 111 112 public int getVersion() { 113 return version; 114 } 115 116 public synchronized ByteSequence marshal(Object command) throws IOException { 117 118 if (cacheEnabled) { 119 runMarshallCacheEvictionSweep(); 120 } 121 122// MarshallAware ma = null; 123// // If not using value caching, then the marshaled form is always the 124// // same 125// if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) { 126// ma = (MarshallAware)command; 127// } 128 129 ByteSequence sequence = null; 130 // if( ma!=null ) { 131 // sequence = ma.getCachedMarshalledForm(this); 132 // } 133 134 if (sequence == null) { 135 136 int size = 1; 137 if (command != null) { 138 139 DataStructure c = (DataStructure)command; 140 byte type = c.getDataStructureType(); 141 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 142 if (dsm == null) { 143 throw new IOException("Unknown data type: " + type); 144 } 145 if (tightEncodingEnabled) { 146 147 BooleanStream bs = new BooleanStream(); 148 size += dsm.tightMarshal1(this, c, bs); 149 size += bs.marshalledSize(); 150 151 bytesOut.restart(size); 152 if (!sizePrefixDisabled) { 153 bytesOut.writeInt(size); 154 } 155 bytesOut.writeByte(type); 156 bs.marshal(bytesOut); 157 dsm.tightMarshal2(this, c, bytesOut, bs); 158 sequence = bytesOut.toByteSequence(); 159 160 } else { 161 bytesOut.restart(); 162 if (!sizePrefixDisabled) { 163 bytesOut.writeInt(0); // we don't know the final size 164 // yet but write this here for 165 // now. 166 } 167 bytesOut.writeByte(type); 168 dsm.looseMarshal(this, c, bytesOut); 169 sequence = bytesOut.toByteSequence(); 170 171 if (!sizePrefixDisabled) { 172 size = sequence.getLength() - 4; 173 int pos = sequence.offset; 174 ByteSequenceData.writeIntBig(sequence, size); 175 sequence.offset = pos; 176 } 177 } 178 179 } else { 180 bytesOut.restart(5); 181 bytesOut.writeInt(size); 182 bytesOut.writeByte(NULL_TYPE); 183 sequence = bytesOut.toByteSequence(); 184 } 185 186 // if( ma!=null ) { 187 // ma.setCachedMarshalledForm(this, sequence); 188 // } 189 } 190 return sequence; 191 } 192 193 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 194 bytesIn.restart(sequence); 195 // DataInputStream dis = new DataInputStream(new 196 // ByteArrayInputStream(sequence)); 197 198 if (!sizePrefixDisabled) { 199 int size = bytesIn.readInt(); 200 if (sequence.getLength() - 4 != size) { 201 // throw new IOException("Packet size does not match marshaled 202 // size"); 203 } 204 205 if (size > maxFrameSize) { 206 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 207 } 208 } 209 210 Object command = doUnmarshal(bytesIn); 211 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 212 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 213 // } 214 return command; 215 } 216 217 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 218 219 if (cacheEnabled) { 220 runMarshallCacheEvictionSweep(); 221 } 222 223 int size = 1; 224 if (o != null) { 225 226 DataStructure c = (DataStructure)o; 227 byte type = c.getDataStructureType(); 228 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 229 if (dsm == null) { 230 throw new IOException("Unknown data type: " + type); 231 } 232 if (tightEncodingEnabled) { 233 BooleanStream bs = new BooleanStream(); 234 size += dsm.tightMarshal1(this, c, bs); 235 size += bs.marshalledSize(); 236 237 if (!sizePrefixDisabled) { 238 dataOut.writeInt(size); 239 } 240 241 dataOut.writeByte(type); 242 bs.marshal(dataOut); 243 dsm.tightMarshal2(this, c, dataOut, bs); 244 245 } else { 246 DataOutput looseOut = dataOut; 247 248 if (!sizePrefixDisabled) { 249 bytesOut.restart(); 250 looseOut = bytesOut; 251 } 252 253 looseOut.writeByte(type); 254 dsm.looseMarshal(this, c, looseOut); 255 256 if (!sizePrefixDisabled) { 257 ByteSequence sequence = bytesOut.toByteSequence(); 258 dataOut.writeInt(sequence.getLength()); 259 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 260 } 261 262 } 263 264 } else { 265 if (!sizePrefixDisabled) { 266 dataOut.writeInt(size); 267 } 268 dataOut.writeByte(NULL_TYPE); 269 } 270 } 271 272 public Object unmarshal(DataInput dis) throws IOException { 273 DataInput dataIn = dis; 274 if (!sizePrefixDisabled) { 275 int size = dis.readInt(); 276 if (size > maxFrameSize) { 277 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 278 } 279 // int size = dis.readInt(); 280 // byte[] data = new byte[size]; 281 // dis.readFully(data); 282 // bytesIn.restart(data); 283 // dataIn = bytesIn; 284 } 285 return doUnmarshal(dataIn); 286 } 287 288 /** 289 * Used by NIO or AIO transports 290 */ 291 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 292 int size = 1; 293 if (o != null) { 294 DataStructure c = (DataStructure)o; 295 byte type = c.getDataStructureType(); 296 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 297 if (dsm == null) { 298 throw new IOException("Unknown data type: " + type); 299 } 300 301 size += dsm.tightMarshal1(this, c, bs); 302 size += bs.marshalledSize(); 303 } 304 return size; 305 } 306 307 /** 308 * Used by NIO or AIO transports; note that the size is not written as part 309 * of this method. 310 */ 311 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 312 if (cacheEnabled) { 313 runMarshallCacheEvictionSweep(); 314 } 315 316 if (o != null) { 317 DataStructure c = (DataStructure)o; 318 byte type = c.getDataStructureType(); 319 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 320 if (dsm == null) { 321 throw new IOException("Unknown data type: " + type); 322 } 323 ds.writeByte(type); 324 bs.marshal(ds); 325 dsm.tightMarshal2(this, c, ds, bs); 326 } 327 } 328 329 /** 330 * Allows you to dynamically switch the version of the openwire protocol 331 * being used. 332 * 333 * @param version 334 */ 335 public void setVersion(int version) { 336 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 337 Class mfClass; 338 try { 339 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 340 } catch (ClassNotFoundException e) { 341 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 342 + ", could not load " + mfName) 343 .initCause(e); 344 } 345 try { 346 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 347 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 348 } catch (Throwable e) { 349 throw (IllegalArgumentException)new IllegalArgumentException( 350 "Invalid version: " 351 + version 352 + ", " 353 + mfName 354 + " does not properly implement the createMarshallerMap method.") 355 .initCause(e); 356 } 357 this.version = version; 358 } 359 360 public Object doUnmarshal(DataInput dis) throws IOException { 361 byte dataType = dis.readByte(); 362 if (dataType != NULL_TYPE) { 363 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 364 if (dsm == null) { 365 throw new IOException("Unknown data type: " + dataType); 366 } 367 Object data = dsm.createObject(); 368 if (this.tightEncodingEnabled) { 369 BooleanStream bs = new BooleanStream(); 370 bs.unmarshal(dis); 371 dsm.tightUnmarshal(this, data, dis, bs); 372 } else { 373 dsm.looseUnmarshal(this, data, dis); 374 } 375 return data; 376 } else { 377 return null; 378 } 379 } 380 381 // public void debug(String msg) { 382 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 383 // System.out.println(t+": "+msg); 384 // } 385 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 386 bs.writeBoolean(o != null); 387 if (o == null) { 388 return 0; 389 } 390 391 if (o.isMarshallAware()) { 392 // MarshallAware ma = (MarshallAware)o; 393 ByteSequence sequence = null; 394 // sequence=ma.getCachedMarshalledForm(this); 395 bs.writeBoolean(sequence != null); 396 if (sequence != null) { 397 return 1 + sequence.getLength(); 398 } 399 } 400 401 byte type = o.getDataStructureType(); 402 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 403 if (dsm == null) { 404 throw new IOException("Unknown data type: " + type); 405 } 406 return 1 + dsm.tightMarshal1(this, o, bs); 407 } 408 409 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 410 throws IOException { 411 if (!bs.readBoolean()) { 412 return; 413 } 414 415 byte type = o.getDataStructureType(); 416 ds.writeByte(type); 417 418 if (o.isMarshallAware() && bs.readBoolean()) { 419 420 // We should not be doing any caching 421 throw new IOException("Corrupted stream"); 422 // MarshallAware ma = (MarshallAware) o; 423 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 424 // ds.write(sequence.getData(), sequence.getOffset(), 425 // sequence.getLength()); 426 427 } else { 428 429 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 430 if (dsm == null) { 431 throw new IOException("Unknown data type: " + type); 432 } 433 dsm.tightMarshal2(this, o, ds, bs); 434 435 } 436 } 437 438 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 439 if (bs.readBoolean()) { 440 441 byte dataType = dis.readByte(); 442 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 443 if (dsm == null) { 444 throw new IOException("Unknown data type: " + dataType); 445 } 446 DataStructure data = dsm.createObject(); 447 448 if (data.isMarshallAware() && bs.readBoolean()) { 449 450 dis.readInt(); 451 dis.readByte(); 452 453 BooleanStream bs2 = new BooleanStream(); 454 bs2.unmarshal(dis); 455 dsm.tightUnmarshal(this, data, dis, bs2); 456 457 // TODO: extract the sequence from the dis and associate it. 458 // MarshallAware ma = (MarshallAware)data 459 // ma.setCachedMarshalledForm(this, sequence); 460 461 } else { 462 dsm.tightUnmarshal(this, data, dis, bs); 463 } 464 465 return data; 466 } else { 467 return null; 468 } 469 } 470 471 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 472 if (dis.readBoolean()) { 473 474 byte dataType = dis.readByte(); 475 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 476 if (dsm == null) { 477 throw new IOException("Unknown data type: " + dataType); 478 } 479 DataStructure data = dsm.createObject(); 480 dsm.looseUnmarshal(this, data, dis); 481 return data; 482 483 } else { 484 return null; 485 } 486 } 487 488 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 489 dataOut.writeBoolean(o != null); 490 if (o != null) { 491 byte type = o.getDataStructureType(); 492 dataOut.writeByte(type); 493 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 494 if (dsm == null) { 495 throw new IOException("Unknown data type: " + type); 496 } 497 dsm.looseMarshal(this, o, dataOut); 498 } 499 } 500 501 public void runMarshallCacheEvictionSweep() { 502 // Do we need to start evicting?? 503 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 504 505 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 506 marshallCache[nextMarshallCacheEvictionIndex] = null; 507 508 nextMarshallCacheEvictionIndex++; 509 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 510 nextMarshallCacheEvictionIndex = 0; 511 } 512 513 } 514 } 515 516 public Short getMarshallCacheIndex(DataStructure o) { 517 return marshallCacheMap.get(o); 518 } 519 520 public Short addToMarshallCache(DataStructure o) { 521 short i = nextMarshallCacheIndex++; 522 if (nextMarshallCacheIndex >= marshallCache.length) { 523 nextMarshallCacheIndex = 0; 524 } 525 526 // We can only cache that item if there is space left. 527 if (marshallCacheMap.size() < marshallCache.length) { 528 marshallCache[i] = o; 529 Short index = new Short(i); 530 marshallCacheMap.put(o, index); 531 return index; 532 } else { 533 // Use -1 to indicate that the value was not cached due to cache 534 // being full. 535 return new Short((short)-1); 536 } 537 } 538 539 public void setInUnmarshallCache(short index, DataStructure o) { 540 541 // There was no space left in the cache, so we can't 542 // put this in the cache. 543 if (index == -1) { 544 return; 545 } 546 547 unmarshallCache[index] = o; 548 } 549 550 public DataStructure getFromUnmarshallCache(short index) { 551 return unmarshallCache[index]; 552 } 553 554 public void setStackTraceEnabled(boolean b) { 555 stackTraceEnabled = b; 556 } 557 558 public boolean isStackTraceEnabled() { 559 return stackTraceEnabled; 560 } 561 562 public boolean isTcpNoDelayEnabled() { 563 return tcpNoDelayEnabled; 564 } 565 566 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 567 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 568 } 569 570 public boolean isCacheEnabled() { 571 return cacheEnabled; 572 } 573 574 public void setCacheEnabled(boolean cacheEnabled) { 575 if(cacheEnabled){ 576 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 577 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 578 } 579 this.cacheEnabled = cacheEnabled; 580 } 581 582 public boolean isTightEncodingEnabled() { 583 return tightEncodingEnabled; 584 } 585 586 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 587 this.tightEncodingEnabled = tightEncodingEnabled; 588 } 589 590 public boolean isSizePrefixDisabled() { 591 return sizePrefixDisabled; 592 } 593 594 public void setSizePrefixDisabled(boolean prefixPacketSize) { 595 this.sizePrefixDisabled = prefixPacketSize; 596 } 597 598 public void setPreferedWireFormatInfo(WireFormatInfo info) { 599 this.preferedWireFormatInfo = info; 600 } 601 602 public WireFormatInfo getPreferedWireFormatInfo() { 603 return preferedWireFormatInfo; 604 } 605 606 public long getMaxFrameSize() { 607 return maxFrameSize; 608 } 609 610 public void setMaxFrameSize(long maxFrameSize) { 611 this.maxFrameSize = maxFrameSize; 612 } 613 614 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 615 616 if (preferedWireFormatInfo == null) { 617 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 618 } 619 620 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 621 info.setVersion(this.getVersion()); 622 623 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize())); 624 info.setMaxFrameSize(this.getMaxFrameSize()); 625 626 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 627 info.setStackTraceEnabled(this.stackTraceEnabled); 628 629 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 630 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 631 632 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 633 info.setCacheEnabled(this.cacheEnabled); 634 635 this.tightEncodingEnabled = info.isTightEncodingEnabled() 636 && preferedWireFormatInfo.isTightEncodingEnabled(); 637 info.setTightEncodingEnabled(this.tightEncodingEnabled); 638 639 this.sizePrefixDisabled = info.isSizePrefixDisabled() 640 && preferedWireFormatInfo.isSizePrefixDisabled(); 641 info.setSizePrefixDisabled(this.sizePrefixDisabled); 642 643 if (cacheEnabled) { 644 645 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 646 info.setCacheSize(size); 647 648 if (size == 0) { 649 size = MARSHAL_CACHE_SIZE; 650 } 651 652 marshallCache = new DataStructure[size]; 653 unmarshallCache = new DataStructure[size]; 654 nextMarshallCacheIndex = 0; 655 nextMarshallCacheEvictionIndex = 0; 656 marshallCacheMap = new HashMap<DataStructure, Short>(); 657 } else { 658 marshallCache = null; 659 unmarshallCache = null; 660 nextMarshallCacheIndex = 0; 661 nextMarshallCacheEvictionIndex = 0; 662 marshallCacheMap = null; 663 } 664 665 } 666 667 protected int min(int version1, int version2) { 668 if (version1 < version2 && version1 > 0 || version2 <= 0) { 669 return version1; 670 } 671 return version2; 672 } 673 674 protected long min(long version1, long version2) { 675 if (version1 < version2 && version1 > 0 || version2 <= 0) { 676 return version1; 677 } 678 return version2; 679 } 680}