diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/api/Producer.java index 76573660..ed5595a7 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/Producer.java @@ -16,13 +16,15 @@ */ package io.openmessaging.api; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; + import io.openmessaging.api.exception.OMSDestinationException; import io.openmessaging.api.exception.OMSMessageFormatException; import io.openmessaging.api.exception.OMSRuntimeException; import io.openmessaging.api.exception.OMSSecurityException; import io.openmessaging.api.exception.OMSTimeOutException; -import java.util.Properties; -import java.util.concurrent.ExecutorService; /** * A {@code Producer} is a simple object used to send messages on behalf of a {@code MessagingAccessPoint}. An instance @@ -43,6 +45,23 @@ */ public interface Producer extends Admin { + /** + * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it + * does not already have any metadata about the given topic. + * + * @param topic + * @return + */ + Set topicPartitions(String topic); + + /** + * Register a callback for sensing topic metadata changes. + * + * @param topic + * @param callback + */ + void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback); + /** * Sends a message to the specified destination synchronously, the destination should be preset to {@link * Message#setTopic(String)}, other header fields as well. @@ -57,6 +76,20 @@ public interface Producer extends Admin { */ SendResult send(final Message message); + /** + * Sends a message to the specified partition synchronously. + * + * @param message a message will be sent. + * @param topicPartition {@link TopicPartition}. + * @return the successful {@code SendResult}. + * @throws OMSSecurityException when have no authority to send messages to a given destination. + * @throws OMSMessageFormatException when an invalid message is specified. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + SendResult send(final Message message, TopicPartition topicPartition); + /** *

* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't care about the @@ -66,6 +99,16 @@ public interface Producer extends Admin { */ void sendOneway(final Message message); + /** + *

+ * There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't care about the + * send result and also have no context to get the result. + * + * @param message a message will be sent. + * @param topicPartition {@link TopicPartition}. + */ + void sendOneway(final Message message, TopicPartition topicPartition); + /** * Sends a message to the specified destination asynchronously, the destination should be preset to {@link * Message#setTopic(String)}, other header fields as well. @@ -78,6 +121,18 @@ public interface Producer extends Admin { */ void sendAsync(final Message message, final SendCallback sendCallback); + /** + * Sends a message to the specified partition asynchronously. + *

+ * The returned {@code Promise} will have the result once the operation completes, and the registered {@link + * SendCallback} will be invoked, either because the operation was successful or because of an error. + * + * @param message a message will be sent. + * @param topicPartition {@link TopicPartition}. + * @param sendCallback {@link SendCallback} + */ + void sendAsync(final Message message, TopicPartition topicPartition, final SendCallback sendCallback); + /** * Set call back excutor * @param callbackExecutor diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/TopicPartitionChangeListener.java b/openmessaging-api/src/main/java/io/openmessaging/api/TopicPartitionChangeListener.java new file mode 100644 index 00000000..36c1bced --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/api/TopicPartitionChangeListener.java @@ -0,0 +1,13 @@ +package io.openmessaging.api; + +import java.util.Set; + +public interface TopicPartitionChangeListener { + /** + * This method will be invoked in the condition of partition numbers changed, These scenarios occur when the + * topic is expanded or shrunk. + * + * @param topicPartitions + */ + void onChanged(Set topicPartitions); +} \ No newline at end of file