blob: 26c09f1bab0faa6ba44ac48bf13b95e35780a4e3 [file] [log] [blame]
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.client.socket.impl;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.client.ReadableByteBuf;
import org.mariadb.jdbc.client.impl.StandardReadableByteBuf;
import org.mariadb.jdbc.client.socket.Reader;
import org.mariadb.jdbc.client.util.MutableByte;
import org.mariadb.jdbc.util.log.Logger;
import org.mariadb.jdbc.util.log.LoggerHelper;
import org.mariadb.jdbc.util.log.Loggers;
/** Packet reader */
public class PacketReader implements Reader {
private StandardReadableByteBuf readBuf = new StandardReadableByteBuf(null, 0);
private static final int REUSABLE_BUFFER_LENGTH = 1024;
private static final int MAX_PACKET_SIZE = 0xffffff;
private static final Logger logger = Loggers.getLogger(PacketReader.class);
private final byte[] header = new byte[4];
private final byte[] reusableArray = new byte[REUSABLE_BUFFER_LENGTH];
private final InputStream inputStream;
private final int maxQuerySizeToLog;
private final MutableByte sequence;
private String serverThreadLog = "";
/**
* Constructor of standard socket MySQL packet stream reader.
*
* @param in stream
* @param conf connection options
* @param sequence current increment sequence
*/
public PacketReader(InputStream in, Configuration conf, MutableByte sequence) {
this.inputStream = in;
this.maxQuerySizeToLog = conf.maxQuerySizeToLog();
this.sequence = sequence;
}
public ReadableByteBuf readableBufFromArray(byte[] buf) {
readBuf.buf(buf, buf.length, 0);
return readBuf;
}
public ReadableByteBuf readReusablePacket() throws IOException {
return readReusablePacket(logger.isTraceEnabled());
}
public ReadableByteBuf readReusablePacket(boolean traceEnable) throws IOException {
// ***************************************************
// Read 4 byte header
// ***************************************************
int remaining = 4;
int off = 0;
do {
int count = inputStream.read(header, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ off
+ " bytes from 4 (socket was closed by server)");
}
remaining -= count;
off += count;
} while (remaining > 0);
int lastPacketLength =
(header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
sequence.set(header[3]);
// prepare array
byte[] rawBytes;
if (lastPacketLength < REUSABLE_BUFFER_LENGTH) {
rawBytes = reusableArray;
} else {
rawBytes = new byte[lastPacketLength];
}
// ***************************************************
// Read content
// ***************************************************
remaining = lastPacketLength;
off = 0;
do {
int count = inputStream.read(rawBytes, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ (lastPacketLength - remaining)
+ " bytes from "
+ lastPacketLength
+ " (socket was closed by server)");
}
remaining -= count;
off += count;
} while (remaining > 0);
if (traceEnable) {
logger.trace(
"read: {}\n{}",
serverThreadLog,
LoggerHelper.hex(header, rawBytes, 0, lastPacketLength, maxQuerySizeToLog));
}
readBuf.buf(rawBytes, lastPacketLength, 0);
return readBuf;
}
/**
* Get next MySQL packet. If packet is more than 16M, read as many packet needed to finish reading
* MySQL packet. (first that has not length = 16Mb)
*
* @param traceEnable must trace packet.
* @return array packet.
* @throws IOException if socket exception occur.
*/
public byte[] readPacket(boolean traceEnable) throws IOException {
// ***************************************************
// Read 4 byte header
// ***************************************************
int remaining = 4;
int off = 0;
do {
int count = inputStream.read(header, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ off
+ " bytes from 4 (socket was closed by server)");
}
remaining -= count;
off += count;
} while (remaining > 0);
int lastPacketLength =
(header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
// prepare array
byte[] rawBytes = new byte[lastPacketLength];
// ***************************************************
// Read content
// ***************************************************
remaining = lastPacketLength;
off = 0;
do {
int count = inputStream.read(rawBytes, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ (lastPacketLength - remaining)
+ " bytes from "
+ lastPacketLength
+ " (socket was closed by server)");
}
remaining -= count;
off += count;
} while (remaining > 0);
if (traceEnable) {
logger.trace(
"read: {}\n{}",
serverThreadLog,
LoggerHelper.hex(header, rawBytes, 0, lastPacketLength, maxQuerySizeToLog));
}
// ***************************************************
// In case content length is big, content will be separate in many 16Mb packets
// ***************************************************
if (lastPacketLength == MAX_PACKET_SIZE) {
int packetLength;
do {
remaining = 4;
off = 0;
do {
int count = inputStream.read(header, off, remaining);
if (count < 0) {
throw new EOFException("unexpected end of stream, read " + off + " bytes from 4");
}
remaining -= count;
off += count;
} while (remaining > 0);
packetLength = (header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
int currentbufLength = rawBytes.length;
byte[] newRawBytes = new byte[currentbufLength + packetLength];
System.arraycopy(rawBytes, 0, newRawBytes, 0, currentbufLength);
rawBytes = newRawBytes;
// ***************************************************
// Read content
// ***************************************************
remaining = packetLength;
off = currentbufLength;
do {
int count = inputStream.read(rawBytes, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ (packetLength - remaining)
+ " bytes from "
+ packetLength);
}
remaining -= count;
off += count;
} while (remaining > 0);
if (traceEnable) {
logger.trace(
"read: {}\n{}",
serverThreadLog,
LoggerHelper.hex(
header, rawBytes, currentbufLength, packetLength, maxQuerySizeToLog));
}
lastPacketLength += packetLength;
} while (packetLength == MAX_PACKET_SIZE);
}
return rawBytes;
}
public void skipPacket() throws IOException {
if (logger.isTraceEnabled()) {
readReusablePacket(logger.isTraceEnabled());
return;
}
// ***************************************************
// Read 4 byte header
// ***************************************************
int remaining = 4;
int off = 0;
do {
int count = inputStream.read(header, off, remaining);
if (count < 0) {
throw new EOFException(
"unexpected end of stream, read "
+ off
+ " bytes from 4 (socket was closed by server)");
}
remaining -= count;
off += count;
} while (remaining > 0);
int lastPacketLength =
(header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
remaining = lastPacketLength;
do {
remaining -= inputStream.skip(remaining);
} while (remaining > 0);
// ***************************************************
// In case content length is big, content will be separate in many 16Mb packets
// ***************************************************
if (lastPacketLength == MAX_PACKET_SIZE) {
int packetLength;
do {
remaining = 4;
off = 0;
do {
int count = inputStream.read(header, off, remaining);
if (count < 0) {
throw new EOFException("unexpected end of stream, read " + off + " bytes from 4");
}
remaining -= count;
off += count;
} while (remaining > 0);
packetLength = (header[0] & 0xff) + ((header[1] & 0xff) << 8) + ((header[2] & 0xff) << 16);
remaining = packetLength;
do {
remaining -= inputStream.skip(remaining);
} while (remaining > 0);
lastPacketLength += packetLength;
} while (packetLength == MAX_PACKET_SIZE);
}
}
public MutableByte getSequence() {
return sequence;
}
public void close() throws IOException {
inputStream.close();
}
/**
* Set server thread id.
*
* @param serverThreadId current server thread id.
* @param hostAddress host information
*/
public void setServerThreadId(Long serverThreadId, HostAddress hostAddress) {
Boolean isMaster = hostAddress != null ? hostAddress.primary : null;
this.serverThreadLog =
"conn="
+ (serverThreadId == null ? "-1" : serverThreadId)
+ ((isMaster != null) ? " (" + (isMaster ? "M" : "S") + ")" : "");
}
}