99#include < future>
1010#include < iostream>
1111#include < string>
12+ #include < thread>
1213#include < utility>
1314#include < vector>
14- #include < thread>
1515
1616#include " catalog/catalog.hpp"
1717#include " common/cluster_manager.hpp"
@@ -132,7 +132,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
132132 std::vector<ColumnInfo> catalog_cols;
133133 uint16_t pos = 0 ;
134134 for (const auto & col : ct.columns ()) {
135- common::ValueType vtype = common::ValueType::TYPE_INT32; // Simplified for POC
135+ common::ValueType vtype = common::ValueType::TYPE_INT32; // Simplified for POC
136136 if (col.type_ == " TEXT" ) vtype = common::ValueType::TYPE_TEXT;
137137 catalog_cols.emplace_back (col.name_ , vtype, pos++);
138138 }
@@ -145,7 +145,8 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
145145 }
146146 }
147147
148- // Explicit forward to data nodes to ensure they have metadata IMMEDIATELY (POC workaround for Raft lag)
148+ // Explicit forward to data nodes to ensure they have metadata IMMEDIATELY (POC
149+ // workaround for Raft lag)
149150 network::ExecuteFragmentArgs args;
150151 args.sql = raw_sql;
151152 args.context_id = " ddl_sync" ;
@@ -155,10 +156,11 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
155156 network::RpcClient client (node.address , node.cluster_port );
156157 if (client.connect ()) {
157158 std::vector<uint8_t > resp;
158- static_cast <void >(client.call (network::RpcType::ExecuteFragment, payload, resp));
159+ static_cast <void >(
160+ client.call (network::RpcType::ExecuteFragment, payload, resp));
159161 }
160162 }
161-
163+
162164 res.set_rows_affected (1 );
163165 // Small sleep after DDL to let things settle
164166 std::this_thread::sleep_for (std::chrono::milliseconds (500 ));
@@ -357,12 +359,13 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
357359 const auto * insert_stmt = dynamic_cast <const parser::InsertStatement*>(&stmt);
358360 if (insert_stmt != nullptr && !insert_stmt->values ().empty ()) {
359361 std::unordered_map<uint32_t , std::vector<std::vector<std::string>>> partitions;
360-
362+
361363 for (const auto & row_exprs : insert_stmt->values ()) {
362364 if (row_exprs.empty ()) continue ;
363365 // Assume first column is sharding key
364366 if (row_exprs[0 ]->type () == parser::ExprType::Constant) {
365- const auto * const_expr = dynamic_cast <const parser::ConstantExpr*>(row_exprs[0 ].get ());
367+ const auto * const_expr =
368+ dynamic_cast <const parser::ConstantExpr*>(row_exprs[0 ].get ());
366369 if (const_expr != nullptr ) {
367370 const common::Value pk_val = const_expr->value ();
368371 const uint32_t shard_idx = cluster::ShardManager::compute_shard (
@@ -384,11 +387,13 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
384387 const auto & node = data_nodes[shard_idx];
385388 network::RpcClient client (node.address , node.cluster_port );
386389 if (client.connect ()) {
387- std::string shard_sql = " INSERT INTO " + insert_stmt->table ()->to_string () + " VALUES " ;
390+ std::string shard_sql =
391+ " INSERT INTO " + insert_stmt->table ()->to_string () + " VALUES " ;
388392 for (size_t i = 0 ; i < rows.size (); ++i) {
389393 shard_sql += " (" ;
390394 for (size_t j = 0 ; j < rows[i].size (); ++j) {
391- shard_sql += rows[i][j] + std::string (j == rows[i].size () - 1 ? " " : " , " );
395+ shard_sql +=
396+ rows[i][j] + std::string (j == rows[i].size () - 1 ? " " : " , " );
392397 }
393398 shard_sql += std::string (" )" ) + (i == rows.size () - 1 ? " " : " , " );
394399 }
@@ -411,22 +416,22 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
411416 errors += " [" + node.id + " ] Connect failed; " ;
412417 }
413418 }
414-
419+
415420 QueryResult res;
416421 if (!errors.empty ()) res.set_error (errors);
417422 res.set_rows_affected (total_affected);
418423 return res;
419424 }
420425 } else if (type == parser::StmtType::Select || type == parser::StmtType::Update ||
421426 type == parser::StmtType::Delete) {
422-
423427 bool is_join = false ;
424428 if (type == parser::StmtType::Select) {
425429 const auto * sel = dynamic_cast <const parser::SelectStatement*>(&stmt);
426430 if (sel && !sel->joins ().empty ()) is_join = true ;
427431 }
428432
429- // Try shard pruning based on WHERE clause, but ONLY if NOT a join (joins are complex in POC)
433+ // Try shard pruning based on WHERE clause, but ONLY if NOT a join (joins are complex in
434+ // POC)
430435 const parser::Expression* where_expr = nullptr ;
431436 if (!is_join) {
432437 if (type == parser::StmtType::Select) {
@@ -465,7 +470,8 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
465470 }
466471
467472 network::ExecuteFragmentArgs fragment_args;
468- // Strip LIMIT/OFFSET from fragment SQL to ensure data nodes return all rows for global processing
473+ // Strip LIMIT/OFFSET from fragment SQL to ensure data nodes return all rows for global
474+ // processing
469475 fragment_args.sql = (type == parser::StmtType::Select) ? strip_limit_offset (raw_sql) : raw_sql;
470476 fragment_args.context_id = context_id;
471477 auto fragment_payload = fragment_args.serialize ();
@@ -526,8 +532,10 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
526532 if (col->type () == parser::ExprType::Function) {
527533 const auto * func = dynamic_cast <const parser::FunctionExpr*>(col.get ());
528534 std::string name = func->name ();
529- std::transform (name.begin (), name.end (), name.begin (), [](unsigned char c){ return std::toupper (c); });
530- if (name == " COUNT" || name == " SUM" || name == " MIN" || name == " MAX" || name == " AVG" ) {
535+ std::transform (name.begin (), name.end (), name.begin (),
536+ [](unsigned char c) { return std::toupper (c); });
537+ if (name == " COUNT" || name == " SUM" || name == " MIN" || name == " MAX" ||
538+ name == " AVG" ) {
531539 is_global_aggregate = true ;
532540 agg_types.push_back (name);
533541 } else {
@@ -548,7 +556,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
548556 if (row.size () < agg_types.size ()) continue ;
549557 for (size_t i = 0 ; i < agg_types.size (); ++i) {
550558 if (agg_types[i].empty ()) continue ;
551-
559+
552560 const auto & val = row.get (i);
553561 if (val.is_null ()) continue ;
554562
@@ -569,7 +577,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
569577 }
570578 }
571579 }
572-
580+
573581 executor::Tuple merged_tuple;
574582 for (auto & v : final_vals) {
575583 merged_tuple.values ().push_back (std::move (v));
@@ -587,18 +595,21 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
587595 if (col_idx == static_cast <size_t >(-1 )) {
588596 // try unqualified
589597 size_t dot = col_name.find_last_of (' .' );
590- if (dot != std::string::npos) col_idx = res.schema ().find_column (col_name.substr (dot+1 ));
598+ if (dot != std::string::npos)
599+ col_idx = res.schema ().find_column (col_name.substr (dot + 1 ));
591600 }
592601
593602 if (col_idx == static_cast <size_t >(-1 )) {
594603 // Fallback for POC if ORDER BY key is not in projection
595604 col_idx = 0 ;
596605 }
597606
598- if (col_idx != static_cast <size_t >(-1 ) && col_idx < res.schema ().columns ().size ()) {
599- std::sort (aggregated_rows.begin (), aggregated_rows.end (), [col_idx](const auto & a, const auto & b) {
600- return a.get (col_idx) < b.get (col_idx);
601- });
607+ if (col_idx != static_cast <size_t >(-1 ) &&
608+ col_idx < res.schema ().columns ().size ()) {
609+ std::sort (aggregated_rows.begin (), aggregated_rows.end (),
610+ [col_idx](const auto & a, const auto & b) {
611+ return a.get (col_idx) < b.get (col_idx);
612+ });
602613 }
603614 }
604615 }
@@ -609,15 +620,16 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
609620 if (sel && (sel->has_limit () || sel->has_offset ())) {
610621 int64_t limit = sel->limit ();
611622 int64_t offset = sel->offset ();
612-
623+
613624 if (offset > 0 ) {
614625 if (static_cast <size_t >(offset) >= aggregated_rows.size ()) {
615626 aggregated_rows.clear ();
616627 } else {
617- aggregated_rows.erase (aggregated_rows.begin (), aggregated_rows.begin () + offset);
628+ aggregated_rows.erase (aggregated_rows.begin (),
629+ aggregated_rows.begin () + offset);
618630 }
619631 }
620-
632+
621633 if (limit >= 0 && static_cast <size_t >(limit) < aggregated_rows.size ()) {
622634 aggregated_rows.resize (limit);
623635 }
0 commit comments