Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.client.api;

import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryString;

Check warning on line 4 in client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'com.clickhouse.client.api.data_formats.internal.BinaryString'.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1zt1tZ_4yywg-9BRWz&open=AZ1zt1tZ_4yywg-9BRWz&pullRequest=2813
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -70,14 +71,19 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private TableSchema schema;
private ClickHouseColumn[] columns;
private Class<?>[] columnTypeHints;
private Map[] convertions;
private Map<ClickHouseDataType, Class<?>> defaultTypeHintMap;
private boolean hasNext = true;
private boolean initialState = true; // reader is in initial state, no records have been read yet
private long row = -1; // before first row
private long lastNextCallTs; // for exception to detect slow reader

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
this.input = inputStream;
this.defaultTypeHintMap = defaultTypeHintMap == null ? Collections.emptyMap() : defaultTypeHintMap;
Map<String, Object> settings = querySettings == null ? Collections.emptyMap() : querySettings.getAllSettings();
Boolean useServerTimeZone = (Boolean) settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
TimeZone timeZone = (useServerTimeZone == Boolean.TRUE && querySettings != null) ?
Expand All @@ -89,7 +95,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
boolean jsonAsString = MapUtils.getFlag(settings,
ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString,
defaultTypeHintMap);
defaultTypeHintMap, ByteBuffer::allocate);
if (schema != null) {
setSchema(schema);
}
Expand Down Expand Up @@ -188,7 +194,7 @@ protected boolean readRecord(Object[] record) throws IOException {
boolean firstColumn = true;
for (int i = 0; i < columns.length; i++) {
try {
Object val = binaryStreamReader.readValue(columns[i]);
Object val = binaryStreamReader.readValue(columns[i], columnTypeHints[i]);
if (val != null) {
record[i] = val;
} else {
Expand All @@ -212,13 +218,18 @@ public <T> T readValue(int colIndex) {
if (colIndex < 1 || colIndex > getSchema().getColumns().size()) {
throw new ClientException("Column index out of bounds: " + colIndex);
}
return (T) currentRecord[colIndex - 1];

T value = (T) currentRecord[colIndex - 1];
if (value instanceof BinaryString) {
return (T) ((BinaryString) value).asString();
}
return value;
}

@SuppressWarnings("unchecked")
@Override
public <T> T readValue(String colName) {
return (T) currentRecord[getSchema().nameToIndex(colName)];
return readValue(getSchema().nameToColumnIndex(colName));
}

@Override
Expand Down Expand Up @@ -300,16 +311,21 @@ protected void setSchema(TableSchema schema) {
this.schema = schema;
this.columns = schema.getColumns().toArray(ClickHouseColumn.EMPTY_ARRAY);
this.convertions = new Map[columns.length];

this.columnTypeHints = new Class[columns.length];
this.currentRecord = new Object[columns.length];
this.nextRecord = new Object[columns.length];

Class<?> stringTypeHint = defaultTypeHintMap.get(ClickHouseDataType.String);

for (int i = 0; i < columns.length; i++) {
ClickHouseColumn column = columns[i];
ClickHouseDataType columnDataType = column.getDataType();
if (columnDataType.equals(ClickHouseDataType.SimpleAggregateFunction)){
columnDataType = column.getNestedColumns().get(0).getDataType();
}
if (columnDataType.equals(ClickHouseDataType.String)) {
columnTypeHints[i] = stringTypeHint;
}
switch (columnDataType) {
case Int8:
case Int16:
Expand Down Expand Up @@ -530,9 +546,11 @@ private <T> T getPrimitiveArray(int index, Class<?> componentType) {
for (int i = 0; i < list.size(); i++) {
Array.set(array, i, list.get(i));
}
return (T)array;
return (T) array;
} else if (componentType == byte.class) {
if (value instanceof String) {
if (value instanceof BinaryString) {
return (T) ((BinaryString)value).asBytes();
} else if(value instanceof String) {
return (T) ((String) value).getBytes(StandardCharsets.UTF_8);
} else if (value instanceof InetAddress) {
return (T) ((InetAddress) value).getAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -33,6 +35,7 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Function;

/**
* This class is not thread safe and should not be shared between multiple threads.
Expand All @@ -51,6 +54,8 @@

private final ByteBufferAllocator bufferAllocator;

private final StringBufferAllocator stringBufferAllocator;

private final boolean jsonAsString;

private final Class<?> arrayDefaultTypeHint;
Expand All @@ -69,11 +74,17 @@
* @param jsonAsString - use string to serialize/deserialize JSON columns
* @param typeHintMapping - what type use as hint if hint is not set or may not be known.
*/
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString, Map<ClickHouseDataType, Class<?>> typeHintMapping) {
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log,
ByteBufferAllocator bufferAllocator,
boolean jsonAsString,
Map<ClickHouseDataType,
Class<?>> typeHintMapping,
StringBufferAllocator stringBufferAllocator) {
this.log = log == null ? NOPLogger.NOP_LOGGER : log;
this.timeZone = timeZone;
this.input = input;
this.bufferAllocator = bufferAllocator;
this.stringBufferAllocator = stringBufferAllocator;
this.jsonAsString = jsonAsString;

this.arrayDefaultTypeHint = typeHintMapping == null ||
Expand Down Expand Up @@ -121,13 +132,19 @@
switch (dataType) {
// Primitives
case FixedString: {
byte[] bytes = precision > STRING_BUFF.length ?
new byte[precision] : STRING_BUFF;
readNBytes(input, bytes, 0, precision);
return (T) new String(bytes, 0, precision, StandardCharsets.UTF_8);
if (typeHint == BinaryString.class) {
return (T) readBinaryString(precision, stringBufferAllocator::allocate);
} else {
return (T) readString(input, precision);
}
}
case String: {
return (T) readString();
if (typeHint == BinaryString.class) {
int len = readVarInt(input);
return (T) readBinaryString(len, stringBufferAllocator::allocate);
} else {
return (T) readString(input);
}
}
case Int8:
return (T) Byte.valueOf(readByte());
Expand Down Expand Up @@ -627,10 +644,10 @@
if (itemTypeColumn.isNullable() || itemTypeColumn.getDataType() == ClickHouseDataType.Variant) {
array = new ArrayValue(Object.class, len);
for (int i = 0; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
array.set(i, readArrayItemValue(itemTypeColumn));
}
} else {
Object firstValue = readValue(itemTypeColumn);
Object firstValue = readArrayItemValue(itemTypeColumn);
Class<?> itemClass = firstValue.getClass();
if (firstValue instanceof Byte) {
itemClass = byte.class;
Expand All @@ -657,12 +674,17 @@
array = new ArrayValue(itemClass, len);
array.set(0, firstValue);
for (int i = 1; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
array.set(i, readArrayItemValue(itemTypeColumn));
}
}
return array;
}

private Object readArrayItemValue(ClickHouseColumn itemTypeColumn) throws IOException {
Class<?> typeHint = itemTypeColumn.getDataType() == ClickHouseDataType.String ? String.class : null;
return readValue(itemTypeColumn, typeHint);
}

public void skipValue(ClickHouseColumn column) throws IOException {
readValue(column, null);
}
Expand Down Expand Up @@ -835,13 +857,18 @@
ClickHouseColumn valueType = column.getValueInfo();
LinkedHashMap<Object, Object> map = new LinkedHashMap<>(len);
for (int i = 0; i < len; i++) {
Object key = readValue(keyType);
Object value = readValue(valueType);
Object key = readMapKeyOrValue(keyType);
Object value = readMapKeyOrValue(valueType);
map.put(key, value);
}
return map;
}

private Object readMapKeyOrValue(ClickHouseColumn c) throws IOException {
Class<?> typeHint = c.getDataType() == ClickHouseDataType.String ? String.class : null;
return readValue(c, typeHint);
}

/**
* Reads a tuple.
* @param column - column information
Expand Down Expand Up @@ -1114,6 +1141,28 @@
return new String(dest, 0, len, StandardCharsets.UTF_8);
}

public BinaryString readBinaryString(int len, Function<Integer, ByteBuffer> bufferAllocator) throws IOException {

Check warning on line 1144 in client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this code to use the more specialised Functional Interface 'IntFunction<ByteBuffer>'

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1znqHzfrTUmNjb20JI&open=AZ1znqHzfrTUmNjb20JI&pullRequest=2813
ByteBuffer buffer = null;
if (len > 0) {
buffer = bufferAllocator.apply(len);
if (buffer == null) {
throw new IOException("bufferAllocator returned `null`");
}
if (buffer.hasArray()) {
readNBytes(input, buffer.array(), 0, len);
} else {
int left = len;
while (left > 0) {
int chunkSize = Math.min(STRING_BUFF.length, left);
readNBytes(input, STRING_BUFF, 0, chunkSize);
buffer.put(STRING_BUFF, 0, chunkSize);
left -= chunkSize;
}
}
}
return buffer == null ? null : new BinaryStringImpl(buffer);
}

/**
* Reads a decimal value from input stream.
* @param input - source of bytes
Expand All @@ -1122,6 +1171,10 @@
*/
public static String readString(InputStream input) throws IOException {
int len = readVarInt(input);
return readString(input, len);
}

public static String readString(InputStream input, int len) throws IOException {
if (len == 0) {
return "";
}
Expand All @@ -1140,6 +1193,10 @@
byte[] allocate(int size);
}

public interface StringBufferAllocator {
ByteBuffer allocate(int size);
}

/**
* Byte allocator that creates a new byte array for each request.
*/
Expand Down Expand Up @@ -1394,4 +1451,56 @@
}
return obj;
}

static final class BinaryStringImpl implements BinaryString {

private final ByteBuffer buffer;
private final int len;
private CharBuffer charBuffer = null;
private String strValue = null;

BinaryStringImpl(ByteBuffer buffer) {
this.buffer = buffer;
this.len = buffer.limit();
}

@Override
public String asString() {
if (strValue == null) {
if (buffer.hasArray()) {
strValue = new String(buffer.array(), StandardCharsets.UTF_8);
} else {
ensureCharBuffer();
strValue = charBuffer.toString();
}
}
return strValue;
}

@Override
public byte[] asBytes() {
if (buffer.hasArray()) {
return buffer.array();
}

throw new UnsupportedOperationException("String is stored out of the heap and has no byte buffer easily accessible");
}

@Override
public int length() {
return len;
}

private void ensureCharBuffer() {
if (charBuffer == null) {
buffer.rewind();
charBuffer = StandardCharsets.UTF_8.decode(buffer);
}
}

@Override
public int compareTo(String o) {
return asString().compareTo(o);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.clickhouse.client.api.data_formats.internal;

public interface BinaryString extends Comparable<String> {


int length();

/**
* Converts raw bytes to a string whenever size is.
* @return String object
*/
String asString();

byte[] asBytes();
}
Loading
Loading