Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 103 additions & 86 deletions sentry/src/transports/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ pub struct CurlHttpTransport {
impl CurlHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, 30)
}

/// Creates a new Transport that uses the specified [`CurlClient`].
pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), 30)
Comment thread
mvanhorn marked this conversation as resolved.
}

fn new_internal(options: &ClientOptions, client: Option<CurlClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<CurlClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(CurlClient::new);
let http_proxy = options.http_proxy.as_ref().map(ToString::to_string);
let https_proxy = options.https_proxy.as_ref().map(ToString::to_string);
Expand All @@ -38,99 +51,103 @@ impl CurlHttpTransport {
let accept_invalid_certs = options.accept_invalid_certs;

let mut handle = client;
let thread = TransportThread::new(move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
let thread = TransportThread::with_capacity(
move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
_ => {}
}
_ => {}
}

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter = iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter =
iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
}
}
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
98 changes: 57 additions & 41 deletions sentry/src/transports/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,28 @@ pub struct ReqwestHttpTransport {
impl ReqwestHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, 30)
Comment thread
mvanhorn marked this conversation as resolved.
}

/// Creates a new Transport that uses the specified [`ReqwestClient`].
pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), 30)
}

fn new_internal(options: &ClientOptions, client: Option<ReqwestClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<ReqwestClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(|| {
let mut builder = reqwest::Client::builder();
if options.accept_invalid_certs {
Expand Down Expand Up @@ -64,53 +77,56 @@ impl ReqwestHttpTransport {
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();

let thread = TransportThread::new(move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);
let thread = TransportThread::with_capacity(
move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);

// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();
// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();

if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}
if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}

let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
}
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
}
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
rl
}
rl
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
19 changes: 16 additions & 3 deletions sentry/src/transports/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,25 @@ pub struct TransportThread {
}

impl TransportThread {
/// Spawn a new background thread.
pub fn new<SendFn>(mut send: SendFn) -> Self
/// Spawn a new background thread with the default channel capacity of 30.
pub fn new<SendFn>(send: SendFn) -> Self
where
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(30);
Self::with_capacity(send, 30)
}

/// Spawn a new background thread with a custom channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to
/// avoid a rendezvous channel, which would silently drop envelopes under
/// `try_send`.
pub fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self
where
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(channel_capacity.max(1));
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
Expand Down
Loading