Skip to content

Commit 70185ee

Browse files
authored
GEODE-10300: C++ native client: Allow locator responses greater than … (#970)
* GEODE-10300: Fix locator response size limit in C++ client If a response message from the locator to the C++ native client is longer than 3000 bytes the C++ native client will only read the first 3000 bytes. * GEODE-10300: Updated after review * GEODE-10300: Updated after review * GEODE-10300: Updated after another review * GEODE-10300: Updated after some more reviews * GEODE-10300: Some more changes after review. * GEODE-10300: Small change after review * GEODE-10300: Remove unneeded space
1 parent b3f8a08 commit 70185ee

13 files changed

Lines changed: 416 additions & 79 deletions

cppcache/include/geode/DataInput.hpp

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class APACHE_GEODE_EXPORT DataInput {
7373
*/
7474
inline bool readBoolean() {
7575
_GEODE_CHECK_BUFFER_SIZE(1);
76-
return *(m_buf++) == 1 ? true : false;
76+
return *(buffer_++) == 1 ? true : false;
7777
}
7878

7979
/**
@@ -89,8 +89,8 @@ class APACHE_GEODE_EXPORT DataInput {
8989
inline void readBytesOnly(uint8_t* buffer, size_t len) {
9090
if (len > 0) {
9191
_GEODE_CHECK_BUFFER_SIZE(len);
92-
std::memcpy(buffer, m_buf, len);
93-
m_buf += len;
92+
std::memcpy(buffer, buffer_, len);
93+
buffer_ += len;
9494
}
9595
}
9696

@@ -107,8 +107,8 @@ class APACHE_GEODE_EXPORT DataInput {
107107
inline void readBytesOnly(int8_t* buffer, size_t len) {
108108
if (len > 0) {
109109
_GEODE_CHECK_BUFFER_SIZE(len);
110-
std::memcpy(buffer, m_buf, len);
111-
m_buf += len;
110+
std::memcpy(buffer, buffer_, len);
111+
buffer_ += len;
112112
}
113113
}
114114

@@ -129,8 +129,8 @@ class APACHE_GEODE_EXPORT DataInput {
129129
if (length > 0) {
130130
_GEODE_CHECK_BUFFER_SIZE(length);
131131
_GEODE_NEW(buffer, uint8_t[length]);
132-
std::memcpy(buffer, m_buf, length);
133-
m_buf += length;
132+
std::memcpy(buffer, buffer_, length);
133+
buffer_ += length;
134134
}
135135
*bytes = buffer;
136136
}
@@ -152,8 +152,8 @@ class APACHE_GEODE_EXPORT DataInput {
152152
if (length > 0) {
153153
_GEODE_CHECK_BUFFER_SIZE(length);
154154
_GEODE_NEW(buffer, int8_t[length]);
155-
std::memcpy(buffer, m_buf, length);
156-
m_buf += length;
155+
std::memcpy(buffer, buffer_, length);
156+
buffer_ += length;
157157
}
158158
*bytes = buffer;
159159
}
@@ -173,10 +173,10 @@ class APACHE_GEODE_EXPORT DataInput {
173173
*/
174174
inline int32_t readInt32() {
175175
_GEODE_CHECK_BUFFER_SIZE(4);
176-
int32_t tmp = *(m_buf++);
177-
tmp = (tmp << 8) | *(m_buf++);
178-
tmp = (tmp << 8) | *(m_buf++);
179-
tmp = (tmp << 8) | *(m_buf++);
176+
int32_t tmp = *(buffer_++);
177+
tmp = (tmp << 8) | *(buffer_++);
178+
tmp = (tmp << 8) | *(buffer_++);
179+
tmp = (tmp << 8) | *(buffer_++);
180180
return tmp;
181181
}
182182

@@ -186,14 +186,14 @@ class APACHE_GEODE_EXPORT DataInput {
186186
inline int64_t readInt64() {
187187
_GEODE_CHECK_BUFFER_SIZE(8);
188188
int64_t tmp;
189-
tmp = *(m_buf++);
190-
tmp = (tmp << 8) | *(m_buf++);
191-
tmp = (tmp << 8) | *(m_buf++);
192-
tmp = (tmp << 8) | *(m_buf++);
193-
tmp = (tmp << 8) | *(m_buf++);
194-
tmp = (tmp << 8) | *(m_buf++);
195-
tmp = (tmp << 8) | *(m_buf++);
196-
tmp = (tmp << 8) | *(m_buf++);
189+
tmp = *(buffer_++);
190+
tmp = (tmp << 8) | *(buffer_++);
191+
tmp = (tmp << 8) | *(buffer_++);
192+
tmp = (tmp << 8) | *(buffer_++);
193+
tmp = (tmp << 8) | *(buffer_++);
194+
tmp = (tmp << 8) | *(buffer_++);
195+
tmp = (tmp << 8) | *(buffer_++);
196+
tmp = (tmp << 8) | *(buffer_++);
197197
return tmp;
198198
}
199199

@@ -396,33 +396,33 @@ class APACHE_GEODE_EXPORT DataInput {
396396
* as readonly and modification of contents using this internal pointer
397397
* has undefined behavior.
398398
*/
399-
inline const uint8_t* currentBufferPosition() const { return m_buf; }
399+
inline const uint8_t* currentBufferPosition() const { return buffer_; }
400400

401401
/** get the number of bytes read in the buffer */
402-
inline size_t getBytesRead() const { return m_buf - m_bufHead; }
402+
inline size_t getBytesRead() const { return buffer_ - bufferHead_; }
403403

404404
/** get the number of bytes remaining to be read in the buffer */
405405
inline size_t getBytesRemaining() const {
406-
return (m_bufLength - getBytesRead());
406+
return (bufferLength_ - getBytesRead());
407407
}
408408

409409
/** advance the cursor by given offset */
410-
inline void advanceCursor(size_t offset) { m_buf += offset; }
410+
inline void advanceCursor(size_t offset) { buffer_ += offset; }
411411

412412
/** rewind the cursor by given offset */
413-
inline void rewindCursor(size_t offset) { m_buf -= offset; }
413+
inline void rewindCursor(size_t offset) { buffer_ -= offset; }
414414

415415
/** reset the cursor to the start of buffer */
416-
inline void reset() { m_buf = m_bufHead; }
416+
inline void reset() { buffer_ = bufferHead_; }
417417

418418
inline void setBuffer() {
419-
m_buf = currentBufferPosition();
420-
m_bufLength = getBytesRemaining();
419+
buffer_ = currentBufferPosition();
420+
bufferLength_ = getBytesRemaining();
421421
}
422422

423-
inline void resetPdx(size_t offset) { m_buf = m_bufHead + offset; }
423+
inline void resetPdx(size_t offset) { buffer_ = bufferHead_ + offset; }
424424

425-
inline size_t getPdxBytes() const { return m_bufLength; }
425+
inline size_t getPdxBytes() const { return bufferLength_; }
426426

427427
static uint8_t* getBufferCopy(const uint8_t* from, size_t length) {
428428
uint8_t* result;
@@ -432,7 +432,7 @@ class APACHE_GEODE_EXPORT DataInput {
432432
return result;
433433
}
434434

435-
inline void reset(size_t offset) { m_buf = m_bufHead + offset; }
435+
inline void reset(size_t offset) { buffer_ = bufferHead_ + offset; }
436436

437437
uint8_t* getBufferCopyFrom(const uint8_t* from, size_t length) {
438438
uint8_t* result;
@@ -452,19 +452,19 @@ class APACHE_GEODE_EXPORT DataInput {
452452
DataInput& operator=(DataInput&&) = default;
453453

454454
protected:
455+
const uint8_t* buffer_;
456+
const uint8_t* bufferHead_;
457+
size_t bufferLength_;
458+
Pool* pool_;
459+
const CacheImpl* cache_;
460+
455461
/** constructor given a pre-allocated byte array with size */
456462
DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
457463
Pool* pool);
458464

459465
virtual const SerializationRegistry& getSerializationRegistry() const;
460466

461467
private:
462-
const uint8_t* m_buf;
463-
const uint8_t* m_bufHead;
464-
size_t m_bufLength;
465-
Pool* m_pool;
466-
const CacheImpl* m_cache;
467-
468468
std::shared_ptr<Serializable> readObjectInternal(int8_t typeId = -1);
469469

470470
template <typename mType>
@@ -502,21 +502,21 @@ class APACHE_GEODE_EXPORT DataInput {
502502

503503
inline char readPdxChar() { return static_cast<char>(readInt16()); }
504504

505-
inline void _checkBufferSize(size_t size, int32_t line) {
506-
if ((m_bufLength - (m_buf - m_bufHead)) < size) {
505+
virtual void _checkBufferSize(size_t size, int32_t line) {
506+
if ((bufferLength_ - (buffer_ - bufferHead_)) < size) {
507507
throw OutOfRangeException(
508508
"DataInput: attempt to read beyond buffer at line " +
509509
std::to_string(line) + ": available buffer size " +
510-
std::to_string(m_bufLength - (m_buf - m_bufHead)) +
510+
std::to_string(bufferLength_ - (buffer_ - bufferHead_)) +
511511
", attempted read of size " + std::to_string(size));
512512
}
513513
}
514514

515-
inline int8_t readNoCheck() { return *(m_buf++); }
515+
inline int8_t readNoCheck() { return *(buffer_++); }
516516

517517
inline int16_t readInt16NoCheck() {
518-
int16_t tmp = *(m_buf++);
519-
tmp = static_cast<int16_t>((tmp << 8) | *(m_buf++));
518+
int16_t tmp = *(buffer_++);
519+
tmp = static_cast<int16_t>((tmp << 8) | *(buffer_++));
520520
return tmp;
521521
}
522522

@@ -605,7 +605,7 @@ class APACHE_GEODE_EXPORT DataInput {
605605
value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
606606
}
607607

608-
Pool* getPool() const { return m_pool; }
608+
Pool* getPool() const { return pool_; }
609609

610610
friend Cache;
611611
friend CacheImpl;

cppcache/src/Connector.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ class Connector {
116116
*/
117117
virtual uint16_t getPort() = 0;
118118

119+
/**
120+
* Returns the remote endpoint for this connection in the form host:port
121+
*/
122+
virtual std::string getRemoteEndpoint() = 0;
123+
119124
/**
120125
* Writes an array of a known size to the underlying output stream.
121126
*

cppcache/src/DataInput.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,21 @@ namespace client {
3030

3131
DataInput::DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
3232
Pool* pool)
33-
: m_buf(buffer),
34-
m_bufHead(buffer),
35-
m_bufLength(len),
36-
m_pool(pool),
37-
m_cache(cache) {}
33+
: buffer_(buffer),
34+
bufferHead_(buffer),
35+
bufferLength_(len),
36+
pool_(pool),
37+
cache_(cache) {}
3838

3939
std::shared_ptr<Serializable> DataInput::readObjectInternal(int8_t typeId) {
4040
return getSerializationRegistry().deserialize(*this, typeId);
4141
}
4242

4343
const SerializationRegistry& DataInput::getSerializationRegistry() const {
44-
return *m_cache->getSerializationRegistry();
44+
return *cache_->getSerializationRegistry();
4545
}
4646

47-
Cache* DataInput::getCache() const { return m_cache->getCache(); }
47+
Cache* DataInput::getCache() const { return cache_->getCache(); }
4848

4949
template <class _Traits, class _Allocator>
5050
void DataInput::readJavaModifiedUtf8(
@@ -63,7 +63,7 @@ void DataInput::readJavaModifiedUtf8(
6363
uint16_t length = readInt16();
6464
_GEODE_CHECK_BUFFER_SIZE(length);
6565
value = internal::JavaModifiedUtf8::decode(
66-
reinterpret_cast<const char*>(m_buf), length);
66+
reinterpret_cast<const char*>(buffer_), length);
6767
advanceCursor(length);
6868
}
6969
template APACHE_GEODE_EXPLICIT_TEMPLATE_EXPORT void

cppcache/src/GetAllServersResponse.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ namespace geode {
2121
namespace client {
2222

2323
void GetAllServersResponse::toData(DataOutput& output) const {
24-
int32_t numServers = static_cast<int32_t>(m_servers.size());
25-
output.writeInt(numServers);
26-
for (int32_t i = 0; i < numServers; i++) {
27-
output.writeObject(m_servers.at(i));
24+
auto numServers = servers_.size();
25+
output.writeInt(static_cast<int32_t>(numServers));
26+
for (unsigned int i = 0; i < numServers; i++) {
27+
output.writeObject(servers_.at(i));
2828
}
2929
}
3030
void GetAllServersResponse::fromData(DataInput& input) {
@@ -33,7 +33,7 @@ void GetAllServersResponse::fromData(DataInput& input) {
3333
for (int i = 0; i < numServers; i++) {
3434
std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
3535
sLoc->fromData(input);
36-
m_servers.push_back(sLoc);
36+
servers_.push_back(sLoc);
3737
}
3838
}
3939

cppcache/src/GetAllServersResponse.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,27 @@ namespace client {
3535

3636
class GetAllServersResponse : public internal::DataSerializableFixedId_t<
3737
internal::DSFid::GetAllServersResponse> {
38-
std::vector<std::shared_ptr<ServerLocation> > m_servers;
39-
4038
public:
4139
static std::shared_ptr<Serializable> create() {
4240
return std::make_shared<GetAllServersResponse>();
4341
}
4442
GetAllServersResponse() : Serializable() {}
43+
explicit GetAllServersResponse(
44+
std::vector<std::shared_ptr<ServerLocation> > servers)
45+
: Serializable(), servers_(servers) {}
4546
void toData(DataOutput& output) const override;
4647
void fromData(DataInput& input) override;
4748

4849
size_t objectSize() const override {
49-
return sizeof(GetAllServersResponse) + m_servers.capacity();
50+
return sizeof(GetAllServersResponse) + servers_.capacity();
5051
}
5152
std::vector<std::shared_ptr<ServerLocation> > getServers() {
52-
return m_servers;
53+
return servers_;
5354
}
5455
~GetAllServersResponse() override = default;
56+
57+
private:
58+
std::vector<std::shared_ptr<ServerLocation> > servers_;
5559
};
5660

5761
} // namespace client

cppcache/src/StreamDataInput.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "StreamDataInput.hpp"
19+
20+
#include <geode/DataInput.hpp>
21+
22+
#include "Utils.hpp"
23+
#include "util/Log.hpp"
24+
25+
namespace apache {
26+
namespace geode {
27+
namespace client {
28+
29+
constexpr size_t kBufferSize = 3000;
30+
31+
StreamDataInput::StreamDataInput(std::chrono::milliseconds timeout,
32+
std::unique_ptr<Connector> connector,
33+
const CacheImpl* cache, Pool* pool)
34+
: DataInput(nullptr, 0, cache, pool),
35+
connector_(std::move(connector)),
36+
remainingTimeBeforeTimeout_(timeout) {}
37+
38+
void StreamDataInput::readDataIfNotAvailable(size_t size) {
39+
char buff[kBufferSize];
40+
while (getBytesRemaining() < size) {
41+
const auto start = std::chrono::system_clock::now();
42+
43+
const auto receivedLength = connector_->receive_nothrowiftimeout(
44+
buff, kBufferSize, remainingTimeBeforeTimeout_);
45+
46+
const auto timeSpent = std::chrono::system_clock::now() - start;
47+
48+
remainingTimeBeforeTimeout_ -=
49+
std::chrono::duration_cast<decltype(remainingTimeBeforeTimeout_)>(
50+
timeSpent);
51+
52+
LOGDEBUG(
53+
"received %d bytes from %s: %s, time spent: "
54+
"%ld millisecs, time remaining before timeout: %ld millisecs",
55+
receivedLength, connector_->getRemoteEndpoint().c_str(),
56+
Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
57+
receivedLength)
58+
.c_str(),
59+
std::chrono::duration_cast<std::chrono::milliseconds>(timeSpent)
60+
.count(),
61+
remainingTimeBeforeTimeout_.count());
62+
63+
if (remainingTimeBeforeTimeout_ <= std::chrono::milliseconds::zero()) {
64+
throw(TimeoutException(std::string("Timeout when receiving from ")
65+
.append(connector_->getRemoteEndpoint())));
66+
}
67+
68+
auto newLength = bufferLength_ + receivedLength;
69+
auto currentPosition = getBytesRead();
70+
streamBuf_.resize(newLength);
71+
memcpy(streamBuf_.data() + bufferLength_, buff, receivedLength);
72+
73+
bufferHead_ = streamBuf_.data();
74+
buffer_ = bufferHead_ + currentPosition;
75+
bufferLength_ = newLength;
76+
}
77+
}
78+
79+
} // namespace client
80+
} // namespace geode
81+
} // namespace apache

0 commit comments

Comments
 (0)