Skip to content

Commit 7533969

Browse files
cratelynconorsch
authored andcommitted
rpc: 🌽 add an rpc endpoint
implement the `FaucetService` trait.
1 parent b8a2eca commit 7533969

File tree

4 files changed

+312
-0
lines changed

4 files changed

+312
-0
lines changed

‎src/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub use catchup::Catchup;
2323
#[cfg(feature = "rpc")]
2424
pub mod proto;
2525

26+
#[cfg(feature = "rpc")]
27+
pub mod rpc;
28+
2629
#[tokio::main]
2730
async fn main() -> anyhow::Result<()> {
2831
// Configuring logging

‎src/opt.rs

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ impl Opt {
2323
pub async fn exec(self) -> anyhow::Result<()> {
2424
match self.command {
2525
Command::Serve(serve) => serve.exec().await,
26+
Command::ServeRpc(rpc) => rpc.exec().await,
2627
Command::History(history) => history.exec().await,
2728
}
2829
}
@@ -32,6 +33,8 @@ impl Opt {
3233
pub enum Command {
3334
/// Run the bot, listening for Discord messages, dispensing tokens, and replying to users.
3435
Serve(serve::Serve),
36+
/// Run the bot, listening for RPC calls, and responding as appropriate.
37+
ServeRpc(rpc::ServeRpc),
3538
/// Export the history of requests from the channel as CSV to stdout.
3639
History(history::History),
3740
}

‎src/opt/rpc.rs

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use {
2+
crate::Responder,
3+
anyhow::{anyhow, Context},
4+
clap::Parser,
5+
penumbra_asset::Value,
6+
std::{net::SocketAddr, path::PathBuf},
7+
tap::Tap,
8+
};
9+
10+
/// Run galileo on an RPC endpoint.
11+
#[derive(Debug, Clone, Parser)]
12+
pub struct ServeRpc {
13+
/// The number of accounts to send funds from. Funds will send from account indices `[0, n-1]`.
14+
#[clap(long, default_value = "4")]
15+
account_count: u32,
16+
/// The path to the directory to use to store data [default: platform appdata directory].
17+
#[clap(long, short)]
18+
data_dir: Option<PathBuf>,
19+
/// The URL of the pd gRPC endpoint on the remote node.
20+
///
21+
/// This is the node that transactions will be sent to.
22+
#[clap(short, long, default_value = "https://grpc.testnet.penumbra.zone")]
23+
node: url::Url,
24+
/// Bind the gRPC server to this socket.
25+
///
26+
/// The gRPC server supports both grpc (HTTP/2) and grpc-web (HTTP/1.1) clients.
27+
///
28+
/// If `grpc_auto_https` is set, this defaults to `0.0.0.0:443` and uses HTTPS.
29+
///
30+
/// If `grpc_auto_https` is not set, this defaults to `127.0.0.1:8080` without HTTPS.
31+
#[clap(short, long)]
32+
grpc_bind: Option<SocketAddr>,
33+
/// If set, serve gRPC using auto-managed HTTPS with this domain name.
34+
///
35+
/// NOTE: This option automatically provisions TLS certificates from Let's Encrypt and caches
36+
/// them in the `home` directory. The production LE CA has rate limits, so be careful using
37+
/// this option. Avoid deleting the certificates and forcing re-issuance, which can lead to
38+
/// hitting the rate limit. See the `--acme-staging` option.
39+
#[clap(long, value_name = "DOMAIN")]
40+
grpc_auto_https: Option<String>,
41+
/// Enable use of the LetsEncrypt ACME staging API (https://letsencrypt.org/docs/staging-environment/),
42+
/// which is more forgiving of ratelimits. Set this option to `true` if you're trying out the
43+
/// `--grpc-auto-https` option for the first time, to validate your configuration, before
44+
/// subjecting yourself to production ratelimits.
45+
///
46+
/// This option has no effect if `--grpc-auto-https` is not set.
47+
#[clap(long)]
48+
acme_staging: bool,
49+
// Disable transaction sending. Useful for debugging.
50+
// TODO(kate): wire in dry-run mode.
51+
// #[clap(long)]
52+
// dry_run: bool,
53+
/// The amounts to send for each response, written as typed values 1.87penumbra, 12cubes, etc.
54+
values: Vec<Value>,
55+
}
56+
57+
impl ServeRpc {
58+
/// Run the bot, listening for RPC calls, and responding as appropriate.
59+
///
60+
/// This function should never return, unless an error of some kind is encountered.
61+
pub async fn exec(self) -> anyhow::Result<()> {
62+
self.preflight_checks()
63+
.await
64+
.context("failed preflight checks")?;
65+
66+
let Self {
67+
account_count,
68+
data_dir,
69+
node,
70+
grpc_bind,
71+
grpc_auto_https,
72+
acme_staging,
73+
values,
74+
} = self;
75+
76+
// Use the given `grpc_bind` address if one was specified. If not, we will choose a
77+
// default depending on whether or not `grpc_auto_https` was set. See the
78+
// `RootCommand::Start::grpc_bind` documentation above.
79+
let grpc_bind = {
80+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
81+
const HTTP_DEFAULT: SocketAddr =
82+
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
83+
const HTTPS_DEFAULT: SocketAddr =
84+
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 443);
85+
let default = || {
86+
if grpc_auto_https.is_some() {
87+
HTTPS_DEFAULT
88+
} else {
89+
HTTP_DEFAULT
90+
}
91+
};
92+
grpc_bind.unwrap_or_else(default)
93+
};
94+
95+
// Make a worker to handle the address queue.
96+
let service =
97+
crate::sender::service::init(data_dir.as_deref(), account_count, &node).await?;
98+
let (request_tx, responder) = Responder::new(service, 1, values.clone());
99+
let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel(1);
100+
let responder = tokio::spawn(async move { responder.run(cancel_tx).await });
101+
102+
// Next, create the RPC service.
103+
let make_svc = crate::rpc::rpc(request_tx)?
104+
.into_router()
105+
// Set rather permissive CORS headers for pd's gRPC: the service
106+
// should be accessible from arbitrary web contexts, such as localhost,
107+
// or any FQDN that wants to reference its data.
108+
.layer(tower_http::cors::CorsLayer::permissive())
109+
.into_make_service();
110+
111+
// Now start the GRPC server, initializing an ACME client to use as a certificate
112+
// resolver if auto-https has been enabled.
113+
macro_rules! spawn_grpc_server {
114+
($server:expr) => {
115+
tokio::task::spawn($server.serve(make_svc))
116+
};
117+
}
118+
let grpc_server = axum_server::bind(grpc_bind);
119+
let grpc_server = match grpc_auto_https {
120+
// Auto-https is enabled. Configure the axum accepter, and spawn an ACME worker.
121+
Some(domain) => {
122+
let data_dir = // we wouldn't need an error here if we hoist the default up.
123+
data_dir.ok_or(anyhow!("data directory must be set to use auto-https"))?;
124+
let (acceptor, acme_worker) =
125+
penumbra_auto_https::axum_acceptor(data_dir, domain, !acme_staging);
126+
// TODO(kate): we should eventually propagate errors from the ACME worker task.
127+
tokio::spawn(acme_worker);
128+
spawn_grpc_server!(grpc_server.acceptor(acceptor))
129+
}
130+
// Auto-https is not enabled. Spawn only the GRPC server.
131+
None => {
132+
spawn_grpc_server!(grpc_server)
133+
}
134+
}
135+
.tap(|_| tracing::info!(address = %grpc_bind, "grpc service is running"));
136+
137+
// Start the RPC server and the responder worker, listening for a cancellation event.
138+
tokio::select! {
139+
result = responder => result.map_err(anyhow::Error::from)?.context("error in responder service"),
140+
result = grpc_server => result.map_err(anyhow::Error::from)?.context("error in grpc service"),
141+
_ = cancel_rx.recv() => Err(anyhow::anyhow!("cancellation received")),
142+
}
143+
}
144+
145+
/// Perform sanity checks on CLI args prior to running.
146+
async fn preflight_checks(&self) -> anyhow::Result<()> {
147+
use num_traits::identities::Zero;
148+
149+
// TODO(kate): wire up dry-run mode.
150+
// if self.dry_run {
151+
// tracing::info!("dry-run mode is enabled, won't send transactions or post messages");
152+
// }
153+
154+
if self.values.is_empty() {
155+
anyhow::bail!("at least one value must be provided");
156+
} else if self.values.iter().any(|v| v.amount.value().is_zero()) {
157+
anyhow::bail!("all values must be non-zero");
158+
}
159+
160+
Ok(())
161+
}
162+
}

‎src/rpc.rs

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//! RPC facilities.
2+
//!
3+
//! Call [`rpc()`] to obtain a [`Router`] that can handle Galileo RPC calls.
4+
5+
use {
6+
crate::proto::faucet::v1::{
7+
faucet_service_server::{FaucetService, FaucetServiceServer},
8+
FaucetRequest, FaucetResponse,
9+
},
10+
anyhow::Context,
11+
tokio::sync::mpsc,
12+
tonic::{transport::server::Router, Request, Response, Status},
13+
};
14+
15+
/// Returns a new [`Router`] for handing Galileo RPC calls.
16+
pub fn rpc(request_tx: mpsc::Sender<crate::responder::Request>) -> anyhow::Result<Router> {
17+
let faucet = FaucetServiceServer::new(FaucetServer::new(request_tx));
18+
let reflection = tonic_reflection::server::Builder::configure()
19+
.register_encoded_file_descriptor_set(crate::proto::FILE_DESCRIPTOR_SET)
20+
.build()
21+
.context("could not configure grpc reflection service")?;
22+
23+
Ok(tonic::transport::server::Server::builder()
24+
.add_service(tonic_web::enable(faucet))
25+
.add_service(tonic_web::enable(reflection)))
26+
}
27+
28+
/// A faucet server.
29+
///
30+
/// Most importantly, this type implements [`FaucetService`]. This type is generic over an
31+
/// `S` service that provides a [`tower::Service`] implementation from [`SenderRequest`] tuples to
32+
/// the [`TransactionId`] of the transaction that sent funds to the address.
33+
#[derive(Clone)]
34+
struct FaucetServer {
35+
request_tx: mpsc::Sender<crate::responder::Request>,
36+
}
37+
38+
/// Errors encountered when [`FaucetService::send_funds()`] is called.
39+
enum SendFundsError {
40+
/// The address was not given.
41+
AddressMissing,
42+
/// The address was not valid.
43+
AddressInvalid(
44+
<penumbra_keys::Address as TryFrom<penumbra_proto::core::keys::v1::Address>>::Error,
45+
),
46+
/// Failed to send a request to the responder worker.
47+
SendRequestFailed,
48+
/// Failed to receive a response from the responder worker.
49+
ReceiveResponseFailed,
50+
/// The sender service failed.
51+
SenderError(String),
52+
}
53+
54+
// === impl FaucetServer ===
55+
56+
impl FaucetServer {
57+
/// Returns a new faucet server, wrapping the provided sender service.
58+
fn new(request_tx: mpsc::Sender<crate::responder::Request>) -> Self {
59+
Self { request_tx }
60+
}
61+
}
62+
63+
#[tonic::async_trait]
64+
impl FaucetService for FaucetServer {
65+
/// Sends funds to an address, returning the transaction hash.
66+
async fn send_funds(
67+
&self,
68+
request: Request<FaucetRequest>,
69+
) -> Result<Response<FaucetResponse>, Status> {
70+
let Self { request_tx } = self;
71+
72+
// Parse the inbound request, getting the address we will send funds to.
73+
let FaucetRequest { address } = request.into_inner();
74+
let address: penumbra_keys::Address = address
75+
.ok_or(SendFundsError::AddressMissing)?
76+
.try_into()
77+
.map_err(SendFundsError::AddressInvalid)?;
78+
79+
// Send the address to the responder worker, which will dispense tokens to the address.
80+
let (response_rx, request) = crate::responder::Request::new(address.clone());
81+
request_tx
82+
.send(request)
83+
.await
84+
.map_err(|_| SendFundsError::SendRequestFailed)?;
85+
86+
// Wait for the responder to send a response back.
87+
let crate::responder::Response {
88+
succeeded,
89+
failed,
90+
unparsed,
91+
remaining,
92+
} = response_rx
93+
.await
94+
.map_err(|_| SendFundsError::ReceiveResponseFailed)?;
95+
assert!(unparsed.is_empty(), "unparsed addresses should be empty");
96+
assert!(remaining.is_empty(), "no addresses should be remaining");
97+
98+
let transaction_id = match (succeeded.as_slice(), failed.as_slice()) {
99+
// We have received the address we sent funds to, and a transaction id.
100+
([(address_rx, id)], []) => {
101+
assert_eq!(
102+
address,
103+
address_rx.clone(),
104+
"responder worker dispensed tokens to the wrong address",
105+
);
106+
id.clone()
107+
}
108+
// We failed to send funds to this address.
109+
([], [(address_rx, error)]) => {
110+
assert_eq!(
111+
address,
112+
address_rx.clone(),
113+
"responder worker dispensed tokens to the wrong address",
114+
);
115+
return Err(error.clone()).map_err(SendFundsError::SenderError).map_err(Status::from);
116+
}
117+
_ => panic!("something has gone wrong, responder worker dispensed tokens to unexpected addresses"),
118+
};
119+
120+
Ok(FaucetResponse {
121+
transaction_id: Some(transaction_id.into()),
122+
})
123+
.map(Response::new)
124+
}
125+
}
126+
127+
// === impl SendFundsError ===
128+
129+
impl From<SendFundsError> for Status {
130+
fn from(error: SendFundsError) -> Status {
131+
use SendFundsError::*;
132+
match error {
133+
AddressMissing => Status::invalid_argument("no address was provided"),
134+
AddressInvalid(e) => {
135+
Status::invalid_argument(format!("address could not be parsed: '{e:?}'"))
136+
}
137+
SendRequestFailed => Status::unavailable("failed to send request to send worker"),
138+
SenderError(e) => Status::internal(format!("failed to send funds: '{e:?}'")),
139+
ReceiveResponseFailed => {
140+
Status::unavailable("failed to receive response from send worker")
141+
}
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)