blob: 7504cc28dd5eb100a241a4847d511f1b7d8a6fce [file] [log] [blame]
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.client.socket.impl;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.client.socket.Writer;
import org.mariadb.jdbc.client.util.MutableByte;
import org.mariadb.jdbc.export.MaxAllowedPacketException;
import org.mariadb.jdbc.util.log.Logger;
import org.mariadb.jdbc.util.log.LoggerHelper;
import org.mariadb.jdbc.util.log.Loggers;
/** Packet writer */
@SuppressWarnings("SameReturnValue")
public class PacketWriter implements Writer {
/** initial buffer size */
public static final int SMALL_BUFFER_SIZE = 8192;
private static final Logger logger = Loggers.getLogger(PacketWriter.class);
private static final byte QUOTE = (byte) '\'';
private static final byte DBL_QUOTE = (byte) '"';
private static final byte ZERO_BYTE = (byte) '\0';
private static final byte BACKSLASH = (byte) '\\';
private static final int MEDIUM_BUFFER_SIZE = 128 * 1024;
private static final int LARGE_BUFFER_SIZE = 1024 * 1024;
private static final int MAX_PACKET_LENGTH = 0x00ffffff + 4;
private final int maxQuerySizeToLog;
private final OutputStream out;
private int maxPacketLength = MAX_PACKET_LENGTH;
private Integer maxAllowedPacket;
private long cmdLength;
private boolean permitTrace = true;
private String serverThreadLog = "";
private int mark = -1;
private boolean bufContainDataAfterMark = false;
/** internal buffer */
protected byte[] buf;
/** buffer position */
protected int pos = 4;
/** packet sequence */
protected final MutableByte sequence;
/** compressed packet sequence */
protected final MutableByte compressSequence;
/**
* Common feature to write data into socket, creating MariaDB Packet.
*
* @param out output stream
* @param maxQuerySizeToLog maximum query size to log
* @param maxAllowedPacket max allowed packet value if known
* @param sequence packet sequence
* @param compressSequence compressed packet sequence
*/
public PacketWriter(
OutputStream out,
int maxQuerySizeToLog,
Integer maxAllowedPacket,
MutableByte sequence,
MutableByte compressSequence) {
this.out = out;
this.buf = new byte[SMALL_BUFFER_SIZE];
this.maxQuerySizeToLog = maxQuerySizeToLog;
this.cmdLength = 0;
this.sequence = sequence;
this.compressSequence = compressSequence;
this.maxAllowedPacket = maxAllowedPacket;
}
/**
* get current position
*
* @return current position
*/
public int pos() {
return pos;
}
/**
* position setter
*
* @param pos new position
* @throws IOException if buffer is not big enough to contains new position
*/
public void pos(int pos) throws IOException {
if (pos > buf.length) growBuffer(pos);
this.pos = pos;
}
/**
* get current command length
*
* @return current command length
*/
public long getCmdLength() {
return cmdLength;
}
/**
* Write byte into buf, flush buf to socket if needed.
*
* @param value byte to send
* @throws IOException if socket error occur.
*/
public void writeByte(int value) throws IOException {
if (pos >= buf.length) {
if (pos >= maxPacketLength && !bufContainDataAfterMark) {
// buf is more than a Packet, must flushbuf()
writeSocket(false);
} else {
growBuffer(1);
}
}
buf[pos++] = (byte) value;
}
/**
* Write short value into buf. flush buf if too small.
*
* @param value short value
* @throws IOException if socket error occur
*/
public void writeShort(short value) throws IOException {
if (2 > buf.length - pos) {
// not enough space remaining
writeBytes(new byte[] {(byte) value, (byte) (value >> 8)}, 0, 2);
return;
}
buf[pos] = (byte) value;
buf[pos + 1] = (byte) (value >> 8);
pos += 2;
}
/**
* Write int value into buf. flush buf if too small.
*
* @param value int value
* @throws IOException if socket error occur
*/
public void writeInt(int value) throws IOException {
if (4 > buf.length - pos) {
// not enough space remaining
byte[] arr = new byte[4];
arr[0] = (byte) value;
arr[1] = (byte) (value >> 8);
arr[2] = (byte) (value >> 16);
arr[3] = (byte) (value >> 24);
writeBytes(arr, 0, 4);
return;
}
buf[pos] = (byte) value;
buf[pos + 1] = (byte) (value >> 8);
buf[pos + 2] = (byte) (value >> 16);
buf[pos + 3] = (byte) (value >> 24);
pos += 4;
}
/**
* Write long value into buf. flush buf if too small.
*
* @param value long value
* @throws IOException if socket error occur
*/
public void writeLong(long value) throws IOException {
if (8 > buf.length - pos) {
// not enough space remaining
byte[] arr = new byte[8];
arr[0] = (byte) value;
arr[1] = (byte) (value >> 8);
arr[2] = (byte) (value >> 16);
arr[3] = (byte) (value >> 24);
arr[4] = (byte) (value >> 32);
arr[5] = (byte) (value >> 40);
arr[6] = (byte) (value >> 48);
arr[7] = (byte) (value >> 56);
writeBytes(arr, 0, 8);
return;
}
buf[pos] = (byte) value;
buf[pos + 1] = (byte) (value >> 8);
buf[pos + 2] = (byte) (value >> 16);
buf[pos + 3] = (byte) (value >> 24);
buf[pos + 4] = (byte) (value >> 32);
buf[pos + 5] = (byte) (value >> 40);
buf[pos + 6] = (byte) (value >> 48);
buf[pos + 7] = (byte) (value >> 56);
pos += 8;
}
public void writeDouble(double value) throws IOException {
writeLong(Double.doubleToLongBits(value));
}
public void writeFloat(float value) throws IOException {
writeInt(Float.floatToIntBits(value));
}
public void writeBytes(byte[] arr) throws IOException {
writeBytes(arr, 0, arr.length);
}
public void writeBytesAtPos(byte[] arr, int pos) {
System.arraycopy(arr, 0, buf, pos, arr.length);
}
/**
* Write byte array to buf. If buf is full, flush socket.
*
* @param arr byte array
* @param off offset
* @param len byte length to write
* @throws IOException if socket error occur
*/
public void writeBytes(byte[] arr, int off, int len) throws IOException {
if (len > buf.length - pos) {
if (buf.length != maxPacketLength) {
growBuffer(len);
}
// max buf size
if (len > buf.length - pos) {
if (mark != -1) {
growBuffer(len);
if (mark != -1) {
flushBufferStopAtMark();
}
}
if (len > buf.length - pos) {
// not enough space in buf, will stream :
// fill buf and flush until all data are snd
int remainingLen = len;
do {
int lenToFillbuf = Math.min(maxPacketLength - pos, remainingLen);
System.arraycopy(arr, off, buf, pos, lenToFillbuf);
remainingLen -= lenToFillbuf;
off += lenToFillbuf;
pos += lenToFillbuf;
if (remainingLen > 0) {
writeSocket(false);
} else {
break;
}
} while (true);
return;
}
}
}
System.arraycopy(arr, off, buf, pos, len);
pos += len;
}
/**
* Write field length into buf, flush socket if needed.
*
* @param length field length
* @throws IOException if socket error occur.
*/
public void writeLength(long length) throws IOException {
if (length < 251) {
writeByte((byte) length);
return;
}
if (length < 65536) {
if (3 > buf.length - pos) {
// not enough space remaining
byte[] arr = new byte[3];
arr[0] = (byte) 0xfc;
arr[1] = (byte) length;
arr[2] = (byte) (length >>> 8);
writeBytes(arr, 0, 3);
return;
}
buf[pos] = (byte) 0xfc;
buf[pos + 1] = (byte) length;
buf[pos + 2] = (byte) (length >>> 8);
pos += 3;
return;
}
if (length < 16777216) {
if (4 > buf.length - pos) {
// not enough space remaining
byte[] arr = new byte[4];
arr[0] = (byte) 0xfd;
arr[1] = (byte) length;
arr[2] = (byte) (length >>> 8);
arr[3] = (byte) (length >>> 16);
writeBytes(arr, 0, 4);
return;
}
buf[pos] = (byte) 0xfd;
buf[pos + 1] = (byte) length;
buf[pos + 2] = (byte) (length >>> 8);
buf[pos + 3] = (byte) (length >>> 16);
pos += 4;
return;
}
if (9 > buf.length - pos) {
// not enough space remaining
byte[] arr = new byte[9];
arr[0] = (byte) 0xfe;
arr[1] = (byte) length;
arr[2] = (byte) (length >>> 8);
arr[3] = (byte) (length >>> 16);
arr[4] = (byte) (length >>> 24);
arr[5] = (byte) (length >>> 32);
arr[6] = (byte) (length >>> 40);
arr[7] = (byte) (length >>> 48);
arr[8] = (byte) (length >>> 56);
writeBytes(arr, 0, 9);
return;
}
buf[pos] = (byte) 0xfe;
buf[pos + 1] = (byte) length;
buf[pos + 2] = (byte) (length >>> 8);
buf[pos + 3] = (byte) (length >>> 16);
buf[pos + 4] = (byte) (length >>> 24);
buf[pos + 5] = (byte) (length >>> 32);
buf[pos + 6] = (byte) (length >>> 40);
buf[pos + 7] = (byte) (length >>> 48);
buf[pos + 8] = (byte) (length >>> 56);
pos += 9;
}
public void writeAscii(String str) throws IOException {
int len = str.length();
if (len > buf.length - pos) {
byte[] arr = str.getBytes(StandardCharsets.US_ASCII);
writeBytes(arr, 0, arr.length);
return;
}
for (int off = 0; off < len; ) {
this.buf[this.pos++] = (byte) str.charAt(off++);
}
}
public void writeString(String str) throws IOException {
int charsLength = str.length();
// not enough space remaining
if (charsLength * 3 >= buf.length - pos) {
byte[] arr = str.getBytes(StandardCharsets.UTF_8);
writeBytes(arr, 0, arr.length);
return;
}
// create UTF-8 byte array
// since java char are internally using UTF-16 using surrogate's pattern, 4 bytes unicode
// characters will
// represent 2 characters : example "\uD83C\uDFA4" = 🎤 unicode 8 "no microphones"
// so max size is 3 * charLength
// (escape characters are 1 byte encoded, so length might only be 2 when escape)
// + 2 for the quotes for text protocol
int charsOffset = 0;
char currChar;
// quick loop if only ASCII chars for faster escape
for (;
charsOffset < charsLength && (currChar = str.charAt(charsOffset)) < 0x80;
charsOffset++) {
buf[pos++] = (byte) currChar;
}
// if quick loop not finished
while (charsOffset < charsLength) {
currChar = str.charAt(charsOffset++);
if (currChar < 0x80) {
buf[pos++] = (byte) currChar;
} else if (currChar < 0x800) {
buf[pos++] = (byte) (0xc0 | (currChar >> 6));
buf[pos++] = (byte) (0x80 | (currChar & 0x3f));
} else if (currChar >= 0xD800 && currChar < 0xE000) {
// reserved for surrogate - see https://en.wikipedia.org/wiki/UTF-16
if (currChar < 0xDC00) {
// is high surrogate
if (charsOffset + 1 > charsLength) {
buf[pos++] = (byte) 0x63;
} else {
char nextChar = str.charAt(charsOffset);
if (nextChar >= 0xDC00 && nextChar < 0xE000) {
// is low surrogate
int surrogatePairs =
((currChar << 10) + nextChar) + (0x010000 - (0xD800 << 10) - 0xDC00);
buf[pos++] = (byte) (0xf0 | ((surrogatePairs >> 18)));
buf[pos++] = (byte) (0x80 | ((surrogatePairs >> 12) & 0x3f));
buf[pos++] = (byte) (0x80 | ((surrogatePairs >> 6) & 0x3f));
buf[pos++] = (byte) (0x80 | (surrogatePairs & 0x3f));
charsOffset++;
} else {
// must have low surrogate
buf[pos++] = (byte) 0x3f;
}
}
} else {
// low surrogate without high surrogate before
buf[pos++] = (byte) 0x3f;
}
} else {
buf[pos++] = (byte) (0xe0 | ((currChar >> 12)));
buf[pos++] = (byte) (0x80 | ((currChar >> 6) & 0x3f));
buf[pos++] = (byte) (0x80 | (currChar & 0x3f));
}
}
}
/**
* Current buffer
*
* @return current buffer
*/
public byte[] buf() {
return buf;
}
/**
* Write string to socket.
*
* @param str string
* @param noBackslashEscapes escape method
* @throws IOException if socket error occur
*/
public void writeStringEscaped(String str, boolean noBackslashEscapes) throws IOException {
int charsLength = str.length();
// not enough space remaining
if (charsLength * 3 >= buf.length - pos) {
byte[] arr = str.getBytes(StandardCharsets.UTF_8);
writeBytesEscaped(arr, arr.length, noBackslashEscapes);
return;
}
// create UTF-8 byte array
// since java char are internally using UTF-16 using surrogate's pattern, 4 bytes unicode
// characters will
// represent 2 characters : example "\uD83C\uDFA4" = 🎤 unicode 8 "no microphones"
// so max size is 3 * charLength
// (escape characters are 1 byte encoded, so length might only be 2 when escape)
// + 2 for the quotes for text protocol
int charsOffset = 0;
char currChar;
// quick loop if only ASCII chars for faster escape
if (noBackslashEscapes) {
for (;
charsOffset < charsLength && (currChar = str.charAt(charsOffset)) < 0x80;
charsOffset++) {
if (currChar == QUOTE) {
buf[pos++] = QUOTE;
}
buf[pos++] = (byte) currChar;
}
} else {
for (;
charsOffset < charsLength && (currChar = str.charAt(charsOffset)) < 0x80;
charsOffset++) {
if (currChar == BACKSLASH || currChar == QUOTE || currChar == 0 || currChar == DBL_QUOTE) {
buf[pos++] = BACKSLASH;
}
buf[pos++] = (byte) currChar;
}
}
// if quick loop not finished
while (charsOffset < charsLength) {
currChar = str.charAt(charsOffset++);
if (currChar < 0x80) {
if (noBackslashEscapes) {
if (currChar == QUOTE) {
buf[pos++] = QUOTE;
}
} else if (currChar == BACKSLASH
|| currChar == QUOTE
|| currChar == ZERO_BYTE
|| currChar == DBL_QUOTE) {
buf[pos++] = BACKSLASH;
}
buf[pos++] = (byte) currChar;
} else if (currChar < 0x800) {
buf[pos++] = (byte) (0xc0 | (currChar >> 6));
buf[pos++] = (byte) (0x80 | (currChar & 0x3f));
} else if (currChar >= 0xD800 && currChar < 0xE000) {
// reserved for surrogate - see https://en.wikipedia.org/wiki/UTF-16
if (currChar < 0xDC00) {
// is high surrogate
if (charsOffset + 1 > charsLength) {
buf[pos++] = (byte) 0x63;
} else {
char nextChar = str.charAt(charsOffset);
if (nextChar >= 0xDC00 && nextChar < 0xE000) {
// is low surrogate
int surrogatePairs =
((currChar << 10) + nextChar) + (0x010000 - (0xD800 << 10) - 0xDC00);
buf[pos++] = (byte) (0xf0 | ((surrogatePairs >> 18)));
buf[pos++] = (byte) (0x80 | ((surrogatePairs >> 12) & 0x3f));
buf[pos++] = (byte) (0x80 | ((surrogatePairs >> 6) & 0x3f));
buf[pos++] = (byte) (0x80 | (surrogatePairs & 0x3f));
charsOffset++;
} else {
// must have low surrogate
buf[pos++] = (byte) 0x3f;
}
}
} else {
// low surrogate without high surrogate before
buf[pos++] = (byte) 0x3f;
}
} else {
buf[pos++] = (byte) (0xe0 | ((currChar >> 12)));
buf[pos++] = (byte) (0x80 | ((currChar >> 6) & 0x3f));
buf[pos++] = (byte) (0x80 | (currChar & 0x3f));
}
}
}
/**
* Write escape bytes to socket.
*
* @param bytes bytes
* @param len len to write
* @param noBackslashEscapes escape method
* @throws IOException if socket error occur
*/
public void writeBytesEscaped(byte[] bytes, int len, boolean noBackslashEscapes)
throws IOException {
if (len * 2 > buf.length - pos) {
// makes buf bigger (up to 16M)
if (buf.length != maxPacketLength) {
growBuffer(len * 2);
}
// data may be bigger than buf.
// must flush buf when full (and reset position to 0)
if (len * 2 > buf.length - pos) {
if (mark != -1) {
growBuffer(len * 2);
if (mark != -1) {
flushBufferStopAtMark();
}
} else {
// not enough space in buf, will fill buf
if (buf.length <= pos) {
writeSocket(false);
}
if (noBackslashEscapes) {
for (int i = 0; i < len; i++) {
if (QUOTE == bytes[i]) {
buf[pos++] = QUOTE;
if (buf.length <= pos) {
writeSocket(false);
}
}
buf[pos++] = bytes[i];
if (buf.length <= pos) {
writeSocket(false);
}
}
} else {
for (int i = 0; i < len; i++) {
if (bytes[i] == QUOTE
|| bytes[i] == BACKSLASH
|| bytes[i] == DBL_QUOTE
|| bytes[i] == ZERO_BYTE) {
buf[pos++] = '\\';
if (buf.length <= pos) {
writeSocket(false);
}
}
buf[pos++] = bytes[i];
if (buf.length <= pos) {
writeSocket(false);
}
}
}
return;
}
}
}
// sure to have enough place filling buf directly
if (noBackslashEscapes) {
for (int i = 0; i < len; i++) {
if (QUOTE == bytes[i]) {
buf[pos++] = QUOTE;
}
buf[pos++] = bytes[i];
}
} else {
for (int i = 0; i < len; i++) {
if (bytes[i] == QUOTE
|| bytes[i] == BACKSLASH
|| bytes[i] == '"'
|| bytes[i] == ZERO_BYTE) {
buf[pos++] = BACKSLASH; // add escape slash
}
buf[pos++] = bytes[i];
}
}
}
/**
* buf growing use 4 size only to avoid creating/copying that are expensive operations. possible
* size
*
* <ol>
* <li>SMALL_buf_SIZE = 8k (default)
* <li>MEDIUM_buf_SIZE = 128k
* <li>LARGE_buf_SIZE = 1M
* <li>getMaxPacketLength = 16M (+ 4 if using no compression)
* </ol>
*
* @param len length to add
*/
private void growBuffer(int len) throws IOException {
int bufLength = buf.length;
int newCapacity;
if (bufLength == SMALL_BUFFER_SIZE) {
if (len + pos <= MEDIUM_BUFFER_SIZE) {
newCapacity = MEDIUM_BUFFER_SIZE;
} else if (len + pos <= LARGE_BUFFER_SIZE) {
newCapacity = LARGE_BUFFER_SIZE;
} else {
newCapacity = maxPacketLength;
}
} else if (bufLength == MEDIUM_BUFFER_SIZE) {
if (len + pos < LARGE_BUFFER_SIZE) {
newCapacity = LARGE_BUFFER_SIZE;
} else {
newCapacity = maxPacketLength;
}
} else if (bufContainDataAfterMark) {
// want to add some information to buf without having the command Header
// must grow buf until having all the query
newCapacity = Math.max(len + pos, maxPacketLength);
} else {
newCapacity = maxPacketLength;
}
if (len + pos > newCapacity) {
if (mark != -1) {
// buf is > 16M with mark.
// flush until mark, reset pos at beginning
flushBufferStopAtMark();
if (len + pos <= bufLength) {
return;
}
// need to keep all data, buf can grow more than maxPacketLength
// grow buf if needed
if (bufLength == maxPacketLength) return;
if (len + pos > newCapacity) {
newCapacity = Math.min(maxPacketLength, len + pos);
}
}
}
byte[] newBuf = new byte[newCapacity];
System.arraycopy(buf, 0, newBuf, 0, pos);
buf = newBuf;
}
/**
* Send empty packet.
*
* @throws IOException if socket error occur.
*/
public void writeEmptyPacket() throws IOException {
buf[0] = (byte) 0x00;
buf[1] = (byte) 0x00;
buf[2] = (byte) 0x00;
buf[3] = this.sequence.incrementAndGet();
out.write(buf, 0, 4);
if (logger.isTraceEnabled()) {
logger.trace(
"send com : content length=0 {}\n{}", serverThreadLog, LoggerHelper.hex(buf, 0, 4));
}
out.flush();
cmdLength = 0;
}
/**
* Send packet to socket.
*
* @throws IOException if socket error occur.
*/
public void flush() throws IOException {
writeSocket(true);
// if buf is big, and last query doesn't use at least half of it, resize buf to default
// value
if (buf.length > SMALL_BUFFER_SIZE && cmdLength * 2 < buf.length) {
buf = new byte[SMALL_BUFFER_SIZE];
}
pos = 4;
cmdLength = 0;
mark = -1;
}
public void flushPipeline() throws IOException {
writeSocket(false);
// if buf is big, and last query doesn't use at least half of it, resize buf to default
// value
if (buf.length > SMALL_BUFFER_SIZE && cmdLength * 2 < buf.length) {
buf = new byte[SMALL_BUFFER_SIZE];
}
pos = 4;
cmdLength = 0;
mark = -1;
}
/**
* Count query size. If query size is greater than max_allowed_packet and nothing has been already
* send, throw an exception to avoid having the connection closed.
*
* @param length additional length to query size
* @throws MaxAllowedPacketException if query has not to be sent.
*/
private void checkMaxAllowedLength(int length) throws MaxAllowedPacketException {
if (maxAllowedPacket != null) {
if (cmdLength + length >= maxAllowedPacket) {
// launch exception only if no packet has been sent.
throw new MaxAllowedPacketException(
"query size ("
+ (cmdLength + length)
+ ") is >= to max_allowed_packet ("
+ maxAllowedPacket
+ ")",
cmdLength != 0);
}
}
}
public boolean throwMaxAllowedLength(int length) {
if (maxAllowedPacket != null) return cmdLength + length >= maxAllowedPacket;
return false;
}
public void permitTrace(boolean permitTrace) {
this.permitTrace = permitTrace;
}
/**
* Set server thread id.
*
* @param serverThreadId current server thread id.
* @param hostAddress host information
*/
public void setServerThreadId(Long serverThreadId, HostAddress hostAddress) {
Boolean isMaster = hostAddress != null ? hostAddress.primary : null;
this.serverThreadLog =
"conn="
+ (serverThreadId == null ? "-1" : serverThreadId)
+ ((isMaster != null) ? " (" + (isMaster ? "M" : "S") + ")" : "");
}
public void mark() {
mark = pos;
}
public boolean isMarked() {
return mark != -1;
}
public boolean hasFlushed() {
return sequence.get() != -1;
}
/**
* Flush to last mark.
*
* @throws IOException if flush fail.
*/
public void flushBufferStopAtMark() throws IOException {
final int end = pos;
pos = mark;
writeSocket(true);
out.flush();
initPacket();
System.arraycopy(buf, mark, buf, pos, end - mark);
pos += end - mark;
mark = -1;
bufContainDataAfterMark = true;
}
public boolean bufIsDataAfterMark() {
return bufContainDataAfterMark;
}
/**
* Reset mark flag and send bytes after mark flag.
*
* @return bytes after mark flag
*/
public byte[] resetMark() {
pos = mark;
mark = -1;
if (bufContainDataAfterMark) {
byte[] data = Arrays.copyOfRange(buf, 4, pos);
initPacket();
bufContainDataAfterMark = false;
return data;
}
return null;
}
public void initPacket() {
sequence.set((byte) -1);
compressSequence.set((byte) -1);
pos = 4;
cmdLength = 0;
}
/**
* Flush the internal buf.
*
* @param commandEnd command end
* @throws IOException id connection error occur.
*/
protected void writeSocket(boolean commandEnd) throws IOException {
if (pos > 4) {
buf[0] = (byte) (pos - 4);
buf[1] = (byte) ((pos - 4) >>> 8);
buf[2] = (byte) ((pos - 4) >>> 16);
buf[3] = this.sequence.incrementAndGet();
checkMaxAllowedLength(pos - 4);
out.write(buf, 0, pos);
if (commandEnd) out.flush();
cmdLength += pos - 4;
if (logger.isTraceEnabled()) {
if (permitTrace) {
logger.trace(
"send: {}\n{}", serverThreadLog, LoggerHelper.hex(buf, 0, pos, maxQuerySizeToLog));
} else {
logger.trace("send: content length={} {} com=<hidden>", pos - 4, serverThreadLog);
}
}
// if last com fill the max size, must send an empty com to indicate command end.
if (commandEnd && pos == maxPacketLength) {
writeEmptyPacket();
}
pos = 4;
}
}
public void close() throws IOException {
out.close();
}
}