Class Session
-
- All Implemented Interfaces:
-
java.lang.AutoCloseable
public final class Session implements AutoCloseable
A Zenoh Session, the core interaction point with a Zenoh network.
A session is typically associated with declarations such as Publishers, Subscribers, or Queryables, which are declared using declarePublisher, declareSubscriber, and declareQueryable, respectively. Other operations such as simple Put, Get or Delete can be performed from a session using put, get and delete.
Sessions are open upon creation and can be closed manually by calling close. Alternatively, the session will be automatically closed when used with Java's try-with-resources statement or its Kotlin counterpart, use.
For optimal performance and adherence to good practices, it is recommended to have only one running session, which is sufficient for most use cases. You should never construct one session per publisher/subscriber, as this will significantly increase the size of your Zenoh network, while preventing potential locality-based optimizations.
-
-
Method Summary
Modifier and Type Method Description Unit
close()
Close the session. final Publisher
declarePublisher(KeyExpr keyExpr, PublisherOptions publisherOptions)
Declare a Publisher on the session. final HandlerSubscriber<BlockingQueue<Optional<Sample>>>
declareSubscriber(KeyExpr keyExpr)
Declare a Subscriber on the session. final <R extends Any> HandlerSubscriber<R>
declareSubscriber(KeyExpr keyExpr, Handler<Sample, R> handler)
Declare a Subscriber on the session using a handler. final CallbackSubscriber
declareSubscriber(KeyExpr keyExpr, Callback<Sample> callback)
Declare a Subscriber on the session using a callback. final HandlerQueryable<BlockingQueue<Optional<Query>>>
declareQueryable(KeyExpr keyExpr, QueryableOptions options)
Declare a Queryable on the session. final <R extends Any> HandlerQueryable<R>
declareQueryable(KeyExpr keyExpr, Handler<Query, R> handler, QueryableOptions options)
Declare a Queryable on the session. final CallbackQueryable
declareQueryable(KeyExpr keyExpr, Callback<Query> callback, QueryableOptions options)
Declare a Queryable on the session. final Querier
declareQuerier(KeyExpr keyExpr, QuerierOptions options)
Declare a Querier. final KeyExpr
declareKeyExpr(String keyExpr)
Declare a KeyExpr. final Unit
undeclare(KeyExpr keyExpr)
Undeclare a KeyExpr. final BlockingQueue<Optional<Reply>>
get(IntoSelector selector, GetOptions options)
Perform a get query handling the replies through a BlockingQueue. final <R extends Any> R
get(IntoSelector selector, Handler<Reply, R> handler, GetOptions options)
Perform a get query handling the replies through a Handler. final Unit
get(IntoSelector selector, Callback<Reply> callback, GetOptions options)
Perform a get query, handling the replies with a Callback. final Unit
put(KeyExpr keyExpr, IntoZBytes payload, PutOptions options)
Perform a put with the provided payload to the specified keyExpr. final Unit
put(KeyExpr keyExpr, String payload, PutOptions options)
Perform a put with the provided payload to the specified keyExpr. final Unit
delete(KeyExpr keyExpr, DeleteOptions options)
Perform a delete operation to the specified keyExpr. final Boolean
isClosed()
Returns if session is open or has been closed. final SessionInfo
info()
Returns the SessionInfo of this session. final Liveliness
liveliness()
Obtain a Liveliness instance tied to this Zenoh session. -
-
Method Detail
-
close
Unit close()
Close the session.
Closing the session invalidates any attempt to perform a declaration or to perform an operation such as Put or Delete. Attempting to do so will result in a failure.
However, any session declaration that was still alive and bound to the session previous to closing it, will still be alive.
-
declarePublisher
final Publisher declarePublisher(KeyExpr keyExpr, PublisherOptions publisherOptions)
Declare a Publisher on the session.
Example:
try (Session session = Zenoh.open(config)) { // A publisher config can optionally be provided. PublisherOptions publisherOptions = new PublisherOptions(); publisherOptions.setEncoding(Encoding.ZENOH_STRING); publisherOptions.setCongestionControl(CongestionControl.BLOCK); publisherOptions.setReliability(Reliability.RELIABLE); // Declare the publisher Publisher publisher = session.declarePublisher(keyExpr, publisherOptions); int idx = 0; while (true) { Thread.sleep(1000); String payload = String.format("[%4d] %s", idx, value); System.out.println("Putting Data ('" + keyExpr + "': '" + payload + "')..."); publisher.put(payload); idx++; } }
- Parameters:
keyExpr
- The KeyExpr the publisher will be associated to.publisherOptions
- Optional PublisherOptions to configure the publisher.- Returns:
The declared Publisher.
-
declareSubscriber
final HandlerSubscriber<BlockingQueue<Optional<Sample>>> declareSubscriber(KeyExpr keyExpr)
Declare a Subscriber on the session.
Example with blocking queue (default receiver):
try (Session session = Zenoh.open(config)) { try (HandlerSubscriber<BlockingQueue<Optional<Sample>>> subscriber = session.declareSubscriber(keyExpr)) { BlockingQueue<Optional<Sample>> receiver = subscriber.getReceiver(); assert receiver != null; while (true) { Optional<Sample> wrapper = receiver.take(); if (wrapper.isEmpty()) { break; } System.out.println(wrapper.get()); handleSample(wrapper.get()); } } }
- Parameters:
keyExpr
- The KeyExpr the subscriber will be associated to.- Returns:
HandlerSubscriber with a BlockingQueue as a receiver.
-
declareSubscriber
final <R extends Any> HandlerSubscriber<R> declareSubscriber(KeyExpr keyExpr, Handler<Sample, R> handler)
Declare a Subscriber on the session using a handler.
Example with a custom handler:
// Example handler that stores the received samples into a queue. class QueueHandler implements Handler<Sample, ArrayDeque<Sample>> { final ArrayDeque<Sample> queue = new ArrayDeque<>(); @Override public void handle(Sample t) { queue.add(t); } @Override public ArrayDeque<Sample> receiver() { return queue; } @Override public void onClose() {} } // ... try (Session session = Zenoh.open(config)) { QueueHandler queueHandler = new QueueHandler(); var subscriber = session.declareSubscriber(keyExpr, queueHandler); // ... }
- Parameters:
keyExpr
- The KeyExpr the subscriber will be associated to.handler
- The Handler to process the incoming Samples received by the subscriber.- Returns:
A HandlerSubscriber with the handler's receiver.
-
declareSubscriber
final CallbackSubscriber declareSubscriber(KeyExpr keyExpr, Callback<Sample> callback)
Declare a Subscriber on the session using a callback.
Example with a callback:
try (Session session = Zenoh.open(config)) { var subscriber = session.declareSubscriber(keyExpr, sample -> System.out.println(sample)); // ... }
-
declareQueryable
final HandlerQueryable<BlockingQueue<Optional<Query>>> declareQueryable(KeyExpr keyExpr, QueryableOptions options)
Declare a Queryable on the session.
Example using a blocking queue (default receiver):
try (Session session = Zenoh.open(config)) { var queryable = session.declareQueryable(keyExpr); var receiver = queryable.getReceiver(); while (true) { Optional<Query> wrapper = receiver.take(); if (wrapper.isEmpty()) { break; } Query query = wrapper.get(); query.reply(query.getKeyExpr(), "Example reply"); } }
- Parameters:
keyExpr
- The KeyExpr the queryable will be associated to.options
- Optional QueryableOptions for configuring the queryable.- Returns:
A HandlerQueryable with a BlockingQueue receiver.
-
declareQueryable
final <R extends Any> HandlerQueryable<R> declareQueryable(KeyExpr keyExpr, Handler<Query, R> handler, QueryableOptions options)
Declare a Queryable on the session.
Example using a custom Handler:
// Example handler that replies with the amount of queries received. class QueryHandler implements Handler<Query, Void> { private Int counter = 0; @Override public void handle(Query query) { var keyExpr = query.getKeyExpr(); query.reply(keyExpr, "Reply #" + counter + "!"); counter++; } @Override public Void receiver() {} @Override public void onClose() {} } // ... try (Session session = Zenoh.open(config)) { var queryable = session.declareQueryable(keyExpr, new QueryHandler()); //... }
- Parameters:
keyExpr
- The KeyExpr the queryable will be associated to.handler
- The Handler to handle the incoming queries.options
- Optional QueryableOptions for configuring the queryable.- Returns:
A HandlerQueryable with the handler's receiver.
-
declareQueryable
final CallbackQueryable declareQueryable(KeyExpr keyExpr, Callback<Query> callback, QueryableOptions options)
Declare a Queryable on the session.
try (Session session = Zenoh.open(config)) { var queryable = session.declareQueryable(keyExpr, query -> query.reply(keyExpr, "Example reply")); //... }
- Parameters:
keyExpr
- The KeyExpr the queryable will be associated to.callback
- The Callback to handle the incoming queries.options
- Optional QueryableOptions for configuring the queryable.- Returns:
-
declareQuerier
final Querier declareQuerier(KeyExpr keyExpr, QuerierOptions options)
Declare a Querier.
A querier allows to send queries to a queryable.
Queriers are automatically undeclared when dropped.
Example:
try (Session session = Zenoh.open(config)) { QuerierOptions options = new QuerierOptions(); options.setTarget(QueryTarget.BEST_MATCHING); Querier querier = session.declareQuerier(selector.getKeyExpr(), options); //... Querier.GetOptions options = new Querier.GetOptions(); options.setPayload("Example payload"); querier.get(reply -> {...}, options); }
- Parameters:
keyExpr
- The KeyExpr for the querier.options
- Optional QuerierOptions to configure the querier.- Returns:
A Querier that will be undeclared on drop.
-
declareKeyExpr
final KeyExpr declareKeyExpr(String keyExpr)
Declare a KeyExpr.
Informs Zenoh that you intend to use the provided Key Expression repeatedly.
It is generally not needed to declare key expressions, as declaring a subscriber, a queryable, or a publisher will also inform Zenoh of your intent to use their key expressions repeatedly.
- Parameters:
keyExpr
- The intended Key expression.- Returns:
The declared KeyExpr.
-
undeclare
final Unit undeclare(KeyExpr keyExpr)
Undeclare a KeyExpr.
The key expression must have been previously declared on the session with declareKeyExpr, otherwise the operation will result in a failure.
- Parameters:
keyExpr
- The key expression to undeclare.- Returns:
A resolvable returning the status of the undeclare operation.
-
get
final BlockingQueue<Optional<Reply>> get(IntoSelector selector, GetOptions options)
Perform a get query handling the replies through a BlockingQueue.
Example using the default blocking queue receiver:
try (Session session = Zenoh.open(config)) { System.out.println("Performing Get on '" + selector + "'..."); BlockingQueue<Optional<Reply>> receiver = session.get(Selector.from("a/b/c")); while (true) { Optional<Reply> wrapper = receiver.take(); if (wrapper.isEmpty()) { break; } Reply reply = wrapper.get(); System.out.println(reply); } }
- Parameters:
selector
- The Selector for the get query.options
- Optional GetOptions to configure the get query.- Returns:
A BlockingQueue with the received replies.
-
get
final <R extends Any> R get(IntoSelector selector, Handler<Reply, R> handler, GetOptions options)
Perform a get query handling the replies through a Handler.
Example using a custom handler:
// Example handler that prints the replies along with a counter: class GetHandler implements Handler<Reply, Void> { private Int counter = 0; @Override public void handle(Reply reply) { System.out.println("Reply #" + counter + ": " + reply); counter++; } @Override public Void receiver() {} @Override public void onClose() {} } //... try (Session session = Zenoh.open(config)) { System.out.println("Performing Get on '" + selector + "'..."); session.get(Selector.from("a/b/c"), new GetHandler()); //... }
- Parameters:
selector
- The Selector for the get query.handler
- The Handler to handle the incoming replies.options
- Optional GetOptions to configure the query.- Returns:
The handler's receiver.
-
get
final Unit get(IntoSelector selector, Callback<Reply> callback, GetOptions options)
Perform a get query, handling the replies with a Callback.
Example:
try (Session session = Zenoh.open(config)) { session.get(Selector.from("a/b/c"), reply -> System.out.println(reply)); //... }
- Parameters:
selector
- The Selector for the get query.callback
- The Callback to handle the incoming replies.options
- Optional GetOptions to configure the query.
-
put
final Unit put(KeyExpr keyExpr, IntoZBytes payload, PutOptions options)
Perform a put with the provided payload to the specified keyExpr.
Example:
session.put(KeyExpr.from("a/b/c"), ZBytes.from("Example payload")); //...
- Parameters:
keyExpr
- The KeyExpr for performing the put.payload
- The payload to put.options
- Optional PutOptions to configure the put.
-
put
final Unit put(KeyExpr keyExpr, String payload, PutOptions options)
Perform a put with the provided payload to the specified keyExpr.
Example:
session.put(KeyExpr.from("a/b/c"), "Example payload"); //...
- Parameters:
keyExpr
- The KeyExpr for performing the put.payload
- The payload to put as a string.options
- Optional PutOptions to configure the put.
-
delete
final Unit delete(KeyExpr keyExpr, DeleteOptions options)
Perform a delete operation to the specified keyExpr.
- Parameters:
keyExpr
- The KeyExpr for performing the delete operation.options
- Optional DeleteOptions to configure the delete operation.
-
info
final SessionInfo info()
Returns the SessionInfo of this session.
-
liveliness
final Liveliness liveliness()
Obtain a Liveliness instance tied to this Zenoh session.
-
-
-
-