Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ atomic_immut = "0.1"
bytecodec = "0.4"
cannyls = "0.10"
factory = "0.1"
fibers = "0.1"
fibers_rpc = "0.3"
futures = "0.1"
futures = "0.3"
pin-project = "1.0.1"
protobuf_codec = "0.2"
slog = "2"
tokio = { version = "1.0", features = ["sync"] }
trackable = "0.2"

[dev-dependencies]
tokio = { version = "1.0", features = ["rt"] }
tempdir = "0.3"

[patch.crates-io]
fibers = { git = "https://github.com/dwango/fibers-rs", branch = "feature/tokio-1.0" }
fibers_rpc = { git = "https://github.com/koba-e964/fibers_rpc", branch = "feature/tokio-1.0" }
cannyls = { git = "https://github.com/frugalos/cannyls", branch = "feature/tokio-1.0" }
trackable = { git = "https://github.com/koba-e964/trackable", branch = "feature/trackable-for-poll" }
43 changes: 22 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use cannyls::deadline::Deadline;
use cannyls::lump::{LumpData, LumpHeader, LumpId};
use cannyls::storage::StorageUsage;
use cannyls::{Error, ErrorKind, Result};
use cannyls::{ErrorKind, Result};
use fibers_rpc::{self, Call};
use futures::{Async, Future, Poll};
use futures::future::FutureExt;
use futures::Future;
use std::net::SocketAddr;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use trackable::error::ErrorKindExt;

use crate::device::DeviceId;
Expand Down Expand Up @@ -92,13 +95,13 @@ impl<'a> RequestBuilder<'a> {
&self,
device_id: DeviceId,
lump_id: LumpId,
) -> impl Future<Item = Option<Vec<u8>>, Error = Error> {
) -> impl Future<Output = Result<Option<Vec<u8>>>> {
let mut client = rpc::GetLumpRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

let request = self.lump_request(device_id, lump_id);
let future = Response::new(self.client.server, client.call(self.client.server, request));
future.map(|data| data.map(|d| d.into_bytes()))
future.map(|result| track!(result.map(|data| data.map(|d| d.into_bytes()))))
}

/// Lumpヘッダ(要約情報)の取得を行う.
Expand All @@ -114,7 +117,7 @@ impl<'a> RequestBuilder<'a> {
&self,
device_id: DeviceId,
lump_id: LumpId,
) -> impl Future<Item = Option<LumpHeader>, Error = Error> {
) -> impl Future<Output = Result<Option<LumpHeader>>> {
let mut client = rpc::HeadLumpRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand Down Expand Up @@ -142,7 +145,7 @@ impl<'a> RequestBuilder<'a> {
device_id: DeviceId,
lump_id: LumpId,
lump_data: LumpData,
) -> impl Future<Item = bool, Error = Error> {
) -> impl Future<Output = Result<bool>> {
let mut client = rpc::PutLumpRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand All @@ -169,7 +172,7 @@ impl<'a> RequestBuilder<'a> {
&self,
device_id: DeviceId,
lump_id: LumpId,
) -> impl Future<Item = bool, Error = Error> {
) -> impl Future<Output = Result<bool>> {
let mut client = rpc::DeleteLumpRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand All @@ -184,10 +187,7 @@ impl<'a> RequestBuilder<'a> {
/// 例えば、以下のようなエラーが返されることがある:
/// - 指定されたデバイスが存在しない場合には`ErrorKind::InvalidInput`
/// - 指定されたデバイスが現在利用不可能な場合には`ErrorKind::DeviceBusy`
pub fn list_lumps(
&self,
device_id: DeviceId,
) -> impl Future<Item = Vec<LumpId>, Error = Error> {
pub fn list_lumps(&self, device_id: DeviceId) -> impl Future<Output = Result<Vec<LumpId>>> {
let mut client = rpc::ListLumpRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand All @@ -209,7 +209,7 @@ impl<'a> RequestBuilder<'a> {
&self,
device_id: DeviceId,
range: Range<LumpId>,
) -> impl Future<Item = StorageUsage, Error = Error> {
) -> impl Future<Output = Result<StorageUsage>> {
let mut client = rpc::UsageRangeRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand All @@ -232,7 +232,7 @@ impl<'a> RequestBuilder<'a> {
&self,
device_id: DeviceId,
range: Range<LumpId>,
) -> impl Future<Item = Vec<LumpId>, Error = Error> {
) -> impl Future<Output = Result<Vec<LumpId>>> {
let mut client = rpc::DeleteRangeRpc::client(&self.client.rpc_service);
*client.options_mut() = self.rpc_options.clone();

Expand Down Expand Up @@ -271,9 +271,11 @@ impl<'a> RequestBuilder<'a> {
}
}

#[pin_project]
#[derive(Debug)]
struct Response<T> {
server: SocketAddr,
#[pin]
inner: fibers_rpc::client::Response<Result<T>>,
}
impl<T> Response<T> {
Expand All @@ -282,11 +284,11 @@ impl<T> Response<T> {
}
}
impl<T> Future for Response<T> {
type Item = T;
type Error = Error;
type Output = Result<T>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let server = self.server.clone();
track!(self.project().inner.poll(cx)).map(|result| match result {
Err(e) => {
let original_kind = *e.kind();
let kind = match original_kind {
Expand All @@ -295,10 +297,9 @@ impl<T> Future for Response<T> {
| fibers_rpc::ErrorKind::Unavailable
| fibers_rpc::ErrorKind::Other => ErrorKind::Other,
};
Err(track!(kind.takes_over(e); original_kind, self.server).into())
Err(track!(kind.takes_over(e); original_kind, server).into())
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(result)) => track!(result.map(Async::Ready); self.server),
}
Ok(result) => track!(result; server),
})
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ extern crate atomic_immut;
extern crate bytecodec;
extern crate cannyls;
extern crate factory;
extern crate fibers;
extern crate fibers_rpc;
extern crate futures;
#[macro_use]
extern crate pin_project;
extern crate protobuf_codec;
#[macro_use]
extern crate slog;
Expand Down
45 changes: 23 additions & 22 deletions src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use atomic_immut::AtomicImmut;
use cannyls::deadline::Deadline;
use cannyls::device::{Device, DeviceHandle};
use cannyls::{Error, ErrorKind, Result};
use fibers::sync::mpsc;
use futures::{Async, Future, Poll, Stream};
use cannyls::{ErrorKind, Result};
use futures::{Future, FutureExt};
use slog::Logger;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use trackable::error::ErrorKindExt;

use crate::device::DeviceId;
Expand All @@ -35,16 +37,16 @@ pub struct DeviceRegistry {
device_handles: DeviceHandles,

// レジストリに対するコマンド送受信チャンネル.
command_tx: mpsc::Sender<Command>,
command_rx: mpsc::Receiver<Command>,
command_tx: mpsc::UnboundedSender<Command>,
command_rx: mpsc::UnboundedReceiver<Command>,

// レジストリが停止中かどうかを示すためのフラグ.
being_stopped: bool,
}
impl DeviceRegistry {
/// 新しいレジストリインスタンスを生成する.
pub fn new(logger: Logger) -> Self {
let (command_tx, command_rx) = mpsc::channel();
let (command_tx, command_rx) = mpsc::unbounded_channel();
DeviceRegistry {
logger,
devices: HashMap::new(),
Expand Down Expand Up @@ -72,7 +74,7 @@ impl DeviceRegistry {
/// 1. 以後は、レジストリに対するデバイス登録要求、は全て無視される
/// 2. 全登録デバイスに停止命令が発行される
/// 3. 全デバイスが停止したら、レジストリ自体を停止する
/// - i.e., `Future:poll`の結果が`Ok(Async::Ready(()))`となる
/// - i.e., `Future:poll`の結果が`Poll::Ready(Ok(()))`となる
pub fn stop(&mut self) {
info!(self.logger, "Starts being_stopped all devices");
for (id, state) in &mut self.devices {
Expand Down Expand Up @@ -159,35 +161,34 @@ impl DeviceRegistry {
}
}
impl Future for DeviceRegistry {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready(command) = self.command_rx.poll().expect("Never fails") {
let command = command.expect("DeviceRegistryが`command_tx`を保持しているので、このストリームが終端することはない");
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while let Poll::Ready(Some(command)) = self.command_rx.poll_recv(cx) {
self.handle_command(command);
}

let logger = self.logger.clone();
for (id, state) in &mut self.devices {
if state.terminated {
continue;
}
match track!(state.device.poll()) {
Err(e) => {
error!(self.logger, "Device {:?} terminated abnormally: {}", id, e);
match track!(state.device.poll_unpin(cx)) {
Poll::Ready(Err(e)) => {
error!(logger, "Device {:?} terminated abnormally: {}", id, e);
state.terminated = true;
}
Ok(Async::Ready(())) => {
info!(self.logger, "Device {:?} terminated normally", id);
Poll::Ready(Ok(())) => {
info!(logger, "Device {:?} terminated normally", id);
state.terminated = true;
}
Ok(Async::NotReady) => {}
Poll::Pending => {}
}
}
if self.being_stopped && self.devices.values().all(|d| d.terminated) {
info!(self.logger, "All devices have stopped");
Ok(Async::Ready(()))
info!(logger, "All devices have stopped");
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
Expand All @@ -200,7 +201,7 @@ impl Drop for DeviceRegistry {
/// デバイスレジストリを操作するためのハンドル.
#[derive(Debug, Clone)]
pub struct DeviceRegistryHandle {
command_tx: mpsc::Sender<Command>,
command_tx: mpsc::UnboundedSender<Command>,
device_handles: DeviceHandles,
}
impl DeviceRegistryHandle {
Expand Down
28 changes: 7 additions & 21 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use fibers_rpc::server::{HandleCall, Reply, ServerBuilder};
use futures::Future;

use crate::protobuf::PutLumpRequestDecoderFactory;
use crate::registry::DeviceRegistryHandle;
Expand Down Expand Up @@ -46,14 +45,14 @@ impl Server {
impl HandleCall<rpc::GetLumpRpc> for Server {
fn handle_call(&self, request: rpc::LumpRequest) -> Reply<rpc::GetLumpRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request.options.with(&device).get(request.lump_id).then(Ok);
let future = request.options.with(&device).get(request.lump_id);
Reply::future(future)
}
}
impl HandleCall<rpc::HeadLumpRpc> for Server {
fn handle_call(&self, request: rpc::LumpRequest) -> Reply<rpc::HeadLumpRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request.options.with(&device).head(request.lump_id).then(Ok);
let future = request.options.with(&device).head(request.lump_id);
Reply::future(future)
}
}
Expand All @@ -64,48 +63,35 @@ impl HandleCall<rpc::PutLumpRpc> for Server {
let future = request
.options
.with(&device)
.put(request.lump_id, lump_data)
.then(Ok);
.put(request.lump_id, lump_data);
Reply::future(future)
}
}
impl HandleCall<rpc::DeleteLumpRpc> for Server {
fn handle_call(&self, request: rpc::LumpRequest) -> Reply<rpc::DeleteLumpRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request
.options
.with(&device)
.delete(request.lump_id)
.then(Ok);
let future = request.options.with(&device).delete(request.lump_id);
Reply::future(future)
}
}
impl HandleCall<rpc::ListLumpRpc> for Server {
fn handle_call(&self, request: rpc::DeviceRequest) -> Reply<rpc::ListLumpRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request.options.with(&device).list().then(Ok);
let future = request.options.with(&device).list();
Reply::future(future)
}
}
impl HandleCall<rpc::UsageRangeRpc> for Server {
fn handle_call(&self, request: rpc::UsageRangeRequest) -> Reply<rpc::UsageRangeRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request
.options
.with(&device)
.usage_range(request.range)
.then(Ok);
let future = request.options.with(&device).usage_range(request.range);
Reply::future(future)
}
}
impl HandleCall<rpc::DeleteRangeRpc> for Server {
fn handle_call(&self, request: rpc::RangeLumpRequest) -> Reply<rpc::DeleteRangeRpc> {
let device = rpc_try!(self.registry.get_device(&request.device_id));
let future = request
.options
.with(&device)
.delete_range(request.range)
.then(Ok);
let future = request.options.with(&device).delete_range(request.range);
Reply::future(future)
}
}
Loading