diff --git a/python/natsrpy/__init__.py b/python/natsrpy/__init__.py index 91e6406..624ff88 100644 --- a/python/natsrpy/__init__.py +++ b/python/natsrpy/__init__.py @@ -1,4 +1,4 @@ -from natsrpy._natsrpy_rs import Message, Nats, Subscription +from ._natsrpy_rs import Message, Nats, Subscription __all__ = [ "Message", diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 8bdbdcc..2a96d23 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -26,8 +26,8 @@ class PriorityPolicy: PRIORITIZED: PriorityPolicy class PullConsumerConfig: - durable_name: str | None name: str | None + durable_name: str | None description: str | None deliver_policy: DeliverPolicy delivery_start_sequence: int | None @@ -57,8 +57,8 @@ class PullConsumerConfig: def __init__( self, - durable_name: str | None = None, name: str | None = None, + durable_name: str | None = None, description: str | None = None, deliver_policy: DeliverPolicy | None = None, delivery_start_sequence: int | None = None, @@ -89,8 +89,8 @@ class PullConsumerConfig: class PushConsumerConfig: deliver_subject: str - durable_name: str | None name: str | None + durable_name: str | None description: str | None deliver_group: str | None deliver_policy: DeliverPolicy @@ -119,8 +119,8 @@ class PushConsumerConfig: def __init__( self, deliver_subject: str, - durable_name: str | None = None, name: str | None = None, + durable_name: str | None = None, description: str | None = None, deliver_group: str | None = None, deliver_policy: DeliverPolicy | None = None, diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 5da191a..07a595b 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -1,3 +1,4 @@ +from datetime import timedelta from typing import overload from .consumers import ( @@ -29,6 +30,15 @@ class ConsumersManager: async def create(self, config: PullConsumerConfig) -> PullConsumer: ... @overload async def create(self, config: PushConsumerConfig) -> PushConsumer: ... + @overload + async def update(self, config: PullConsumerConfig) -> PullConsumer: ... + @overload + async def update(self, config: PushConsumerConfig) -> PushConsumer: ... + async def get_pull(self, name: str) -> PullConsumer: ... + async def get_push(self, name: str) -> PushConsumer: ... + async def delete(self, name: str) -> bool: ... + async def pause(self, name: str, delay: float | timedelta) -> bool: ... + async def resume(self, name: str) -> bool: ... class ObjectStoreManager: async def create(self, config: ObjectStoreConfig) -> ObjectStore: ... diff --git a/python/natsrpy/js/__init__.py b/python/natsrpy/js.py similarity index 80% rename from python/natsrpy/js/__init__.py rename to python/natsrpy/js.py index 271ea54..ddb89b4 100644 --- a/python/natsrpy/js/__init__.py +++ b/python/natsrpy/js.py @@ -1,5 +1,5 @@ -from natsrpy._natsrpy_rs.js import JetStream -from natsrpy.js.consumers import ( +from ._natsrpy_rs.js import JetStream +from ._natsrpy_rs.js.consumers import ( AckPolicy, DeliverPolicy, PriorityPolicy, @@ -9,9 +9,9 @@ PushConsumerConfig, ReplayPolicy, ) -from natsrpy.js.kv import KeyValue, KVConfig -from natsrpy.js.object_store import ObjectStore, ObjectStoreConfig -from natsrpy.js.stream import ( +from ._natsrpy_rs.js.kv import KeyValue, KVConfig +from ._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig +from ._natsrpy_rs.js.stream import ( Compression, ConsumerLimits, DiscardPolicy, diff --git a/python/natsrpy/js/consumers.py b/python/natsrpy/js/consumers.py deleted file mode 100644 index e90e8f5..0000000 --- a/python/natsrpy/js/consumers.py +++ /dev/null @@ -1,21 +0,0 @@ -from natsrpy._natsrpy_rs.js.consumers import ( - AckPolicy, - DeliverPolicy, - PriorityPolicy, - PullConsumer, - PullConsumerConfig, - PushConsumer, - PushConsumerConfig, - ReplayPolicy, -) - -__all__ = [ - "AckPolicy", - "DeliverPolicy", - "PriorityPolicy", - "PullConsumer", - "PullConsumerConfig", - "PushConsumer", - "PushConsumerConfig", - "ReplayPolicy", -] diff --git a/python/natsrpy/js/kv.py b/python/natsrpy/js/kv.py deleted file mode 100644 index 6eee56e..0000000 --- a/python/natsrpy/js/kv.py +++ /dev/null @@ -1,6 +0,0 @@ -from natsrpy._natsrpy_rs.js.kv import KeyValue, KVConfig - -__all__ = [ - "KVConfig", - "KeyValue", -] diff --git a/python/natsrpy/js/object_store.py b/python/natsrpy/js/object_store.py deleted file mode 100644 index 0c568ad..0000000 --- a/python/natsrpy/js/object_store.py +++ /dev/null @@ -1,6 +0,0 @@ -from natsrpy._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig - -__all__ = [ - "ObjectStore", - "ObjectStoreConfig", -] diff --git a/python/natsrpy/js/stream.py b/python/natsrpy/js/stream.py deleted file mode 100644 index edf828c..0000000 --- a/python/natsrpy/js/stream.py +++ /dev/null @@ -1,50 +0,0 @@ -from natsrpy._natsrpy_rs.js.stream import ( - ClusterInfo, - Compression, - ConsumerLimits, - DiscardPolicy, - External, - PeerInfo, - PersistenceMode, - Placement, - Republish, - RetentionPolicy, - Source, - SourceInfo, - StorageType, - Stream, - StreamConfig, - StreamInfo, - StreamMessage, - StreamState, - SubjectTransform, -) - -__all__ = [ - "ClusterInfo", - "Compression", - "Compression", - "ConsumerLimits", - "ConsumerLimits", - "DiscardPolicy", - "DiscardPolicy", - "External", - "PeerInfo", - "PersistenceMode", - "PersistenceMode", - "Placement", - "Republish", - "RetentionPolicy", - "RetentionPolicy", - "Source", - "SourceInfo", - "StorageType", - "Stream", - "Stream", - "StreamConfig", - "StreamConfig", - "StreamInfo", - "StreamMessage", - "StreamState", - "SubjectTransform", -] diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 02ca844..e005a29 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -67,7 +67,7 @@ pub enum NatsrpyError { #[error(transparent)] PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError), #[error(transparent)] - PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), + ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), #[error(transparent)] PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError), #[error(transparent)] @@ -75,6 +75,8 @@ pub enum NatsrpyError { #[error(transparent)] ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError), #[error(transparent)] + ConsumerUpdateError(#[from] async_nats::jetstream::stream::ConsumerUpdateError), + #[error(transparent)] ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError), #[error(transparent)] ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError), diff --git a/src/js/consumers/pull/config.rs b/src/js/consumers/pull/config.rs index a963433..fc8584a 100644 --- a/src/js/consumers/pull/config.rs +++ b/src/js/consumers/pull/config.rs @@ -8,8 +8,8 @@ use crate::{ #[pyo3::pyclass(from_py_object, get_all, set_all)] #[derive(Clone, Debug, Default)] pub struct PullConsumerConfig { - pub durable_name: Option, pub name: Option, + pub durable_name: Option, pub description: Option, pub deliver_policy: DeliverPolicy, pub delivery_start_sequence: Option, @@ -42,8 +42,8 @@ pub struct PullConsumerConfig { impl PullConsumerConfig { #[new] #[pyo3(signature=( - durable_name=None, name=None, + durable_name=None, description=None, deliver_policy=None, delivery_start_sequence=None, @@ -73,8 +73,8 @@ impl PullConsumerConfig { ))] #[must_use] pub fn __new__( - durable_name: Option, name: Option, + durable_name: Option, description: Option, deliver_policy: Option, delivery_start_sequence: Option, @@ -103,8 +103,8 @@ impl PullConsumerConfig { pause_until: Option, ) -> Self { let mut conf = Self { - durable_name, name, + durable_name, description, delivery_start_sequence, delivery_start_time, diff --git a/src/js/consumers/push/config.rs b/src/js/consumers/push/config.rs index 71cfc46..7b4df9f 100644 --- a/src/js/consumers/push/config.rs +++ b/src/js/consumers/push/config.rs @@ -9,8 +9,8 @@ use crate::{ #[derive(Clone, Debug)] pub struct PushConsumerConfig { pub deliver_subject: String, - pub durable_name: Option, pub name: Option, + pub durable_name: Option, pub description: Option, pub deliver_group: Option, pub deliver_policy: DeliverPolicy, @@ -42,8 +42,8 @@ impl PushConsumerConfig { #[new] #[pyo3(signature=( deliver_subject, - durable_name=None, name=None, + durable_name=None, description=None, deliver_group=None, deliver_policy=None, @@ -73,8 +73,8 @@ impl PushConsumerConfig { #[must_use] pub fn __new__( deliver_subject: String, - durable_name: Option, name: Option, + durable_name: Option, description: Option, deliver_group: Option, deliver_policy: Option, diff --git a/src/js/managers/consumers.rs b/src/js/managers/consumers.rs index 12e346a..2ec1697 100644 --- a/src/js/managers/consumers.rs +++ b/src/js/managers/consumers.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, Python}; use tokio::sync::RwLock; @@ -6,7 +6,7 @@ use tokio::sync::RwLock; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::consumers::{self, pull::PullConsumer, push::PushConsumer}, - utils::natsrpy_future, + utils::{natsrpy_future, py_types::TimeValue}, }; #[pyo3::pyclass] @@ -78,7 +78,31 @@ impl ConsumersManager { }) } - pub fn get<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + pub fn update<'py>( + &self, + py: Python<'py>, + config: ConsumerConfigs, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future(py, async move { + match config { + ConsumerConfigs::Pull(config) => { + let consumer = PullConsumer::new( + ctx.read().await.update_consumer(config.try_into()?).await?, + ); + Ok(Python::attach(|gil| consumer.into_py_any(gil))?) + } + ConsumerConfigs::Push(config) => { + let consumer = PushConsumer::new( + ctx.read().await.update_consumer(config.try_into()?).await?, + ); + Ok(Python::attach(|gil| consumer.into_py_any(gil))?) + } + } + }) + } + + pub fn get_pull<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { let ctx = self.stream.clone(); natsrpy_future(py, async move { Ok(consumers::pull::consumer::PullConsumer::new( @@ -86,4 +110,33 @@ impl ConsumersManager { )) }) } + + pub fn get_push<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future(py, async move { + Ok(consumers::push::consumer::PushConsumer::new( + ctx.read().await.get_consumer(&name).await?, + )) + }) + } + + pub fn pause<'py>( + &self, + py: Python<'py>, + name: String, + delay: TimeValue, + ) -> NatsrpyResult> { + let ctx = self.stream.clone(); + let untill = time::OffsetDateTime::now_utc() + Duration::from(delay); + natsrpy_future(py, async move { + Ok(ctx.read().await.pause_consumer(&name, untill).await?.paused) + }) + } + + pub fn resume<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx = self.stream.clone(); + natsrpy_future(py, async move { + Ok(ctx.read().await.resume_consumer(&name).await?.paused) + }) + } }