Skip to content

Commit 4063698

Browse files
authored
feat: add grpc socket and flattn tx batches to allow for lower allocations (#3297)
* add grpc socket and flattn tx batches to allow for lower allocations * redo proto * docs: update changelog for grpc execution transport * remove extra txs
1 parent 1f1f48e commit 4063698

18 files changed

Lines changed: 1316 additions & 152 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
### Changes
1313

1414
- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
15+
- Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket` [#3297](https://github.com/evstack/ev-node/pull/3297)
16+
- **BREAKING:** Replace legacy gRPC execution `txs` payload fields with `tx_batch` so clients and servers use contiguous transaction buffers [#3297](https://github.com/evstack/ev-node/pull/3297)
1517
- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298)
1618
- Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299)
1719

apps/grpc/README.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
# gRPC Single Sequencer App
22

3-
This application runs a Evolve node with a single sequencer that connects to a remote execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
3+
This application runs an Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface.
44

55
## Overview
66

77
The gRPC single sequencer app provides:
88

9-
- A Evolve consensus node with single sequencer
10-
- Connection to remote execution clients via gRPC
9+
- An Evolve consensus node with single sequencer
10+
- Connection to execution clients via TCP or Unix domain socket gRPC
1111
- Full data availability layer integration
1212
- P2P networking capabilities
1313

@@ -58,11 +58,20 @@ Start the Evolve node with:
5858
--da.auth-token your-da-token
5959
```
6060

61+
For a same-machine executor, use a Unix domain socket endpoint:
62+
63+
```bash
64+
./evgrpc start \
65+
--root-dir ~/.evgrpc \
66+
--grpc-executor-url unix:///tmp/evolve-executor.sock \
67+
--da.address http://localhost:7980
68+
```
69+
6170
## Command-Line Flags
6271

6372
### gRPC-specific Flags
6473

65-
- `--grpc-executor-url`: URL of the gRPC execution service (default: `http://localhost:50051`)
74+
- `--grpc-executor-url`: URL of the gRPC execution service, either `http://host:port` or `unix:///path/to/socket` (default: `http://localhost:50051`)
6675

6776
### Common Evolve Flags
6877

apps/grpc/cmd/run.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828

2929
const (
3030
grpcDbName = "grpc-single"
31-
// FlagGrpcExecutorURL is the flag for the gRPC executor endpoint
31+
// FlagGrpcExecutorURL is the flag for the gRPC executor endpoint.
3232
FlagGrpcExecutorURL = "grpc-executor-url"
3333
)
3434

@@ -163,11 +163,10 @@ func createGRPCExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
163163
return nil, fmt.Errorf("%s flag is required", FlagGrpcExecutorURL)
164164
}
165165

166-
// Create and return the gRPC client
167-
return executiongrpc.NewClient(executorURL), nil
166+
return executiongrpc.NewClient(executorURL)
168167
}
169168

170169
// addGRPCFlags adds flags specific to the gRPC execution client
171170
func addGRPCFlags(cmd *cobra.Command) {
172-
cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service")
171+
cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service, or unix:///path/to/executor.sock")
173172
}

buf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ lint:
1414
breaking:
1515
use:
1616
- FILE
17+
ignore_only:
18+
FIELD_NO_DELETE:
19+
- proto/evnode/v1/execution.proto

client/crates/types/src/proto/evnode.v1.messages.rs

Lines changed: 147 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,19 @@ pub struct SignedHeader {
7676
#[prost(message, optional, tag = "3")]
7777
pub signer: ::core::option::Option<Signer>,
7878
}
79+
/// DAHeaderEnvelope is a wrapper around SignedHeader for DA submission.
80+
/// It is binary compatible with SignedHeader (fields 1-3) but adds an envelope signature.
81+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
82+
pub struct DaHeaderEnvelope {
83+
#[prost(message, optional, tag = "1")]
84+
pub header: ::core::option::Option<Header>,
85+
#[prost(bytes = "vec", tag = "2")]
86+
pub signature: ::prost::alloc::vec::Vec<u8>,
87+
#[prost(message, optional, tag = "3")]
88+
pub signer: ::core::option::Option<Signer>,
89+
#[prost(bytes = "vec", tag = "4")]
90+
pub envelope_signature: ::prost::alloc::vec::Vec<u8>,
91+
}
7992
/// Signer is a signer of a block in the blockchain.
8093
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
8194
pub struct Signer {
@@ -139,6 +152,28 @@ pub struct Vote {
139152
#[prost(bytes = "vec", tag = "5")]
140153
pub validator_address: ::prost::alloc::vec::Vec<u8>,
141154
}
155+
/// P2PSignedHeader
156+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
157+
pub struct P2pSignedHeader {
158+
#[prost(message, optional, tag = "1")]
159+
pub header: ::core::option::Option<Header>,
160+
#[prost(bytes = "vec", tag = "2")]
161+
pub signature: ::prost::alloc::vec::Vec<u8>,
162+
#[prost(message, optional, tag = "3")]
163+
pub signer: ::core::option::Option<Signer>,
164+
#[prost(uint64, optional, tag = "4")]
165+
pub da_height_hint: ::core::option::Option<u64>,
166+
}
167+
/// P2PData
168+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
169+
pub struct P2pData {
170+
#[prost(message, optional, tag = "1")]
171+
pub metadata: ::core::option::Option<Metadata>,
172+
#[prost(bytes = "vec", repeated, tag = "2")]
173+
pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
174+
#[prost(uint64, optional, tag = "3")]
175+
pub da_height_hint: ::core::option::Option<u64>,
176+
}
142177
/// State is the state of the blockchain.
143178
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
144179
pub struct State {
@@ -159,6 +194,24 @@ pub struct State {
159194
#[prost(bytes = "vec", tag = "9")]
160195
pub last_header_hash: ::prost::alloc::vec::Vec<u8>,
161196
}
197+
/// RaftBlockState represents a replicated block state
198+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
199+
pub struct RaftBlockState {
200+
#[prost(uint64, tag = "1")]
201+
pub height: u64,
202+
#[prost(uint64, tag = "2")]
203+
pub last_submitted_da_header_height: u64,
204+
#[prost(uint64, tag = "3")]
205+
pub last_submitted_da_data_height: u64,
206+
#[prost(bytes = "vec", tag = "4")]
207+
pub hash: ::prost::alloc::vec::Vec<u8>,
208+
#[prost(uint64, tag = "5")]
209+
pub timestamp: u64,
210+
#[prost(bytes = "vec", tag = "6")]
211+
pub header: ::prost::alloc::vec::Vec<u8>,
212+
#[prost(bytes = "vec", tag = "7")]
213+
pub data: ::prost::alloc::vec::Vec<u8>,
214+
}
162215
/// SequencerDACheckpoint tracks the position in the DA where transactions were last processed
163216
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
164217
pub struct SequencerDaCheckpoint {
@@ -212,6 +265,17 @@ pub struct Batch {
212265
#[prost(bytes = "vec", repeated, tag = "1")]
213266
pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
214267
}
268+
/// BlockData contains data retrieved from a single DA height.
269+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
270+
pub struct BlockData {
271+
#[prost(uint64, tag = "1")]
272+
pub height: u64,
273+
/// Unix timestamp in nanoseconds
274+
#[prost(int64, tag = "2")]
275+
pub timestamp: i64,
276+
#[prost(bytes = "vec", repeated, tag = "3")]
277+
pub blobs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
278+
}
215279
/// InitChainRequest contains the genesis parameters for chain initialization
216280
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
217281
pub struct InitChainRequest {
@@ -231,28 +295,32 @@ pub struct InitChainResponse {
231295
/// Hash representing initial state
232296
#[prost(bytes = "vec", tag = "1")]
233297
pub state_root: ::prost::alloc::vec::Vec<u8>,
234-
/// Maximum allowed bytes for transactions in a block
235-
#[prost(uint64, tag = "2")]
236-
pub max_bytes: u64,
237298
}
238299
/// GetTxsRequest is the request for fetching transactions
239300
///
240301
/// Empty for now, may include filtering criteria in the future
241302
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
242303
pub struct GetTxsRequest {}
304+
/// TxBatch stores ordered transactions in one contiguous bytes buffer.
305+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
306+
pub struct TxBatch {
307+
/// Concatenated transaction bytes.
308+
#[prost(bytes = "vec", tag = "1")]
309+
pub data: ::prost::alloc::vec::Vec<u8>,
310+
/// Byte length for each transaction in order.
311+
#[prost(uint32, repeated, tag = "2")]
312+
pub tx_sizes: ::prost::alloc::vec::Vec<u32>,
313+
}
243314
/// GetTxsResponse contains the available transactions
244315
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
245316
pub struct GetTxsResponse {
246-
/// Slice of valid transactions from mempool
247-
#[prost(bytes = "vec", repeated, tag = "1")]
248-
pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
317+
/// Valid transactions from mempool in contiguous batch form.
318+
#[prost(message, optional, tag = "2")]
319+
pub tx_batch: ::core::option::Option<TxBatch>,
249320
}
250321
/// ExecuteTxsRequest contains transactions and block context for execution
251322
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
252323
pub struct ExecuteTxsRequest {
253-
/// Ordered list of transactions to execute
254-
#[prost(bytes = "vec", repeated, tag = "1")]
255-
pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
256324
/// Height of block being created (must be > 0)
257325
#[prost(uint64, tag = "2")]
258326
pub block_height: u64,
@@ -262,6 +330,9 @@ pub struct ExecuteTxsRequest {
262330
/// Previous block's state root hash
263331
#[prost(bytes = "vec", tag = "4")]
264332
pub prev_state_root: ::prost::alloc::vec::Vec<u8>,
333+
/// Ordered transactions to execute in contiguous batch form.
334+
#[prost(message, optional, tag = "5")]
335+
pub tx_batch: ::core::option::Option<TxBatch>,
265336
}
266337
/// ExecuteTxsResponse contains the result of transaction execution
267338
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
@@ -285,6 +356,73 @@ pub struct SetFinalRequest {
285356
/// Empty response, errors are returned via gRPC status
286357
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
287358
pub struct SetFinalResponse {}
359+
/// GetExecutionInfoRequest requests execution layer parameters
360+
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
361+
pub struct GetExecutionInfoRequest {}
362+
/// GetExecutionInfoResponse contains execution layer parameters
363+
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
364+
pub struct GetExecutionInfoResponse {
365+
/// Maximum gas allowed for transactions in a block
366+
/// For non-gas-based execution layers, this should be 0
367+
#[prost(uint64, tag = "1")]
368+
pub max_gas: u64,
369+
}
370+
/// FilterTxsRequest contains transactions to validate and filter
371+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
372+
pub struct FilterTxsRequest {
373+
/// Maximum cumulative size allowed (0 means no size limit)
374+
#[prost(uint64, tag = "2")]
375+
pub max_bytes: u64,
376+
/// Maximum cumulative gas allowed (0 means no gas limit)
377+
#[prost(uint64, tag = "3")]
378+
pub max_gas: u64,
379+
/// Whether force-included transactions are present
380+
#[prost(bool, tag = "4")]
381+
pub has_force_included_transaction: bool,
382+
/// All transactions (force-included + mempool) in contiguous batch form.
383+
#[prost(message, optional, tag = "5")]
384+
pub tx_batch: ::core::option::Option<TxBatch>,
385+
}
386+
/// FilterTxsResponse contains the filter status for each transaction
387+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
388+
pub struct FilterTxsResponse {
389+
/// Filter status for each transaction (same length as txs in request)
390+
#[prost(enumeration = "FilterStatus", repeated, tag = "1")]
391+
pub statuses: ::prost::alloc::vec::Vec<i32>,
392+
}
393+
/// FilterStatus represents the result of filtering a transaction
394+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
395+
#[repr(i32)]
396+
pub enum FilterStatus {
397+
/// Transaction will make it to the next batch
398+
FilterOk = 0,
399+
/// Transaction will be filtered out because invalid (too big, malformed, etc.)
400+
FilterRemove = 1,
401+
/// Transaction is valid but postponed for later processing due to size/gas constraint
402+
FilterPostpone = 2,
403+
}
404+
impl FilterStatus {
405+
/// String value of the enum field names used in the ProtoBuf definition.
406+
///
407+
/// The values are not transformed in any way and thus are considered stable
408+
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
409+
pub fn as_str_name(&self) -> &'static str {
410+
match self {
411+
Self::FilterOk => "FILTER_OK",
412+
Self::FilterRemove => "FILTER_REMOVE",
413+
Self::FilterPostpone => "FILTER_POSTPONE",
414+
}
415+
}
416+
/// Creates an enum from field names used in the ProtoBuf definition.
417+
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
418+
match value {
419+
"FILTER_OK" => Some(Self::FilterOk),
420+
"FILTER_REMOVE" => Some(Self::FilterRemove),
421+
"FILTER_POSTPONE" => Some(Self::FilterPostpone),
422+
_ => None,
423+
}
424+
}
425+
}
288426
/// Block contains all the components of a complete block
289427
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
290428
pub struct Block {

0 commit comments

Comments
 (0)